Coverage for pure3270/protocol/printer.py: 92%
130 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""Printer session support for TN3270E protocol."""
3import asyncio
4import logging
5import time
6from typing import Optional, List, Tuple
7from .tn3270e_header import TN3270EHeader
8from .utils import (
9 SCS_DATA,
10 TN3270E_RSF_NO_RESPONSE,
11 TN3270E_RSF_ERROR_RESPONSE,
12 TN3270E_RSF_ALWAYS_RESPONSE,
13 TN3270E_RSF_POSITIVE_RESPONSE,
14 TN3270E_RSF_NEGATIVE_RESPONSE,
15 PRINT_EOJ,
16 TN3270E_DEVICE_TYPE,
17 TN3270E_FUNCTIONS,
18 TN3270E_IS,
19 TN3270E_REQUEST,
20 TN3270E_SEND,
21 TN3270E_BIND_IMAGE,
22 TN3270E_DATA_STREAM_CTL,
23 TN3270E_RESPONSES,
24 TN3270E_SCS_CTL_CODES,
25 TN3270E_SYSREQ,
26 TN3270E_IBM_DYNAMIC,
27)
28from .exceptions import ProtocolError, ParseError
30logger = logging.getLogger(__name__)
33class PrinterJob:
34 """Represents a printer job in a TN3270E printer session."""
36 def __init__(self, job_id: str = ""):
37 """Initialize a printer job."""
38 if not job_id:
39 # Generate a default ID if none provided
40 job_id = f"job_{int(time.time() * 1000) % 100000}"
41 self.job_id = job_id
42 self.data = bytearray()
43 self.status = "active" # active, completed, error
44 self.start_time = time.time()
45 self.end_time: Optional[float] = None
46 self.pages: List[bytes] = []
48 def add_data(self, data: bytes) -> None:
49 """Add SCS character data to the job."""
50 self.data.extend(data)
51 logger.debug(f"Added {len(data)} bytes to printer job {self.job_id}")
53 def complete_job(self) -> None:
54 """Mark the job as completed."""
55 self.status = "completed"
56 self.end_time = time.time()
57 logger.info(f"Printer job {self.job_id} completed")
59 def set_error(self, error_msg: str) -> None:
60 """Mark the job as having an error."""
61 self.status = "error"
62 self.end_time = time.time()
63 logger.error(f"Printer job {self.job_id} error: {error_msg}")
65 def get_duration(self) -> float:
66 """Get the job duration in seconds."""
67 if self.end_time is not None:
68 return self.end_time - self.start_time
69 return time.time() - self.start_time
71 def get_page_count(self) -> int:
72 """Get the number of pages in the job."""
73 # Simple page counting based on form feeds (0x0C)
74 page_count = 1 # At least one page
75 for byte in self.data:
76 if byte == 0x0C: # Form feed
77 page_count += 1
78 return page_count
80 def get_data_size(self) -> int:
81 """Get the size of the job data in bytes."""
82 return len(self.data)
84 def __repr__(self) -> str:
85 """String representation of the printer job."""
86 return (
87 f"PrinterJob(id='{self.job_id}', status='{self.status}', "
88 f"pages={self.get_page_count()}, size={self.get_data_size()} bytes, "
89 f"duration={self.get_duration():.2f}s)"
90 )
93class PrinterSession:
94 """TN3270E printer session handler."""
96 def __init__(self):
97 """Initialize the printer session."""
98 self.is_active = False
99 self.current_job: Optional[PrinterJob] = None
100 self.completed_jobs: List[PrinterJob] = []
101 self.sequence_number = 0
102 self.max_jobs = 100 # Limit to prevent memory issues
103 self.job_counter = 0
105 def activate(self) -> None:
106 """Activate the printer session."""
107 self.is_active = True
108 logger.info("Printer session activated")
110 def deactivate(self) -> None:
111 """Deactivate the printer session."""
112 if self.current_job:
113 self.current_job.set_error("Session deactivated")
114 self._finish_current_job()
115 self.is_active = False
116 logger.info("Printer session deactivated")
118 def start_new_job(self, job_id: str = "") -> PrinterJob:
119 """Start a new printer job."""
120 if not self.is_active:
121 raise ProtocolError("Printer session not active")
123 # Finish any existing job
124 if self.current_job:
125 self.current_job.set_error("New job started before completion")
126 self._finish_current_job()
128 # Create new job
129 if not job_id:
130 self.job_counter += 1
131 job_id = f"job_{self.job_counter}"
133 self.current_job = PrinterJob(job_id)
134 logger.info(f"Started new printer job: {job_id}")
135 return self.current_job
137 def add_scs_data(self, data: bytes) -> None:
138 """Add SCS character data to the current job."""
139 if not self.is_active:
140 raise ProtocolError("Printer session not active")
142 if not self.current_job:
143 self.start_new_job()
145 if self.current_job:
146 self.current_job.add_data(data)
148 def handle_print_eoj(self) -> None:
149 """Handle PRINT-EOJ (End of Job) command."""
150 if not self.is_active:
151 raise ProtocolError("Printer session not active")
153 if self.current_job:
154 self.current_job.complete_job()
155 self._finish_current_job()
156 else:
157 logger.warning("PRINT-EOJ received but no active job")
159 def _finish_current_job(self) -> None:
160 """Finish the current job and add to completed jobs."""
161 if self.current_job:
162 # Add to completed jobs
163 self.completed_jobs.append(self.current_job)
165 # Limit the number of stored jobs
166 if len(self.completed_jobs) > self.max_jobs:
167 # Remove oldest jobs
168 self.completed_jobs = self.completed_jobs[-self.max_jobs :]
170 # Clear current job
171 self.current_job = None
172 logger.info("Current printer job finished and stored")
174 def get_current_job(self) -> Optional[PrinterJob]:
175 """Get the current active job."""
176 return self.current_job
178 def get_completed_jobs(self) -> List[PrinterJob]:
179 """Get the list of completed jobs."""
180 return self.completed_jobs.copy()
182 def get_job_statistics(self) -> dict:
183 """Get printer job statistics."""
184 active_job = 1 if self.current_job else 0
185 completed_count = len(self.completed_jobs)
186 total_pages = sum(job.get_page_count() for job in self.completed_jobs)
187 total_bytes = sum(job.get_data_size() for job in self.completed_jobs)
189 return {
190 "active_jobs": active_job,
191 "completed_jobs": completed_count,
192 "total_pages": total_pages,
193 "total_bytes": total_bytes,
194 "average_pages_per_job": total_pages / max(completed_count, 1),
195 "average_bytes_per_job": total_bytes / max(completed_count, 1),
196 }
198 def clear_completed_jobs(self) -> None:
199 """Clear the list of completed jobs."""
200 self.completed_jobs.clear()
201 logger.info("Cleared completed printer jobs")
203 def handle_scs_control_code(self, scs_code: int) -> None:
204 """Handle SCS control codes."""
205 if not self.is_active:
206 raise ProtocolError("Printer session not active")
208 if scs_code == PRINT_EOJ:
209 self.handle_print_eoj()
210 logger.debug("Handled SCS PRINT-EOJ control code")
211 else:
212 logger.warning(f"Unhandled SCS control code: 0x{scs_code:02x}")
214 def process_tn3270e_message(self, header: TN3270EHeader, data: bytes) -> None:
215 """Process a TN3270E message for printer session."""
216 if not self.is_active:
217 raise ProtocolError("Printer session not active")
219 if header.data_type == SCS_DATA or header.data_type == TN3270E_SCS_CTL_CODES:
220 # Add SCS data to current job
221 self.add_scs_data(data)
222 logger.debug(f"Processed {len(data)} bytes of SCS data")
223 elif header.data_type == TN3270E_RESPONSES:
224 # Handle response messages
225 if header.response_flag == TN3270E_RSF_ERROR_RESPONSE:
226 logger.error(
227 f"Received error response for sequence {header.seq_number}"
228 )
229 if self.current_job:
230 self.current_job.set_error(
231 f"Error response received for sequence {header.seq_number}"
232 )
233 elif header.response_flag == TN3270E_RSF_NEGATIVE_RESPONSE:
234 logger.warning(
235 f"Received negative response for sequence {header.seq_number}"
236 )
237 else:
238 logger.warning(
239 f"Unhandled TN3270E data type: {header.get_data_type_name()}"
240 )
242 def __repr__(self) -> str:
243 """String representation of the printer session."""
244 stats = self.get_job_statistics()
245 return (
246 f"PrinterSession(active={self.is_active}, "
247 f"current_job={'Yes' if self.current_job else 'No'}, "
248 f"completed_jobs={stats['completed_jobs']})"
249 )