Coverage for pure3270/protocol/data_stream.py: 87%

264 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1"""Data stream parser and sender for 3270 protocol.""" 

2 

3from typing import List, Tuple, Optional 

4import logging 

5from typing import Optional 

6from ..emulation.screen_buffer import ScreenBuffer 

7 

8logger = logging.getLogger(__name__) 

9 

10 

11class ParseError(Exception): 

12 """Error during data stream parsing.""" 

13 

14 pass 

15 

16 

17# 3270 Data Stream Orders 

18WCC = 0xF5 

19AID = 0xF6 

20READ_PARTITION = 0xF1 

21SBA = 0x10 

22SF = 0x1D 

23RA = 0xF3 

24GE = 0x29 

25BIND = 0x28 

26WRITE = 0x05 

27EOA = 0x0D 

28SCS_CTL_CODES = 0x04 

29DATA_STREAM_CTL = 0x40 

30STRUCTURED_FIELD = 0x3C # '<' 

31 

32# SCS Control Codes 

33PRINT_EOJ = 0x01 

34 

35# TN3270E Subnegotiation Message Types 

36TN3270E_DEVICE_TYPE = 0x00 

37TN3270E_FUNCTIONS = 0x01 

38TN3270E_IS = 0x02 

39TN3270E_REQUEST = 0x03 

40TN3270E_SEND = 0x04 

41 

42# TN3270E Device Types 

43TN3270E_IBM_DYNAMIC = "IBM-DYNAMIC" 

44TN3270E_IBM_3278_2 = "IBM-3278-2" 

45TN3270E_IBM_3278_3 = "IBM-3278-3" 

46TN3270E_IBM_3278_4 = "IBM-3278-4" 

47TN3270E_IBM_3278_5 = "IBM-3278-5" 

48TN3270E_IBM_3279_2 = "IBM-3279-2" 

49TN3270E_IBM_3279_3 = "IBM-3279-3" 

50TN3270E_IBM_3279_4 = "IBM-3279-4" 

51TN3270E_IBM_3279_5 = "IBM-3279-5" 

52 

53# TN3270E Functions 

54TN3270E_BIND_IMAGE = 0x01 

55TN3270E_DATA_STREAM_CTL = 0x02 

56TN3270E_RESPONSES = 0x04 

57TN3270E_SCS_CTL_CODES = 0x08 

58TN3270E_SYSREQ = 0x10 

59 

60# TN3270E Query Reply Types 

61QUERY_REPLY_SF = 0x88 

62QUERY_REPLY_DEVICE_TYPE = 0x01 

63QUERY_REPLY_CHARACTERISTICS = 0x02 

64QUERY_REPLY_HIGHLIGHTING = 0x03 

65QUERY_REPLY_COLOR = 0x04 

66QUERY_REPLY_EXTENDED_ATTRIBUTES = 0x05 

67QUERY_REPLY_GRAPHICS = 0x06 

68QUERY_REPLY_DBCS_ASIA = 0x07 

69QUERY_REPLY_DBCS_EUROPE = 0x08 

70QUERY_REPLY_DBCS_MIDDLE_EAST = 0x09 

71QUERY_REPLY_LINE_TYPE = 0x0A 

72QUERY_REPLY_OEM_AUXILIARY_DEVICE = 0x0B 

73QUERY_REPLY_TRANSPARENCY = 0x0C 

74QUERY_REPLY_FORMAT_STORAGE = 0x0D 

75QUERY_REPLY_DDM = 0x0E 

76QUERY_REPLY_RPQ_NAMES = 0x0F 

77QUERY_REPLY_SEGMENT = 0x10 

78QUERY_REPLY_PROCEDURE = 0x11 

79QUERY_REPLY_GRID = 0x12 

80 

81 

82class DataStreamParser: 

83 """Parses incoming 3270 data streams and updates the screen buffer.""" 

84 

85 def __init__(self, screen_buffer: ScreenBuffer): 

86 """ 

87 Initialize the DataStreamParser. 

88 

89 :param screen_buffer: ScreenBuffer to update. 

90 """ 

91 self.screen = screen_buffer 

92 self._data = b"" 

93 self._pos = 0 

94 self.wcc = None # Write Control Character 

95 self.aid = None # Attention ID 

96 

97 def get_aid(self) -> Optional[int]: 

98 """Get the current AID value.""" 

99 return self.aid 

100 

101 def parse(self, data: bytes) -> None: 

102 """ 

103 Parse 3270 data stream. 

104 

105 :param data: Incoming 3270 data stream bytes. 

106 :raises ParseError: If parsing fails. 

107 """ 

108 self._data = data 

109 self._pos = 0 

110 logger.debug(f"Parsing {len(data)} bytes of data stream") 

111 

112 try: 

113 while self._pos < len(self._data): 

114 order = self._data[self._pos] 

