Coverage for pure3270/protocol/utils.py: 95%
107 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""Utility functions for TN3270 protocol handling."""
3import logging
5logger = logging.getLogger(__name__)
7# Telnet constants
8IAC = 0xFF
9SB = 0xFA
10SE = 0xF0
11WILL = 0xFB
12WONT = 0xFC
13DO = 0xFD
14DONT = 0xFE
16# TN3270E constants
17TN3270E = 0x28 # TN3270E Telnet option
18EOR = 0x19 # End of Record
20# TN3270E Data Types
21TN3270_DATA = 0x00
22SCS_DATA = 0x01
23RESPONSE = 0x02
24BIND_IMAGE = 0x03
25UNBIND = 0x04
26NVT_DATA = 0x05
27REQUEST = 0x06
28SSCP_LU_DATA = 0x07
29PRINT_EOJ = 0x08
31# TN3270E Subnegotiation Message Types
32TN3270E_DEVICE_TYPE = 0x00
33TN3270E_FUNCTIONS = 0x01
34TN3270E_IS = 0x02
35TN3270E_REQUEST = 0x03
36TN3270E_SEND = 0x04
38# TN3270E Device Types
39TN3270E_IBM_DYNAMIC = "IBM-DYNAMIC"
40TN3270E_IBM_3278_2 = "IBM-3278-2"
41TN3270E_IBM_3278_3 = "IBM-3278-3"
42TN3270E_IBM_3278_4 = "IBM-3278-4"
43TN3270E_IBM_3278_5 = "IBM-3278-5"
44TN3270E_IBM_3279_2 = "IBM-3279-2"
45TN3270E_IBM_3279_3 = "IBM-3279-3"
46TN3270E_IBM_3279_4 = "IBM-3279-4"
47TN3270E_IBM_3279_5 = "IBM-3279-5"
49# TN3270E Functions
50TN3270E_BIND_IMAGE = 0x01
51TN3270E_DATA_STREAM_CTL = 0x02
52TN3270E_RESPONSES = 0x04
53TN3270E_SCS_CTL_CODES = 0x08
54TN3270E_SYSREQ = 0x10
56# TN3270E Request Flags
57TN3270E_RQF_ERR_COND_CLEARED = 0x00
58TN3270E_RQF_MORE_THAN_ONE_RQST = 0x01
59TN3270E_RQF_CANCEL_RQST = 0x02
61# TN3270E Response Flags
62TN3270E_RSF_NO_RESPONSE = 0x00
63TN3270E_RSF_ERROR_RESPONSE = 0x01
64TN3270E_RSF_ALWAYS_RESPONSE = 0x02
65TN3270E_RSF_POSITIVE_RESPONSE = 0x00
66TN3270E_RSF_NEGATIVE_RESPONSE = 0x02
68# Structured Field Constants
69STRUCTURED_FIELD = 0x3C # '<' character
70QUERY_REPLY_SF = 0x88
71READ_PARTITION_QUERY = 0x02
72READ_PARTITION_QUERY_LIST = 0x03
74# Query Reply Types
75QUERY_REPLY_DEVICE_TYPE = 0x01
76QUERY_REPLY_CHARACTERISTICS = 0x02
77QUERY_REPLY_HIGHLIGHTING = 0x03
78QUERY_REPLY_COLOR = 0x04
79QUERY_REPLY_EXTENDED_ATTRIBUTES = 0x05
80QUERY_REPLY_GRAPHICS = 0x06
81QUERY_REPLY_DBCS_ASIA = 0x07
82QUERY_REPLY_DBCS_EUROPE = 0x08
83QUERY_REPLY_DBCS_MIDDLE_EAST = 0x09
84QUERY_REPLY_LINE_TYPE = 0x0A
85QUERY_REPLY_OEM_AUXILIARY_DEVICE = 0x0B
86QUERY_REPLY_TRANSPARENCY = 0x0C
87QUERY_REPLY_FORMAT_STORAGE = 0x0D
88QUERY_REPLY_DDM = 0x0E
89QUERY_REPLY_RPQ_NAMES = 0x0F
90QUERY_REPLY_SEGMENT = 0x10
91QUERY_REPLY_PROCEDURE = 0x11
92QUERY_REPLY_GRID = 0x12
95def send_iac(writer, data: bytes) -> None:
96 """
97 Send IAC command.
99 Args:
100 writer: StreamWriter.
101 data: Data bytes after IAC.
102 """
103 if writer:
104 writer.write(bytes([IAC]) + data)
105 # drain() should be awaited in async context
106 # The caller is responsible for awaiting drain() if needed
109def send_subnegotiation(writer, opt: bytes, data: bytes) -> None:
110 """
111 Send subnegotiation.
113 Args:
114 writer: StreamWriter.
115 opt: Option byte.
116 data: Subnegotiation data.
117 """
118 if writer:
119 sub = bytes([IAC, SB]) + opt + data + bytes([IAC, SE])
120 writer.write(sub)
121 # drain() should be awaited in async context
122 # The caller is responsible for awaiting drain() if needed
125def strip_telnet_iac(
126 data: bytes, handle_eor_ga: bool = False, enable_logging: bool = False
127) -> bytes:
128 """
129 Strip Telnet IAC sequences from data.
131 :param data: Raw bytes containing potential IAC sequences.
132 :param handle_eor_ga: If True, specifically handle EOR (0x19) and GA (0xf9) commands.
133 :param enable_logging: If True, log EOR/GA stripping.
134 :return: Cleaned bytes without IAC sequences.
135 """
136 clean_data = b""
137 i = 0
138 while i < len(data):
139 if data[i] == IAC:
140 if i + 1 < len(data):
141 cmd = data[i + 1]
142 if cmd == SB:
143 # Skip subnegotiation until SE
144 j = i + 2
145 while j < len(data) and data[j] != SE:
146 j += 1
147 if j < len(data) and data[j] == SE:
148 j += 1
149 i = j
150 continue
151 elif cmd in (WILL, WONT, DO, DONT):
152 i += 3
153 continue
154 elif handle_eor_ga and cmd in (0x19, 0xF9): # EOR or GA
155 if enable_logging:
156 if cmd == 0x19:
157 logger.debug("Stripping IAC EOR in fallback")
158 else:
159 logger.debug("Stripping IAC GA in fallback")
160 i += 2
161 continue
162 else:
163 i += 2
164 continue
165 else:
166 # Incomplete IAC at end, break to avoid index error
167 break
168 else:
169 clean_data += bytes([data[i]])
170 i += 1
171 return clean_data