Coverage for pure3270/protocol/printer.py: 92%

130 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1"""Printer session support for TN3270E protocol.""" 

2 

3import asyncio 

4import logging 

5import time 

6from typing import Optional, List, Tuple 

7from .tn3270e_header import TN3270EHeader 

8from .utils import ( 

9 SCS_DATA, 

10 TN3270E_RSF_NO_RESPONSE, 

11 TN3270E_RSF_ERROR_RESPONSE, 

12 TN3270E_RSF_ALWAYS_RESPONSE, 

13 TN3270E_RSF_POSITIVE_RESPONSE, 

14 TN3270E_RSF_NEGATIVE_RESPONSE, 

15 PRINT_EOJ, 

16 TN3270E_DEVICE_TYPE, 

17 TN3270E_FUNCTIONS, 

18 TN3270E_IS, 

19 TN3270E_REQUEST, 

20 TN3270E_SEND, 

21 TN3270E_BIND_IMAGE, 

22 TN3270E_DATA_STREAM_CTL, 

23 TN3270E_RESPONSES, 

24 TN3270E_SCS_CTL_CODES, 

25 TN3270E_SYSREQ, 

26 TN3270E_IBM_DYNAMIC, 

27) 

28from .exceptions import ProtocolError, ParseError 

29 

30logger = logging.getLogger(__name__) 

31 

32 

33class PrinterJob: 

34 """Represents a printer job in a TN3270E printer session.""" 

35 

36 def __init__(self, job_id: str = ""): 

37 """Initialize a printer job.""" 

38 if not job_id: 

39 # Generate a default ID if none provided 

40 job_id = f"job_{int(time.time() * 1000) % 100000}" 

41 self.job_id = job_id 

42 self.data = bytearray() 

43 self.status = "active" # active, completed, error 

44 self.start_time = time.time() 

45 self.end_time: Optional[float] = None 

46 self.pages: List[bytes] = [] 

47 

48 def add_data(self, data: bytes) -> None: 

49 """Add SCS character data to the job.""" 

50 self.data.extend(data) 

51 logger.debug(f"Added {len(data)} bytes to printer job {self.job_id}") 

52 

53 def complete_job(self) -> None: 

54 """Mark the job as completed.""" 

55 self.status = "completed" 

56 self.end_time = time.time() 

57 logger.info(f"Printer job {self.job_id} completed") 

58 

59 def set_error(self, error_msg: str) -> None: 

60 """Mark the job as having an error.""" 

61 self.status = "error" 

62 self.end_time = time.time() 

63 logger.error(f"Printer job {self.job_id} error: {error_msg}") 

64 

65 def get_duration(self) -> float: 

66 """Get the job duration in seconds.""" 

67 if self.end_time is not None: 

68 return self.end_time - self.start_time 

69 return time.time() - self.start_time 

70 

71 def get_page_count(self) -> int: 

72 """Get the number of pages in the job.""" 

73 # Simple page counting based on form feeds (0x0C) 

74 page_count = 1 # At least one page 

75 for byte in self.data: 

76 if byte == 0x0C: # Form feed 

77 page_count += 1 

78 return page_count 

79 

80 def get_data_size(self) -> int: 

81 """Get the size of the job data in bytes.""" 

82 return len(self.data) 

83 

84 def __repr__(self) -> str: 

85 """String representation of the printer job.""" 

86 return ( 

87 f"PrinterJob(id='{self.job_id}', status='{self.status}', " 

88 f"pages={self.get_page_count()}, size={self.get_data_size()} bytes, " 

89 f"duration={self.get_duration():.2f}s)" 

90 ) 

91 

92 

93class PrinterSession: 

94 """TN3270E printer session handler.""" 

95 

96 def __init__(self): 

97 """Initialize the printer session.""" 

98 self.is_active = False 

99 self.current_job: Optional[PrinterJob] = None 

100 self.completed_jobs: List[PrinterJob] = [] 

101 self.sequence_number = 0 

102 self.max_jobs = 100 # Limit to prevent memory issues 

103 self.job_counter = 0 

104 

105 def activate(self) -> None: 

106 """Activate the printer session.""" 

107 self.is_active = True 

108 logger.info("Printer session activated") 

109 

110 def deactivate(self) -> None: 

111 """Deactivate the printer session.""" 

112 if self.current_job: 

113 self.current_job.set_error("Session deactivated") 

114 self._finish_current_job() 

115 self.is_active = False 

116 logger.info("Printer session deactivated") 

117 

118 def start_new_job(self, job_id: str = "") -> PrinterJob: 

119 """Start a new printer job.""" 

120 if not self.is_active: 

121 raise ProtocolError("Printer session not active") 

122 

123 # Finish any existing job 

124 if self.current_job: 

125 self.current_job.set_error("New job started before completion") 

126 self._finish_current_job() 

127 

