Coverage for pure3270/patching/s3270_wrapper.py: 52%
340 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""
2Wrapper class to make pure3270 compatible with p3270's S3270 interface.
3"""
5import logging
6from typing import Optional, Any
7from pure3270.session import Session, SessionError
9logger = logging.getLogger(__name__)
12class Pure3270S3270Wrapper:
13 """
14 A wrapper that implements the same interface as p3270.S3270
15 but uses pure3270.Session internally.
17 This allows p3270.P3270Client to work with pure3270 instead of
18 spawning the external s3270 binary.
19 """
21 numOfInstances = 0
23 def __init__(self, args, encoding="latin1"):
24 """
25 Initialize the wrapper.
27 Args:
28 args: Command line arguments (ignored, for compatibility)
29 encoding: Text encoding (ignored, for compatibility)
30 """
31 self.args = args
32 self.encoding = encoding
33 self.buffer = None
34 self.statusMsg = None
36 # Create our pure3270 session
37 self._session = Session()
39 # Increment instance counter
40 Pure3270S3270Wrapper.numOfInstances += 1
41 logger.debug(
42 f"Created Pure3270S3270Wrapper instance #{Pure3270S3270Wrapper.numOfInstances}"
43 )
45 def do(self, cmd: str) -> bool:
46 """
47 Execute an s3270 command.
49 Args:
50 cmd: The s3270 command to execute
52 Returns:
53 True if command succeeded, False otherwise
54 """
55 logger.debug(f"Executing s3270 command: {cmd}")
57 try:
58 # Parse the command and map it to pure3270 operations
59 result = self._execute_command(cmd)
60 return result
61 except Exception as e:
62 logger.error(f"Error executing command '{cmd}': {e}")
63 self.statusMsg = self._create_error_status()
64 return False
66 def _execute_command(self, cmd: str) -> bool:
67 """
68 Parse and execute an s3270 command.
70 Args:
71 cmd: The s3270 command to execute
73 Returns:
74 True if command succeeded, False otherwise
75 """
76 # Strip whitespace and newlines
77 cmd = cmd.strip()
78 if cmd.endswith("\n"):
79 cmd = cmd[:-1]
81 # Handle different command types
82 if cmd.startswith("Connect("):
83 return self._handle_connect(cmd)
84 elif cmd == "Disconnect":
85 return self._handle_disconnect()
86 elif cmd == "Quit":
87 return self._handle_quit()
88 elif cmd == "Enter":
89 return self._handle_enter()
90 elif cmd.startswith("PF("):
91 return self._handle_pf(cmd)
92 elif cmd.startswith("PA("):
93 return self._handle_pa(cmd)
94 elif cmd == "BackSpace":
95 return self._handle_backspace()
96 elif cmd == "BackTab":
97 return self._handle_backtab()
98 elif cmd == "Home":
99 return self._handle_home()
100 elif cmd == "Tab":
101 return self._handle_tab()
102 elif cmd.startswith("Key("):
103 return self._handle_key(cmd)
104 elif cmd == "Clear":
105 return self._handle_clear()
106 elif cmd == "Delete":
107 return self._handle_delete()
108 elif cmd == "DeleteField":
109 return self._handle_delete_field()
110 elif cmd == "DeleteWord":
111 return self._handle_delete_word()
112 elif cmd == "Erase":
113 return self._handle_erase()
114 elif cmd == "Down":
115 return self._handle_down()
116 elif cmd == "Up":
117 return self._handle_up()
118 elif cmd == "Left":
119 return self._handle_left()
120 elif cmd == "Right":
121 return self._handle_right()
122 elif cmd.startswith("MoveCursor("):
123 return self._handle_move_cursor(cmd)
124 elif cmd.startswith("String("):
125 return self._handle_string(cmd)
126 elif cmd.startswith("PrintText("):
127 return self._handle_print_text(cmd)
128 elif cmd == "NoOpCommand":
129 return self._handle_noop()
130 elif cmd.startswith("Ascii("):
131 return self._handle_ascii(cmd)
132 elif cmd.startswith("Wait("):
133 return self._handle_wait(cmd)
134 else:
135 logger.warning(f"Unknown command: {cmd}")
136 self.statusMsg = self._create_status()
137 return True # For compatibility, we don't fail on unknown commands
139 def _handle_connect(self, cmd: str) -> bool:
140 """Handle Connect command."""
141 # Parse connection parameters from command
142 # Examples: Connect(B:hostname), Connect(L:lu@hostname)
143 logger.info(f"Handling connect command: {cmd}")
145 # Parse the hostname from the command
146 # Format is Connect(B:hostname) or Connect(L:lu@hostname)
147 hostname = "localhost" # Default
148 port = 23 # Default
150 try:
151 # Extract the parameter from inside the parentheses
152 if "(" in cmd and ")" in cmd:
153 param = cmd.split("(", 1)[1].split(")", 1)[0]
154 # Handle B:hostname format
155 if param.startswith("B:"):
156 hostname = param[2:] # Remove "B:"
157 # Handle L:lu@hostname format
158 elif param.startswith("L:") and "@" in param:
159 hostname = param.split("@", 1)[1]
160 # Handle plain hostname
161 else:
162 hostname = param
164 # Check if hostname includes port
165 if ":" in hostname:
166 host_parts = hostname.split(":")
167 hostname = host_parts[0]
168 port = int(host_parts[1])
170 except Exception as e:
171 logger.warning(f"Error parsing hostname from connect command: {e}")
173 logger.info(f"Connecting to hostname: {hostname}, port: {port}")
175 # Actually connect to the host
176 try:
177 self._session.connect(host=hostname, port=port)
178 self.statusMsg = self._create_status(connection_state=f"C({hostname})")
179 return True
180 except Exception as e:
181 logger.error(f"Error connecting to {hostname}:{port}: {e}")
182 self.statusMsg = self._create_error_status()
183 return False
185 def _handle_disconnect(self) -> bool:
186 """Handle Disconnect command."""
187 logger.info("Handling disconnect command")
188 self.statusMsg = self._create_status(connection_state="N")
189 return True
191 def _handle_quit(self) -> bool:
192 """Handle Quit command."""
193 logger.info("Handling quit command")
194 self.statusMsg = self._create_status()
195 return True
197 def _handle_enter(self) -> bool:
198 """Handle Enter command."""
199 logger.info("Handling enter command")
200 # Send Enter key action
201 try:
202 self._session.enter()
203 self.statusMsg = self._create_status()
204 return True
205 except Exception as e:
206 logger.error(f"Error sending Enter: {e}")
207 self.statusMsg = self._create_error_status()
208 return False
210 def _handle_pf(self, cmd: str) -> bool:
211 """Handle PF command."""
212 # Parse PF number: PF(1), PF(2), etc.
213 try:
214 pf_num = int(cmd[3:-1]) # Extract number from PF(n)
215 logger.info(f"Handling PF command: {pf_num}")
216 # Send PF key action
217 self._session.pf(pf_num)
218 self.statusMsg = self._create_status()
219 return True
220 except Exception as e:
221 logger.error(f"Error handling PF command '{cmd}': {e}")
222 self.statusMsg = self._create_error_status()
223 return False
225 def _handle_pa(self, cmd: str) -> bool:
226 """Handle PA command."""
227 # Parse PA number: PA(1), PA(2), etc.
228 try:
229 pa_num = int(cmd[3:-1]) # Extract number from PA(n)
230 logger.info(f"Handling PA command: {pa_num}")
231 # Send PA key action
232 self._session.pa(pa_num)
233 self.statusMsg = self._create_status()
234 return True
235 except Exception as e:
236 logger.error(f"Error handling PA command '{cmd}': {e}")
237 self.statusMsg = self._create_error_status()
238 return False
240 def _handle_backspace(self) -> bool:
241 """Handle BackSpace command."""
242 logger.info("Handling backspace command")
243 # Send BackSpace key action
244 try:
245 self._session.backspace() # Using backspace method
246 self.statusMsg = self._create_status()
247 return True
248 except Exception as e:
249 logger.error(f"Error sending BackSpace: {e}")
250 self.statusMsg = self._create_error_status()
251 return False
253 def _handle_backtab(self) -> bool:
254 """Handle BackTab command."""
255 logger.info("Handling backtab command")
256 # Send BackTab key action
257 self.statusMsg = self._create_status()
258 return True
260 def _handle_home(self) -> bool:
261 """Handle Home command."""
262 logger.info("Handling home command")
263 # Send Home key action
264 try:
265 self._session.home()
266 self.statusMsg = self._create_status()
267 return True
268 except Exception as e:
269 logger.error(f"Error sending Home: {e}")
270 self.statusMsg = self._create_error_status()
271 return False
273 def _handle_tab(self) -> bool:
274 """Handle Tab command."""
275 logger.info("Handling tab command")
276 # Send Tab key action
277 self.statusMsg = self._create_status()
278 return True
280 def _handle_key(self, cmd: str) -> bool:
281 """Handle Key command."""
282 # Parse key: Key(A), Key(B), etc.
283 try:
284 key = cmd[4:-1] # Extract key from Key(X)
285 logger.info(f"Handling key command: {key}")
286 # Send key action
287 # For now, we'll just set status
288 self.statusMsg = self._create_status()
289 return True
290 except Exception as e:
291 logger.error(f"Error handling Key command '{cmd}': {e}")
292 self.statusMsg = self._create_error_status()
293 return False
295 def _handle_clear(self) -> bool:
296 """Handle Clear command."""
297 logger.info("Handling clear command")
298 # Send Clear action using erase method
299 try:
300 self._session.erase()
301 self.statusMsg = self._create_status()
302 return True
303 except Exception as e:
304 logger.error(f"Error handling Clear: {e}")
305 self.statusMsg = self._create_error_status()
306 return False
308 def _handle_delete(self) -> bool:
309 """Handle Delete command."""
310 logger.info("Handling delete command")
311 # Send Delete action
312 try:
313 self._session.erase() # Using erase as equivalent
314 self.statusMsg = self._create_status()
315 return True
316 except Exception as e:
317 logger.error(f"Error handling Delete: {e}")
318 self.statusMsg = self._create_error_status()
319 return False
321 def _handle_delete_field(self) -> bool:
322 """Handle DeleteField command."""
323 logger.info("Handling delete field command")
324 # Send DeleteField action
325 try:
326 self._session.erase_eof() # Using erase_eof as equivalent
327 self.statusMsg = self._create_status()
328 return True
329 except Exception as e:
330 logger.error(f"Error handling DeleteField: {e}")
331 self.statusMsg = self._create_error_status()
332 return False
334 def _handle_delete_word(self) -> bool:
335 """Handle DeleteWord command."""
336 logger.info("Handling delete word command")
337 # Send DeleteWord action
338 self.statusMsg = self._create_status()
339 return True
341 def _handle_erase(self) -> bool:
342 """Handle Erase command."""
343 logger.info("Handling erase command")
344 # Send Erase action
345 try:
346 self._session.erase()
347 self.statusMsg = self._create_status()
348 return True
349 except Exception as e:
350 logger.error(f"Error handling Erase: {e}")
351 self.statusMsg = self._create_error_status()
352 return False
354 def _handle_down(self) -> bool:
355 """Handle Down command."""
356 logger.info("Handling down command")
357 # Send Down action
358 try:
359 self._session.down()
360 self.statusMsg = self._create_status()
361 return True
362 except Exception as e:
363 logger.error(f"Error handling Down: {e}")
364 self.statusMsg = self._create_error_status()
365 return False
367 def _handle_up(self) -> bool:
368 """Handle Up command."""
369 logger.info("Handling up command")
370 # Send Up action
371 try:
372 self._session.up()
373 self.statusMsg = self._create_status()
374 return True
375 except Exception as e:
376 logger.error(f"Error handling Up: {e}")
377 self.statusMsg = self._create_error_status()
378 return False
380 def _handle_left(self) -> bool:
381 """Handle Left command."""
382 logger.info("Handling left command")
383 # Send Left action
384 try:
385 self._session.left()
386 self.statusMsg = self._create_status()
387 return True
388 except Exception as e:
389 logger.error(f"Error handling Left: {e}")
390 self.statusMsg = self._create_error_status()
391 return False
393 def _handle_right(self) -> bool:
394 """Handle Right command."""
395 logger.info("Handling right command")
396 # Send Right action
397 try:
398 self._session.right()
399 self.statusMsg = self._create_status()
400 return True
401 except Exception as e:
402 logger.error(f"Error handling Right: {e}")
403 self.statusMsg = self._create_error_status()
404 return False
406 def _handle_move_cursor(self, cmd: str) -> bool:
407 """Handle MoveCursor command."""
408 # Parse coordinates: MoveCursor(row, col)
409 try:
410 coords = cmd[11:-1] # Extract coordinates from MoveCursor(row, col)
411 row, col = map(int, coords.split(","))
412 logger.info(f"Handling move cursor command: row={row}, col={col}")
413 # Move cursor action
414 # For now, we'll just set status
415 self.statusMsg = self._create_status()
416 return True
417 except Exception as e:
418 logger.error(f"Error handling MoveCursor command '{cmd}': {e}")
419 self.statusMsg = self._create_error_status()
420 return False
422 def _handle_string(self, cmd: str) -> bool:
423 """Handle String command."""
424 # Parse string: String("text")
425 try:
426 # Extract text from String("text")
427 text = cmd[7:-1] # Remove String( and )
428 logger.info(f"Handling string command: {text}")
429 # Send text using compose method
430 self._session.compose(text)
431 self.statusMsg = self._create_status()
432 return True
433 except Exception as e:
434 logger.error(f"Error handling String command '{cmd}': {e}")
435 self.statusMsg = self._create_error_status()
436 return False
438 def _handle_print_text(self, cmd: str) -> bool:
439 """Handle PrintText command."""
440 # Parse PrintText command
441 logger.info(f"Handling print text command: {cmd}")
442 # For PrintText(string), we need to populate the buffer
443 if "string" in cmd:
444 try:
445 # Get screen content and put it in buffer
446 screen_content = self._session.read()
447 self.buffer = (
448 screen_content.decode("latin1", errors="replace")
449 if isinstance(screen_content, bytes)
450 else str(screen_content)
451 )
452 logger.debug(f"Buffer populated with: {self.buffer}")
453 except Exception as e:
454 logger.error(f"Error getting screen content: {e}")
455 self.buffer = ""
456 self.statusMsg = self._create_status()
457 return True
459 def _handle_noop(self) -> bool:
460 """Handle NoOpCommand."""
461 logger.info("Handling noop command")
462 # No-op action
463 self.statusMsg = self._create_status()
464 return True
466 def _handle_ascii(self, cmd: str) -> bool:
467 """Handle Ascii command."""
468 # Parse Ascii command: Ascii(row, col, length) or Ascii(row, col, rows, cols)
469 try:
470 params = cmd[6:-1] # Extract parameters from Ascii(...)
471 param_list = list(map(int, params.split(",")))
472 logger.info(f"Handling ascii command with params: {param_list}")
473 # For now, we'll just set an empty buffer and status
474 self.buffer = "" # In a real implementation, we'd extract text from screen
475 self.statusMsg = self._create_status()
476 return True
477 except Exception as e:
478 logger.error(f"Error handling Ascii command '{cmd}': {e}")
479 self.statusMsg = self._create_error_status()
480 return False
482 def _handle_wait(self, cmd: str) -> bool:
483 """Handle Wait command."""
484 # Parse Wait command: Wait(timeout, condition, ...)
485 try:
486 logger.info(f"Handling wait command: {cmd}")
487 # For now, we'll just set status
488 self.statusMsg = self._create_status()
489 return True
490 except Exception as e:
491 logger.error(f"Error handling Wait command '{cmd}': {e}")
492 self.statusMsg = self._create_error_status()
493 return False
495 def _create_status(
496 self,
497 keyboard="U",
498 screen="F",
499 field="U",
500 connection_state="C(hostname)",
501 emulator="I",
502 model="2",
503 rows="24",
504 cols="80",
505 cursor_row="0",
506 cursor_col="0",
507 win_id="0x0",
508 exec_time="-",
509 ) -> str:
510 """
511 Create a status message compatible with s3270 format.
513 Returns:
514 A status message string with 12 space-separated fields
515 """
516 status_fields = [
517 keyboard, # Keyboard state: U=Unlocked, L=Locked, E=Error
518 screen, # Screen formatting: F=Formatted, U=Unformatted
519 field, # Field protection: P=Protected, U=Unprotected
520 connection_state, # Connection state: C(hostname)=Connected, N=Not connected
521 emulator, # Emulator mode: I=3270, L=NVT Line, C=NVT Char, P=Unnegotiated, N=Not connected
522 model, # Model number
523 rows, # Number of rows
524 cols, # Number of columns
525 cursor_row, # Cursor row (0-based)
526 cursor_col, # Cursor column (0-based)
527 win_id, # Window ID
528 exec_time, # Execution time
529 ]
530 return " ".join(status_fields)
532 def _create_error_status(self) -> str:
533 """Create an error status message."""
534 return self._create_status(keyboard="E", connection_state="N")
536 def check(self, doNotCheck=False) -> bool:
537 """
538 Check the result of the executed command.
540 Args:
541 doNotCheck: If True, don't perform checking (for Quit command)
543 Returns:
544 True if check passed, False otherwise
545 """
546 if doNotCheck:
547 return True
548 # In the original s3270, this would read from the subprocess
549 # For our implementation, we assume the command already set status
550 logger.debug("Checking command result")
551 return True