Coverage for pure3270/protocol/negotiator.py: 71%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1""" 

2Negotiator for TN3270 protocol specifics. 

3Handles Telnet negotiation and TN3270E subnegotiation. 

4""" 

5 

6import asyncio 

7import logging 

8from typing import Optional, TYPE_CHECKING, List 

9from .data_stream import DataStreamParser 

10from .utils import ( 

11 send_iac, 

12 send_subnegotiation, 

13 TN3270E_DEVICE_TYPE, 

14 TN3270E_FUNCTIONS, 

15 TN3270E_IS, 

16 TN3270E_REQUEST, 

17 TN3270E_SEND, 

18 TN3270E_BIND_IMAGE, 

19 TN3270E_DATA_STREAM_CTL, 

20 TN3270E_RESPONSES, 

21 TN3270E_SCS_CTL_CODES, 

22 TN3270E_SYSREQ, 

23 TN3270E_IBM_DYNAMIC, 

24) 

25from .exceptions import NegotiationError, ProtocolError, ParseError 

26from ..emulation.screen_buffer import ScreenBuffer 

27 

28if TYPE_CHECKING: 

29 from .tn3270_handler import TN3270Handler 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class Negotiator: 

35 """ 

36 Handles TN3270 negotiation logic. 

37 """ 

38 

39 def __init__( 

40 self, 

41 writer: Optional[asyncio.StreamWriter], 

42 parser: DataStreamParser, 

43 screen_buffer: ScreenBuffer, 

44 handler: Optional["TN3270Handler"] = None, 

45 ): 

46 """ 

47 Initialize the Negotiator. 

48 

49 Args: 

50 writer: StreamWriter for sending commands. 

51 parser: DataStreamParser for parsing responses. 

52 screen_buffer: ScreenBuffer to update during negotiation. 

53 handler: TN3270Handler instance for accessing reader methods. 

54 """ 

55 logger.debug("Negotiator.__init__ called") 

56 self.writer = writer 

57 self.parser = parser 

58 self.screen_buffer = screen_buffer 

59 self.handler = handler 

60 self._ascii_mode = False 

61 logger.debug(f"Negotiator._ascii_mode initialized to {self._ascii_mode}") 

62 self.negotiated_tn3270e = False 

63 self._lu_name: Optional[str] = None 

64 self.screen_rows = 24 

65 self.screen_cols = 80 

66 self.is_printer_session = False 

67 self.supported_device_types: List[str] = [ 

68 "IBM-3278-2", 

69 "IBM-3278-3", 

70 "IBM-3278-4", 

71 "IBM-3278-5", 

72 "IBM-3279-2", 

73 "IBM-3279-3", 

74 "IBM-3279-4", 

75 "IBM-3279-5", 

76 "IBM-DYNAMIC", 

77 ] 

78 self.requested_device_type: Optional[str] = None 

79 self.negotiated_device_type: Optional[str] = None 

80 self.supported_functions: int = ( 

81 TN3270E_BIND_IMAGE 

82 | TN3270E_DATA_STREAM_CTL 

83 | TN3270E_RESPONSES 

84 | TN3270E_SCS_CTL_CODES 

85 ) 

86 self.negotiated_functions: int = 0 

87 

88 async def negotiate(self) -> None: 

89 """ 

90 Perform initial Telnet negotiation. 

91 

92 Sends DO TERMINAL-TYPE and waits for responses. 

93 

94 Raises: 

95 NegotiationError: If negotiation fails. 

96 """ 

97 if self.writer is None: 

98 raise ProtocolError("Writer is None; cannot negotiate.") 

99 send_iac(self.writer, b"\xff\xfd\x27") # DO TERMINAL-TYPE 

100 await self.writer.drain() 

101 # Handle response (simplified) 

102 data = await self._read_iac() 

103 if not data: 

104 raise NegotiationError("No response to DO TERMINAL-TYPE") 

105 

106 async def _negotiate_tn3270(self) -> None: 

107 """ 

108 Negotiate TN3270E subnegotiation. 

109 

110 Sends TN3270E request and handles BIND, etc. 

111 

112 Raises: 

113 NegotiationError: On subnegotiation failure. 

114 """ 

115 if self.writer is None: 

116 raise ProtocolError("Writer is None; cannot negotiate TN3270.") 

117 # Send TN3270E subnegotiation 

118 tn3270e_request = b"\x00\x00\x01\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" 

119 send_subnegotiation(self.writer, b"\x19", tn3270e_request) 

120 await self.writer.drain() 

121 # Parse response 

122 try: 

123 response = await self._receive_data(10.0) 

124 if ( 

125 b"\x28" in response or b"\xff\xfb\x24" in response 

126 ): # TN3270E positive response 

127 self.negotiated_tn3270e = True 

128 self.parser.parse(response) 

