Coverage for pure3270/protocol/tn3270_handler.py: 69%

201 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1""" 

2TN3270 protocol handler for pure3270. 

3Handles negotiation, data sending/receiving, and protocol specifics. 

4""" 

5 

6import asyncio 

7import ssl 

8import logging 

9from typing import Optional 

10from .data_stream import DataStreamParser 

11from ..emulation.screen_buffer import ScreenBuffer 

12from .utils import send_iac, send_subnegotiation 

13from .exceptions import NegotiationError, ProtocolError, ParseError 

14from .negotiator import Negotiator 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class TN3270Handler: 

20 """ 

21 Handler for TN3270 protocol over Telnet. 

22 

23 Manages stream I/O, negotiation, and data parsing for 3270 emulation. 

24 """ 

25 

26 async def connect( 

27 self, 

28 host: Optional[str] = None, 

29 port: Optional[int] = None, 

30 ssl_context: Optional[ssl.SSLContext] = None, 

31 ) -> None: 

32 """Connect the handler.""" 

33 # If already have reader/writer (from fixture), validate and mark as connected 

34 if self.reader is not None and self.writer is not None: 

35 # Add stream validation 

36 if not hasattr(self.reader, "read") or not hasattr(self.writer, "write"): 

37 raise ValueError("Invalid reader or writer objects") 

38 self._connected = True 

39 return 

40 

41 try: 

42 # Use provided params or fallback to instance values 

43 connect_host = host or self.host 

44 connect_port = port or self.port 

45 connect_ssl = ssl_context or self.ssl_context 

46 

47 reader, writer = await asyncio.open_connection( 

48 connect_host, connect_port, ssl=connect_ssl 

49 ) 

50 # Validate streams 

51 if reader is None or writer is None: 

52 raise ConnectionError("Failed to obtain valid reader/writer") 

53 if not hasattr(reader, "read") or not hasattr(writer, "write"): 

54 raise ConnectionError("Invalid stream objects returned") 

55 

56 self.reader = reader 

57 self.writer = writer 

58 self._connected = True 

59 

60 # Update negotiator with writer 

61 self.negotiator.writer = self.writer 

62 

63 # Perform negotiation 

64 await self.negotiator.negotiate() 

65 await self.negotiator._negotiate_tn3270() 

66 except Exception as e: 

67 logger.error(f"Connection failed: {e}") 

68 raise ConnectionError(f"Failed to connect: {e}") 

69 

70 def __init__( 

71 self, 

72 reader: Optional[asyncio.StreamReader], 

73 writer: Optional[asyncio.StreamWriter], 

74 ssl_context: Optional[ssl.SSLContext] = None, 

75 host: str = "localhost", 

76 port: int = 23, 

77 ): 

78 """ 

79 Initialize the TN3270 handler. 

80 

81 Args: 

82 reader: Asyncio stream reader (can be None for testing). 

83 writer: Asyncio stream writer (can be None for testing). 

84 ssl_context: Optional SSL context for secure connections. 

85 host: Target host for connection. 

86 port: Target port for connection. 

87 

88 Raises: 

89 ValueError: If reader or writer is None (when not in test mode). 

90 """ 

91 self.reader = reader 

92 self.writer = writer 

93 self.ssl_context = ssl_context 

94 self.host = host 

95 self.port = port 

96 self.screen_buffer = ScreenBuffer() 

97 self.parser = DataStreamParser(self.screen_buffer) 

98 self._connected = False 

99 self.negotiator = Negotiator(self.writer, self.parser, self.screen_buffer, self) 

100 

101 async def negotiate(self) -> None: 

102 """ 

103 Perform initial Telnet negotiation. 

104 

105 Delegates to negotiator. 

106 """ 

107 await self.negotiator.negotiate() 

108 

109 async def _negotiate_tn3270(self) -> None: 

110 """ 

111 Negotiate TN3270E subnegotiation. 

112 

113 Delegates to negotiator. 

114 """ 

115 await self.negotiator._negotiate_tn3270() 