115 self._pos += 1 

116 

117 if order == WCC: # WCC (Write Control Character) 

118 if self._pos < len(self._data): 

119 self.wcc = self._data[self._pos] 

120 self._pos += 1 

121 self._handle_wcc(self.wcc) 

122 else: 

123 logger.error("Unexpected end of data stream") 

124 raise ParseError("Unexpected end of data stream") 

125 elif order == AID: # AID (Attention ID) 

126 if self._pos < len(self._data): 

127 self.aid = self._data[self._pos] 

128 self._pos += 1 

129 logger.debug(f"AID received: 0x{self.aid:02x}") 

130 else: 

131 logger.error("Unexpected end of data stream") 

132 raise ParseError("Unexpected end of data stream") 

133 elif order == READ_PARTITION: # Read Partition 

134 pass # Handle if needed 

135 elif order == SBA: # SBA (Set Buffer Address) 

136 self._handle_sba() 

137 elif order == SF: # SF (Start Field) 

138 self._handle_sf() 

139 elif order == RA: # RA (Repeat to Address) 

140 self._handle_ra() 

141 elif order == GE: # GE (Graphic Escape) 

142 self._handle_ge() 

143 elif order == BIND: # BIND 

144 logger.debug("BIND received, configuring terminal type") 

145 self._pos = len(self._data) 

146 elif order == WRITE: # W (Write) 

147 self._handle_write() 

148 elif order == EOA: # EOA (End of Addressable) 

149 break 

150 elif order == SCS_CTL_CODES: # SCS Control Codes 

151 self._handle_scs_ctl_codes() 

152 elif order == DATA_STREAM_CTL: # Data Stream Control 

153 self._handle_data_stream_ctl() 

154 else: 

155 self._handle_data(order) 

156 

157 except IndexError: 

158 raise ParseError("Unexpected end of data stream") 

159 

160 def _handle_wcc(self, wcc: int): 

161 """Handle Write Control Character.""" 

162 # Simplified: set buffer state based on WCC bits 

163 # e.g., bit 0: reset modified flags 

164 if wcc & 0x01: 

165 self.screen.clear() 

166 logger.debug(f"WCC: 0x{wcc:02x}") 

167 

168 def _handle_sba(self): 

169 """Handle Set Buffer Address.""" 

170 if self._pos + 1 < len(self._data): 

171 addr_high = self._data[self._pos] 

172 addr_low = self._data[self._pos + 1] 

173 self._pos += 2 

174 address = (addr_high << 8) | addr_low 

175 row = address // self.screen.cols 

176 col = address % self.screen.cols 

177 self.screen.set_position(row, col) 

178 logger.debug(f"SBA to row {row}, col {col}") 

179 else: 

180 logger.error("Unexpected end of data stream") 

181 raise ParseError("Unexpected end of data stream") 

182 

183 def _handle_sf(self): 

184 """Handle Start Field.""" 

185 if self._pos < len(self._data): 

186 attr = self._data[self._pos] 

187 self._pos += 1 

188 

189 # Parse extended field attributes according to IBM 3270 specification 

190 protected = bool(attr & 0x40) # Bit 6: protected 

191 numeric = bool(attr & 0x20) # Bit 5: numeric 

192 intensity = (attr >> 3) & 0x03 # Bits 4-3: intensity 

193 modified = bool(attr & 0x04) # Bit 2: modified data tag 

194 validation = attr & 0x03 # Bits 1-0: validation 

195 

196 # Update field attributes at current position 

197 row, col = self.screen.get_position() 

198 self.screen.write_char( 

199 0x40, row, col, protected=protected 

200 ) # Space with attr 

201 

202 # Store extended attributes in the screen buffer's attribute storage 

203 if 0 <= row < self.screen.rows and 0 <= col < self.screen.cols: 

204 pos = row * self.screen.cols + col 

205 attr_offset = pos * 3 

206 # Byte 0: Protection and basic attributes 

207 self.screen.attributes[attr_offset] = attr 

208 # For now, we'll store intensity in byte 1 and validation in byte 2 

209 # A more complete implementation would map these properly 

210 self.screen.attributes[attr_offset + 1] = intensity 

211 self.screen.attributes[attr_offset + 2] = validation 

212 

213 logger.debug( 

214 f"SF: protected={protected}, numeric={numeric}, intensity={intensity}, modified={modified}, validation={validation}" 

215 ) 

216 else: 

217 logger.error("Unexpected end of data stream") 

218 raise ParseError("Unexpected end of data stream") 

219 

220 def _handle_ra(self): 

221 """Handle Repeat to Address (basic).""" 

222 # Simplified: repeat char to address 

223 if self._pos + 3 < len(self._data): 

224 repeat_char = self._data[self._pos] 

225 addr_high = self._data[self._pos + 1] 

