Coverage for pure3270/protocol/negotiator.py: 71%
161 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""
2Negotiator for TN3270 protocol specifics.
3Handles Telnet negotiation and TN3270E subnegotiation.
4"""
6import asyncio
7import logging
8from typing import Optional, TYPE_CHECKING, List
9from .data_stream import DataStreamParser
10from .utils import (
11 send_iac,
12 send_subnegotiation,
13 TN3270E_DEVICE_TYPE,
14 TN3270E_FUNCTIONS,
15 TN3270E_IS,
16 TN3270E_REQUEST,
17 TN3270E_SEND,
18 TN3270E_BIND_IMAGE,
19 TN3270E_DATA_STREAM_CTL,
20 TN3270E_RESPONSES,
21 TN3270E_SCS_CTL_CODES,
22 TN3270E_SYSREQ,
23 TN3270E_IBM_DYNAMIC,
24)
25from .exceptions import NegotiationError, ProtocolError, ParseError
26from ..emulation.screen_buffer import ScreenBuffer
28if TYPE_CHECKING:
29 from .tn3270_handler import TN3270Handler
31logger = logging.getLogger(__name__)
34class Negotiator:
35 """
36 Handles TN3270 negotiation logic.
37 """
39 def __init__(
40 self,
41 writer: Optional[asyncio.StreamWriter],
42 parser: DataStreamParser,
43 screen_buffer: ScreenBuffer,
44 handler: Optional["TN3270Handler"] = None,
45 ):
46 """
47 Initialize the Negotiator.
49 Args:
50 writer: StreamWriter for sending commands.
51 parser: DataStreamParser for parsing responses.
52 screen_buffer: ScreenBuffer to update during negotiation.
53 handler: TN3270Handler instance for accessing reader methods.
54 """
55 logger.debug("Negotiator.__init__ called")
56 self.writer = writer
57 self.parser = parser
58 self.screen_buffer = screen_buffer
59 self.handler = handler
60 self._ascii_mode = False
61 logger.debug(f"Negotiator._ascii_mode initialized to {self._ascii_mode}")
62 self.negotiated_tn3270e = False
63 self._lu_name: Optional[str] = None
64 self.screen_rows = 24
65 self.screen_cols = 80
66 self.is_printer_session = False
67 self.supported_device_types: List[str] = [
68 "IBM-3278-2",
69 "IBM-3278-3",
70 "IBM-3278-4",
71 "IBM-3278-5",
72 "IBM-3279-2",
73 "IBM-3279-3",
74 "IBM-3279-4",
75 "IBM-3279-5",
76 "IBM-DYNAMIC",
77 ]
78 self.requested_device_type: Optional[str] = None
79 self.negotiated_device_type: Optional[str] = None
80 self.supported_functions: int = (
81 TN3270E_BIND_IMAGE
82 | TN3270E_DATA_STREAM_CTL
83 | TN3270E_RESPONSES
84 | TN3270E_SCS_CTL_CODES
85 )
86 self.negotiated_functions: int = 0
88 async def negotiate(self) -> None:
89 """
90 Perform initial Telnet negotiation.
92 Sends DO TERMINAL-TYPE and waits for responses.
94 Raises:
95 NegotiationError: If negotiation fails.
96 """
97 if self.writer is None:
98 raise ProtocolError("Writer is None; cannot negotiate.")
99 send_iac(self.writer, b"\xff\xfd\x27") # DO TERMINAL-TYPE
100 await self.writer.drain()
101 # Handle response (simplified)
102 data = await self._read_iac()
103 if not data:
104 raise NegotiationError("No response to DO TERMINAL-TYPE")
106 async def _negotiate_tn3270(self) -> None:
107 """
108 Negotiate TN3270E subnegotiation.
110 Sends TN3270E request and handles BIND, etc.
112 Raises:
113 NegotiationError: On subnegotiation failure.
114 """
115 if self.writer is None:
116 raise ProtocolError("Writer is None; cannot negotiate TN3270.")
117 # Send TN3270E subnegotiation
118 tn3270e_request = b"\x00\x00\x01\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
119 send_subnegotiation(self.writer, b"\x19", tn3270e_request)
120 await self.writer.drain()
121 # Parse response
122 try:
123 response = await self._receive_data(10.0)
124 if (
125 b"\x28" in response or b"\xff\xfb\x24" in response
126 ): # TN3270E positive response
127 self.negotiated_tn3270e = True
128 self.parser.parse(response)
129 logger.info("TN3270E negotiation successful")
131 # Check if this is a printer session based on LU name or BIND response
132 if self.lu_name and ("LTR" in self.lu_name or "PTR" in self.lu_name):
133 self.is_printer_session = True
134 logger.info(f"Printer session detected for LU: {self.lu_name}")
135 else:
136 self.negotiated_tn3270e = False
137 self.set_ascii_mode()
138 logger.info("TN3270E negotiation failed, fallback to ASCII")
139 except (ParseError, ProtocolError, asyncio.TimeoutError) as e:
140 logger.warning(f"TN3270E negotiation failed with specific error: {e}")
141 self.negotiated_tn3270e = False
142 self.set_ascii_mode()
143 except Exception as e:
144 logger.error(f"Unexpected error during TN3270E negotiation: {e}")
145 self.negotiated_tn3270e = False
146 self.set_ascii_mode()
148 def set_ascii_mode(self) -> None:
149 """
150 Set to ASCII mode fallback, matching s3270 behavior.
152 Disables EBCDIC processing and enables ASCII/VT100 terminal emulation.
153 """
154 logger.debug(
155 f"BEFORE set_ascii_mode: _ascii_mode = {self._ascii_mode} on negotiator object {id(self)}"
156 )
157 self._ascii_mode = True
158 logger.debug(
159 f"AFTER set_ascii_mode: _ascii_mode = {self._ascii_mode} on negotiator object {id(self)}"
160 )
161 logger.info("Switched to ASCII/VT100 mode (s3270 compatibility)")
163 async def _receive_data(self, timeout: float = 5.0) -> bytes:
164 """
165 Receive data with timeout (internal).
167 Args:
168 timeout: Receive timeout in seconds.
170 Returns:
171 Received bytes.
173 Raises:
174 asyncio.TimeoutError: If timeout exceeded.
175 """
176 if self.handler:
177 return await self.handler.receive_data(timeout)
178 raise NotImplementedError("Handler required for receiving data")
180 async def _read_iac(self) -> bytes:
181 """
182 Read IAC sequence (internal).
184 Returns:
185 IAC response bytes.
187 Raises:
188 ParseError: If IAC parsing fails.
189 """
190 if self.handler:
191 return await self.handler._read_iac()
192 raise NotImplementedError("Handler required for reading IAC")
194 def is_printer_session_active(self) -> bool:
195 """
196 Check if this is a printer session.
198 Returns:
199 bool: True if printer session.
200 """
201 return self.is_printer_session
203 @property
204 def lu_name(self) -> Optional[str]:
205 """Get the LU name."""
206 return self._lu_name
208 @lu_name.setter
209 def lu_name(self, value: Optional[str]) -> None:
210 """Set the LU name."""
211 self._lu_name = value
213 def _parse_tn3270e_subnegotiation(self, data: bytes) -> None:
214 """
215 Parse TN3270E subnegotiation message.
217 Args:
218 data: TN3270E subnegotiation data (without IAC SB and IAC SE)
219 """
220 if len(data) < 3:
221 logger.warning(f"Invalid TN3270E subnegotiation data: {data.hex()}")
222 return
224 # Parse subnegotiation header
225 # data[0] = TN3270E option (should be 0x28)
226 # data[1] = message type
227 # data[2] = message sub-type
229 if data[0] != 0x28: # TN3270E option
230 logger.warning(f"Invalid TN3270E option in subnegotiation: 0x{data[0]:02x}")
231 return
233 message_type = data[1] if len(data) > 1 else None
234 message_subtype = data[2] if len(data) > 2 else None
236 if message_type == TN3270E_DEVICE_TYPE:
237 self._handle_device_type_subnegotiation(data[1:])
238 elif message_type == TN3270E_FUNCTIONS:
239 self._handle_functions_subnegotiation(data[1:])
240 else:
241 logger.debug(f"Unhandled TN3270E subnegotiation type: 0x{message_type:02x}")
243 def _handle_device_type_subnegotiation(self, data: bytes) -> None:
244 """
245 Handle DEVICE-TYPE subnegotiation message.
247 Args:
248 data: DEVICE-TYPE subnegotiation data (message type already stripped)
249 """
250 if len(data) < 2:
251 logger.warning("Invalid DEVICE-TYPE subnegotiation data")
252 return
254 sub_type = data[0]
256 if sub_type == TN3270E_IS:
257 # DEVICE-TYPE IS - server is telling us what device type to use
258 if len(data) > 1:
259 # Extract device type string (null-terminated or until end)
260 device_type_bytes = data[1:]
261 # Find null terminator if present
262 null_pos = device_type_bytes.find(0x00)
263 if null_pos != -1:
264 device_type_bytes = device_type_bytes[:null_pos]
266 device_type = device_type_bytes.decode("ascii", errors="ignore").strip()
267 logger.info(f"Server requested device type: {device_type}")
269 # Handle IBM-DYNAMIC specially
270 if device_type == TN3270E_IBM_DYNAMIC:
271 logger.info("IBM-DYNAMIC device type negotiated")
272 self.negotiated_device_type = TN3270E_IBM_DYNAMIC
273 # For IBM-DYNAMIC, we may need to negotiate screen size dynamically
274 else:
275 self.negotiated_device_type = device_type
277 elif sub_type == TN3270E_REQUEST:
278 # DEVICE-TYPE REQUEST - server is asking what device types we support
279 logger.info("Server requested supported device types")
280 self._send_supported_device_types()
281 else:
282 logger.warning(
283 f"Unhandled DEVICE-TYPE subnegotiation subtype: 0x{sub_type:02x}"
284 )
286 def _handle_functions_subnegotiation(self, data: bytes) -> None:
287 """
288 Handle FUNCTIONS subnegotiation message.
290 Args:
291 data: FUNCTIONS subnegotiation data (message type already stripped)
292 """
293 if len(data) < 2:
294 logger.warning("Invalid FUNCTIONS subnegotiation data")
295 return
297 sub_type = data[0]
299 if sub_type == TN3270E_IS:
300 # FUNCTIONS IS - server is telling us what functions are enabled
301 if len(data) > 1:
302 # Parse function bits
303 function_bits = 0
304 for i in range(1, len(data)):
305 function_bits |= data[i]
307 logger.info(f"Server enabled functions: 0x{function_bits:02x}")
308 self.negotiated_functions = function_bits
310 # Log specific functions
311 if function_bits & TN3270E_BIND_IMAGE:
312 logger.debug("BIND-IMAGE function enabled")
313 if function_bits & TN3270E_DATA_STREAM_CTL:
314 logger.debug("DATA-STREAM-CTL function enabled")
315 if function_bits & TN3270E_RESPONSES:
316 logger.debug("RESPONSES function enabled")
317 if function_bits & TN3270E_SCS_CTL_CODES:
318 logger.debug("SCS-CTL-CODES function enabled")
319 if function_bits & TN3270E_SYSREQ:
320 logger.debug("SYSREQ function enabled")
322 elif sub_type == TN3270E_REQUEST:
323 # FUNCTIONS REQUEST - server is asking what functions we support
324 logger.info("Server requested supported functions")
325 self._send_supported_functions()
326 else:
327 logger.warning(
328 f"Unhandled FUNCTIONS subnegotiation subtype: 0x{sub_type:02x}"
329 )
331 def _send_supported_device_types(self) -> None:
332 """Send our supported device types to the server."""
333 if self.writer is None:
334 logger.error("Cannot send device types: writer is None")
335 return
337 # Send DEVICE-TYPE SEND response with our supported types
338 # For simplicity, we'll just send our first supported type
339 if self.supported_device_types:
340 device_type = self.supported_device_types[0].encode("ascii") + b"\x00"
341 sub_data = bytes([TN3270E_DEVICE_TYPE, TN3270E_SEND]) + device_type
342 send_subnegotiation(self.writer, bytes([0x28]), sub_data)
343 logger.debug(
344 f"Sent supported device type: {self.supported_device_types[0]}"
345 )
347 def _send_supported_functions(self) -> None:
348 """Send our supported functions to the server."""
349 if self.writer is None:
350 logger.error("Cannot send functions: writer is None")
351 return
353 # Send FUNCTIONS SEND response with our supported functions
354 function_bytes = [
355 (self.supported_functions >> i) & 0xFF
356 for i in range(0, 8, 8)
357 if (self.supported_functions >> i) & 0xFF
358 ]
360 if function_bytes:
361 sub_data = bytes([TN3270E_FUNCTIONS, TN3270E_SEND] + function_bytes)
362 send_subnegotiation(self.writer, bytes([0x28]), sub_data)
363 logger.debug(f"Sent supported functions: 0x{self.supported_functions:02x}")