116 

117 def set_ascii_mode(self) -> None: 

118 """ 

119 Set the handler to ASCII mode fallback. 

120 

121 Disables EBCDIC processing. 

122 """ 

123 self.negotiator.set_ascii_mode() 

124 

125 async def send_data(self, data: bytes) -> None: 

126 """ 

127 Send data over the connection. 

128 

129 Args: 

130 data: Bytes to send. 

131 

132 Raises: 

133 ProtocolError: If writer is None or send fails. 

134 """ 

135 if self.writer is None: 

136 logger.error("Not connected") 

137 raise ProtocolError("Writer is None; cannot send data.") 

138 self.writer.write(data) 

139 await self.writer.drain() 

140 

141 async def receive_data(self, timeout: float = 5.0) -> bytes: 

142 """ 

143 Receive data with timeout. 

144 

145 Args: 

146 timeout: Receive timeout in seconds. 

147 

148 Returns: 

149 Received bytes. 

150 

151 Raises: 

152 asyncio.TimeoutError: If timeout exceeded. 

153 ProtocolError: If reader is None. 

154 """ 

155 logger.debug(f"receive_data called on handler {id(self)}") 

156 logger.debug(f"Handler negotiator object ID: {id(self.negotiator)}") 

157 if self.reader is None: 

158 logger.error("Not connected") 

159 raise ProtocolError("Reader is None; cannot receive data.") 

160 try: 

161 logger.debug(f"Attempting to read data with timeout {timeout}") 

162 data = await asyncio.wait_for(self.reader.read(4096), timeout=timeout) 

163 logger.debug(f"Received {len(data)} bytes of data") 

164 except asyncio.TimeoutError: 

165 logger.debug("Timeout while reading data") 

166 raise 

167 except Exception as e: 

168 logger.error(f"Error reading data: {e}") 

169 raise 

170 

171 # Check if we're in ASCII mode 

172 ascii_mode = self.negotiator._ascii_mode 

173 logger.debug( 

174 f"Checking ASCII mode: negotiator._ascii_mode = {ascii_mode} on negotiator object {id(self.negotiator)}" 

175 ) 

176 

177 # Auto-detect ASCII mode based on data content for s3270 compatibility 

178 # s3270 automatically switches to ASCII mode when it detects VT100 sequences 

179 if not ascii_mode and len(data) > 0: 

180 # Check for common VT100 escape sequences that indicate ASCII mode 

181 if self._detect_vt100_sequences(data): 

182 logger.info( 

183 "Detected VT100 sequences, enabling ASCII mode (s3270 compatibility)" 

184 ) 

185 ascii_mode = True 

186 # In s3270 compatibility mode, also set the negotiator ASCII mode 

187 self.negotiator._ascii_mode = True 

188 

189 if ascii_mode: 

190 logger.debug("In ASCII mode, parsing VT100 data") 

191 # In ASCII mode, parse VT100 escape sequences and update screen buffer 

192 try: 

193 logger.debug(f"Parsing VT100 data ({len(data)} bytes)") 

194 from .vt100_parser import VT100Parser 

195 

196 vt100_parser = VT100Parser(self.screen_buffer) 

197 vt100_parser.parse(data) 

198 logger.debug("VT100 parsing completed successfully") 

199 except Exception as e: 

200 logger.warning(f"Error parsing VT100 data: {e}") 

201 import traceback 

202 

203 logger.warning(f"Traceback: {traceback.format_exc()}") 

204 return data 

205 # Parse and update screen buffer (simplified) 

206 logger.debug("In TN3270 mode, parsing 3270 data") 

207 try: 

208 self.parser.parse(data) 

209 except ParseError as e: 

210 logger.warning(f"Failed to parse received data: {e}") 

211 # Continue with raw data if parsing fails 

212 # Strip EOR if present 

213 if b"\xff\x19" in data: 

214 data = data.split(b"\xff\x19")[0] 

215 return data 

216 

217 def _detect_vt100_sequences(self, data: bytes) -> bool: 