129 logger.info("TN3270E negotiation successful") 

130 

131 # Check if this is a printer session based on LU name or BIND response 

132 if self.lu_name and ("LTR" in self.lu_name or "PTR" in self.lu_name): 

133 self.is_printer_session = True 

134 logger.info(f"Printer session detected for LU: {self.lu_name}") 

135 else: 

136 self.negotiated_tn3270e = False 

137 self.set_ascii_mode() 

138 logger.info("TN3270E negotiation failed, fallback to ASCII") 

139 except (ParseError, ProtocolError, asyncio.TimeoutError) as e: 

140 logger.warning(f"TN3270E negotiation failed with specific error: {e}") 

141 self.negotiated_tn3270e = False 

142 self.set_ascii_mode() 

143 except Exception as e: 

144 logger.error(f"Unexpected error during TN3270E negotiation: {e}") 

145 self.negotiated_tn3270e = False 

146 self.set_ascii_mode() 

147 

148 def set_ascii_mode(self) -> None: 

149 """ 

150 Set to ASCII mode fallback, matching s3270 behavior. 

151 

152 Disables EBCDIC processing and enables ASCII/VT100 terminal emulation. 

153 """ 

154 logger.debug( 

155 f"BEFORE set_ascii_mode: _ascii_mode = {self._ascii_mode} on negotiator object {id(self)}" 

156 ) 

157 self._ascii_mode = True 

158 logger.debug( 

159 f"AFTER set_ascii_mode: _ascii_mode = {self._ascii_mode} on negotiator object {id(self)}" 

160 ) 

161 logger.info("Switched to ASCII/VT100 mode (s3270 compatibility)") 

162 

163 async def _receive_data(self, timeout: float = 5.0) -> bytes: 

164 """ 

165 Receive data with timeout (internal). 

166 

167 Args: 

168 timeout: Receive timeout in seconds. 

169 

170 Returns: 

171 Received bytes. 

172 

173 Raises: 

174 asyncio.TimeoutError: If timeout exceeded. 

175 """ 

176 if self.handler: 

177 return await self.handler.receive_data(timeout) 

178 raise NotImplementedError("Handler required for receiving data") 

179 

180 async def _read_iac(self) -> bytes: 

181 """ 

182 Read IAC sequence (internal). 

183 

184 Returns: 

185 IAC response bytes. 

186 

187 Raises: 

188 ParseError: If IAC parsing fails. 

189 """ 

190 if self.handler: 

191 return await self.handler._read_iac() 

192 raise NotImplementedError("Handler required for reading IAC") 

193 

194 def is_printer_session_active(self) -> bool: 

195 """ 

196 Check if this is a printer session. 

197 

198 Returns: 

199 bool: True if printer session. 

200 """ 

201 return self.is_printer_session 

202 

203 @property 

204 def lu_name(self) -> Optional[str]: 

205 """Get the LU name.""" 

206 return self._lu_name 

207 

208 @lu_name.setter 

209 def lu_name(self, value: Optional[str]) -> None: 

210 """Set the LU name.""" 

211 self._lu_name = value 

212 

213 def _parse_tn3270e_subnegotiation(self, data: bytes) -> None: 

214 """ 

215 Parse TN3270E subnegotiation message. 

216 

217 Args: 

218 data: TN3270E subnegotiation data (without IAC SB and IAC SE) 

219 """ 

220 if len(data) < 3: 

221 logger.warning(f"Invalid TN3270E subnegotiation data: {data.hex()}") 

222 return 

223 

224 # Parse subnegotiation header 

225 # data[0] = TN3270E option (should be 0x28) 

226 # data[1] = message type 

227 # data[2] = message sub-type 

228 

229 if data[0] != 0x28: # TN3270E option 

230 logger.warning(f"Invalid TN3270E option in subnegotiation: 0x{data[0]:02x}") 

231 return 

232 

233 message_type = data[1] if len(data) > 1 else None 

234 message_subtype = data[2] if len(data) > 2 else None 

235 

236 if message_type == TN3270E_DEVICE_TYPE: 

237 self._handle_device_type_subnegotiation(data[1:]) 

238 elif message_type == TN3270E_FUNCTIONS: 

239 self._handle_functions_subnegotiation(data[1:]) 

240 else: 

241 logger.debug(f"Unhandled TN3270E subnegotiation type: 0x{message_type:02x}") 

242 

243 def _handle_device_type_subnegotiation(self, data: bytes) -> None: 

244 """ 

245 Handle DEVICE-TYPE subnegotiation message. 

246 

247 Args: 

248 data: DEVICE-TYPE subnegotiation data (message type already stripped) 

249 """ 

250 if len(data) < 2: 

251 logger.warning("Invalid DEVICE-TYPE subnegotiation data") 

252 return 

253 

254 sub_type = data[0] 

255 

