Coverage for pure3270/protocol/data_stream.py: 87%
264 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""Data stream parser and sender for 3270 protocol."""
3from typing import List, Tuple, Optional
4import logging
5from typing import Optional
6from ..emulation.screen_buffer import ScreenBuffer
8logger = logging.getLogger(__name__)
11class ParseError(Exception):
12 """Error during data stream parsing."""
14 pass
17# 3270 Data Stream Orders
18WCC = 0xF5
19AID = 0xF6
20READ_PARTITION = 0xF1
21SBA = 0x10
22SF = 0x1D
23RA = 0xF3
24GE = 0x29
25BIND = 0x28
26WRITE = 0x05
27EOA = 0x0D
28SCS_CTL_CODES = 0x04
29DATA_STREAM_CTL = 0x40
30STRUCTURED_FIELD = 0x3C # '<'
32# SCS Control Codes
33PRINT_EOJ = 0x01
35# TN3270E Subnegotiation Message Types
36TN3270E_DEVICE_TYPE = 0x00
37TN3270E_FUNCTIONS = 0x01
38TN3270E_IS = 0x02
39TN3270E_REQUEST = 0x03
40TN3270E_SEND = 0x04
42# TN3270E Device Types
43TN3270E_IBM_DYNAMIC = "IBM-DYNAMIC"
44TN3270E_IBM_3278_2 = "IBM-3278-2"
45TN3270E_IBM_3278_3 = "IBM-3278-3"
46TN3270E_IBM_3278_4 = "IBM-3278-4"
47TN3270E_IBM_3278_5 = "IBM-3278-5"
48TN3270E_IBM_3279_2 = "IBM-3279-2"
49TN3270E_IBM_3279_3 = "IBM-3279-3"
50TN3270E_IBM_3279_4 = "IBM-3279-4"
51TN3270E_IBM_3279_5 = "IBM-3279-5"
53# TN3270E Functions
54TN3270E_BIND_IMAGE = 0x01
55TN3270E_DATA_STREAM_CTL = 0x02
56TN3270E_RESPONSES = 0x04
57TN3270E_SCS_CTL_CODES = 0x08
58TN3270E_SYSREQ = 0x10
60# TN3270E Query Reply Types
61QUERY_REPLY_SF = 0x88
62QUERY_REPLY_DEVICE_TYPE = 0x01
63QUERY_REPLY_CHARACTERISTICS = 0x02
64QUERY_REPLY_HIGHLIGHTING = 0x03
65QUERY_REPLY_COLOR = 0x04
66QUERY_REPLY_EXTENDED_ATTRIBUTES = 0x05
67QUERY_REPLY_GRAPHICS = 0x06
68QUERY_REPLY_DBCS_ASIA = 0x07
69QUERY_REPLY_DBCS_EUROPE = 0x08
70QUERY_REPLY_DBCS_MIDDLE_EAST = 0x09
71QUERY_REPLY_LINE_TYPE = 0x0A
72QUERY_REPLY_OEM_AUXILIARY_DEVICE = 0x0B
73QUERY_REPLY_TRANSPARENCY = 0x0C
74QUERY_REPLY_FORMAT_STORAGE = 0x0D
75QUERY_REPLY_DDM = 0x0E
76QUERY_REPLY_RPQ_NAMES = 0x0F
77QUERY_REPLY_SEGMENT = 0x10
78QUERY_REPLY_PROCEDURE = 0x11
79QUERY_REPLY_GRID = 0x12
82class DataStreamParser:
83 """Parses incoming 3270 data streams and updates the screen buffer."""
85 def __init__(self, screen_buffer: ScreenBuffer):
86 """
87 Initialize the DataStreamParser.
89 :param screen_buffer: ScreenBuffer to update.
90 """
91 self.screen = screen_buffer
92 self._data = b""
93 self._pos = 0
94 self.wcc = None # Write Control Character
95 self.aid = None # Attention ID
97 def get_aid(self) -> Optional[int]:
98 """Get the current AID value."""
99 return self.aid
101 def parse(self, data: bytes) -> None:
102 """
103 Parse 3270 data stream.
105 :param data: Incoming 3270 data stream bytes.
106 :raises ParseError: If parsing fails.
107 """
108 self._data = data
109 self._pos = 0
110 logger.debug(f"Parsing {len(data)} bytes of data stream")
112 try:
113 while self._pos < len(self._data):
114 order = self._data[self._pos]
115 self._pos += 1
117 if order == WCC: # WCC (Write Control Character)
118 if self._pos < len(self._data):
119 self.wcc = self._data[self._pos]
120 self._pos += 1
121 self._handle_wcc(self.wcc)
122 else:
123 logger.error("Unexpected end of data stream")
124 raise ParseError("Unexpected end of data stream")
125 elif order == AID: # AID (Attention ID)
126 if self._pos < len(self._data):
127 self.aid = self._data[self._pos]
128 self._pos += 1
129 logger.debug(f"AID received: 0x{self.aid:02x}")
130 else:
131 logger.error("Unexpected end of data stream")
132 raise ParseError("Unexpected end of data stream")
133 elif order == READ_PARTITION: # Read Partition
134 pass # Handle if needed
135 elif order == SBA: # SBA (Set Buffer Address)
136 self._handle_sba()
137 elif order == SF: # SF (Start Field)
138 self._handle_sf()
139 elif order == RA: # RA (Repeat to Address)
140 self._handle_ra()
141 elif order == GE: # GE (Graphic Escape)
142 self._handle_ge()
143 elif order == BIND: # BIND
144 logger.debug("BIND received, configuring terminal type")
145 self._pos = len(self._data)
146 elif order == WRITE: # W (Write)
147 self._handle_write()
148 elif order == EOA: # EOA (End of Addressable)
149 break
150 elif order == SCS_CTL_CODES: # SCS Control Codes
151 self._handle_scs_ctl_codes()
152 elif order == DATA_STREAM_CTL: # Data Stream Control
153 self._handle_data_stream_ctl()
154 else:
155 self._handle_data(order)
157 except IndexError:
158 raise ParseError("Unexpected end of data stream")
160 def _handle_wcc(self, wcc: int):
161 """Handle Write Control Character."""
162 # Simplified: set buffer state based on WCC bits
163 # e.g., bit 0: reset modified flags
164 if wcc & 0x01:
165 self.screen.clear()
166 logger.debug(f"WCC: 0x{wcc:02x}")
168 def _handle_sba(self):
169 """Handle Set Buffer Address."""
170 if self._pos + 1 < len(self._data):
171 addr_high = self._data[self._pos]
172 addr_low = self._data[self._pos + 1]
173 self._pos += 2
174 address = (addr_high << 8) | addr_low
175 row = address // self.screen.cols
176 col = address % self.screen.cols
177 self.screen.set_position(row, col)
178 logger.debug(f"SBA to row {row}, col {col}")
179 else:
180 logger.error("Unexpected end of data stream")
181 raise ParseError("Unexpected end of data stream")
183 def _handle_sf(self):
184 """Handle Start Field."""
185 if self._pos < len(self._data):
186 attr = self._data[self._pos]
187 self._pos += 1
189 # Parse extended field attributes according to IBM 3270 specification
190 protected = bool(attr & 0x40) # Bit 6: protected
191 numeric = bool(attr & 0x20) # Bit 5: numeric
192 intensity = (attr >> 3) & 0x03 # Bits 4-3: intensity
193 modified = bool(attr & 0x04) # Bit 2: modified data tag
194 validation = attr & 0x03 # Bits 1-0: validation
196 # Update field attributes at current position
197 row, col = self.screen.get_position()
198 self.screen.write_char(
199 0x40, row, col, protected=protected
200 ) # Space with attr
202 # Store extended attributes in the screen buffer's attribute storage
203 if 0 <= row < self.screen.rows and 0 <= col < self.screen.cols:
204 pos = row * self.screen.cols + col
205 attr_offset = pos * 3
206 # Byte 0: Protection and basic attributes
207 self.screen.attributes[attr_offset] = attr
208 # For now, we'll store intensity in byte 1 and validation in byte 2
209 # A more complete implementation would map these properly
210 self.screen.attributes[attr_offset + 1] = intensity
211 self.screen.attributes[attr_offset + 2] = validation
213 logger.debug(
214 f"SF: protected={protected}, numeric={numeric}, intensity={intensity}, modified={modified}, validation={validation}"
215 )
216 else:
217 logger.error("Unexpected end of data stream")
218 raise ParseError("Unexpected end of data stream")
220 def _handle_ra(self):
221 """Handle Repeat to Address (basic)."""
222 # Simplified: repeat char to address
223 if self._pos + 3 < len(self._data):
224 repeat_char = self._data[self._pos]
225 addr_high = self._data[self._pos + 1]
226 addr_low = self._data[self._pos + 2]
227 self._pos += 3
228 count = (addr_high << 8) | addr_low
229 # Implement repeat logic...
230 logger.debug(f"RA: repeat 0x{repeat_char:02x} {count} times")
232 def _handle_scs_ctl_codes(self):
233 """Handle SCS Control Codes for printer sessions."""
234 if self._pos < len(self._data):
235 scs_code = self._data[self._pos]
236 self._pos += 1
238 if scs_code == PRINT_EOJ:
239 logger.debug("SCS PRINT-EOJ received")
240 # Handle End of Job processing
241 # In a real implementation, this would trigger printer job completion
242 else:
243 logger.debug(f"Unknown SCS control code: 0x{scs_code:02x}")
244 else:
245 logger.error("Unexpected end of data stream in SCS control codes")
246 raise ParseError("Unexpected end of data stream in SCS control codes")
248 def _handle_data_stream_ctl(self):
249 """Handle Data Stream Control for printer data streams."""
250 if self._pos < len(self._data):
251 ctl_code = self._data[self._pos]
252 self._pos += 1
253 logger.debug(f"Data Stream Control code: 0x{ctl_code:02x}")
254 # Implementation would handle specific data stream control functions
255 else:
256 logger.error("Unexpected end of data stream in data stream control")
257 raise ParseError("Unexpected end of data stream in data stream control")
259 def _handle_ge(self):
260 """Handle Graphic Escape (stub)."""
261 logger.debug("GE encountered (graphics not supported)")
263 def _handle_write(self):
264 """Handle Write order: clear and write data."""
265 self.screen.clear()
266 # Subsequent data is written to buffer
267 logger.debug("Write order: clearing and writing")
269 def _handle_scs_data(self, data: bytes):
270 """
271 Handle SCS character stream data for printer sessions.
273 :param data: SCS character data
274 """
275 # In a full implementation, this would process SCS character data
276 # for printer output rather than screen display
277 logger.debug(f"SCS data received: {len(data)} bytes")
279 def _handle_data(self, byte: int):
280 """Handle data byte."""
281 row, col = self.screen.get_position()
282 self.screen.write_char(byte, row, col)
283 col += 1
284 if col >= self.screen.cols:
285 col = 0
286 row += 1
287 self.screen.set_position(row, col)
289 def _handle_structured_field(self):
290 """Handle Structured Field command."""
291 logger.debug("Structured Field command received")
292 # Skip structured field for now (advanced feature)
293 # In a full implementation, this would parse the structured field
294 # and handle queries, replies, etc.
295 self._skip_structured_field()
297 def _skip_structured_field(self):
298 """Skip structured field data."""
299 # Find end of structured field (next command or end of data)
300 while self._pos < len(self._data):
301 # Look for next 3270 command
302 if self._data[self._pos] in [
303 WCC,
304 AID,
305 READ_PARTITION,
306 SBA,
307 SF,
308 RA,
309 GE,
310 BIND,
311 WRITE,
312 EOA,
313 SCS_CTL_CODES,
314 DATA_STREAM_CTL,
315 STRUCTURED_FIELD,
316 ]:
317 break
318 self._pos += 1
319 logger.debug("Skipped structured field")
321 def _handle_read_partition_query(self):
322 """Handle Read Partition Query command."""
323 logger.debug("Read Partition Query command received")
324 # In a full implementation, this would trigger sending Query Reply SFs
325 # to inform the host about our capabilities
327 def build_query_reply_sf(self, query_type: int, data: bytes = b"") -> bytes:
328 """
329 Build Query Reply Structured Field.
331 :param query_type: Query reply type
332 :param data: Query reply data
333 :return: Query Reply Structured Field bytes
334 """
335 sf = bytearray()
336 sf.append(STRUCTURED_FIELD) # SF identifier
337 # Add length (will be filled in later)
338 length_pos = len(sf)
339 sf.extend([0x00, 0x00]) # Placeholder for length
340 sf.append(QUERY_REPLY_SF) # Query Reply SF type
341 sf.append(query_type) # Query reply type
342 sf.extend(data) # Query reply data
344 # Fill in length
345 length = len(sf) - 1 # Exclude the SF identifier
346 sf[length_pos] = (length >> 8) & 0xFF
347 sf[length_pos + 1] = length & 0xFF
349 return bytes(sf)
351 def build_device_type_query_reply(self) -> bytes:
352 """
353 Build Device Type Query Reply Structured Field.
355 :return: Device Type Query Reply SF bytes
356 """
357 # For simplicity, we'll report our device type
358 device_type = b"IBM-3278-4-E" # Example device type
359 return self.build_query_reply_sf(QUERY_REPLY_DEVICE_TYPE, device_type)
361 def build_characteristics_query_reply(self) -> bytes:
362 """
363 Build Characteristics Query Reply Structured Field.
365 :return: Characteristics Query Reply SF bytes
366 """
367 # Report basic characteristics
368 characteristics = bytearray()
369 characteristics.append(0x01) # Flags byte 1
370 characteristics.append(0x00) # Flags byte 2
371 characteristics.append(0x00) # Flags byte 3
373 return self.build_query_reply_sf(QUERY_REPLY_CHARACTERISTICS, characteristics)
376class DataStreamSender:
377 """Constructs outgoing 3270 data streams."""
379 def __init__(self):
380 """Initialize the DataStreamSender."""
381 self.screen = ScreenBuffer()
383 def build_read_modified_all(self) -> bytes:
384 """Build Read Modified All (RMA) command."""
385 # AID for Enter + Read Modified All
386 stream = bytearray(
387 [0x7D, 0xF1]
388 ) # AID Enter, Read Partition (simplified for RMA)
389 return bytes(stream)
391 def build_read_modified_fields(self) -> bytes:
392 """Build Read Modified Fields (RMF) command."""
393 stream = bytearray([0x7D, 0xF6, 0xF0]) # AID Enter, Read Modified, all fields
394 return bytes(stream)
396 def build_scs_ctl_codes(self, scs_code: int) -> bytes:
397 """
398 Build SCS Control Codes for printer sessions.
400 :param scs_code: SCS control code to send
401 """
402 return bytes([SCS_CTL_CODES, scs_code])
404 def build_data_stream_ctl(self, ctl_code: int) -> bytes:
405 """
406 Build Data Stream Control command.
408 :param ctl_code: Data stream control code
409 """
410 return bytes([DATA_STREAM_CTL, ctl_code])
412 def build_write(self, data: bytes, wcc: int = 0xC1) -> bytes:
413 """
414 Build Write command with data.
416 :param data: Data to write.
417 :param wcc: Write Control Character.
418 """
419 stream = bytearray([0xF5, wcc, 0x05]) # WCC, Write
420 stream.extend(data)
421 stream.append(0x0D) # EOA
422 return bytes(stream)
424 def build_sba(self, row: int, col: int) -> bytes:
425 """
426 Build Set Buffer Address.
428 :param row: Row.
429 :param col: Column.
430 """
431 address = (row * self.screen.cols) + col
432 high = (address >> 8) & 0xFF
433 low = address & 0xFF
434 return bytes([0x10, high, low]) # SBA
436 def build_sf(self, protected: bool = True, numeric: bool = False) -> bytes:
437 """
438 Build Start Field.
440 :param protected: Protected attribute.
441 :param numeric: Numeric attribute.
442 """
443 attr = 0x00
444 if protected:
445 attr |= 0x40
446 if numeric:
447 attr |= 0x20
448 return bytes([0x1D, attr]) # SF
450 def build_key_press(self, aid: int) -> bytes:
451 """
452 Build data stream for key press (AID).
454 :param aid: Attention ID (e.g., 0x7D for Enter).
455 """
456 stream = bytearray([aid])
457 return bytes(stream)
459 def build_input_stream(
460 self,
461 modified_fields: List[Tuple[Tuple[int, int], bytes]],
462 aid: int,
463 cols: int = 80,
464 ) -> bytes:
465 """
466 Build 3270 input data stream for modified fields and AID.
468 :param modified_fields: List of ((row, col), content_bytes) for each modified field.
469 :param aid: Attention ID byte for the key press.
470 :param cols: Number of columns for SBA calculation.
471 :return: Complete input data stream bytes.
472 """
473 self.screen.cols = cols # Set cols for SBA calculation
474 stream = bytearray()
475 for start_pos, content in modified_fields:
476 row, col = start_pos
477 # SBA to field start
478 sba = self.build_sba(row, col)
479 stream.extend(sba)
480 # Field data
481 stream.extend(content)
482 # Append AID
483 stream.append(aid)
484 return bytes(stream)