218 """ 

219 Detect VT100 escape sequences in data for s3270 compatibility. 

220 

221 Args: 

222 data: Raw data to check for VT100 sequences 

223 

224 Returns: 

225 True if VT100 sequences detected, False otherwise 

226 """ 

227 if len(data) < 2: 

228 return False 

229 

230 # Check for ESC character followed by common VT100 sequences 

231 for i in range(len(data) - 1): 

232 if data[i] == 0x1B: # ESC character 

233 # Common VT100 sequences: 

234 # ESC [ - CSI (Control Sequence Introducer) 

235 # ESC ( - Character set designation 

236 # ESC ) - Character set designation 

237 # ESC # - DEC commands 

238 # ESC 7 - Save cursor 

239 # ESC 8 - Restore cursor 

240 if i + 1 < len(data): 

241 next_char = data[i + 1] 

242 if next_char in [ 

243 ord("["), 

244 ord("("), 

245 ord(")"), 

246 ord("#"), 

247 ord("7"), 

248 ord("8"), 

249 ]: 

250 logger.debug( 

251 f"Detected VT100 escape sequence: ESC {chr(next_char) if chr(next_char).isprintable() else f'0x{next_char:02x}'}" 

252 ) 

253 return True 

254 # Check for other common sequences 

255 elif next_char in [ord("D"), ord("M"), ord("c")]: # IND, RI, RIS 

256 logger.debug( 

257 f"Detected VT100 control sequence: ESC {chr(next_char)}" 

258 ) 

259 return True 

260 

261 # Check for other VT100 indicators 

262 # Look for escape sequences that are more specific than just printable ASCII 

263 # VT100 data often contains control characters mixed with printable ASCII 

264 printable_ascii_count = sum(1 for b in data if 0x20 <= b <= 0x7E) 

265 control_char_count = sum(1 for b in data if b in [0x0A, 0x0D, 0x09, 0x08, 0x1B]) 

266 total_chars = len(data) 

267 

268 # Only suggest ASCII mode if we have a mix of printable and control characters 

269 # and a reasonable amount of printable text 

270 if total_chars > 0: 

271 printable_ratio = printable_ascii_count / total_chars 

272 control_ratio = control_char_count / total_chars 

273 

274 # Require both printable text and some control characters for VT100 detection 

275 # Also require a minimum amount of printable text 

276 if ( 

277 printable_ratio > 0.5 

278 and control_ratio > 0.05 

279 and printable_ascii_count > 10 

280 ): 

281 logger.debug( 

282 f"Mixed ASCII/control character pattern suggests ASCII mode: " 

283 f"printable={printable_ratio:.2f}, control={control_ratio:.2f}" 

284 ) 

285 return True 

286 

287 # Or if we have high density of printable ASCII with ESC characters 

288 esc_count = data.count(0x1B) 

289 if esc_count > 0 and printable_ratio > 0.6: 

290 logger.debug( 

291 f"ESC characters with high ASCII density ({esc_count} ESC chars, " 

292 f"printable={printable_ratio:.2f}) suggests ASCII mode" 

293 ) 

294 return True 

295 

296 # Or if we have very high density of printable ASCII characters (indicates ASCII terminal) 

297 # This is for systems that deliver their interface as plain ASCII text 

298 if printable_ratio > 0.8 and printable_ascii_count > 50: 

299 logger.debug( 

300 f"Very high ASCII character density ({printable_ratio:.2f}) " 

301 f"suggests ASCII terminal mode" 

302 ) 

303 return True 

304 

305 return False 

306 

307 async def send_scs_data(self, scs_data: bytes) -> None: 

308 """ 

309 Send SCS character data for printer sessions. 

310 

311 Args: 

312 scs_data: SCS character data to send 

313 

314 Raises: 

315 ProtocolError: If not connected or not a printer session 

316 """ 

317 if not self._connected: 

318 raise ProtocolError("Not connected") 

319 

320 if not self.negotiator.is_printer_session: 

321 raise ProtocolError("Not a printer session") 

