Coverage for pure3270/protocol/tn3270_handler.py: 69%
201 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""
2TN3270 protocol handler for pure3270.
3Handles negotiation, data sending/receiving, and protocol specifics.
4"""
6import asyncio
7import ssl
8import logging
9from typing import Optional
10from .data_stream import DataStreamParser
11from ..emulation.screen_buffer import ScreenBuffer
12from .utils import send_iac, send_subnegotiation
13from .exceptions import NegotiationError, ProtocolError, ParseError
14from .negotiator import Negotiator
16logger = logging.getLogger(__name__)
19class TN3270Handler:
20 """
21 Handler for TN3270 protocol over Telnet.
23 Manages stream I/O, negotiation, and data parsing for 3270 emulation.
24 """
26 async def connect(
27 self,
28 host: Optional[str] = None,
29 port: Optional[int] = None,
30 ssl_context: Optional[ssl.SSLContext] = None,
31 ) -> None:
32 """Connect the handler."""
33 # If already have reader/writer (from fixture), validate and mark as connected
34 if self.reader is not None and self.writer is not None:
35 # Add stream validation
36 if not hasattr(self.reader, "read") or not hasattr(self.writer, "write"):
37 raise ValueError("Invalid reader or writer objects")
38 self._connected = True
39 return
41 try:
42 # Use provided params or fallback to instance values
43 connect_host = host or self.host
44 connect_port = port or self.port
45 connect_ssl = ssl_context or self.ssl_context
47 reader, writer = await asyncio.open_connection(
48 connect_host, connect_port, ssl=connect_ssl
49 )
50 # Validate streams
51 if reader is None or writer is None:
52 raise ConnectionError("Failed to obtain valid reader/writer")
53 if not hasattr(reader, "read") or not hasattr(writer, "write"):
54 raise ConnectionError("Invalid stream objects returned")
56 self.reader = reader
57 self.writer = writer
58 self._connected = True
60 # Update negotiator with writer
61 self.negotiator.writer = self.writer
63 # Perform negotiation
64 await self.negotiator.negotiate()
65 await self.negotiator._negotiate_tn3270()
66 except Exception as e:
67 logger.error(f"Connection failed: {e}")
68 raise ConnectionError(f"Failed to connect: {e}")
70 def __init__(
71 self,
72 reader: Optional[asyncio.StreamReader],
73 writer: Optional[asyncio.StreamWriter],
74 ssl_context: Optional[ssl.SSLContext] = None,
75 host: str = "localhost",
76 port: int = 23,
77 ):
78 """
79 Initialize the TN3270 handler.
81 Args:
82 reader: Asyncio stream reader (can be None for testing).
83 writer: Asyncio stream writer (can be None for testing).
84 ssl_context: Optional SSL context for secure connections.
85 host: Target host for connection.
86 port: Target port for connection.
88 Raises:
89 ValueError: If reader or writer is None (when not in test mode).
90 """
91 self.reader = reader
92 self.writer = writer
93 self.ssl_context = ssl_context
94 self.host = host
95 self.port = port
96 self.screen_buffer = ScreenBuffer()
97 self.parser = DataStreamParser(self.screen_buffer)
98 self._connected = False
99 self.negotiator = Negotiator(self.writer, self.parser, self.screen_buffer, self)
101 async def negotiate(self) -> None:
102 """
103 Perform initial Telnet negotiation.
105 Delegates to negotiator.
106 """
107 await self.negotiator.negotiate()
109 async def _negotiate_tn3270(self) -> None:
110 """
111 Negotiate TN3270E subnegotiation.
113 Delegates to negotiator.
114 """
115 await self.negotiator._negotiate_tn3270()
117 def set_ascii_mode(self) -> None:
118 """
119 Set the handler to ASCII mode fallback.
121 Disables EBCDIC processing.
122 """
123 self.negotiator.set_ascii_mode()
125 async def send_data(self, data: bytes) -> None:
126 """
127 Send data over the connection.
129 Args:
130 data: Bytes to send.
132 Raises:
133 ProtocolError: If writer is None or send fails.
134 """
135 if self.writer is None:
136 logger.error("Not connected")
137 raise ProtocolError("Writer is None; cannot send data.")
138 self.writer.write(data)
139 await self.writer.drain()
141 async def receive_data(self, timeout: float = 5.0) -> bytes:
142 """
143 Receive data with timeout.
145 Args:
146 timeout: Receive timeout in seconds.
148 Returns:
149 Received bytes.
151 Raises:
152 asyncio.TimeoutError: If timeout exceeded.
153 ProtocolError: If reader is None.
154 """
155 logger.debug(f"receive_data called on handler {id(self)}")
156 logger.debug(f"Handler negotiator object ID: {id(self.negotiator)}")
157 if self.reader is None:
158 logger.error("Not connected")
159 raise ProtocolError("Reader is None; cannot receive data.")
160 try:
161 logger.debug(f"Attempting to read data with timeout {timeout}")
162 data = await asyncio.wait_for(self.reader.read(4096), timeout=timeout)
163 logger.debug(f"Received {len(data)} bytes of data")
164 except asyncio.TimeoutError:
165 logger.debug("Timeout while reading data")
166 raise
167 except Exception as e:
168 logger.error(f"Error reading data: {e}")
169 raise
171 # Check if we're in ASCII mode
172 ascii_mode = self.negotiator._ascii_mode
173 logger.debug(
174 f"Checking ASCII mode: negotiator._ascii_mode = {ascii_mode} on negotiator object {id(self.negotiator)}"
175 )
177 # Auto-detect ASCII mode based on data content for s3270 compatibility
178 # s3270 automatically switches to ASCII mode when it detects VT100 sequences
179 if not ascii_mode and len(data) > 0:
180 # Check for common VT100 escape sequences that indicate ASCII mode
181 if self._detect_vt100_sequences(data):
182 logger.info(
183 "Detected VT100 sequences, enabling ASCII mode (s3270 compatibility)"
184 )
185 ascii_mode = True
186 # In s3270 compatibility mode, also set the negotiator ASCII mode
187 self.negotiator._ascii_mode = True
189 if ascii_mode:
190 logger.debug("In ASCII mode, parsing VT100 data")
191 # In ASCII mode, parse VT100 escape sequences and update screen buffer
192 try:
193 logger.debug(f"Parsing VT100 data ({len(data)} bytes)")
194 from .vt100_parser import VT100Parser
196 vt100_parser = VT100Parser(self.screen_buffer)
197 vt100_parser.parse(data)
198 logger.debug("VT100 parsing completed successfully")
199 except Exception as e:
200 logger.warning(f"Error parsing VT100 data: {e}")
201 import traceback
203 logger.warning(f"Traceback: {traceback.format_exc()}")
204 return data
205 # Parse and update screen buffer (simplified)
206 logger.debug("In TN3270 mode, parsing 3270 data")
207 try:
208 self.parser.parse(data)
209 except ParseError as e:
210 logger.warning(f"Failed to parse received data: {e}")
211 # Continue with raw data if parsing fails
212 # Strip EOR if present
213 if b"\xff\x19" in data:
214 data = data.split(b"\xff\x19")[0]
215 return data
217 def _detect_vt100_sequences(self, data: bytes) -> bool:
218 """
219 Detect VT100 escape sequences in data for s3270 compatibility.
221 Args:
222 data: Raw data to check for VT100 sequences
224 Returns:
225 True if VT100 sequences detected, False otherwise
226 """
227 if len(data) < 2:
228 return False
230 # Check for ESC character followed by common VT100 sequences
231 for i in range(len(data) - 1):
232 if data[i] == 0x1B: # ESC character
233 # Common VT100 sequences:
234 # ESC [ - CSI (Control Sequence Introducer)
235 # ESC ( - Character set designation
236 # ESC ) - Character set designation
237 # ESC # - DEC commands
238 # ESC 7 - Save cursor
239 # ESC 8 - Restore cursor
240 if i + 1 < len(data):
241 next_char = data[i + 1]
242 if next_char in [
243 ord("["),
244 ord("("),
245 ord(")"),
246 ord("#"),
247 ord("7"),
248 ord("8"),
249 ]:
250 logger.debug(
251 f"Detected VT100 escape sequence: ESC {chr(next_char) if chr(next_char).isprintable() else f'0x{next_char:02x}'}"
252 )
253 return True
254 # Check for other common sequences
255 elif next_char in [ord("D"), ord("M"), ord("c")]: # IND, RI, RIS
256 logger.debug(
257 f"Detected VT100 control sequence: ESC {chr(next_char)}"
258 )
259 return True
261 # Check for other VT100 indicators
262 # Look for escape sequences that are more specific than just printable ASCII
263 # VT100 data often contains control characters mixed with printable ASCII
264 printable_ascii_count = sum(1 for b in data if 0x20 <= b <= 0x7E)
265 control_char_count = sum(1 for b in data if b in [0x0A, 0x0D, 0x09, 0x08, 0x1B])
266 total_chars = len(data)
268 # Only suggest ASCII mode if we have a mix of printable and control characters
269 # and a reasonable amount of printable text
270 if total_chars > 0:
271 printable_ratio = printable_ascii_count / total_chars
272 control_ratio = control_char_count / total_chars
274 # Require both printable text and some control characters for VT100 detection
275 # Also require a minimum amount of printable text
276 if (
277 printable_ratio > 0.5
278 and control_ratio > 0.05
279 and printable_ascii_count > 10
280 ):
281 logger.debug(
282 f"Mixed ASCII/control character pattern suggests ASCII mode: "
283 f"printable={printable_ratio:.2f}, control={control_ratio:.2f}"
284 )
285 return True
287 # Or if we have high density of printable ASCII with ESC characters
288 esc_count = data.count(0x1B)
289 if esc_count > 0 and printable_ratio > 0.6:
290 logger.debug(
291 f"ESC characters with high ASCII density ({esc_count} ESC chars, "
292 f"printable={printable_ratio:.2f}) suggests ASCII mode"
293 )
294 return True
296 # Or if we have very high density of printable ASCII characters (indicates ASCII terminal)
297 # This is for systems that deliver their interface as plain ASCII text
298 if printable_ratio > 0.8 and printable_ascii_count > 50:
299 logger.debug(
300 f"Very high ASCII character density ({printable_ratio:.2f}) "
301 f"suggests ASCII terminal mode"
302 )
303 return True
305 return False
307 async def send_scs_data(self, scs_data: bytes) -> None:
308 """
309 Send SCS character data for printer sessions.
311 Args:
312 scs_data: SCS character data to send
314 Raises:
315 ProtocolError: If not connected or not a printer session
316 """
317 if not self._connected:
318 raise ProtocolError("Not connected")
320 if not self.negotiator.is_printer_session:
321 raise ProtocolError("Not a printer session")
323 if self.writer is None:
324 raise ProtocolError("Writer is None; cannot send SCS data.")
326 # Send SCS data
327 self.writer.write(scs_data)
328 await self.writer.drain()
329 logger.debug(f"Sent {len(scs_data)} bytes of SCS data")
331 async def send_print_eoj(self) -> None:
332 """
333 Send PRINT-EOJ (End of Job) command for printer sessions.
335 Raises:
336 ProtocolError: If not connected or not a printer session
337 """
338 if not self._connected:
339 raise ProtocolError("Not connected")
341 if not self.negotiator.is_printer_session:
342 raise ProtocolError("Not a printer session")
344 from .data_stream import DataStreamSender
346 sender = DataStreamSender()
347 eoj_command = sender.build_scs_ctl_codes(0x01) # PRINT_EOJ
349 if self.writer is None:
350 raise ProtocolError("Writer is None; cannot send PRINT-EOJ.")
352 self.writer.write(eoj_command)
353 await self.writer.drain()
354 logger.debug("Sent PRINT-EOJ command")
356 async def _read_iac(self) -> bytes:
357 """
358 Read IAC (Interpret As Command) sequence.
360 Returns:
361 IAC response bytes.
363 Raises:
364 ParseError: If IAC parsing fails.
365 """
366 if self.reader is None:
367 raise ProtocolError("Reader is None; cannot read IAC.")
368 iac = await self.reader.readexactly(3)
369 if iac[0] != 0xFF:
370 raise ParseError("Invalid IAC sequence")
371 return iac
373 async def close(self) -> None:
374 """Close the connection."""
375 if self.writer:
376 self.writer.close()
377 await self.writer.wait_closed()
378 self.writer = None
379 self._connected = False
381 def is_connected(self) -> bool:
382 """Check if the handler is connected."""
383 if self._connected and self.writer is not None and self.reader is not None:
384 # Liveness check: try to check if writer is still open
385 try:
386 # Check if writer has is_closing method (asyncio.StreamWriter)
387 if hasattr(self.writer, "is_closing") and self.writer.is_closing():
388 return False
389 # Check if reader has at_eof method (asyncio.StreamReader)
390 if hasattr(self.reader, "at_eof") and self.reader.at_eof():
391 return False
392 except Exception:
393 # If liveness check fails, assume not connected
394 return False
395 return True
396 return False
398 def is_printer_session_active(self) -> bool:
399 """
400 Check if this is a printer session.
402 Returns:
403 bool: True if this is a printer session
404 """
405 return self.negotiator.is_printer_session
407 @property
408 def negotiated_tn3270e(self) -> bool:
409 """Get TN3270E negotiation status."""
410 return self.negotiator.negotiated_tn3270e
412 @property
413 def lu_name(self) -> Optional[str]:
414 """Get the LU name."""
415 return self.negotiator.lu_name
417 @lu_name.setter
418 def lu_name(self, value: Optional[str]) -> None:
419 """Set the LU name."""
420 self.negotiator.lu_name = value
422 @property
423 def screen_rows(self) -> int:
424 """Get screen rows."""
425 return self.negotiator.screen_rows
427 @property
428 def screen_cols(self) -> int:
429 """Get screen columns."""
430 return self.negotiator.screen_cols
432 @property
433 def is_printer_session(self) -> bool:
434 """Get printer session status."""
435 return self.negotiator.is_printer_session
437 @is_printer_session.setter
438 def is_printer_session(self, value: bool) -> None:
439 """Set printer session status."""
440 self.negotiator.is_printer_session = value