Coverage for pure3270/protocol/vt100_parser.py: 0%
216 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""
2VT100 terminal emulator for pure3270 ASCII mode support.
4This module provides VT100 escape sequence parsing to support ASCII terminal emulation
5as a fallback when TN3270 negotiation fails, matching s3270 behavior.
6"""
8import re
9import logging
10from typing import List, Tuple
12logger = logging.getLogger(__name__)
15class VT100Parser:
16 """VT100 escape sequence parser for ASCII terminal emulation."""
18 def __init__(self, screen_buffer):
19 """
20 Initialize VT100 parser.
22 Args:
23 screen_buffer: ScreenBuffer instance to update with parsed content
24 """
25 self.screen_buffer = screen_buffer
26 self.current_row = 0
27 self.current_col = 0
28 self.saved_row = 0
29 self.saved_col = 0
30 self.charset = "B" # Default charset
31 self.graphics_charset = "0" # Graphics charset
32 self.is_alt_charset = False # Alternative character set mode
34 def parse(self, data: bytes) -> None:
35 """
36 Parse VT100 escape sequences and update screen buffer.
38 Args:
39 data: Raw ASCII data with VT100 escape sequences
40 """
41 try:
42 text = data.decode("ascii", errors="ignore")
43 self._parse_text(text)
44 except Exception as e:
45 logger.warning(f"Error parsing VT100 data: {e}")
47 def _parse_text(self, text: str) -> None:
48 """
49 Parse text with VT100 escape sequences.
51 Args:
52 text: ASCII text with escape sequences
53 """
54 # Process text character by character
55 i = 0
56 while i < len(text):
57 if text[i] == "\x1b": # ESC character
58 # Parse escape sequence
59 seq_end = self._parse_escape_sequence(text, i)
60 i = seq_end
61 elif text[i] == "\x0e": # SO (Shift Out) - Activate alternate charset
62 self.is_alt_charset = True
63 i += 1
64 elif text[i] == "\x0f": # SI (Shift In) - Activate standard charset
65 self.is_alt_charset = False
66 i += 1
67 else:
68 # Regular character
69 self._write_char(text[i])
70 i += 1
72 def _parse_escape_sequence(self, text: str, start: int) -> int:
73 """
74 Parse a VT100 escape sequence.
76 Args:
77 text: Text containing escape sequence
78 start: Start position of ESC character
80 Returns:
81 Position after the escape sequence
82 """
83 if start + 1 >= len(text):
84 return start + 1
86 # Check for CSI (Control Sequence Introducer) - ESC [
87 if text[start + 1] == "[":
88 return self._parse_csi_sequence(text, start + 2)
89 # Check for other escape sequences
90 elif text[start + 1] == "(":
91 # ESC ( - Designate G0 Character Set
92 if start + 2 < len(text):
93 self.charset = text[start + 2]
94 return start + 3
95 return start + 2
96 elif text[start + 1] == ")":
97 # ESC ) - Designate G1 Character Set
98 if start + 2 < len(text):
99 self.graphics_charset = text[start + 2]
100 return start + 3
101 return start + 2
102 elif text[start + 1] == "#":
103 # ESC # - DEC Double-Height/Width Line
104 return start + 3 if start + 2 < len(text) else start + 2
105 elif text[start + 1] == "7":
106 # ESC 7 - Save Cursor
107 self._save_cursor()
108 return start + 2
109 elif text[start + 1] == "8":
110 # ESC 8 - Restore Cursor
111 self._restore_cursor()
112 return start + 2
113 elif text[start + 1] == "=":
114 # ESC = - Application Keypad Mode
115 return start + 2
116 elif text[start + 1] == ">":
117 # ESC > - Normal Keypad Mode
118 return start + 2
119 elif text[start + 1] == "D":
120 # ESC D - Index (IND)
121 self._index()
122 return start + 2
123 elif text[start + 1] == "M":
124 # ESC M - Reverse Index (RI)
125 self._reverse_index()
126 return start + 2
127 elif text[start + 1] == "c":
128 # ESC c - Reset
129 self._reset()
130 return start + 2
131 else:
132 # Simple escape sequence - ESC followed by one character
133 return start + 2
135 def _parse_csi_sequence(self, text: str, start: int) -> int:
136 """
137 Parse CSI (Control Sequence Introducer) sequence.
139 Args:
140 text: Text containing CSI sequence
141 start: Start position after ESC [
143 Returns:
144 Position after the CSI sequence
145 """
146 # Find the end of the sequence (alphabetic final character)
147 i = start
148 while i < len(text) and not text[i].isalpha():
149 i += 1
151 if i >= len(text):
152 return len(text)
154 # Extract parameters
155 params_str = text[start:i]
156 command = text[i]
158 # Parse parameters
159 params = []
160 if params_str:
161 for param in params_str.split(";"):
162 if param.isdigit():
163 params.append(int(param))
164 elif param == "":
165 params.append(0) # Empty parameter defaults to 0
166 else:
167 params.append(0) # Default value for non-numeric
168 else:
169 params = [0] # Default parameter
171 # Handle commands
172 self._handle_csi_command(command, params)
174 return i + 1
176 def _handle_csi_command(self, command: str, params: list) -> None:
177 """
178 Handle CSI command.
180 Args:
181 command: Command character
182 params: List of parameters
183 """
184 try:
185 if command == "H" or command == "f": # Cursor Position
186 # ESC [ r ; c H or ESC [ r ; c f
187 row = params[0] if len(params) > 0 and params[0] > 0 else 1
188 col = params[1] if len(params) > 1 and params[1] > 0 else 1
189 self._move_cursor(row - 1, col - 1) # Convert to 0-based
190 elif command == "A": # Cursor Up
191 rows = params[0] if len(params) > 0 and params[0] > 0 else 1
192 self._move_cursor(max(0, self.current_row - rows), self.current_col)
193 elif command == "B": # Cursor Down
194 rows = params[0] if len(params) > 0 and params[0] > 0 else 1
195 self._move_cursor(
196 min(self.screen_buffer.rows - 1, self.current_row + rows),
197 self.current_col,
198 )
199 elif command == "C": # Cursor Forward
200 cols = params[0] if len(params) > 0 and params[0] > 0 else 1
201 self._move_cursor(
202 self.current_row,
203 min(self.screen_buffer.cols - 1, self.current_col + cols),
204 )
205 elif command == "D": # Cursor Backward
206 cols = params[0] if len(params) > 0 and params[0] > 0 else 1
207 self._move_cursor(self.current_row, max(0, self.current_col - cols))
208 elif command == "J": # Erase in Display
209 param = params[0] if len(params) > 0 else 0
210 self._erase_display(param)
211 elif command == "K": # Erase in Line
212 param = params[0] if len(params) > 0 else 0
213 self._erase_line(param)
214 elif command == "s": # Save Cursor Position
215 self._save_cursor()
216 elif command == "u": # Restore Cursor Position
217 self._restore_cursor()
218 elif command == "m": # Select Graphic Rendition (SGR)
219 self._handle_sgr(params)
220 elif command == "n": # Device Status Report
221 # Ignore for now
222 pass
223 elif command == "h": # Set Mode
224 # Ignore for now
225 pass
226 elif command == "l": # Reset Mode
227 # Ignore for now
228 pass
229 else:
230 logger.debug(f"Unhandled CSI command: {command} with params {params}")
231 except Exception as e:
232 logger.warning(f"Error handling CSI command {command}: {e}")
234 def _handle_sgr(self, params: List[int]) -> None:
235 """
236 Handle Select Graphic Rendition (SGR) commands.
238 Args:
239 params: SGR parameters
240 """
241 # For now, just consume the parameters
242 # In a full implementation, this would handle colors, bold, etc.
243 pass
245 def _write_char(self, char: str) -> None:
246 """
247 Write a character to the current cursor position.
249 Args:
250 char: Character to write
251 """
252 if char == "\n":
253 self.current_row = min(self.screen_buffer.rows - 1, self.current_row + 1)
254 self.current_col = 0
255 elif char == "\r":
256 self.current_col = 0
257 elif char == "\t":
258 # Tab to next 8-character boundary
259 self.current_col = min(
260 self.screen_buffer.cols - 1, ((self.current_col // 8) + 1) * 8
261 )
262 elif char == "\b":
263 # Backspace
264 if self.current_col > 0:
265 self.current_col -= 1
266 elif char == "\x07": # Bell
267 # Ignore bell character
268 pass
269 else:
270 # Regular character
271 if (
272 self.current_row < self.screen_buffer.rows
273 and self.current_col < self.screen_buffer.cols
274 ):
275 # Convert ASCII to EBCDIC for storage in screen buffer
276 try:
277 # Try to use the existing EBCDIC translation utilities
278 from ..emulation.ebcdic import translate_ascii_to_ebcdic
280 ebcdic_bytes = translate_ascii_to_ebcdic(char)
281 if ebcdic_bytes:
282 ebcdic_byte = ebcdic_bytes[0]
283 pos = (
284 self.current_row * self.screen_buffer.cols
285 + self.current_col
286 )
287 if pos < len(self.screen_buffer.buffer):
288 self.screen_buffer.buffer[pos] = ebcdic_byte
289 else:
290 # Fallback to space if conversion fails
291 pos = (
292 self.current_row * self.screen_buffer.cols
293 + self.current_col
294 )
295 if pos < len(self.screen_buffer.buffer):
296 self.screen_buffer.buffer[pos] = 0x40 # Space in EBCDIC
297 except Exception as e:
298 logger.debug(f"Error converting character '{char}' to EBCDIC: {e}")
299 # Store as space if conversion fails
300 pos = self.current_row * self.screen_buffer.cols + self.current_col
301 if pos < len(self.screen_buffer.buffer):
302 self.screen_buffer.buffer[pos] = 0x40 # Space in EBCDIC
304 # Move cursor
305 self.current_col += 1
306 if self.current_col >= self.screen_buffer.cols:
307 self.current_col = 0
308 self.current_row = min(
309 self.screen_buffer.rows - 1, self.current_row + 1
310 )
312 def _move_cursor(self, row: int, col: int) -> None:
313 """
314 Move cursor to specified position.
316 Args:
317 row: Row position (0-based)
318 col: Column position (0-based)
319 """
320 self.current_row = max(0, min(self.screen_buffer.rows - 1, row))
321 self.current_col = max(0, min(self.screen_buffer.cols - 1, col))
323 def _save_cursor(self) -> None:
324 """Save current cursor position."""
325 self.saved_row = self.current_row
326 self.saved_col = self.current_col
328 def _restore_cursor(self) -> None:
329 """Restore saved cursor position."""
330 self.current_row = self.saved_row
331 self.current_col = self.saved_col
333 def _erase_display(self, param: int) -> None:
334 """
335 Erase display.
337 Args:
338 param: Erase parameter
339 0: Clear from cursor to end of screen
340 1: Clear from beginning of screen to cursor
341 2: Clear entire screen
342 """
343 if param == 0:
344 # Clear from cursor to end of screen
345 start_pos = self.current_row * self.screen_buffer.cols + self.current_col
346 for i in range(start_pos, len(self.screen_buffer.buffer)):
347 self.screen_buffer.buffer[i] = 0x40 # Space
348 elif param == 1:
349 # Clear from beginning of screen to cursor
350 end_pos = self.current_row * self.screen_buffer.cols + self.current_col
351 for i in range(0, end_pos + 1):
352 self.screen_buffer.buffer[i] = 0x40 # Space
353 elif param == 2:
354 # Clear entire screen
355 self.screen_buffer.clear()
357 def _erase_line(self, param: int) -> None:
358 """
359 Erase line.
361 Args:
362 param: Erase parameter
363 0: Clear from cursor to end of line
364 1: Clear from beginning of line to cursor
365 2: Clear entire line
366 """
367 if param == 0:
368 # Clear from cursor to end of line
369 start_pos = self.current_row * self.screen_buffer.cols + self.current_col
370 end_pos = (
371 self.current_row * self.screen_buffer.cols + self.screen_buffer.cols
372 )
373 for i in range(start_pos, min(end_pos, len(self.screen_buffer.buffer))):
374 self.screen_buffer.buffer[i] = 0x40 # Space
375 elif param == 1:
376 # Clear from beginning of line to cursor
377 start_pos = self.current_row * self.screen_buffer.cols
378 end_pos = self.current_row * self.screen_buffer.cols + self.current_col
379 for i in range(start_pos, min(end_pos + 1, len(self.screen_buffer.buffer))):
380 self.screen_buffer.buffer[i] = 0x40 # Space
381 elif param == 2:
382 # Clear entire line
383 start_pos = self.current_row * self.screen_buffer.cols
384 end_pos = (
385 self.current_row * self.screen_buffer.cols + self.screen_buffer.cols
386 )
387 for i in range(start_pos, min(end_pos, len(self.screen_buffer.buffer))):
388 self.screen_buffer.buffer[i] = 0x40 # Space
390 def _index(self) -> None:
391 """Index (IND) - Move cursor down one line, scrolling if needed."""
392 if self.current_row < self.screen_buffer.rows - 1:
393 self.current_row += 1
394 # In a full implementation, this would scroll the screen
396 def _reverse_index(self) -> None:
397 """Reverse Index (RI) - Move cursor up one line, scrolling if needed."""
398 if self.current_row > 0:
399 self.current_row -= 1
400 # In a full implementation, this would reverse scroll the screen
402 def _reset(self) -> None:
403 """Reset terminal to initial state."""
404 self.current_row = 0
405 self.current_col = 0
406 self.saved_row = 0
407 self.saved_col = 0
408 self.charset = "B"
409 self.graphics_charset = "0"
410 self.is_alt_charset = False
411 self.screen_buffer.clear()