Coverage for pure3270/emulation/screen_buffer.py: 87%

180 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1"""Screen buffer management for 3270 emulation.""" 

2 

3from typing import List, Tuple, Optional 

4from .ebcdic import EBCDICCodec 

5 

6 

7class Field: 

8 """Represents a 3270 field with content, attributes, and boundaries.""" 

9 

10 def __init__( 

11 self, 

12 start: Tuple[int, int], 

13 end: Tuple[int, int], 

14 protected: bool = False, 

15 numeric: bool = False, 

16 modified: bool = False, 

17 selected: bool = False, 

18 intensity: int = 0, # 0=normal, 1=highlighted, 2=non-display, 3=blink 

19 color: int = 0, # 0=neutral/default, 1=blue, 2=red, 3=pink, etc. 

20 background: int = 0, # 0=neutral/default, 1=blue, 2=red, 3=pink, etc. 

21 validation: int = 0, # 0=no validation, 1=mandatory fill, 2=trigger 

22 outlining: int = 0, # 0=no outline, 1=underscore, 2=rightline, 3=overline 

23 content: Optional[bytes] = None, 

24 ): 

25 """ 

26 Initialize a Field. 

27 

28 :param start: Tuple of (row, col) for field start position. 

29 :param end: Tuple of (row, col) for field end position. 

30 :param protected: Whether the field is protected (non-input). 

31 :param numeric: Whether the field accepts only numeric input. 

32 :param modified: Whether the field has been modified. 

33 :param selected: Whether the field is selected. 

34 :param intensity: Field intensity (0=normal, 1=highlighted, 2=non-display, 3=blink). 

35 :param color: Foreground color (0=default, 1=blue, 2=red, etc.). 

36 :param background: Background color/highlight (0=default, 1=blue, 2=red, etc.). 

37 :param validation: Validation attribute (0=none, 1=mandatory, 2=trigger). 

38 :param outlining: Outlining attribute (0=none, 1=underscore, 2=rightline, 3=overline). 

39 :param content: Initial EBCDIC content bytes. 

40 """ 

41 self.start = start 

42 self.end = end 

43 self.protected = protected 

44 self.numeric = numeric 

45 self.modified = modified 

46 self.selected = selected 

47 self.intensity = intensity 

48 self.color = color 

49 self.background = background 

50 self.validation = validation 

51 self.outlining = outlining 

52 self.content = content or b"" 

53 

54 def get_content(self) -> str: 

55 """Get field content as Unicode string.""" 

56 if not self.content: 

57 return "" 

58 codec = EBCDICCodec() 

59 decoded, _ = codec.decode(self.content) 

60 return decoded 

61 

62 def set_content(self, text: str): 

63 """Set field content from Unicode string.""" 

64 codec = EBCDICCodec() 

65 encoded, _ = codec.encode(text) 

66 self.content = encoded 

67 self.modified = True 

68 

69 def __repr__(self) -> str: 

70 return f"Field(start={self.start}, end={self.end}, protected={self.protected}, intensity={self.intensity})" 

71 

72 

73class ScreenBuffer: 

74 """Manages the 3270 screen buffer, including characters, attributes, and fields.""" 

75 

76 def __init__(self, rows: int = 24, cols: int = 80): 

77 """ 

78 Initialize the ScreenBuffer. 

79 

80 :param rows: Number of rows (default 24). 

81 :param cols: Number of columns (default 80). 

82 """ 

83 self.rows = rows 

84 self.cols = cols 

85 self.size = rows * cols 

86 # EBCDIC character buffer - initialize to spaces 

87 self.buffer = bytearray(b"\x40" * self.size) 

88 # Attributes buffer: 3 bytes per position (protection, foreground, background/highlight) 

89 self.attributes = bytearray(self.size * 3) 

90 # List of fields 

91 self.fields: List[Field] = [] 

92 # Cursor position 

93 self.cursor_row = 0 

94 self.cursor_col = 0 

95 # Default field attributes 

96 self._default_protected = True 

97 self._default_numeric = False 

98 

99 def clear(self): 

100 """Clear the screen buffer and reset fields.""" 

101 self.buffer = bytearray(b"\x40" * self.size) 

102 self.attributes = bytearray(self.size * 3) 

103 self.fields = [] 

104 self.cursor_row = 0 

105 self.cursor_col = 0 

106 

107 def set_position(self, row: int, col: int): 

108 """Set cursor position.""" 

109 self.cursor_row = row 

110 self.cursor_col = col 

111 

112 def get_position(self) -> Tuple[int, int]: 

113 """Get current cursor position.""" 

114 return (self.cursor_row, self.cursor_col) 

115 

