Coverage for pure3270/protocol/vt100_parser.py: 0%

216 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1""" 

2VT100 terminal emulator for pure3270 ASCII mode support. 

3 

4This module provides VT100 escape sequence parsing to support ASCII terminal emulation 

5as a fallback when TN3270 negotiation fails, matching s3270 behavior. 

6""" 

7 

8import re 

9import logging 

10from typing import List, Tuple 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class VT100Parser: 

16 """VT100 escape sequence parser for ASCII terminal emulation.""" 

17 

18 def __init__(self, screen_buffer): 

19 """ 

20 Initialize VT100 parser. 

21 

22 Args: 

23 screen_buffer: ScreenBuffer instance to update with parsed content 

24 """ 

25 self.screen_buffer = screen_buffer 

26 self.current_row = 0 

27 self.current_col = 0 

28 self.saved_row = 0 

29 self.saved_col = 0 

30 self.charset = "B" # Default charset 

31 self.graphics_charset = "0" # Graphics charset 

32 self.is_alt_charset = False # Alternative character set mode 

33 

34 def parse(self, data: bytes) -> None: 

35 """ 

36 Parse VT100 escape sequences and update screen buffer. 

37 

38 Args: 

39 data: Raw ASCII data with VT100 escape sequences 

40 """ 

41 try: 

42 text = data.decode("ascii", errors="ignore") 

43 self._parse_text(text) 

44 except Exception as e: 

45 logger.warning(f"Error parsing VT100 data: {e}") 

46 

47 def _parse_text(self, text: str) -> None: 

48 """ 

49 Parse text with VT100 escape sequences. 

50 

51 Args: 

52 text: ASCII text with escape sequences 

53 """ 

54 # Process text character by character 

55 i = 0 

56 while i < len(text): 

57 if text[i] == "\x1b": # ESC character 

58 # Parse escape sequence 

59 seq_end = self._parse_escape_sequence(text, i) 

60 i = seq_end 

61 elif text[i] == "\x0e": # SO (Shift Out) - Activate alternate charset 

62 self.is_alt_charset = True 

63 i += 1 

64 elif text[i] == "\x0f": # SI (Shift In) - Activate standard charset 

65 self.is_alt_charset = False 

66 i += 1 

67 else: 

68 # Regular character 

69 self._write_char(text[i]) 

70 i += 1 

71 

72 def _parse_escape_sequence(self, text: str, start: int) -> int: 

73 """ 

74 Parse a VT100 escape sequence. 

75 

76 Args: 

77 text: Text containing escape sequence 

78 start: Start position of ESC character 

79 

80 Returns: 

81 Position after the escape sequence 

82 """ 

83 if start + 1 >= len(text): 

84 return start + 1 

85 

86 # Check for CSI (Control Sequence Introducer) - ESC [ 

87 if text[start + 1] == "[": 

88 return self._parse_csi_sequence(text, start + 2) 

89 # Check for other escape sequences 

90 elif text[start + 1] == "(": 

91 # ESC ( - Designate G0 Character Set 

92 if start + 2 < len(text): 

93 self.charset = text[start + 2] 

94 return start + 3 

95 return start + 2 

96 elif text[start + 1] == ")": 

97 # ESC ) - Designate G1 Character Set 

98 if start + 2 < len(text): 

99 self.graphics_charset = text[start + 2] 

100 return start + 3 

101 return start + 2 

102 elif text[start + 1] == "#": 

103 # ESC # - DEC Double-Height/Width Line 

104 return start + 3 if start + 2 < len(text) else start + 2 

105 elif text[start + 1] == "7": 

106 # ESC 7 - Save Cursor 

107 self._save_cursor() 

108 return start + 2 

109 elif text[start + 1] == "8": 

110 # ESC 8 - Restore Cursor 

111 self._restore_cursor() 

112 return start + 2 

113 elif text[start + 1] == "=": 

114 # ESC = - Application Keypad Mode 

115 return start + 2 

116 elif text[start + 1] == ">": 

117 # ESC > - Normal Keypad Mode 

118 return start + 2 

119 elif text[start + 1] == "D": 

120 # ESC D - Index (IND) 

121 self._index() 

122 return start + 2 

123 elif text[start + 1] == "M": 

124 # ESC M - Reverse Index (RI) 

125 self._reverse_index() 

126 return start + 2 

127 elif text[start + 1] == "c": 

128 # ESC c - Reset 

129 self._reset() 

130 return start + 2 

131 else: 

132 # Simple escape sequence - ESC followed by one character 

133 return start + 2 

134 

135 def _parse_csi_sequence(self, text: str, start: int) -> int: 

136 """ 

137 Parse CSI (Control Sequence Introducer) sequence. 

138 

139 Args: 

140 text: Text containing CSI sequence 

141 start: Start position after ESC [ 

142 

143 Returns: 

144 Position after the CSI sequence 

145 """ 

146 # Find the end of the sequence (alphabetic final character) 

147 i = start 

148 while i < len(text) and not text[i].isalpha(): 

149 i += 1 

150 

151 if i >= len(text): 

152 return len(text) 

