Coverage for pure3270/patching/s3270_wrapper.py: 52%

340 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1""" 

2Wrapper class to make pure3270 compatible with p3270's S3270 interface. 

3""" 

4 

5import logging 

6from typing import Optional, Any 

7from pure3270.session import Session, SessionError 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class Pure3270S3270Wrapper: 

13 """ 

14 A wrapper that implements the same interface as p3270.S3270 

15 but uses pure3270.Session internally. 

16 

17 This allows p3270.P3270Client to work with pure3270 instead of 

18 spawning the external s3270 binary. 

19 """ 

20 

21 numOfInstances = 0 

22 

23 def __init__(self, args, encoding="latin1"): 

24 """ 

25 Initialize the wrapper. 

26 

27 Args: 

28 args: Command line arguments (ignored, for compatibility) 

29 encoding: Text encoding (ignored, for compatibility) 

30 """ 

31 self.args = args 

32 self.encoding = encoding 

33 self.buffer = None 

34 self.statusMsg = None 

35 

36 # Create our pure3270 session 

37 self._session = Session() 

38 

39 # Increment instance counter 

40 Pure3270S3270Wrapper.numOfInstances += 1 

41 logger.debug( 

42 f"Created Pure3270S3270Wrapper instance #{Pure3270S3270Wrapper.numOfInstances}" 

43 ) 

44 

45 def do(self, cmd: str) -> bool: 

46 """ 

47 Execute an s3270 command. 

48 

49 Args: 

50 cmd: The s3270 command to execute 

51 

52 Returns: 

53 True if command succeeded, False otherwise 

54 """ 

55 logger.debug(f"Executing s3270 command: {cmd}") 

56 

57 try: 

58 # Parse the command and map it to pure3270 operations 

59 result = self._execute_command(cmd) 

60 return result 

61 except Exception as e: 

62 logger.error(f"Error executing command '{cmd}': {e}") 

63 self.statusMsg = self._create_error_status() 

64 return False 

65 

66 def _execute_command(self, cmd: str) -> bool: 

67 """ 

68 Parse and execute an s3270 command. 

69 

70 Args: 

71 cmd: The s3270 command to execute 

72 

73 Returns: 

74 True if command succeeded, False otherwise 

75 """ 

76 # Strip whitespace and newlines 

77 cmd = cmd.strip() 

78 if cmd.endswith("\n"): 

79 cmd = cmd[:-1] 

80 

81 # Handle different command types 

82 if cmd.startswith("Connect("): 

83 return self._handle_connect(cmd) 

84 elif cmd == "Disconnect": 

85 return self._handle_disconnect() 

86 elif cmd == "Quit": 

87 return self._handle_quit() 

88 elif cmd == "Enter": 

89 return self._handle_enter() 

90 elif cmd.startswith("PF("): 

91 return self._handle_pf(cmd) 

92 elif cmd.startswith("PA("): 

93 return self._handle_pa(cmd) 

94 elif cmd == "BackSpace": 

95 return self._handle_backspace() 

96 elif cmd == "BackTab": 

97 return self._handle_backtab() 

98 elif cmd == "Home": 

99 return self._handle_home() 

100 elif cmd == "Tab": 

101 return self._handle_tab() 

102 elif cmd.startswith("Key("): 

103 return self._handle_key(cmd) 

104 elif cmd == "Clear": 

105 return self._handle_clear() 

106 elif cmd == "Delete": 

107 return self._handle_delete() 

108 elif cmd == "DeleteField": 

109 return self._handle_delete_field() 

110 elif cmd == "DeleteWord": 

111 return self._handle_delete_word() 

112 elif cmd == "Erase": 

113 return self._handle_erase() 

114 elif cmd == "Down": 

115 return self._handle_down() 

116 elif cmd == "Up": 

117 return self._handle_up() 

118 elif cmd == "Left": 

119 return self._handle_left() 

120 elif cmd == "Right": 

121 return self._handle_right() 