116 def write_char( 

117 self, 

118 ebcdic_byte: int, 

119 row: int, 

120 col: int, 

121 protected: bool = False, 

122 circumvent_protection: bool = False, 

123 ): 

124 """ 

125 Write an EBCDIC character to the buffer at position. 

126 

127 :param ebcdic_byte: EBCDIC byte value. 

128 :param row: Row position. 

129 :param col: Column position. 

130 :param protected: Protection attribute to set. 

131 :param circumvent_protection: If True, write even to protected fields. 

132 """ 

133 if 0 <= row < self.rows and 0 <= col < self.cols: 

134 pos = row * self.cols + col 

135 attr_offset = pos * 3 

136 is_protected = bool(self.attributes[attr_offset] & 0x40) # Bit 6: protected 

137 if is_protected and not circumvent_protection: 

138 return # Skip writing to protected field 

139 self.buffer[pos] = ebcdic_byte 

140 # Set protection bit (bit 6) 

141 self.attributes[attr_offset] = (self.attributes[attr_offset] & 0xBF) | ( 

142 0x40 if protected else 0x00 

143 ) 

144 

145 # Update field content and mark as modified if this position belongs to a field 

146 self._update_field_content(row, col, ebcdic_byte) 

147 

148 def _update_field_content(self, row: int, col: int, ebcdic_byte: int): 

149 """ 

150 Update the field content when a character is written to a position. 

151 

152 :param row: Row position. 

153 :param col: Column position. 

154 :param ebcdic_byte: EBCDIC byte value written. 

155 """ 

156 # Find the field that contains this position 

157 for field in self.fields: 

158 start_row, start_col = field.start 

159 end_row, end_col = field.end 

160 

161 # Check if the position is within this field 

162 if start_row <= row <= end_row and ( 

163 start_row != end_row or (start_col <= col <= end_col) 

164 ): 

165 # Position is within this field, mark as modified 

166 field.modified = True 

167 

168 # For now, we'll just mark the field as modified 

169 # A more complete implementation would update the field's content buffer 

170 break 

171 

172 def update_from_stream(self, data: bytes): 

173 """ 

174 Update buffer from a 3270 data stream (basic implementation). 

175 

176 :param data: Raw 3270 data stream bytes. 

177 """ 

178 i = 0 

179 while i < len(data): 

180 order = data[i] 

181 i += 1 

182 if order == 0xF5: # Write 

183 if i < len(data): 

184 i += 1 # skip WCC 

185 continue 

186 elif order == 0x10: # SBA 

187 if i + 1 < len(data): 

188 i += 2 # skip address bytes 

189 self.set_position(0, 0) # Address 0x0000 -> row 0, col 0 

190 continue 

191 elif order in (0x05, 0x0D): # Unknown/EOA 

192 continue 

193 else: 

194 # Treat as data byte 

195 pos = self.cursor_row * self.cols + self.cursor_col 

196 if pos < self.size: 

197 self.buffer[pos] = order 

198 self.cursor_col += 1 

199 if self.cursor_col >= self.cols: 

200 self.cursor_col = 0 

201 self.cursor_row += 1 

202 if self.cursor_row >= self.rows: 

203 self.cursor_row = 0 # wrap around 

204 # Update fields (basic detection) 

205 self._detect_fields() 

206 

207 def _detect_fields(self): 

208 """Detect field boundaries based on attribute changes (simplified).""" 

209 self.fields = [] 

210 in_field = False 

211 start = (0, 0) 

212 for row in range(self.rows): 

213 for col in range(self.cols): 

214 pos = row * self.cols + col 

215 attr_offset = pos * 3 

216 protected = bool( 

217 self.attributes[attr_offset] & 0x40 

218 ) # Bit 6: protected 

219 if not in_field and not protected: 

220 in_field = True 

221 start = (row, col) 

222 elif in_field and protected: 

223 in_field = False 

224 end = (row, col - 1) if col > 0 else (row, self.cols - 1) 

225 # Calculate content from start to end 

226 start_pos = start[0] * self.cols + start[1] 

227 end_pos = row * self.cols + (col - 1) 

228 content = bytes(self.buffer[start_pos : end_pos + 1]) 

229 

230 # Extract extended attributes 

231 intensity = ( 

232 self.attributes[attr_offset + 1] 

233 if attr_offset + 1 < len(self.attributes) 

234 else 0 

235 ) 

236 validation = ( 

237 self.attributes[attr_offset + 2] 

238 if attr_offset + 2 < len(self.attributes) 

239 else 0 

240 ) 

241 

242 # Input fields are not protected (protected=False) 