128 # Create new job 

129 if not job_id: 

130 self.job_counter += 1 

131 job_id = f"job_{self.job_counter}" 

132 

133 self.current_job = PrinterJob(job_id) 

134 logger.info(f"Started new printer job: {job_id}") 

135 return self.current_job 

136 

137 def add_scs_data(self, data: bytes) -> None: 

138 """Add SCS character data to the current job.""" 

139 if not self.is_active: 

140 raise ProtocolError("Printer session not active") 

141 

142 if not self.current_job: 

143 self.start_new_job() 

144 

145 if self.current_job: 

146 self.current_job.add_data(data) 

147 

148 def handle_print_eoj(self) -> None: 

149 """Handle PRINT-EOJ (End of Job) command.""" 

150 if not self.is_active: 

151 raise ProtocolError("Printer session not active") 

152 

153 if self.current_job: 

154 self.current_job.complete_job() 

155 self._finish_current_job() 

156 else: 

157 logger.warning("PRINT-EOJ received but no active job") 

158 

159 def _finish_current_job(self) -> None: 

160 """Finish the current job and add to completed jobs.""" 

161 if self.current_job: 

162 # Add to completed jobs 

163 self.completed_jobs.append(self.current_job) 

164 

165 # Limit the number of stored jobs 

166 if len(self.completed_jobs) > self.max_jobs: 

167 # Remove oldest jobs 

168 self.completed_jobs = self.completed_jobs[-self.max_jobs :] 

169 

170 # Clear current job 

171 self.current_job = None 

172 logger.info("Current printer job finished and stored") 

173 

174 def get_current_job(self) -> Optional[PrinterJob]: 

175 """Get the current active job.""" 

176 return self.current_job 

177 

178 def get_completed_jobs(self) -> List[PrinterJob]: 

179 """Get the list of completed jobs.""" 

180 return self.completed_jobs.copy() 

181 

182 def get_job_statistics(self) -> dict: 

183 """Get printer job statistics.""" 

184 active_job = 1 if self.current_job else 0 

185 completed_count = len(self.completed_jobs) 

186 total_pages = sum(job.get_page_count() for job in self.completed_jobs) 

187 total_bytes = sum(job.get_data_size() for job in self.completed_jobs) 

188 

189 return { 

190 "active_jobs": active_job, 

191 "completed_jobs": completed_count, 

192 "total_pages": total_pages, 

193 "total_bytes": total_bytes, 

194 "average_pages_per_job": total_pages / max(completed_count, 1), 

195 "average_bytes_per_job": total_bytes / max(completed_count, 1), 

196 } 

197 

198 def clear_completed_jobs(self) -> None: 

199 """Clear the list of completed jobs.""" 

200 self.completed_jobs.clear() 

201 logger.info("Cleared completed printer jobs") 

202 

203 def handle_scs_control_code(self, scs_code: int) -> None: 

204 """Handle SCS control codes.""" 

205 if not self.is_active: 

206 raise ProtocolError("Printer session not active") 

207 

208 if scs_code == PRINT_EOJ: 

209 self.handle_print_eoj() 

210 logger.debug("Handled SCS PRINT-EOJ control code") 

211 else: 

212 logger.warning(f"Unhandled SCS control code: 0x{scs_code:02x}") 

213 

214 def process_tn3270e_message(self, header: TN3270EHeader, data: bytes) -> None: 

215 """Process a TN3270E message for printer session.""" 

216 if not self.is_active: 

217 raise ProtocolError("Printer session not active") 

218 

219 if header.data_type == SCS_DATA or header.data_type == TN3270E_SCS_CTL_CODES: 

220 # Add SCS data to current job 

221 self.add_scs_data(data) 

222 logger.debug(f"Processed {len(data)} bytes of SCS data") 

223 elif header.data_type == TN3270E_RESPONSES: 

224 # Handle response messages 

225 if header.response_flag == TN3270E_RSF_ERROR_RESPONSE: 

226 logger.error( 

227 f"Received error response for sequence {header.seq_number}" 

228 ) 

229 if self.current_job: 

230 self.current_job.set_error( 

231 f"Error response received for sequence {header.seq_number}" 

232 ) 

233 elif header.response_flag == TN3270E_RSF_NEGATIVE_RESPONSE: 

234 logger.warning( 

235 f"Received negative response for sequence {header.seq_number}" 

236 ) 

237 else: 

238 logger.warning( 

239 f"Unhandled TN3270E data type: {header.get_data_type_name()}" 

240 ) 

241 

242 def __repr__(self) -> str: 

243 """String representation of the printer session.""" 

244 stats = self.get_job_statistics() 

245 return ( 

246 f"PrinterSession(active={self.is_active}, " 

247 f"current_job={'Yes' if self.current_job else 'No'}, " 

248 f"completed_jobs={stats['completed_jobs']})" 

249 )