226 addr_low = self._data[self._pos + 2] 

227 self._pos += 3 

228 count = (addr_high << 8) | addr_low 

229 # Implement repeat logic... 

230 logger.debug(f"RA: repeat 0x{repeat_char:02x} {count} times") 

231 

232 def _handle_scs_ctl_codes(self): 

233 """Handle SCS Control Codes for printer sessions.""" 

234 if self._pos < len(self._data): 

235 scs_code = self._data[self._pos] 

236 self._pos += 1 

237 

238 if scs_code == PRINT_EOJ: 

239 logger.debug("SCS PRINT-EOJ received") 

240 # Handle End of Job processing 

241 # In a real implementation, this would trigger printer job completion 

242 else: 

243 logger.debug(f"Unknown SCS control code: 0x{scs_code:02x}") 

244 else: 

245 logger.error("Unexpected end of data stream in SCS control codes") 

246 raise ParseError("Unexpected end of data stream in SCS control codes") 

247 

248 def _handle_data_stream_ctl(self): 

249 """Handle Data Stream Control for printer data streams.""" 

250 if self._pos < len(self._data): 

251 ctl_code = self._data[self._pos] 

252 self._pos += 1 

253 logger.debug(f"Data Stream Control code: 0x{ctl_code:02x}") 

254 # Implementation would handle specific data stream control functions 

255 else: 

256 logger.error("Unexpected end of data stream in data stream control") 

257 raise ParseError("Unexpected end of data stream in data stream control") 

258 

259 def _handle_ge(self): 

260 """Handle Graphic Escape (stub).""" 

261 logger.debug("GE encountered (graphics not supported)") 

262 

263 def _handle_write(self): 

264 """Handle Write order: clear and write data.""" 

265 self.screen.clear() 

266 # Subsequent data is written to buffer 

267 logger.debug("Write order: clearing and writing") 

268 

269 def _handle_scs_data(self, data: bytes): 

270 """ 

271 Handle SCS character stream data for printer sessions. 

272 

273 :param data: SCS character data 

274 """ 

275 # In a full implementation, this would process SCS character data 

276 # for printer output rather than screen display 

277 logger.debug(f"SCS data received: {len(data)} bytes") 

278 

279 def _handle_data(self, byte: int): 

280 """Handle data byte.""" 

281 row, col = self.screen.get_position() 

282 self.screen.write_char(byte, row, col) 

283 col += 1 

284 if col >= self.screen.cols: 

285 col = 0 

286 row += 1 

287 self.screen.set_position(row, col) 

288 

289 def _handle_structured_field(self): 

290 """Handle Structured Field command.""" 

291 logger.debug("Structured Field command received") 

292 # Skip structured field for now (advanced feature) 

293 # In a full implementation, this would parse the structured field 

294 # and handle queries, replies, etc. 

295 self._skip_structured_field() 

296 

297 def _skip_structured_field(self): 

298 """Skip structured field data.""" 

299 # Find end of structured field (next command or end of data) 

300 while self._pos < len(self._data): 

301 # Look for next 3270 command 

302 if self._data[self._pos] in [ 

303 WCC, 

304 AID, 

305 READ_PARTITION, 

306 SBA, 

307 SF, 

308 RA, 

309 GE, 

310 BIND, 

311 WRITE, 

312 EOA, 

313 SCS_CTL_CODES, 

314 DATA_STREAM_CTL, 

315 STRUCTURED_FIELD, 

316 ]: 

317 break 

318 self._pos += 1 

319 logger.debug("Skipped structured field") 

320 

321 def _handle_read_partition_query(self): 

322 """Handle Read Partition Query command.""" 

323 logger.debug("Read Partition Query command received") 

324 # In a full implementation, this would trigger sending Query Reply SFs 

325 # to inform the host about our capabilities 

326 

327 def build_query_reply_sf(self, query_type: int, data: bytes = b"") -> bytes: 

328 """ 

329 Build Query Reply Structured Field. 

330 

331 :param query_type: Query reply type 

332 :param data: Query reply data 

333 :return: Query Reply Structured Field bytes 

334 """ 

335 sf = bytearray() 

336 sf.append(STRUCTURED_FIELD) # SF identifier 

337 # Add length (will be filled in later) 

338 length_pos = len(sf) 

339 sf.extend([0x00, 0x00]) # Placeholder for length 

340 sf.append(QUERY_REPLY_SF) # Query Reply SF type 

341 sf.append(query_type) # Query reply type 

342 sf.extend(data) # Query reply data 

343 

344 # Fill in length 

345 length = len(sf) - 1 # Exclude the SF identifier 

346 sf[length_pos] = (length >> 8) & 0xFF 

347 sf[length_pos + 1] = length & 0xFF 

348 

349 return bytes(sf) 

350 

351 def build_device_type_query_reply(self) -> bytes: 