153 

154 # Extract parameters 

155 params_str = text[start:i] 

156 command = text[i] 

157 

158 # Parse parameters 

159 params = [] 

160 if params_str: 

161 for param in params_str.split(";"): 

162 if param.isdigit(): 

163 params.append(int(param)) 

164 elif param == "": 

165 params.append(0) # Empty parameter defaults to 0 

166 else: 

167 params.append(0) # Default value for non-numeric 

168 else: 

169 params = [0] # Default parameter 

170 

171 # Handle commands 

172 self._handle_csi_command(command, params) 

173 

174 return i + 1 

175 

176 def _handle_csi_command(self, command: str, params: list) -> None: 

177 """ 

178 Handle CSI command. 

179 

180 Args: 

181 command: Command character 

182 params: List of parameters 

183 """ 

184 try: 

185 if command == "H" or command == "f": # Cursor Position 

186 # ESC [ r ; c H or ESC [ r ; c f 

187 row = params[0] if len(params) > 0 and params[0] > 0 else 1 

188 col = params[1] if len(params) > 1 and params[1] > 0 else 1 

189 self._move_cursor(row - 1, col - 1) # Convert to 0-based 

190 elif command == "A": # Cursor Up 

191 rows = params[0] if len(params) > 0 and params[0] > 0 else 1 

192 self._move_cursor(max(0, self.current_row - rows), self.current_col) 

193 elif command == "B": # Cursor Down 

194 rows = params[0] if len(params) > 0 and params[0] > 0 else 1 

195 self._move_cursor( 

196 min(self.screen_buffer.rows - 1, self.current_row + rows), 

197 self.current_col, 

198 ) 

199 elif command == "C": # Cursor Forward 

200 cols = params[0] if len(params) > 0 and params[0] > 0 else 1 

201 self._move_cursor( 

202 self.current_row, 

203 min(self.screen_buffer.cols - 1, self.current_col + cols), 

204 ) 

205 elif command == "D": # Cursor Backward 

206 cols = params[0] if len(params) > 0 and params[0] > 0 else 1 

207 self._move_cursor(self.current_row, max(0, self.current_col - cols)) 

208 elif command == "J": # Erase in Display 

209 param = params[0] if len(params) > 0 else 0 

210 self._erase_display(param) 

211 elif command == "K": # Erase in Line 

212 param = params[0] if len(params) > 0 else 0 

213 self._erase_line(param) 

214 elif command == "s": # Save Cursor Position 

215 self._save_cursor() 

216 elif command == "u": # Restore Cursor Position 

217 self._restore_cursor() 

218 elif command == "m": # Select Graphic Rendition (SGR) 

219 self._handle_sgr(params) 

220 elif command == "n": # Device Status Report 

221 # Ignore for now 

222 pass 

223 elif command == "h": # Set Mode 

224 # Ignore for now 

225 pass 

226 elif command == "l": # Reset Mode 

227 # Ignore for now 

228 pass 

229 else: 

230 logger.debug(f"Unhandled CSI command: {command} with params {params}") 

231 except Exception as e: 

232 logger.warning(f"Error handling CSI command {command}: {e}") 

233 

234 def _handle_sgr(self, params: List[int]) -> None: 

235 """ 

236 Handle Select Graphic Rendition (SGR) commands. 

237 

238 Args: 

239 params: SGR parameters 

240 """ 

241 # For now, just consume the parameters 

242 # In a full implementation, this would handle colors, bold, etc. 

243 pass 

244 

245 def _write_char(self, char: str) -> None: 

246 """ 

247 Write a character to the current cursor position. 

248 

249 Args: 

250 char: Character to write 

251 """ 

252 if char == "\n": 

253 self.current_row = min(self.screen_buffer.rows - 1, self.current_row + 1) 

254 self.current_col = 0 

255 elif char == "\r": 

256 self.current_col = 0 

257 elif char == "\t": 

258 # Tab to next 8-character boundary 