122 elif cmd.startswith("MoveCursor("): 

123 return self._handle_move_cursor(cmd) 

124 elif cmd.startswith("String("): 

125 return self._handle_string(cmd) 

126 elif cmd.startswith("PrintText("): 

127 return self._handle_print_text(cmd) 

128 elif cmd == "NoOpCommand": 

129 return self._handle_noop() 

130 elif cmd.startswith("Ascii("): 

131 return self._handle_ascii(cmd) 

132 elif cmd.startswith("Wait("): 

133 return self._handle_wait(cmd) 

134 else: 

135 logger.warning(f"Unknown command: {cmd}") 

136 self.statusMsg = self._create_status() 

137 return True # For compatibility, we don't fail on unknown commands 

138 

139 def _handle_connect(self, cmd: str) -> bool: 

140 """Handle Connect command.""" 

141 # Parse connection parameters from command 

142 # Examples: Connect(B:hostname), Connect(L:lu@hostname) 

143 logger.info(f"Handling connect command: {cmd}") 

144 

145 # Parse the hostname from the command 

146 # Format is Connect(B:hostname) or Connect(L:lu@hostname) 

147 hostname = "localhost" # Default 

148 port = 23 # Default 

149 

150 try: 

151 # Extract the parameter from inside the parentheses 

152 if "(" in cmd and ")" in cmd: 

153 param = cmd.split("(", 1)[1].split(")", 1)[0] 

154 # Handle B:hostname format 

155 if param.startswith("B:"): 

156 hostname = param[2:] # Remove "B:" 

157 # Handle L:lu@hostname format 

158 elif param.startswith("L:") and "@" in param: 

159 hostname = param.split("@", 1)[1] 

160 # Handle plain hostname 

161 else: 

162 hostname = param 

163 

164 # Check if hostname includes port 

165 if ":" in hostname: 

166 host_parts = hostname.split(":") 

167 hostname = host_parts[0] 

168 port = int(host_parts[1]) 

169 

170 except Exception as e: 

171 logger.warning(f"Error parsing hostname from connect command: {e}") 

172 

173 logger.info(f"Connecting to hostname: {hostname}, port: {port}") 

174 

175 # Actually connect to the host 

176 try: 

177 self._session.connect(host=hostname, port=port) 

178 self.statusMsg = self._create_status(connection_state=f"C({hostname})") 

179 return True 

180 except Exception as e: 

181 logger.error(f"Error connecting to {hostname}:{port}: {e}") 

182 self.statusMsg = self._create_error_status() 

183 return False 

184 

185 def _handle_disconnect(self) -> bool: 

186 """Handle Disconnect command.""" 

187 logger.info("Handling disconnect command") 

188 self.statusMsg = self._create_status(connection_state="N") 

189 return True 

190 

191 def _handle_quit(self) -> bool: 

192 """Handle Quit command.""" 

193 logger.info("Handling quit command") 

194 self.statusMsg = self._create_status() 

195 return True 

196 

197 def _handle_enter(self) -> bool: 

198 """Handle Enter command.""" 

199 logger.info("Handling enter command") 

200 # Send Enter key action 

201 try: 

202 self._session.enter() 

203 self.statusMsg = self._create_status() 

204 return True 

205 except Exception as e: 

206 logger.error(f"Error sending Enter: {e}") 

207 self.statusMsg = self._create_error_status() 

208 return False 

209 

210 def _handle_pf(self, cmd: str) -> bool: 

211 """Handle PF command.""" 

212 # Parse PF number: PF(1), PF(2), etc. 

213 try: 

214 pf_num = int(cmd[3:-1]) # Extract number from PF(n) 

215 logger.info(f"Handling PF command: {pf_num}") 

216 # Send PF key action 

217 self._session.pf(pf_num) 

218 self.statusMsg = self._create_status() 

219 return True 

220 except Exception as e: 

221 logger.error(f"Error handling PF command '{cmd}': {e}") 

