Source code for pure3270.protocol.printer

"""Advanced printer session support for TN3270E protocol with comprehensive SCS control codes."""

import asyncio
import logging
import threading
import time
from typing import Any, Callable, Dict, List, Optional, Set, Tuple

from .exceptions import ParseError, ProtocolError
from .tn3270e_header import TN3270EHeader
from .utils import (  # Additional constants for advanced printer support
    BIND_IMAGE,
    NVT_DATA,
    PRINT_EOJ,
    PRINTER_STATUS_DATA_TYPE,
    REQUEST,
    RESPONSE,
    SCS_DATA,
    SSCP_LU_DATA,
    TN3270_DATA,
    TN3270E_BIND_IMAGE,
    TN3270E_DATA_STREAM_CTL,
    TN3270E_DEVICE_TYPE,
    TN3270E_FUNCTIONS,
    TN3270E_IBM_DYNAMIC,
    TN3270E_IS,
    TN3270E_REQUEST,
    TN3270E_RESPONSES,
    TN3270E_RSF_ALWAYS_RESPONSE,
    TN3270E_RSF_ERROR_RESPONSE,
    TN3270E_RSF_NO_RESPONSE,
    TN3270E_SCS_CTL_CODES,
    TN3270E_SEND,
    TN3270E_SYSREQ,
    UNBIND,
)

logger = logging.getLogger(__name__)


