Coverage for pure3270/emulation/screen_buffer.py: 87%
180 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""Screen buffer management for 3270 emulation."""
3from typing import List, Tuple, Optional
4from .ebcdic import EBCDICCodec
7class Field:
8 """Represents a 3270 field with content, attributes, and boundaries."""
10 def __init__(
11 self,
12 start: Tuple[int, int],
13 end: Tuple[int, int],
14 protected: bool = False,
15 numeric: bool = False,
16 modified: bool = False,
17 selected: bool = False,
18 intensity: int = 0, # 0=normal, 1=highlighted, 2=non-display, 3=blink
19 color: int = 0, # 0=neutral/default, 1=blue, 2=red, 3=pink, etc.
20 background: int = 0, # 0=neutral/default, 1=blue, 2=red, 3=pink, etc.
21 validation: int = 0, # 0=no validation, 1=mandatory fill, 2=trigger
22 outlining: int = 0, # 0=no outline, 1=underscore, 2=rightline, 3=overline
23 content: Optional[bytes] = None,
24 ):
25 """
26 Initialize a Field.
28 :param start: Tuple of (row, col) for field start position.
29 :param end: Tuple of (row, col) for field end position.
30 :param protected: Whether the field is protected (non-input).
31 :param numeric: Whether the field accepts only numeric input.
32 :param modified: Whether the field has been modified.
33 :param selected: Whether the field is selected.
34 :param intensity: Field intensity (0=normal, 1=highlighted, 2=non-display, 3=blink).
35 :param color: Foreground color (0=default, 1=blue, 2=red, etc.).
36 :param background: Background color/highlight (0=default, 1=blue, 2=red, etc.).
37 :param validation: Validation attribute (0=none, 1=mandatory, 2=trigger).
38 :param outlining: Outlining attribute (0=none, 1=underscore, 2=rightline, 3=overline).
39 :param content: Initial EBCDIC content bytes.
40 """
41 self.start = start
42 self.end = end
43 self.protected = protected
44 self.numeric = numeric
45 self.modified = modified
46 self.selected = selected
47 self.intensity = intensity
48 self.color = color
49 self.background = background
50 self.validation = validation
51 self.outlining = outlining
52 self.content = content or b""
54 def get_content(self) -> str:
55 """Get field content as Unicode string."""
56 if not self.content:
57 return ""
58 codec = EBCDICCodec()
59 decoded, _ = codec.decode(self.content)
60 return decoded
62 def set_content(self, text: str):
63 """Set field content from Unicode string."""
64 codec = EBCDICCodec()
65 encoded, _ = codec.encode(text)
66 self.content = encoded
67 self.modified = True
69 def __repr__(self) -> str:
70 return f"Field(start={self.start}, end={self.end}, protected={self.protected}, intensity={self.intensity})"
73class ScreenBuffer:
74 """Manages the 3270 screen buffer, including characters, attributes, and fields."""
76 def __init__(self, rows: int = 24, cols: int = 80):
77 """
78 Initialize the ScreenBuffer.
80 :param rows: Number of rows (default 24).
81 :param cols: Number of columns (default 80).
82 """
83 self.rows = rows
84 self.cols = cols
85 self.size = rows * cols
86 # EBCDIC character buffer - initialize to spaces
87 self.buffer = bytearray(b"\x40" * self.size)
88 # Attributes buffer: 3 bytes per position (protection, foreground, background/highlight)
89 self.attributes = bytearray(self.size * 3)
90 # List of fields
91 self.fields: List[Field] = []
92 # Cursor position
93 self.cursor_row = 0
94 self.cursor_col = 0
95 # Default field attributes
96 self._default_protected = True
97 self._default_numeric = False
99 def clear(self):
100 """Clear the screen buffer and reset fields."""
101 self.buffer = bytearray(b"\x40" * self.size)
102 self.attributes = bytearray(self.size * 3)
103 self.fields = []
104 self.cursor_row = 0
105 self.cursor_col = 0
107 def set_position(self, row: int, col: int):
108 """Set cursor position."""
109 self.cursor_row = row
110 self.cursor_col = col
112 def get_position(self) -> Tuple[int, int]:
113 """Get current cursor position."""
114 return (self.cursor_row, self.cursor_col)
116 def write_char(
117 self,
118 ebcdic_byte: int,
119 row: int,
120 col: int,
121 protected: bool = False,
122 circumvent_protection: bool = False,
123 ):
124 """
125 Write an EBCDIC character to the buffer at position.
127 :param ebcdic_byte: EBCDIC byte value.
128 :param row: Row position.
129 :param col: Column position.
130 :param protected: Protection attribute to set.
131 :param circumvent_protection: If True, write even to protected fields.
132 """
133 if 0 <= row < self.rows and 0 <= col < self.cols:
134 pos = row * self.cols + col
135 attr_offset = pos * 3
136 is_protected = bool(self.attributes[attr_offset] & 0x40) # Bit 6: protected
137 if is_protected and not circumvent_protection:
138 return # Skip writing to protected field
139 self.buffer[pos] = ebcdic_byte
140 # Set protection bit (bit 6)
141 self.attributes[attr_offset] = (self.attributes[attr_offset] & 0xBF) | (
142 0x40 if protected else 0x00
143 )
145 # Update field content and mark as modified if this position belongs to a field
146 self._update_field_content(row, col, ebcdic_byte)
148 def _update_field_content(self, row: int, col: int, ebcdic_byte: int):
149 """
150 Update the field content when a character is written to a position.
152 :param row: Row position.
153 :param col: Column position.
154 :param ebcdic_byte: EBCDIC byte value written.
155 """
156 # Find the field that contains this position
157 for field in self.fields:
158 start_row, start_col = field.start
159 end_row, end_col = field.end
161 # Check if the position is within this field
162 if start_row <= row <= end_row and (
163 start_row != end_row or (start_col <= col <= end_col)
164 ):
165 # Position is within this field, mark as modified
166 field.modified = True
168 # For now, we'll just mark the field as modified
169 # A more complete implementation would update the field's content buffer
170 break
172 def update_from_stream(self, data: bytes):
173 """
174 Update buffer from a 3270 data stream (basic implementation).
176 :param data: Raw 3270 data stream bytes.
177 """
178 i = 0
179 while i < len(data):
180 order = data[i]
181 i += 1
182 if order == 0xF5: # Write
183 if i < len(data):
184 i += 1 # skip WCC
185 continue
186 elif order == 0x10: # SBA
187 if i + 1 < len(data):
188 i += 2 # skip address bytes
189 self.set_position(0, 0) # Address 0x0000 -> row 0, col 0
190 continue
191 elif order in (0x05, 0x0D): # Unknown/EOA
192 continue
193 else:
194 # Treat as data byte
195 pos = self.cursor_row * self.cols + self.cursor_col
196 if pos < self.size:
197 self.buffer[pos] = order
198 self.cursor_col += 1
199 if self.cursor_col >= self.cols:
200 self.cursor_col = 0
201 self.cursor_row += 1
202 if self.cursor_row >= self.rows:
203 self.cursor_row = 0 # wrap around
204 # Update fields (basic detection)
205 self._detect_fields()
207 def _detect_fields(self):
208 """Detect field boundaries based on attribute changes (simplified)."""
209 self.fields = []
210 in_field = False
211 start = (0, 0)
212 for row in range(self.rows):
213 for col in range(self.cols):
214 pos = row * self.cols + col
215 attr_offset = pos * 3
216 protected = bool(
217 self.attributes[attr_offset] & 0x40
218 ) # Bit 6: protected
219 if not in_field and not protected:
220 in_field = True
221 start = (row, col)
222 elif in_field and protected:
223 in_field = False
224 end = (row, col - 1) if col > 0 else (row, self.cols - 1)
225 # Calculate content from start to end
226 start_pos = start[0] * self.cols + start[1]
227 end_pos = row * self.cols + (col - 1)
228 content = bytes(self.buffer[start_pos : end_pos + 1])
230 # Extract extended attributes
231 intensity = (
232 self.attributes[attr_offset + 1]
233 if attr_offset + 1 < len(self.attributes)
234 else 0
235 )
236 validation = (
237 self.attributes[attr_offset + 2]
238 if attr_offset + 2 < len(self.attributes)
239 else 0
240 )
242 # Input fields are not protected (protected=False)
243 self.fields.append(
244 Field(
245 start,
246 end,
247 protected=False,
248 content=content,
249 intensity=intensity,
250 validation=validation,
251 )
252 )
253 if in_field:
254 end = (self.rows - 1, self.cols - 1)
255 # Calculate content from start to end
256 start_pos = start[0] * self.cols + start[1]
257 end_pos = end[0] * self.cols + end[1]
258 content = bytes(self.buffer[start_pos : end_pos + 1])
259 # Determine protection status of the final field
260 end_pos_attr = end_pos * 3
261 is_protected = bool(
262 self.attributes[end_pos_attr] & 0x40
263 ) # Bit 6: protected
265 # Extract extended attributes
266 intensity = (
267 self.attributes[end_pos_attr + 1]
268 if end_pos_attr + 1 < len(self.attributes)
269 else 0
270 )
271 validation = (
272 self.attributes[end_pos_attr + 2]
273 if end_pos_attr + 2 < len(self.attributes)
274 else 0
275 )
277 self.fields.append(
278 Field(
279 start,
280 end,
281 protected=is_protected,
282 content=content,
283 intensity=intensity,
284 validation=validation,
285 )
286 )
288 def to_text(self) -> str:
289 """
290 Convert screen buffer to Unicode text string.
292 :return: Multi-line string representation.
293 """
294 codec = EBCDICCodec()
295 lines = []
296 for row in range(self.rows):
297 line_bytes = bytes(self.buffer[row * self.cols : (row + 1) * self.cols])
298 line_text, _ = codec.decode(line_bytes)
299 lines.append(line_text)
300 return "\n".join(lines)
302 def get_field_content(self, field_index: int) -> str:
303 """
304 Get content of a specific field.
306 :param field_index: Index in fields list.
307 :return: Unicode string content.
308 """
309 if 0 <= field_index < len(self.fields):
310 return self.fields[field_index].get_content()
311 return ""
313 def read_modified_fields(self) -> List[Tuple[Tuple[int, int], str]]:
314 """
315 Read modified fields (RMF support, basic).
317 :return: List of (position, content) for modified fields.
318 """
319 modified = []
320 for field in self.fields:
321 if field.modified:
322 content = field.get_content()
323 modified.append((field.start, content))
324 return modified
326 def set_modified(self, row: int, col: int, modified: bool = True):
327 """Set modified flag for position."""
328 if 0 <= row < self.rows and 0 <= col < self.cols:
329 pos = row * self.cols + col
330 attr_offset = pos * 3 + 2 # Assume byte 2 for modified
331 self.attributes[attr_offset] = 0x01 if modified else 0x00
333 def is_position_modified(self, row: int, col: int) -> bool:
334 """Check if position is modified."""
335 if 0 <= row < self.rows and 0 <= col < self.cols:
336 pos = row * self.cols + col
337 attr_offset = pos * 3 + 2
338 return bool(self.attributes[attr_offset])
339 return False
341 def __repr__(self) -> str:
342 return f"ScreenBuffer({self.rows}x{self.cols}, fields={len(self.fields)})"
344 def get_field_at_position(self, row: int, col: int) -> Optional[Field]:
345 """Get the field containing the given position, if any."""
346 for field in self.fields:
347 start_row, start_col = field.start
348 end_row, end_col = field.end
349 if start_row <= row <= end_row and start_col <= col <= end_col:
350 return field
351 return None
353 def remove_field(self, field: Field) -> None:
354 """Remove a field from the fields list and clear its content in the buffer."""
355 if field in self.fields:
356 self.fields.remove(field)
357 # Clear the buffer content for this field
358 start_row, start_col = field.start
359 end_row, end_col = field.end
360 for r in range(start_row, end_row + 1):
361 for c in range(start_col, end_col + 1):
362 if r < self.rows and c < self.cols:
363 pos = r * self.cols + c
364 self.buffer[pos] = 0x40 # Space in EBCDIC
365 # Clear attributes
366 attr_offset = pos * 3
367 self.attributes[attr_offset : attr_offset + 3] = b"\x00\x00\x00"
368 # Re-detect fields to update boundaries
369 self._detect_fields()
371 def update_fields(self) -> None:
372 """Update field detection and attributes."""
373 self._detect_fields()