222 self.statusMsg = self._create_error_status() 

223 return False 

224 

225 def _handle_pa(self, cmd: str) -> bool: 

226 """Handle PA command.""" 

227 # Parse PA number: PA(1), PA(2), etc. 

228 try: 

229 pa_num = int(cmd[3:-1]) # Extract number from PA(n) 

230 logger.info(f"Handling PA command: {pa_num}") 

231 # Send PA key action 

232 self._session.pa(pa_num) 

233 self.statusMsg = self._create_status() 

234 return True 

235 except Exception as e: 

236 logger.error(f"Error handling PA command '{cmd}': {e}") 

237 self.statusMsg = self._create_error_status() 

238 return False 

239 

240 def _handle_backspace(self) -> bool: 

241 """Handle BackSpace command.""" 

242 logger.info("Handling backspace command") 

243 # Send BackSpace key action 

244 try: 

245 self._session.backspace() # Using backspace method 

246 self.statusMsg = self._create_status() 

247 return True 

248 except Exception as e: 

249 logger.error(f"Error sending BackSpace: {e}") 

250 self.statusMsg = self._create_error_status() 

251 return False 

252 

253 def _handle_backtab(self) -> bool: 

254 """Handle BackTab command.""" 

255 logger.info("Handling backtab command") 

256 # Send BackTab key action 

257 self.statusMsg = self._create_status() 

258 return True 

259 

260 def _handle_home(self) -> bool: 

261 """Handle Home command.""" 

262 logger.info("Handling home command") 

263 # Send Home key action 

264 try: 

265 self._session.home() 

266 self.statusMsg = self._create_status() 

267 return True 

268 except Exception as e: 

269 logger.error(f"Error sending Home: {e}") 

270 self.statusMsg = self._create_error_status() 

271 return False 

272 

273 def _handle_tab(self) -> bool: 

274 """Handle Tab command.""" 

275 logger.info("Handling tab command") 

276 # Send Tab key action 

277 self.statusMsg = self._create_status() 

278 return True 

279 

280 def _handle_key(self, cmd: str) -> bool: 

281 """Handle Key command.""" 

282 # Parse key: Key(A), Key(B), etc. 

283 try: 

284 key = cmd[4:-1] # Extract key from Key(X) 

285 logger.info(f"Handling key command: {key}") 

286 # Send key action 

287 # For now, we'll just set status 

288 self.statusMsg = self._create_status() 

289 return True 

290 except Exception as e: 

291 logger.error(f"Error handling Key command '{cmd}': {e}") 

292 self.statusMsg = self._create_error_status() 

293 return False 

294 

295 def _handle_clear(self) -> bool: 

296 """Handle Clear command.""" 

297 logger.info("Handling clear command") 

298 # Send Clear action using erase method 

299 try: 

300 self._session.erase() 

301 self.statusMsg = self._create_status() 

302 return True 

303 except Exception as e: 

304 logger.error(f"Error handling Clear: {e}") 

305 self.statusMsg = self._create_error_status() 

306 return False 

307 

308 def _handle_delete(self) -> bool: 

309 """Handle Delete command.""" 

310 logger.info("Handling delete command") 

311 # Send Delete action 

312 try: 

313 self._session.erase() # Using erase as equivalent 

314 self.statusMsg = self._create_status() 

315 return True 

316 except Exception as e: 

317 logger.error(f"Error handling Delete: {e}") 

318 self.statusMsg = self._create_error_status() 

319 return False 

320 

321 def _handle_delete_field(self) -> bool: 

322 """Handle DeleteField command.""" 

323 logger.info("Handling delete field command") 

324 # Send DeleteField action 

325 try: 

326 self._session.erase_eof() # Using erase_eof as equivalent 

327 self.statusMsg = self._create_status() 

328 return True 

329 except Exception as e: 

330 logger.error(f"Error handling DeleteField: {e}") 

331 self.statusMsg = self._create_error_status() 

332 return False 

333 