256 if sub_type == TN3270E_IS: 

257 # DEVICE-TYPE IS - server is telling us what device type to use 

258 if len(data) > 1: 

259 # Extract device type string (null-terminated or until end) 

260 device_type_bytes = data[1:] 

261 # Find null terminator if present 

262 null_pos = device_type_bytes.find(0x00) 

263 if null_pos != -1: 

264 device_type_bytes = device_type_bytes[:null_pos] 

265 

266 device_type = device_type_bytes.decode("ascii", errors="ignore").strip() 

267 logger.info(f"Server requested device type: {device_type}") 

268 

269 # Handle IBM-DYNAMIC specially 

270 if device_type == TN3270E_IBM_DYNAMIC: 

271 logger.info("IBM-DYNAMIC device type negotiated") 

272 self.negotiated_device_type = TN3270E_IBM_DYNAMIC 

273 # For IBM-DYNAMIC, we may need to negotiate screen size dynamically 

274 else: 

275 self.negotiated_device_type = device_type 

276 

277 elif sub_type == TN3270E_REQUEST: 

278 # DEVICE-TYPE REQUEST - server is asking what device types we support 

279 logger.info("Server requested supported device types") 

280 self._send_supported_device_types() 

281 else: 

282 logger.warning( 

283 f"Unhandled DEVICE-TYPE subnegotiation subtype: 0x{sub_type:02x}" 

284 ) 

285 

286 def _handle_functions_subnegotiation(self, data: bytes) -> None: 

287 """ 

288 Handle FUNCTIONS subnegotiation message. 

289 

290 Args: 

291 data: FUNCTIONS subnegotiation data (message type already stripped) 

292 """ 

293 if len(data) < 2: 

294 logger.warning("Invalid FUNCTIONS subnegotiation data") 

295 return 

296 

297 sub_type = data[0] 

298 

299 if sub_type == TN3270E_IS: 

300 # FUNCTIONS IS - server is telling us what functions are enabled 

301 if len(data) > 1: 

302 # Parse function bits 

303 function_bits = 0 

304 for i in range(1, len(data)): 

305 function_bits |= data[i] 

306 

307 logger.info(f"Server enabled functions: 0x{function_bits:02x}") 

308 self.negotiated_functions = function_bits 

309 

310 # Log specific functions 

311 if function_bits & TN3270E_BIND_IMAGE: 

312 logger.debug("BIND-IMAGE function enabled") 

313 if function_bits & TN3270E_DATA_STREAM_CTL: 

314 logger.debug("DATA-STREAM-CTL function enabled") 

315 if function_bits & TN3270E_RESPONSES: 

316 logger.debug("RESPONSES function enabled") 

317 if function_bits & TN3270E_SCS_CTL_CODES: 

318 logger.debug("SCS-CTL-CODES function enabled") 

319 if function_bits & TN3270E_SYSREQ: 

320 logger.debug("SYSREQ function enabled") 

321 

322 elif sub_type == TN3270E_REQUEST: 

323 # FUNCTIONS REQUEST - server is asking what functions we support 

324 logger.info("Server requested supported functions") 

325 self._send_supported_functions() 

326 else: 

327 logger.warning( 

328 f"Unhandled FUNCTIONS subnegotiation subtype: 0x{sub_type:02x}" 

329 ) 

330 

331 def _send_supported_device_types(self) -> None: 

332 """Send our supported device types to the server.""" 

333 if self.writer is None: 

334 logger.error("Cannot send device types: writer is None") 

335 return 

336 

337 # Send DEVICE-TYPE SEND response with our supported types 

338 # For simplicity, we'll just send our first supported type 

339 if self.supported_device_types: 

340 device_type = self.supported_device_types[0].encode("ascii") + b"\x00" 

341 sub_data = bytes([TN3270E_DEVICE_TYPE, TN3270E_SEND]) + device_type 

342 send_subnegotiation(self.writer, bytes([0x28]), sub_data) 

343 logger.debug( 

344 f"Sent supported device type: {self.supported_device_types[0]}" 

345 ) 

346 

347 def _send_supported_functions(self) -> None: 

348 """Send our supported functions to the server.""" 

349 if self.writer is None: 

350 logger.error("Cannot send functions: writer is None") 

351 return 

352 

353 # Send FUNCTIONS SEND response with our supported functions 

354 function_bytes = [ 

355 (self.supported_functions >> i) & 0xFF 

356 for i in range(0, 8, 8) 

357 if (self.supported_functions >> i) & 0xFF 

358 ] 

359 

360 if function_bytes: 

361 sub_data = bytes([TN3270E_FUNCTIONS, TN3270E_SEND] + function_bytes) 

362 send_subnegotiation(self.writer, bytes([0x28]), sub_data) 

363 logger.debug(f"Sent supported functions: 0x{self.supported_functions:02x}")