243 self.fields.append( 

244 Field( 

245 start, 

246 end, 

247 protected=False, 

248 content=content, 

249 intensity=intensity, 

250 validation=validation, 

251 ) 

252 ) 

253 if in_field: 

254 end = (self.rows - 1, self.cols - 1) 

255 # Calculate content from start to end 

256 start_pos = start[0] * self.cols + start[1] 

257 end_pos = end[0] * self.cols + end[1] 

258 content = bytes(self.buffer[start_pos : end_pos + 1]) 

259 # Determine protection status of the final field 

260 end_pos_attr = end_pos * 3 

261 is_protected = bool( 

262 self.attributes[end_pos_attr] & 0x40 

263 ) # Bit 6: protected 

264 

265 # Extract extended attributes 

266 intensity = ( 

267 self.attributes[end_pos_attr + 1] 

268 if end_pos_attr + 1 < len(self.attributes) 

269 else 0 

270 ) 

271 validation = ( 

272 self.attributes[end_pos_attr + 2] 

273 if end_pos_attr + 2 < len(self.attributes) 

274 else 0 

275 ) 

276 

277 self.fields.append( 

278 Field( 

279 start, 

280 end, 

281 protected=is_protected, 

282 content=content, 

283 intensity=intensity, 

284 validation=validation, 

285 ) 

286 ) 

287 

288 def to_text(self) -> str: 

289 """ 

290 Convert screen buffer to Unicode text string. 

291 

292 :return: Multi-line string representation. 

293 """ 

294 codec = EBCDICCodec() 

295 lines = [] 

296 for row in range(self.rows): 

297 line_bytes = bytes(self.buffer[row * self.cols : (row + 1) * self.cols]) 

298 line_text, _ = codec.decode(line_bytes) 

299 lines.append(line_text) 

300 return "\n".join(lines) 

301 

302 def get_field_content(self, field_index: int) -> str: 

303 """ 

304 Get content of a specific field. 

305 

306 :param field_index: Index in fields list. 

307 :return: Unicode string content. 

308 """ 

309 if 0 <= field_index < len(self.fields): 

310 return self.fields[field_index].get_content() 

311 return "" 

312 

313 def read_modified_fields(self) -> List[Tuple[Tuple[int, int], str]]: 

314 """ 

315 Read modified fields (RMF support, basic). 

316 

317 :return: List of (position, content) for modified fields. 

318 """ 

319 modified = [] 

320 for field in self.fields: 

321 if field.modified: 

322 content = field.get_content() 

323 modified.append((field.start, content)) 

324 return modified 

325 

326 def set_modified(self, row: int, col: int, modified: bool = True): 

327 """Set modified flag for position.""" 

328 if 0 <= row < self.rows and 0 <= col < self.cols: 

329 pos = row * self.cols + col 

330 attr_offset = pos * 3 + 2 # Assume byte 2 for modified 

331 self.attributes[attr_offset] = 0x01 if modified else 0x00 

332 

333 def is_position_modified(self, row: int, col: int) -> bool: 

334 """Check if position is modified.""" 

335 if 0 <= row < self.rows and 0 <= col < self.cols: 

336 pos = row * self.cols + col 

337 attr_offset = pos * 3 + 2 

338 return bool(self.attributes[attr_offset]) 

339 return False 

340 

341 def __repr__(self) -> str: 

342 return f"ScreenBuffer({self.rows}x{self.cols}, fields={len(self.fields)})" 

343 

344 def get_field_at_position(self, row: int, col: int) -> Optional[Field]: 

345 """Get the field containing the given position, if any.""" 

346 for field in self.fields: 

347 start_row, start_col = field.start 

348 end_row, end_col = field.end 

349 if start_row <= row <= end_row and start_col <= col <= end_col: 

350 return field 

351 return None 

352 

353 def remove_field(self, field: Field) -> None: 

354 """Remove a field from the fields list and clear its content in the buffer.""" 

355 if field in self.fields: 

356 self.fields.remove(field) 

357 # Clear the buffer content for this field 

358 start_row, start_col = field.start 

359 end_row, end_col = field.end 

360 for r in range(start_row, end_row + 1): 

361 for c in range(start_col, end_col + 1): 

362 if r < self.rows and c < self.cols: 

363 pos = r * self.cols + c 

364 self.buffer[pos] = 0x40 # Space in EBCDIC 

365 # Clear attributes 

366 attr_offset = pos * 3 

367 self.attributes[attr_offset : attr_offset + 3] = b"\x00\x00\x00" 

368 # Re-detect fields to update boundaries 

369 self._detect_fields() 

370 

371 def update_fields(self) -> None: 

372 """Update field detection and attributes.""" 

373 self._detect_fields()