334 def _handle_delete_word(self) -> bool: 

335 """Handle DeleteWord command.""" 

336 logger.info("Handling delete word command") 

337 # Send DeleteWord action 

338 self.statusMsg = self._create_status() 

339 return True 

340 

341 def _handle_erase(self) -> bool: 

342 """Handle Erase command.""" 

343 logger.info("Handling erase command") 

344 # Send Erase action 

345 try: 

346 self._session.erase() 

347 self.statusMsg = self._create_status() 

348 return True 

349 except Exception as e: 

350 logger.error(f"Error handling Erase: {e}") 

351 self.statusMsg = self._create_error_status() 

352 return False 

353 

354 def _handle_down(self) -> bool: 

355 """Handle Down command.""" 

356 logger.info("Handling down command") 

357 # Send Down action 

358 try: 

359 self._session.down() 

360 self.statusMsg = self._create_status() 

361 return True 

362 except Exception as e: 

363 logger.error(f"Error handling Down: {e}") 

364 self.statusMsg = self._create_error_status() 

365 return False 

366 

367 def _handle_up(self) -> bool: 

368 """Handle Up command.""" 

369 logger.info("Handling up command") 

370 # Send Up action 

371 try: 

372 self._session.up() 

373 self.statusMsg = self._create_status() 

374 return True 

375 except Exception as e: 

376 logger.error(f"Error handling Up: {e}") 

377 self.statusMsg = self._create_error_status() 

378 return False 

379 

380 def _handle_left(self) -> bool: 

381 """Handle Left command.""" 

382 logger.info("Handling left command") 

383 # Send Left action 

384 try: 

385 self._session.left() 

386 self.statusMsg = self._create_status() 

387 return True 

388 except Exception as e: 

389 logger.error(f"Error handling Left: {e}") 

390 self.statusMsg = self._create_error_status() 

391 return False 

392 

393 def _handle_right(self) -> bool: 

394 """Handle Right command.""" 

395 logger.info("Handling right command") 

396 # Send Right action 

397 try: 

398 self._session.right() 

399 self.statusMsg = self._create_status() 

400 return True 

401 except Exception as e: 

402 logger.error(f"Error handling Right: {e}") 

403 self.statusMsg = self._create_error_status() 

404 return False 

405 

406 def _handle_move_cursor(self, cmd: str) -> bool: 

407 """Handle MoveCursor command.""" 

408 # Parse coordinates: MoveCursor(row, col) 

409 try: 

410 coords = cmd[11:-1] # Extract coordinates from MoveCursor(row, col) 

411 row, col = map(int, coords.split(",")) 

412 logger.info(f"Handling move cursor command: row={row}, col={col}") 

413 # Move cursor action 

414 # For now, we'll just set status 

415 self.statusMsg = self._create_status() 

416 return True 

417 except Exception as e: 

418 logger.error(f"Error handling MoveCursor command '{cmd}': {e}") 

419 self.statusMsg = self._create_error_status() 

420 return False 

421 

422 def _handle_string(self, cmd: str) -> bool: 

423 """Handle String command.""" 

424 # Parse string: String("text") 

425 try: 

426 # Extract text from String("text") 

427 text = cmd[7:-1] # Remove String( and ) 

428 logger.info(f"Handling string command: {text}") 

429 # Send text using compose method 

430 self._session.compose(text) 

431 self.statusMsg = self._create_status() 

432 return True 

433 except Exception as e: 

434 logger.error(f"Error handling String command '{cmd}': {e}") 

435 self.statusMsg = self._create_error_status() 

436 return False 

437 

438 def _handle_print_text(self, cmd: str) -> bool: 

439 """Handle PrintText command.""" 

440 # Parse PrintText command 

441 logger.info(f"Handling print text command: {cmd}") 

442 # For PrintText(string), we need to populate the buffer 

443 if "string" in cmd: 

444 try: 

445 # Get screen content and put it in buffer 

446 screen_content = self._session.read() 

