Coverage for pure3270/protocol/tn3270e_header.py: 85%

48 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1"""TN3270E message header structure for pure3270.""" 

2 

3import struct 

4import logging 

5from typing import Optional 

6from .utils import ( 

7 TN3270_DATA, 

8 SCS_DATA, 

9 RESPONSE, 

10 BIND_IMAGE, 

11 UNBIND, 

12 NVT_DATA, 

13 REQUEST, 

14 SSCP_LU_DATA, 

15 PRINT_EOJ, 

16 TN3270E_RSF_NO_RESPONSE, 

17 TN3270E_RSF_ERROR_RESPONSE, 

18 TN3270E_RSF_ALWAYS_RESPONSE, 

19 TN3270E_RSF_POSITIVE_RESPONSE, 

20 TN3270E_RSF_NEGATIVE_RESPONSE, 

21) 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class TN3270EHeader: 

27 """ 

28 TN3270E Message Header Structure (RFC 2355 Section 3). 

29 

30 The TN3270E header is a 5-byte structure with the following fields: 

31 

32 Byte 0: DATA-TYPE 

33 Byte 1: REQUEST-FLAG 

34 Byte 2: RESPONSE-FLAG 

35 Byte 3-4: SEQ-NUMBER (16-bit big-endian) 

36 """ 

37 

38 def __init__( 

39 self, 

40 data_type: int = TN3270_DATA, 

41 request_flag: int = 0, 

42 response_flag: int = TN3270E_RSF_NO_RESPONSE, 

43 seq_number: int = 0, 

44 ): 

45 """ 

46 Initialize a TN3270E header. 

47 

48 Args: 

49 data_type: Type of data (TN3270_DATA, SCS_DATA, etc.) 

50 request_flag: Request flags 

51 response_flag: Response flags 

52 seq_number: Sequence number (0-65535) 

53 """ 

54 self.data_type = data_type 

55 self.request_flag = request_flag 

56 self.response_flag = response_flag 

57 self.seq_number = seq_number 

58 

59 @classmethod 

60 def from_bytes(cls, data: bytes) -> Optional["TN3270EHeader"]: 

61 """ 

62 Parse a TN3270E header from bytes. 

63 

64 Args: 

65 data: 5 bytes containing the TN3270E header 

66 

67 Returns: 

68 TN3270EHeader instance or None if parsing fails 

69 """ 

70 if len(data) < 5: 

71 return None 

72 

73 try: 

74 # Unpack the 5-byte header 

75 # B = unsigned char (1 byte) 

76 # H = unsigned short (2 bytes, big-endian) 

77 data_type, request_flag, response_flag, seq_number = struct.unpack( 

78 "!BBBH", data[:5] 

79 ) 

80 return cls(data_type, request_flag, response_flag, seq_number) 

81 except struct.error: 

82 return None 

83 

84 def to_bytes(self) -> bytes: 

85 """ 

86 Convert the header to bytes. 

87 

88 Returns: 

89 5 bytes representing the TN3270E header 

90 """ 

91 return struct.pack( 

92 "!BBBH", 

93 self.data_type, 

94 self.request_flag, 

95 self.response_flag, 

96 self.seq_number, 

97 ) 

98 

99 def __repr__(self) -> str: 

100 """String representation of the header.""" 

101 type_names = { 

102 TN3270_DATA: "TN3270_DATA", 

103 SCS_DATA: "SCS_DATA", 

104 RESPONSE: "RESPONSE", 

105 BIND_IMAGE: "BIND_IMAGE", 

106 UNBIND: "UNBIND", 

107 NVT_DATA: "NVT_DATA", 

108 REQUEST: "REQUEST", 

109 SSCP_LU_DATA: "SSCP_LU_DATA", 

110 PRINT_EOJ: "PRINT_EOJ", 

111 } 

112 

113 response_names = { 

114 TN3270E_RSF_NO_RESPONSE: "NO_RESPONSE", 

115 TN3270E_RSF_ERROR_RESPONSE: "ERROR_RESPONSE", 

116 TN3270E_RSF_ALWAYS_RESPONSE: "ALWAYS_RESPONSE", 

117 TN3270E_RSF_POSITIVE_RESPONSE: "POSITIVE_RESPONSE", 

118 TN3270E_RSF_NEGATIVE_RESPONSE: "NEGATIVE_RESPONSE", 

119 } 

120 

121 type_name = type_names.get(self.data_type, f"0x{self.data_type:02x}") 

122 response_name = response_names.get( 

123 self.response_flag, f"0x{self.response_flag:02x}" 

124 ) 

125 

126 return ( 

127 f"TN3270EHeader(data_type={type_name}, " 

128 f"request_flag=0x{self.request_flag:02x}, " 

129 f"response_flag={response_name}, " 

130 f"seq_number={self.seq_number})" 

131 ) 

132 

133 def is_tn3270_data(self) -> bool: 

134 """Check if this is TN3270_DATA type.""" 

135 return self.data_type == TN3270_DATA 

136 

137 def is_scs_data(self) -> bool: 

138 """Check if this is SCS_DATA type.""" 

139 return self.data_type == SCS_DATA 

140 

141 def is_response(self) -> bool: 

142 """Check if this is RESPONSE type.""" 

143 return self.data_type == RESPONSE 

144 

145 def is_error_response(self) -> bool: 

146 """Check if this is an error response.""" 

147 return self.response_flag == TN3270E_RSF_ERROR_RESPONSE 

148 

149 def is_negative_response(self) -> bool: 

150 """Check if this is a negative response.""" 

151 return self.response_flag == TN3270E_RSF_NEGATIVE_RESPONSE 

152 

153 def is_positive_response(self) -> bool: 

154 """Check if this is a positive response.""" 

155 return self.response_flag == TN3270E_RSF_POSITIVE_RESPONSE 

156 

157 def is_always_response(self) -> bool: 

158 """Check if this is an always response.""" 

159 return self.response_flag == TN3270E_RSF_ALWAYS_RESPONSE 

160 

161 def get_data_type_name(self) -> str: 

162 """Get the name of the data type.""" 

163 type_names = { 

164 TN3270_DATA: "TN3270_DATA", 

165 SCS_DATA: "SCS_DATA", 

166 RESPONSE: "RESPONSE", 

167 BIND_IMAGE: "BIND_IMAGE", 

168 UNBIND: "UNBIND", 

169 NVT_DATA: "NVT_DATA", 

170 REQUEST: "REQUEST", 

171 SSCP_LU_DATA: "SSCP_LU_DATA", 

172 PRINT_EOJ: "PRINT_EOJ", 

173 } 

174 return type_names.get(self.data_type, f"UNKNOWN(0x{self.data_type:02x})") 

175 

176 def get_response_flag_name(self) -> str: 

177 """Get the name of the response flag.""" 

178 response_names = { 

179 TN3270E_RSF_NO_RESPONSE: "NO_RESPONSE", 

180 TN3270E_RSF_ERROR_RESPONSE: "ERROR_RESPONSE", 

181 TN3270E_RSF_ALWAYS_RESPONSE: "ALWAYS_RESPONSE", 

182 TN3270E_RSF_POSITIVE_RESPONSE: "POSITIVE_RESPONSE", 

183 TN3270E_RSF_NEGATIVE_RESPONSE: "NEGATIVE_RESPONSE", 

184 } 

185 return response_names.get( 

186 self.response_flag, f"UNKNOWN(0x{self.response_flag:02x})" 

187 )