Coverage for pure3270/protocol/utils.py: 95%

107 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1"""Utility functions for TN3270 protocol handling.""" 

2 

3import logging 

4 

5logger = logging.getLogger(__name__) 

6 

7# Telnet constants 

8IAC = 0xFF 

9SB = 0xFA 

10SE = 0xF0 

11WILL = 0xFB 

12WONT = 0xFC 

13DO = 0xFD 

14DONT = 0xFE 

15 

16# TN3270E constants 

17TN3270E = 0x28 # TN3270E Telnet option 

18EOR = 0x19 # End of Record 

19 

20# TN3270E Data Types 

21TN3270_DATA = 0x00 

22SCS_DATA = 0x01 

23RESPONSE = 0x02 

24BIND_IMAGE = 0x03 

25UNBIND = 0x04 

26NVT_DATA = 0x05 

27REQUEST = 0x06 

28SSCP_LU_DATA = 0x07 

29PRINT_EOJ = 0x08 

30 

31# TN3270E Subnegotiation Message Types 

32TN3270E_DEVICE_TYPE = 0x00 

33TN3270E_FUNCTIONS = 0x01 

34TN3270E_IS = 0x02 

35TN3270E_REQUEST = 0x03 

36TN3270E_SEND = 0x04 

37 

38# TN3270E Device Types 

39TN3270E_IBM_DYNAMIC = "IBM-DYNAMIC" 

40TN3270E_IBM_3278_2 = "IBM-3278-2" 

41TN3270E_IBM_3278_3 = "IBM-3278-3" 

42TN3270E_IBM_3278_4 = "IBM-3278-4" 

43TN3270E_IBM_3278_5 = "IBM-3278-5" 

44TN3270E_IBM_3279_2 = "IBM-3279-2" 

45TN3270E_IBM_3279_3 = "IBM-3279-3" 

46TN3270E_IBM_3279_4 = "IBM-3279-4" 

47TN3270E_IBM_3279_5 = "IBM-3279-5" 

48 

49# TN3270E Functions 

50TN3270E_BIND_IMAGE = 0x01 

51TN3270E_DATA_STREAM_CTL = 0x02 

52TN3270E_RESPONSES = 0x04 

53TN3270E_SCS_CTL_CODES = 0x08 

54TN3270E_SYSREQ = 0x10 

55 

56# TN3270E Request Flags 

57TN3270E_RQF_ERR_COND_CLEARED = 0x00 

58TN3270E_RQF_MORE_THAN_ONE_RQST = 0x01 

59TN3270E_RQF_CANCEL_RQST = 0x02 

60 

61# TN3270E Response Flags 

62TN3270E_RSF_NO_RESPONSE = 0x00 

63TN3270E_RSF_ERROR_RESPONSE = 0x01 

64TN3270E_RSF_ALWAYS_RESPONSE = 0x02 

65TN3270E_RSF_POSITIVE_RESPONSE = 0x00 

66TN3270E_RSF_NEGATIVE_RESPONSE = 0x02 

67 

68# Structured Field Constants 

69STRUCTURED_FIELD = 0x3C # '<' character 

70QUERY_REPLY_SF = 0x88 

71READ_PARTITION_QUERY = 0x02 

72READ_PARTITION_QUERY_LIST = 0x03 

73 

74# Query Reply Types 

75QUERY_REPLY_DEVICE_TYPE = 0x01 

76QUERY_REPLY_CHARACTERISTICS = 0x02 

77QUERY_REPLY_HIGHLIGHTING = 0x03 

78QUERY_REPLY_COLOR = 0x04 

79QUERY_REPLY_EXTENDED_ATTRIBUTES = 0x05 

80QUERY_REPLY_GRAPHICS = 0x06 

81QUERY_REPLY_DBCS_ASIA = 0x07 

82QUERY_REPLY_DBCS_EUROPE = 0x08 

83QUERY_REPLY_DBCS_MIDDLE_EAST = 0x09 

84QUERY_REPLY_LINE_TYPE = 0x0A 

85QUERY_REPLY_OEM_AUXILIARY_DEVICE = 0x0B 

86QUERY_REPLY_TRANSPARENCY = 0x0C 

87QUERY_REPLY_FORMAT_STORAGE = 0x0D 

88QUERY_REPLY_DDM = 0x0E 

89QUERY_REPLY_RPQ_NAMES = 0x0F 

90QUERY_REPLY_SEGMENT = 0x10 

91QUERY_REPLY_PROCEDURE = 0x11 

92QUERY_REPLY_GRID = 0x12 

93 

94 

95def send_iac(writer, data: bytes) -> None: 

96 """ 

97 Send IAC command. 

98 

99 Args: 

100 writer: StreamWriter. 

101 data: Data bytes after IAC. 

102 """ 

103 if writer: 

104 writer.write(bytes([IAC]) + data) 

105 # drain() should be awaited in async context 

106 # The caller is responsible for awaiting drain() if needed 

107 

108 

109def send_subnegotiation(writer, opt: bytes, data: bytes) -> None: 

110 """ 

111 Send subnegotiation. 

112 

113 Args: 

114 writer: StreamWriter. 

115 opt: Option byte. 

116 data: Subnegotiation data. 

117 """ 

118 if writer: 

119 sub = bytes([IAC, SB]) + opt + data + bytes([IAC, SE]) 

120 writer.write(sub) 

121 # drain() should be awaited in async context 

122 # The caller is responsible for awaiting drain() if needed 

123 

124 

125def strip_telnet_iac( 

126 data: bytes, handle_eor_ga: bool = False, enable_logging: bool = False 

127) -> bytes: 

128 """ 

129 Strip Telnet IAC sequences from data. 

130 

131 :param data: Raw bytes containing potential IAC sequences. 

132 :param handle_eor_ga: If True, specifically handle EOR (0x19) and GA (0xf9) commands. 

133 :param enable_logging: If True, log EOR/GA stripping. 

134 :return: Cleaned bytes without IAC sequences. 

135 """ 

136 clean_data = b"" 

137 i = 0 

138 while i < len(data): 

139 if data[i] == IAC: 

140 if i + 1 < len(data): 

141 cmd = data[i + 1] 

142 if cmd == SB: 

143 # Skip subnegotiation until SE 

144 j = i + 2 

145 while j < len(data) and data[j] != SE: 

146 j += 1 

147 if j < len(data) and data[j] == SE: 

148 j += 1 

149 i = j 

150 continue 

151 elif cmd in (WILL, WONT, DO, DONT): 

152 i += 3 

153 continue 

154 elif handle_eor_ga and cmd in (0x19, 0xF9): # EOR or GA 

155 if enable_logging: 

156 if cmd == 0x19: 

157 logger.debug("Stripping IAC EOR in fallback") 

158 else: 

159 logger.debug("Stripping IAC GA in fallback") 

160 i += 2 

161 continue 

162 else: 

163 i += 2 

164 continue 

165 else: 

166 # Incomplete IAC at end, break to avoid index error 

167 break 

168 else: 

169 clean_data += bytes([data[i]]) 

170 i += 1 

171 return clean_data