352 """ 

353 Build Device Type Query Reply Structured Field. 

354 

355 :return: Device Type Query Reply SF bytes 

356 """ 

357 # For simplicity, we'll report our device type 

358 device_type = b"IBM-3278-4-E" # Example device type 

359 return self.build_query_reply_sf(QUERY_REPLY_DEVICE_TYPE, device_type) 

360 

361 def build_characteristics_query_reply(self) -> bytes: 

362 """ 

363 Build Characteristics Query Reply Structured Field. 

364 

365 :return: Characteristics Query Reply SF bytes 

366 """ 

367 # Report basic characteristics 

368 characteristics = bytearray() 

369 characteristics.append(0x01) # Flags byte 1 

370 characteristics.append(0x00) # Flags byte 2 

371 characteristics.append(0x00) # Flags byte 3 

372 

373 return self.build_query_reply_sf(QUERY_REPLY_CHARACTERISTICS, characteristics) 

374 

375 

376class DataStreamSender: 

377 """Constructs outgoing 3270 data streams.""" 

378 

379 def __init__(self): 

380 """Initialize the DataStreamSender.""" 

381 self.screen = ScreenBuffer() 

382 

383 def build_read_modified_all(self) -> bytes: 

384 """Build Read Modified All (RMA) command.""" 

385 # AID for Enter + Read Modified All 

386 stream = bytearray( 

387 [0x7D, 0xF1] 

388 ) # AID Enter, Read Partition (simplified for RMA) 

389 return bytes(stream) 

390 

391 def build_read_modified_fields(self) -> bytes: 

392 """Build Read Modified Fields (RMF) command.""" 

393 stream = bytearray([0x7D, 0xF6, 0xF0]) # AID Enter, Read Modified, all fields 

394 return bytes(stream) 

395 

396 def build_scs_ctl_codes(self, scs_code: int) -> bytes: 

397 """ 

398 Build SCS Control Codes for printer sessions. 

399 

400 :param scs_code: SCS control code to send 

401 """ 

402 return bytes([SCS_CTL_CODES, scs_code]) 

403 

404 def build_data_stream_ctl(self, ctl_code: int) -> bytes: 

405 """ 

406 Build Data Stream Control command. 

407 

408 :param ctl_code: Data stream control code 

409 """ 

410 return bytes([DATA_STREAM_CTL, ctl_code]) 

411 

412 def build_write(self, data: bytes, wcc: int = 0xC1) -> bytes: 

413 """ 

414 Build Write command with data. 

415 

416 :param data: Data to write. 

417 :param wcc: Write Control Character. 

418 """ 

419 stream = bytearray([0xF5, wcc, 0x05]) # WCC, Write 

420 stream.extend(data) 

421 stream.append(0x0D) # EOA 

422 return bytes(stream) 

423 

424 def build_sba(self, row: int, col: int) -> bytes: 

425 """ 

426 Build Set Buffer Address. 

427 

428 :param row: Row. 

429 :param col: Column. 

430 """ 

431 address = (row * self.screen.cols) + col 

432 high = (address >> 8) & 0xFF 

433 low = address & 0xFF 

434 return bytes([0x10, high, low]) # SBA 

435 

436 def build_sf(self, protected: bool = True, numeric: bool = False) -> bytes: 

437 """ 

438 Build Start Field. 

439 

440 :param protected: Protected attribute. 

441 :param numeric: Numeric attribute. 

442 """ 

443 attr = 0x00 

444 if protected: 

445 attr |= 0x40 

446 if numeric: 

447 attr |= 0x20 

448 return bytes([0x1D, attr]) # SF 

449 

450 def build_key_press(self, aid: int) -> bytes: 

451 """ 

452 Build data stream for key press (AID). 

453 

454 :param aid: Attention ID (e.g., 0x7D for Enter). 

455 """ 

456 stream = bytearray([aid]) 

457 return bytes(stream) 

458 

459 def build_input_stream( 

460 self, 

461 modified_fields: List[Tuple[Tuple[int, int], bytes]], 

462 aid: int, 

463 cols: int = 80, 

464 ) -> bytes: 

465 """ 

466 Build 3270 input data stream for modified fields and AID. 

467 

468 :param modified_fields: List of ((row, col), content_bytes) for each modified field. 

469 :param aid: Attention ID byte for the key press. 

470 :param cols: Number of columns for SBA calculation. 

471 :return: Complete input data stream bytes. 

472 """ 

473 self.screen.cols = cols # Set cols for SBA calculation 

474 stream = bytearray() 

475 for start_pos, content in modified_fields: 

476 row, col = start_pos 

477 # SBA to field start 

478 sba = self.build_sba(row, col) 

479 stream.extend(sba) 

480 # Field data 

481 stream.extend(content) 

482 # Append AID 

483 stream.append(aid) 

484 return bytes(stream)