259 self.current_col = min( 

260 self.screen_buffer.cols - 1, ((self.current_col // 8) + 1) * 8 

261 ) 

262 elif char == "\b": 

263 # Backspace 

264 if self.current_col > 0: 

265 self.current_col -= 1 

266 elif char == "\x07": # Bell 

267 # Ignore bell character 

268 pass 

269 else: 

270 # Regular character 

271 if ( 

272 self.current_row < self.screen_buffer.rows 

273 and self.current_col < self.screen_buffer.cols 

274 ): 

275 # Convert ASCII to EBCDIC for storage in screen buffer 

276 try: 

277 # Try to use the existing EBCDIC translation utilities 

278 from ..emulation.ebcdic import translate_ascii_to_ebcdic 

279 

280 ebcdic_bytes = translate_ascii_to_ebcdic(char) 

281 if ebcdic_bytes: 

282 ebcdic_byte = ebcdic_bytes[0] 

283 pos = ( 

284 self.current_row * self.screen_buffer.cols 

285 + self.current_col 

286 ) 

287 if pos < len(self.screen_buffer.buffer): 

288 self.screen_buffer.buffer[pos] = ebcdic_byte 

289 else: 

290 # Fallback to space if conversion fails 

291 pos = ( 

292 self.current_row * self.screen_buffer.cols 

293 + self.current_col 

294 ) 

295 if pos < len(self.screen_buffer.buffer): 

296 self.screen_buffer.buffer[pos] = 0x40 # Space in EBCDIC 

297 except Exception as e: 

298 logger.debug(f"Error converting character '{char}' to EBCDIC: {e}") 

299 # Store as space if conversion fails 

300 pos = self.current_row * self.screen_buffer.cols + self.current_col 

301 if pos < len(self.screen_buffer.buffer): 

302 self.screen_buffer.buffer[pos] = 0x40 # Space in EBCDIC 

303 

304 # Move cursor 

305 self.current_col += 1 

306 if self.current_col >= self.screen_buffer.cols: 

307 self.current_col = 0 

308 self.current_row = min( 

309 self.screen_buffer.rows - 1, self.current_row + 1 

310 ) 

311 

312 def _move_cursor(self, row: int, col: int) -> None: 

313 """ 

314 Move cursor to specified position. 

315 

316 Args: 

317 row: Row position (0-based) 

318 col: Column position (0-based) 

319 """ 

320 self.current_row = max(0, min(self.screen_buffer.rows - 1, row)) 

321 self.current_col = max(0, min(self.screen_buffer.cols - 1, col)) 

322 

323 def _save_cursor(self) -> None: 

324 """Save current cursor position.""" 

325 self.saved_row = self.current_row 

326 self.saved_col = self.current_col 

327 

328 def _restore_cursor(self) -> None: 

329 """Restore saved cursor position.""" 

330 self.current_row = self.saved_row 

331 self.current_col = self.saved_col 

332 

333 def _erase_display(self, param: int) -> None: 

334 """ 

335 Erase display. 

336 

337 Args: 

338 param: Erase parameter 

339 0: Clear from cursor to end of screen 

340 1: Clear from beginning of screen to cursor 

341 2: Clear entire screen 

342 """ 

343 if param == 0: 

344 # Clear from cursor to end of screen 

345 start_pos = self.current_row * self.screen_buffer.cols + self.current_col 

346 for i in range(start_pos, len(self.screen_buffer.buffer)): 

347 self.screen_buffer.buffer[i] = 0x40 # Space 

348 elif param == 1: 

349 # Clear from beginning of screen to cursor 

350 end_pos = self.current_row * self.screen_buffer.cols + self.current_col 

351 for i in range(0, end_pos + 1): 

352 self.screen_buffer.buffer[i] = 0x40 # Space 

353 elif param == 2: 

354 # Clear entire screen 

355 self.screen_buffer.clear() 

356 

357 def _erase_line(self, param: int) -> None: 

358 """ 

359 Erase line. 

360 

361 Args: 

362 param: Erase parameter 

363 0: Clear from cursor to end of line 

364 1: Clear from beginning of line to cursor 

365 2: Clear entire line 

366 """ 

367 if param == 0: 

368 # Clear from cursor to end of line 

369 start_pos = self.current_row * self.screen_buffer.cols + self.current_col 

370 end_pos = ( 

371 self.current_row * self.screen_buffer.cols + self.screen_buffer.cols 

372 ) 

373 for i in range(start_pos, min(end_pos, len(self.screen_buffer.buffer))): 

374 self.screen_buffer.buffer[i] = 0x40 # Space 

375 elif param == 1: 

376 # Clear from beginning of line to cursor 

377 start_pos = self.current_row * self.screen_buffer.cols 

378 end_pos = self.current_row * self.screen_buffer.cols + self.current_col 

379 for i in range(start_pos, min(end_pos + 1, len(self.screen_buffer.buffer))): 

380 self.screen_buffer.buffer[i] = 0x40 # Space 

381 elif param == 2: 

382 # Clear entire line 

383 start_pos = self.current_row * self.screen_buffer.cols 

384 end_pos = ( 

385 self.current_row * self.screen_buffer.cols + self.screen_buffer.cols 

386 ) 

387 for i in range(start_pos, min(end_pos, len(self.screen_buffer.buffer))): 

388 self.screen_buffer.buffer[i] = 0x40 # Space 

389 

390 def _index(self) -> None: 

391 """Index (IND) - Move cursor down one line, scrolling if needed.""" 

392 if self.current_row < self.screen_buffer.rows - 1: 

393 self.current_row += 1 

394 # In a full implementation, this would scroll the screen 

395 

396 def _reverse_index(self) -> None: 

397 """Reverse Index (RI) - Move cursor up one line, scrolling if needed.""" 

398 if self.current_row > 0: 

399 self.current_row -= 1 

400 # In a full implementation, this would reverse scroll the screen 

401 

402 def _reset(self) -> None: 

403 """Reset terminal to initial state.""" 

404 self.current_row = 0 

405 self.current_col = 0 

406 self.saved_row = 0 

407 self.saved_col = 0 

408 self.charset = "B" 

409 self.graphics_charset = "0" 

410 self.is_alt_charset = False 

411 self.screen_buffer.clear()