[docs] class PrinterJob: """Represents a printer job in a TN3270E printer session with advanced features."""
[docs] def __init__(self, job_id: str = "", max_data_size: int = 1048576): """Initialize a printer job with thread-safe operations.""" if not job_id: # Generate a default ID if none provided job_id = f"job_{int(time.time() * 1000) % 100000}" self.job_id = job_id self.max_data_size = max_data_size self.data = bytearray() self.status = "active" # active, completed, error, paused self.start_time = time.time() self.end_time: Optional[float] = None self.pages: List[bytes] = [] self.scs_control_codes: List[int] = [] self.page_count = 0 self.line_count = 0 self.error_message: Optional[str] = None self.lock = threading.RLock() # Reentrant lock for nested calls self.metadata: Dict[str, Any] = {}
[docs] def add_data(self, data: bytes) -> None: """Add SCS character data to the job with thread safety.""" with self.lock: self.data.extend(data) if len(self.data) > self.max_data_size: self.data = self.data[-self.max_data_size :] logger.debug(f"Added {len(data)} bytes to printer job {self.job_id}")
[docs] def add_scs_control_code(self, scs_code: int) -> None: """Add SCS control code to the job.""" with self.lock: self.scs_control_codes.append(scs_code) logger.debug( f"Added SCS control code 0x{scs_code:02x} to printer job {self.job_id}" )
[docs] def increment_line_count(self) -> None: """Increment line count for the job.""" with self.lock: self.line_count += 1
[docs] def increment_page_count(self) -> None: """Increment page count for the job.""" with self.lock: self.page_count += 1
[docs] def complete_job(self) -> None: """Mark the job as completed with thread safety.""" with self.lock: if self.status == "active": self.status = "completed" self.end_time = time.time() logger.info(f"Printer job {self.job_id} completed successfully")
[docs] def set_error(self, error_msg: str) -> None: """Mark the job as having an error with thread safety.""" with self.lock: self.status = "error" self.end_time = time.time() self.error_message = error_msg logger.error(f"Printer job {self.job_id} error: {error_msg}")
[docs] def pause_job(self) -> None: """Pause the job with thread safety.""" with self.lock: if self.status == "active": self.status = "paused" logger.info(f"Printer job {self.job_id} paused")
[docs] def resume_job(self) -> None: """Resume the job with thread safety.""" with self.lock: if self.status == "paused": self.status = "active" logger.info(f"Printer job {self.job_id} resumed")
[docs] def get_duration(self) -> float: """Get the job duration in seconds.""" if self.end_time is not None: return self.end_time - self.start_time return time.time() - self.start_time
[docs] def get_page_count(self) -> int: """Get the number of pages in the job.""" # Simple page counting based on form feeds (0x0C) page_count = 1 # At least one page for byte in self.data: if byte == 0x0C: # Form feed page_count += 1 return page_count
[docs] def get_data_size(self) -> int: """Get the size of the job data in bytes.""" return len(self.data)
def __repr__(self) -> str: """String representation of the printer job.""" return ( f"PrinterJob(id='{self.job_id}', status='{self.status}', " f"pages={self.get_page_count()}, lines={self.line_count}, " f"size={self.get_data_size()} bytes, " f"scs_codes={len(self.scs_control_codes)}, " f"duration={self.get_duration():.2f}s)" )
[docs] def get_scs_control_codes(self) -> List[int]: """Get the list of SCS control codes.""" with self.lock: return self.scs_control_codes.copy()
[docs] def get_error_message(self) -> Optional[str]: """Get the error message if any.""" with self.lock: return self.error_message
[docs] def is_thread_safe(self) -> bool: """Check if the job supports thread-safe operations.""" return True
[docs] class PrinterSession: """Advanced TN3270E printer session handler with comprehensive SCS support."""
[docs] def __init__(self) -> None: """Initialize the printer session with thread safety.""" self.is_active = False self.current_job: Optional[PrinterJob] = None self.completed_jobs: List[PrinterJob] = [] self.sequence_number = 0 self.max_jobs = 50 # Limit to prevent memory issues self.job_counter = 0 self.lock = threading.RLock() # Reentrant lock for nested calls self.scs_handlers: Dict[int, Callable[..., None]] = {} self.tn3270e_functions: Set[int] = set() self.device_type = TN3270E_IBM_DYNAMIC self.last_activity = time.time() self.error_count = 0 self.max_errors = 10
[docs] def activate(self) -> None: """Activate the printer session with thread safety.""" with self.lock: self.is_active = True self.last_activity = time.time() self._initialize_scs_handlers() logger.info( f"Printer session activated with device type: {self.device_type}" )
[docs] def deactivate(self) -> None: """Deactivate the printer session with thread safety.""" with self.lock: if self.current_job: self.current_job.set_error("Session deactivated") self._finish_current_job() self.is_active = False logger.info("Printer session deactivated")
def _initialize_scs_handlers(self) -> None: """Initialize SCS control code handlers.""" self.scs_handlers = { PRINT_EOJ: self._handle_print_eoj_scs, # 0x08 - PRINT-EOJ 0x01: self._handle_soh_scs, # Start of Header 0x03: self._handle_cr_scs, # Carriage Return 0x04: self._handle_nl_scs, # New Line 0x05: self._handle_ff_scs, # Form Feed 0x06: self._handle_ht_scs, # Horizontal Tab 0x07: self._handle_vt_scs, # Vertical Tab # 0x08 is PRINT_EOJ, not Backspace in SCS context 0x09: self._handle_lf_scs, # Line Feed 0x0A: self._handle_ir_scs, # Index Return 0x0B: self._handle_vcs_scs, # Vertical Channel Select 0x0C: self._handle_ff_scs, # Form Feed (duplicate) 0x0D: self._handle_cr_scs, # Carriage Return (duplicate) 0x0E: self._handle_so_scs, # Shift Out 0x0F: self._handle_si_scs, # Shift In 0x10: self._handle_trn_scs, # Transparent 0x11: self._handle_it_scs, # Indent Tab 0x12: self._handle_irs_scs, # Intermittent Right Space 0x13: self._handle_suo_scs, # Set Uppercase On 0x14: self._handle_suf_scs, # Set Uppercase Off 0x15: self._handle_bel_scs, # Bell 0x16: self._handle_ea_scs, # Enable Alarm 0x17: self._handle_da_scs, # Disable Alarm 0x18: self._handle_nop_scs, # No Operation 0x19: self._handle_ems_scs, # End of Message Set 0x1A: self._handle_ubs_scs, # Unit Backspace 0x1B: self._handle_cuu_scs, # Cursor Up 0x1C: self._handle_cud_scs, # Cursor Down 0x1D: self._handle_cuf_scs, # Cursor Forward 0x1E: self._handle_cub_scs, # Cursor Backward 0x1F: self._handle_cuu_scs, # Cursor Up (duplicate) }
[docs] def start_new_job(self, job_id: str = "") -> PrinterJob: """Start a new printer job.""" if not self.is_active: raise ProtocolError("Printer session not active") # Finish any existing job if self.current_job: self.current_job.set_error("New job started before completion") self._finish_current_job() # Create new job if not job_id: self.job_counter += 1 job_id = f"job_{self.job_counter}" self.current_job = PrinterJob(job_id) logger.info(f"Started new printer job: {job_id}") return self.current_job
[docs] def add_scs_data(self, data: bytes, holding_lock: bool = False) -> None: """Add SCS character data to the current job. Args: data: SCS data to add holding_lock: True if caller already holds self.lock (internal use only) """ if not holding_lock: if not self.is_active: raise ProtocolError("Printer session not active") # Check if we need to start a new job if not self.current_job: if holding_lock: # We're already holding the lock, so start job without re-acquiring self.job_counter += 1 job_id = f"job_{self.job_counter}" self.current_job = PrinterJob(job_id) logger.info(f"Started new printer job: {self.current_job.job_id}") else: self.start_new_job() if self.current_job: self.current_job.add_data(data)
[docs] def handle_print_eoj(self) -> None: """Handle PRINT-EOJ (End of Job) command.""" if not self.is_active: raise ProtocolError("Printer session not active") if self.current_job: self.current_job.complete_job() self._finish_current_job() else: logger.warning("PRINT-EOJ received but no active job")
def _finish_current_job(self) -> None: """Finish the current job and add to completed jobs.""" if self.current_job: # Add to completed jobs self.completed_jobs.append(self.current_job) # Limit the number of stored jobs if len(self.completed_jobs) > self.max_jobs: # Remove oldest jobs self.completed_jobs = self.completed_jobs[-self.max_jobs :] # Clear current job self.current_job = None logger.info("Current printer job finished and stored")
[docs] def prune(self) -> None: """Prune completed jobs to the last max_jobs.""" if len(self.completed_jobs) > self.max_jobs: self.completed_jobs = self.completed_jobs[-self.max_jobs :] logger.debug(f"Pruned completed jobs to last {self.max_jobs}")
[docs] def get_current_job(self) -> Optional[PrinterJob]: """Get the current active job.""" return self.current_job
[docs] def get_completed_jobs(self) -> List[PrinterJob]: """Get the list of completed jobs.""" return self.completed_jobs.copy()
[docs] def get_job_statistics(self) -> Dict[str, Any]: """Get printer job statistics.""" active_job = 1 if self.current_job else 0 completed_count = len(self.completed_jobs) total_pages = sum(job.get_page_count() for job in self.completed_jobs) total_bytes = sum(job.get_data_size() for job in self.completed_jobs) return { "active_jobs": active_job, "completed_jobs": completed_count, "total_pages": total_pages, "total_bytes": total_bytes, "average_pages_per_job": total_pages / max(completed_count, 1), "average_bytes_per_job": total_bytes / max(completed_count, 1), }
[docs] def clear_completed_jobs(self) -> None: """Clear the list of completed jobs.""" self.completed_jobs.clear() logger.info("Cleared completed printer jobs")
[docs] def close(self) -> None: """Close the printer session and clear all jobs.""" self.deactivate() self.clear_completed_jobs() logger.info("Printer session closed and jobs cleared")
[docs] def handle_scs_control_code(self, scs_code: int) -> None: """Handle SCS control codes with comprehensive support.""" if not self.is_active: raise ProtocolError("Printer session not active") with self.lock: self.last_activity = time.time() if scs_code in self.scs_handlers: try: self.scs_handlers[scs_code]() if self.current_job: self.current_job.add_scs_control_code(scs_code) except Exception as e: logger.error( f"Error handling SCS control code 0x{scs_code:02x}: {e}" ) raise ParseError( f"Failed to handle SCS control code 0x{scs_code:02x}", context={"scs_code": scs_code}, original_exception=e, ) from e else: logger.warning(f"Unhandled SCS control code: 0x{scs_code:02x}")
# SCS Control Code Handlers def _handle_print_eoj_scs(self) -> None: """Handle PRINT-EOJ SCS control code.""" self.handle_print_eoj() logger.debug("Handled SCS PRINT-EOJ control code") def _handle_soh_scs(self) -> None: """Handle Start of Header SCS control code.""" logger.debug("Handled SCS SOH (Start of Header)") def _handle_cr_scs(self) -> None: """Handle Carriage Return SCS control code.""" if self.current_job: self.current_job.increment_line_count() logger.debug("Handled SCS CR (Carriage Return)") def _handle_nl_scs(self) -> None: """Handle New Line SCS control code.""" if self.current_job: self.current_job.increment_line_count() logger.debug("Handled SCS NL (New Line)") def _handle_ff_scs(self) -> None: """Handle Form Feed SCS control code.""" if self.current_job: self.current_job.increment_page_count() logger.debug("Handled SCS FF (Form Feed)") def _handle_ht_scs(self) -> None: """Handle Horizontal Tab SCS control code.""" logger.debug("Handled SCS HT (Horizontal Tab)") def _handle_vt_scs(self) -> None: """Handle Vertical Tab SCS control code.""" logger.debug("Handled SCS VT (Vertical Tab)") def _handle_bs_scs(self) -> None: """Handle Backspace SCS control code.""" logger.debug("Handled SCS BS (Backspace)") def _handle_lf_scs(self) -> None: """Handle Line Feed SCS control code.""" if self.current_job: self.current_job.increment_line_count() logger.debug("Handled SCS LF (Line Feed)") def _handle_ir_scs(self) -> None: """Handle Index Return SCS control code.""" logger.debug("Handled SCS IR (Index Return)") def _handle_vcs_scs(self) -> None: """Handle Vertical Channel Select SCS control code.""" logger.debug("Handled SCS VCS (Vertical Channel Select)") def _handle_so_scs(self) -> None: """Handle Shift Out SCS control code.""" logger.debug("Handled SCS SO (Shift Out)") def _handle_si_scs(self) -> None: """Handle Shift In SCS control code.""" logger.debug("Handled SCS SI (Shift In)") def _handle_trn_scs(self) -> None: """Handle Transparent SCS control code.""" logger.debug("Handled SCS TRN (Transparent)") def _handle_it_scs(self) -> None: """Handle Indent Tab SCS control code.""" logger.debug("Handled SCS IT (Indent Tab)") def _handle_irs_scs(self) -> None: """Handle Intermittent Right Space SCS control code.""" logger.debug("Handled SCS IRS (Intermittent Right Space)") def _handle_suo_scs(self) -> None: """Handle Set Uppercase On SCS control code.""" logger.debug("Handled SCS SUO (Set Uppercase On)") def _handle_suf_scs(self) -> None: """Handle Set Uppercase Off SCS control code.""" logger.debug("Handled SCS SUF (Set Uppercase Off)") def _handle_bel_scs(self) -> None: """Handle Bell SCS control code.""" logger.debug("Handled SCS BEL (Bell)") def _handle_ea_scs(self) -> None: """Handle Enable Alarm SCS control code.""" logger.debug("Handled SCS EA (Enable Alarm)") def _handle_da_scs(self) -> None: """Handle Disable Alarm SCS control code.""" logger.debug("Handled SCS DA (Disable Alarm)") def _handle_nop_scs(self) -> None: """Handle No Operation SCS control code.""" logger.debug("Handled SCS NOP (No Operation)") def _handle_ems_scs(self) -> None: """Handle End of Message Set SCS control code.""" logger.debug("Handled SCS EMS (End of Message Set)") def _handle_ubs_scs(self) -> None: """Handle Unit Backspace SCS control code.""" logger.debug("Handled SCS UBS (Unit Backspace)") def _handle_cuu_scs(self) -> None: """Handle Cursor Up SCS control code.""" logger.debug("Handled SCS CUU (Cursor Up)") def _handle_cud_scs(self) -> None: """Handle Cursor Down SCS control code.""" logger.debug("Handled SCS CUD (Cursor Down)") def _handle_cuf_scs(self) -> None: """Handle Cursor Forward SCS control code.""" logger.debug("Handled SCS CUF (Cursor Forward)") def _handle_cub_scs(self) -> None: """Handle Cursor Backward SCS control code.""" logger.debug("Handled SCS CUB (Cursor Backward)")
[docs] def process_tn3270e_message(self, header: TN3270EHeader, data: bytes) -> None: """Process a TN3270E message for printer session with comprehensive support.""" if not self.is_active: raise ProtocolError("Printer session not active") with self.lock: self.last_activity = time.time() # Handle different TN3270E data types if header.data_type == SCS_DATA: # Add SCS data to current job (we already hold the lock) self.add_scs_data(data, holding_lock=True) logger.debug(f"Processed {len(data)} bytes of SCS data") elif header.data_type == TN3270E_SCS_CTL_CODES: # Handle SCS control codes if data: scs_code = data[0] self.handle_scs_control_code(scs_code) logger.debug(f"Processed SCS control codes: {data.hex()}") elif header.data_type == TN3270E_RESPONSES: # Handle response messages with enhanced error handling self._handle_tn3270e_response(header, data) elif header.data_type == BIND_IMAGE: # Handle BIND-IMAGE structured field self._handle_bind_image_message(header, data) elif header.data_type == REQUEST: # Handle REQUEST messages self._handle_request_message(header, data) elif header.data_type == RESPONSE: # Handle RESPONSE messages self._handle_response_message(header, data) elif header.data_type == TN3270_DATA: # Handle 3270 data stream self._handle_3270_data_message(header, data) elif header.data_type == NVT_DATA: # Handle NVT data logger.debug(f"Received NVT data: {data!r}") elif header.data_type == SSCP_LU_DATA: # Handle SSCP-LU data logger.debug(f"Received SSCP-LU data: {data.hex()}") elif header.data_type == UNBIND: # Handle UNBIND logger.info("Received UNBIND message") elif header.data_type == PRINTER_STATUS_DATA_TYPE: # Handle printer status data self._handle_printer_status_message(header, data) else: logger.warning( f"Unhandled TN3270E data type: {header.get_data_type_name()}" )
def _handle_tn3270e_response(self, header: TN3270EHeader, data: bytes) -> None: """Handle TN3270E response messages.""" if header.response_flag == TN3270E_RSF_ERROR_RESPONSE: logger.error(f"Received error response for sequence {header.seq_number}") if self.current_job: self.current_job.set_error( f"Error response received for sequence {header.seq_number}" ) # Parse error response details (sense data) if len(data) > 0: code = data[0] logger.warning(f"Error response code: 0x{code:02x}") elif header.response_flag == TN3270E_RSF_NO_RESPONSE: # No response expected - this is normal for most messages pass elif header.response_flag == TN3270E_RSF_ALWAYS_RESPONSE: logger.debug(f"Received always response for sequence {header.seq_number}") def _handle_bind_image_message(self, header: TN3270EHeader, data: bytes) -> None: """Handle BIND-IMAGE messages.""" logger.debug(f"Received BIND-IMAGE message: {data.hex()}") # BIND-IMAGE handling would be implemented here # This could update device capabilities, screen dimensions, etc. def _handle_request_message(self, header: TN3270EHeader, data: bytes) -> None: """Handle REQUEST messages.""" logger.debug(f"Received REQUEST message: {data.hex()}") def _handle_response_message(self, header: TN3270EHeader, data: bytes) -> None: """Handle RESPONSE messages.""" logger.debug(f"Received RESPONSE message: {data.hex()}") def _handle_3270_data_message(self, header: TN3270EHeader, data: bytes) -> None: """Handle 3270 data stream messages.""" logger.debug(f"Received 3270 data message: {data.hex()}") def _handle_printer_status_message( self, header: TN3270EHeader, data: bytes ) -> None: """Handle printer status messages.""" logger.debug(f"Received printer status message: {data.hex()}") def __repr__(self) -> str: """String representation of the printer session.""" stats = self.get_job_statistics() return ( f"PrinterSession(active={self.is_active}, " f"current_job={'Yes' if self.current_job else 'No'}, " f"completed_jobs={stats['completed_jobs']}, " f"total_pages={stats['total_pages']}, " f"total_bytes={stats['total_bytes']}, " f"errors={self.error_count}, " f"device_type={self.device_type})" )
[docs] def get_session_info(self) -> Dict[str, Any]: """Get comprehensive session information.""" with self.lock: stats = self.get_job_statistics() return { "is_active": self.is_active, "device_type": self.device_type, "current_job_id": self.current_job.job_id if self.current_job else None, "completed_jobs": stats["completed_jobs"], "total_pages": stats["total_pages"], "total_bytes": stats["total_bytes"], "error_count": self.error_count, "last_activity": self.last_activity, "tn3270e_functions": list(self.tn3270e_functions), "scs_handlers_count": len(self.scs_handlers), }
[docs] def set_device_type(self, device_type: str) -> None: """Set the device type for the printer session.""" with self.lock: self.device_type = device_type logger.info(f"Printer session device type set to: {device_type}")
[docs] def add_tn3270e_function(self, function: int) -> None: """Add a supported TN3270E function.""" with self.lock: self.tn3270e_functions.add(function) logger.debug(f"Added TN3270E function: 0x{function:02x}")
[docs] def supports_tn3270e_function(self, function: int) -> bool: """Check if a TN3270E function is supported.""" with self.lock: return function in self.tn3270e_functions
[docs] def get_error_rate(self) -> float: """Get the current error rate.""" with self.lock: total_operations = len(self.completed_jobs) + (1 if self.current_job else 0) if total_operations == 0: return 0.0 return self.error_count / total_operations
[docs] def is_healthy(self) -> bool: """Check if the session is healthy.""" with self.lock: return ( self.is_active and self.error_count <= self.max_errors and self.get_error_rate() < 0.1 )