322 

323 if self.writer is None: 

324 raise ProtocolError("Writer is None; cannot send SCS data.") 

325 

326 # Send SCS data 

327 self.writer.write(scs_data) 

328 await self.writer.drain() 

329 logger.debug(f"Sent {len(scs_data)} bytes of SCS data") 

330 

331 async def send_print_eoj(self) -> None: 

332 """ 

333 Send PRINT-EOJ (End of Job) command for printer sessions. 

334 

335 Raises: 

336 ProtocolError: If not connected or not a printer session 

337 """ 

338 if not self._connected: 

339 raise ProtocolError("Not connected") 

340 

341 if not self.negotiator.is_printer_session: 

342 raise ProtocolError("Not a printer session") 

343 

344 from .data_stream import DataStreamSender 

345 

346 sender = DataStreamSender() 

347 eoj_command = sender.build_scs_ctl_codes(0x01) # PRINT_EOJ 

348 

349 if self.writer is None: 

350 raise ProtocolError("Writer is None; cannot send PRINT-EOJ.") 

351 

352 self.writer.write(eoj_command) 

353 await self.writer.drain() 

354 logger.debug("Sent PRINT-EOJ command") 

355 

356 async def _read_iac(self) -> bytes: 

357 """ 

358 Read IAC (Interpret As Command) sequence. 

359 

360 Returns: 

361 IAC response bytes. 

362 

363 Raises: 

364 ParseError: If IAC parsing fails. 

365 """ 

366 if self.reader is None: 

367 raise ProtocolError("Reader is None; cannot read IAC.") 

368 iac = await self.reader.readexactly(3) 

369 if iac[0] != 0xFF: 

370 raise ParseError("Invalid IAC sequence") 

371 return iac 

372 

373 async def close(self) -> None: 

374 """Close the connection.""" 

375 if self.writer: 

376 self.writer.close() 

377 await self.writer.wait_closed() 

378 self.writer = None 

379 self._connected = False 

380 

381 def is_connected(self) -> bool: 

382 """Check if the handler is connected.""" 

383 if self._connected and self.writer is not None and self.reader is not None: 

384 # Liveness check: try to check if writer is still open 

385 try: 

386 # Check if writer has is_closing method (asyncio.StreamWriter) 

387 if hasattr(self.writer, "is_closing") and self.writer.is_closing(): 

388 return False 

389 # Check if reader has at_eof method (asyncio.StreamReader) 

390 if hasattr(self.reader, "at_eof") and self.reader.at_eof(): 

391 return False 

392 except Exception: 

393 # If liveness check fails, assume not connected 

394 return False 

395 return True 

396 return False 

397 

398 def is_printer_session_active(self) -> bool: 

399 """ 

400 Check if this is a printer session. 

401 

402 Returns: 

403 bool: True if this is a printer session 

404 """ 

405 return self.negotiator.is_printer_session 

406 

407 @property 

408 def negotiated_tn3270e(self) -> bool: 

409 """Get TN3270E negotiation status.""" 

410 return self.negotiator.negotiated_tn3270e 

411 

412 @property 

413 def lu_name(self) -> Optional[str]: 

414 """Get the LU name.""" 

415 return self.negotiator.lu_name 

416 

417 @lu_name.setter 

418 def lu_name(self, value: Optional[str]) -> None: 

419 """Set the LU name.""" 

420 self.negotiator.lu_name = value 

421 

422 @property 

423 def screen_rows(self) -> int: 

424 """Get screen rows.""" 

425 return self.negotiator.screen_rows 

426 

427 @property 

428 def screen_cols(self) -> int: 

429 """Get screen columns.""" 

430 return self.negotiator.screen_cols 

431 

432 @property 

433 def is_printer_session(self) -> bool: 

434 """Get printer session status.""" 

435 return self.negotiator.is_printer_session 

436 

437 @is_printer_session.setter 

438 def is_printer_session(self, value: bool) -> None: 

439 """Set printer session status.""" 

440 self.negotiator.is_printer_session = value