447 self.buffer = ( 

448 screen_content.decode("latin1", errors="replace") 

449 if isinstance(screen_content, bytes) 

450 else str(screen_content) 

451 ) 

452 logger.debug(f"Buffer populated with: {self.buffer}") 

453 except Exception as e: 

454 logger.error(f"Error getting screen content: {e}") 

455 self.buffer = "" 

456 self.statusMsg = self._create_status() 

457 return True 

458 

459 def _handle_noop(self) -> bool: 

460 """Handle NoOpCommand.""" 

461 logger.info("Handling noop command") 

462 # No-op action 

463 self.statusMsg = self._create_status() 

464 return True 

465 

466 def _handle_ascii(self, cmd: str) -> bool: 

467 """Handle Ascii command.""" 

468 # Parse Ascii command: Ascii(row, col, length) or Ascii(row, col, rows, cols) 

469 try: 

470 params = cmd[6:-1] # Extract parameters from Ascii(...) 

471 param_list = list(map(int, params.split(","))) 

472 logger.info(f"Handling ascii command with params: {param_list}") 

473 # For now, we'll just set an empty buffer and status 

474 self.buffer = "" # In a real implementation, we'd extract text from screen 

475 self.statusMsg = self._create_status() 

476 return True 

477 except Exception as e: 

478 logger.error(f"Error handling Ascii command '{cmd}': {e}") 

479 self.statusMsg = self._create_error_status() 

480 return False 

481 

482 def _handle_wait(self, cmd: str) -> bool: 

483 """Handle Wait command.""" 

484 # Parse Wait command: Wait(timeout, condition, ...) 

485 try: 

486 logger.info(f"Handling wait command: {cmd}") 

487 # For now, we'll just set status 

488 self.statusMsg = self._create_status() 

489 return True 

490 except Exception as e: 

491 logger.error(f"Error handling Wait command '{cmd}': {e}") 

492 self.statusMsg = self._create_error_status() 

493 return False 

494 

495 def _create_status( 

496 self, 

497 keyboard="U", 

498 screen="F", 

499 field="U", 

500 connection_state="C(hostname)", 

501 emulator="I", 

502 model="2", 

503 rows="24", 

504 cols="80", 

505 cursor_row="0", 

506 cursor_col="0", 

507 win_id="0x0", 

508 exec_time="-", 

509 ) -> str: 

510 """ 

511 Create a status message compatible with s3270 format. 

512 

513 Returns: 

514 A status message string with 12 space-separated fields 

515 """ 

516 status_fields = [ 

517 keyboard, # Keyboard state: U=Unlocked, L=Locked, E=Error 

518 screen, # Screen formatting: F=Formatted, U=Unformatted 

519 field, # Field protection: P=Protected, U=Unprotected 

520 connection_state, # Connection state: C(hostname)=Connected, N=Not connected 

521 emulator, # Emulator mode: I=3270, L=NVT Line, C=NVT Char, P=Unnegotiated, N=Not connected 

522 model, # Model number 

523 rows, # Number of rows 

524 cols, # Number of columns 

525 cursor_row, # Cursor row (0-based) 

526 cursor_col, # Cursor column (0-based) 

527 win_id, # Window ID 

528 exec_time, # Execution time 

529 ] 

530 return " ".join(status_fields) 

531 

532 def _create_error_status(self) -> str: 

533 """Create an error status message.""" 

534 return self._create_status(keyboard="E", connection_state="N") 

535 

536 def check(self, doNotCheck=False) -> bool: 

537 """ 

538 Check the result of the executed command. 

539 

540 Args: 

541 doNotCheck: If True, don't perform checking (for Quit command) 

542 

543 Returns: 

544 True if check passed, False otherwise 

545 """ 

546 if doNotCheck: 

547 return True 

548 # In the original s3270, this would read from the subprocess 

549 # For our implementation, we assume the command already set status 

550 logger.debug("Checking command result") 

551 return True