Integration Scenarios

This section provides real-world integration examples and production deployment scenarios for Pure3270 in enterprise environments.

For basic usage patterns and API examples, see the Examples section. For advanced patterns and optimization techniques, see Advanced Patterns.

Enterprise Integration Examples

Bank Processing System Integration

Complete enterprise banking application integration with transaction processing:

import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
from pure3270 import AsyncSession, setup_logging

@dataclass
class Transaction:
    """Banking transaction data structure."""
    transaction_id: str
    account_number: str
    transaction_type: str
    amount: float
    description: str
    timestamp: datetime
    status: str

class BankingSystemInterface:
    """Interface to mainframe banking system via TN3270."""

    def __init__(self, hosts: List[str], credentials: dict):
        self.hosts = hosts
        self.credentials = credentials
        self.session_manager = None
        self.transaction_logger = logging.getLogger('banking.transactions')
        self.performance_monitor = {}

    async def initialize(self):
        """Initialize banking system connection."""
        setup_logging(level="INFO", component="banking")

        # Initialize session manager with failover
        self.session_manager = await self._create_session_manager()

        # Setup transaction logging
        handler = logging.FileHandler('banking_transactions.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.transaction_logger.addHandler(handler)
        self.transaction_logger.setLevel(logging.INFO)

    async def process_transaction(self, transaction: Transaction) -> bool:
        """
        Process a banking transaction through the mainframe system.

        This represents a real enterprise banking transaction flow.
        """
        session = None
        try:
            # Get session with automatic failover
            session = await self.session_manager.get_session()

            # Log transaction start
            self.transaction_logger.info(
                f"Starting transaction {transaction.transaction_id} "
                f"for account {transaction.account_number}"
            )

            # Navigate to transaction entry screen
            await self._navigate_to_transaction_screen(session)

            # Enter transaction details
            success = await self._enter_transaction_data(session, transaction)

            if success:
                # Submit transaction
                result = await self._submit_transaction(session, transaction)

                if result:
                    self.transaction_logger.info(
                        f"Transaction {transaction.transaction_id} completed successfully"
                    )
                    return True
                else:
                    self.transaction_logger.error(
                        f"Transaction {transaction.transaction_id} failed submission"
                    )
                    return False
            else:
                self.transaction_logger.error(
                    f"Transaction {transaction.transaction_id} failed data entry"
                )
                return False

        except Exception as e:
            self.transaction_logger.error(
                f"Transaction {transaction.transaction_id} failed with error: {e}"
            )
            return False
        finally:
            if session:
                await self.session_manager.return_session(session)

    async def _navigate_to_transaction_screen(self, session: AsyncSession):
        """Navigate to the transaction entry screen."""
        # Clear any existing screen
        await session.key("CLEAR")
        await asyncio.sleep(0.5)

        # Navigate to transaction module
        await session.string("TXN")
        await session.key("ENTER")

        # Wait for screen to load
        await asyncio.sleep(1.0)

        # Verify we're on the correct screen
        screen = session.ascii(session.read())
        if "TRANSACTION ENTRY" not in screen.upper():
            raise Exception("Failed to navigate to transaction screen")

    async def _enter_transaction_data(self, session: AsyncSession,
                                    transaction: Transaction) -> bool:
        """Enter transaction data into mainframe screen."""
        try:
            # Enter account number
            await self._enter_field(session, 5, 10, transaction.account_number)

            # Enter transaction type
            await self._enter_field(session, 6, 10, transaction.transaction_type)

            # Enter amount (formatted)
            amount_str = f"{transaction.amount:.2f}"
            await self._enter_field(session, 7, 10, amount_str)

            # Enter description
            await self._enter_field(session, 8, 10, transaction.description[:30])

            return True

        except Exception as e:
            self.transaction_logger.error(f"Data entry failed: {e}")
            return False

    async def _enter_field(self, session: AsyncSession, row: int,
                         col: int, value: str):
        """Enter value into specific screen field."""
        # Position cursor
        await self._position_cursor(session, row, col)

        # Clear field
        await session.key("CLEAR")
        await asyncio.sleep(0.1)

        # Enter value
        await session.string(value)

    async def _position_cursor(self, session: AsyncSession, row: int, col: int):
        """Position cursor at specific screen location."""
        # Implementation depends on screen navigation
        # This is a simplified version
        for _ in range(row):
            await session.key("DOWN")
        for _ in range(col):
            await session.key("RIGHT")

    async def _submit_transaction(self, session: AsyncSession,
                                transaction: Transaction) -> bool:
        """Submit transaction and verify result."""
        # Submit
        await session.key("ENTER")
        await asyncio.sleep(2.0)

        # Read response
        response = session.ascii(session.read())

        # Check for success/error messages
        if "TRANSACTION COMPLETED" in response.upper():
            return True
        elif "ERROR" in response.upper():
            # Log error details
            error_msg = self._extract_error_message(response)
            self.transaction_logger.error(f"Transaction error: {error_msg}")
            return False
        else:
            # Unknown response, treat as error
            return False

    def _extract_error_message(self, screen_text: str) -> str:
        """Extract error message from screen text."""
        lines = screen_text.split('\n')
        for line in lines:
            if "ERROR" in line.upper():
                return line.strip()
        return "Unknown error"

    async def batch_process_transactions(self, transactions: List[Transaction]) -> Dict[str, bool]:
        """Process multiple transactions in batch for efficiency."""
        results = {}

        # Group transactions by type for optimization
        transaction_groups = {}
        for transaction in transactions:
            ttype = transaction.transaction_type
            if ttype not in transaction_groups:
                transaction_groups[ttype] = []
            transaction_groups[ttype].append(transaction)

        # Process each group
        for ttype, group_transactions in transaction_groups.items():
            self.transaction_logger.info(
                f"Processing {len(group_transactions)} transactions of type {ttype}"
            )

            for transaction in group_transactions:
                success = await self.process_transaction(transaction)
                results[transaction.transaction_id] = success

                # Small delay to avoid overwhelming mainframe
                await asyncio.sleep(0.1)

        return results

    async def close(self):
        """Cleanup banking system connection."""
        if self.session_manager:
            await self.session_manager.stop()

class HighAvailabilityBankingSystem:
    """High-availability banking system with multiple mainframe connections."""

    def __init__(self, primary_hosts: List[str], backup_hosts: List[str]):
        self.primary_interface = BankingSystemInterface(primary_hosts, {})
        self.backup_interface = BankingSystemInterface(backup_hosts, {})
        self.is_primary_active = True
        self.failover_threshold = 3

    async def initialize(self):
        """Initialize both primary and backup systems."""
        try:
            await self.primary_interface.initialize()
            await self.backup_interface.initialize()
            print("High-availability banking system initialized")
        except Exception as e:
            print(f"Initialization failed: {e}")
            raise

    async def process_transaction_with_failover(self, transaction: Transaction) -> bool:
        """Process transaction with automatic failover capability."""
        # Try primary system first
        if self.is_primary_active:
            try:
                success = await self.primary_interface.process_transaction(transaction)
                if success:
                    return True
                else:
                    # Check if we should failover
                    if not await self._check_primary_health():
                        await self._perform_failover()
                        return await self._retry_with_backup(transaction)
            except Exception as e:
                print(f"Primary system failed: {e}")
                await self._perform_failover()
                return await self._retry_with_backup(transaction)
        else:
            # Use backup system
            return await self._retry_with_backup(transaction)

    async def _check_primary_health(self) -> bool:
        """Check if primary system is healthy."""
        try:
            session = await self.primary_interface.session_manager.get_session()
            await asyncio.wait_for(session.read(), timeout=5.0)
            await self.primary_interface.session_manager.return_session(session)
            return True
        except:
            return False

    async def _perform_failover(self):
        """Perform failover to backup system."""
        print("Performing failover to backup system")
        self.is_primary_active = False

        # Log failover event
        logging.info("Primary system failover completed")

    async def _retry_with_backup(self, transaction: Transaction) -> bool:
        """Retry transaction with backup system."""
        try:
            return await self.backup_interface.process_transaction(transaction)
        except Exception as e:
            print(f"Backup system failed: {e}")
            return False

# Example usage
async def banking_system_example():
    """Demonstrate enterprise banking system integration."""

    # Configure banking system
    primary_hosts = ["mainframe1.bank.com", "mainframe2.bank.com"]
    backup_hosts = ["backup-mainframe.bank.com"]

    ha_system = HighAvailabilityBankingSystem(primary_hosts, backup_hosts)
    await ha_system.initialize()

    try:
        # Create sample transactions
        transactions = [
            Transaction(
                transaction_id="TXN001",
                account_number="1234567890",
                transaction_type="DEPOSIT",
                amount=1000.00,
                description="Customer deposit",
                timestamp=datetime.now(),
                status="PENDING"
            ),
            Transaction(
                transaction_id="TXN002",
                account_number="0987654321",
                transaction_type="WITHDRAWAL",
                amount=500.00,
                description="Cash withdrawal",
                timestamp=datetime.now(),
                status="PENDING"
            )
        ]

        # Process transactions with failover
        for transaction in transactions:
            success = await ha_system.process_transaction_with_failover(transaction)
            print(f"Transaction {transaction.transaction_id}: {'SUCCESS' if success else 'FAILED'}")

    finally:
        await ha_system.primary_interface.close()
        await ha_system.backup_interface.close()

Multi-Session Management

Enterprise Multi-Host Session Management

Managing multiple mainframe connections with load balancing and session affinity:

import asyncio
import hashlib
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from enum import Enum
from pure3270 import AsyncSession, setup_logging

class SessionRole(Enum):
    """Define session roles for different operations."""
    READ_ONLY = "read"
    TRANSACTIONAL = "write"
    BATCH = "batch"
    ADMIN = "admin"
    PRINTER = "printer"

class HostType(Enum):
    """Define different types of mainframe hosts."""
    PRODUCTION = "production"
    STAGING = "staging"
    DEVELOPMENT = "development"
    ARCHIVE = "archive"

@dataclass
class HostConfig:
    """Configuration for mainframe host."""
    host: str
    port: int
    host_type: HostType
    terminal_type: str = "IBM-3278-4"
    max_sessions: int = 10
    timeout: float = 30.0
    ssl_context: Optional[dict] = None

@dataclass
class SessionInfo:
    """Information about active session."""
    session_id: str
    host_config: HostConfig
    role: SessionRole
    created_at: float
    last_activity: float
    active: bool = True

class EnterpriseSessionManager:
    """Enterprise-grade session manager with multiple host support."""

    def __init__(self):
        self.hosts: Dict[str, HostConfig] = {}
        self.sessions: Dict[str, SessionInfo] = {}
        self.session_pools: Dict[str, List[SessionInfo]] = {}
        self.active_sessions: Dict[str, AsyncSession] = {}
        self._lock = asyncio.Lock()
        self._health_monitor_task = None

    async def add_host(self, host_config: HostConfig):
        """Add a mainframe host to the manager."""
        self.hosts[host_config.host] = host_config
        self.session_pools[host_config.host] = []
        print(f"Added host: {host_config.host} ({host_config.host_type.value})")

    async def get_session(self, host: str = None, role: SessionRole = SessionRole.TRANSACTIONAL) -> AsyncSession:
        """
        Get a session for specific host and role.

        Uses load balancing and session affinity for optimal performance.
        """
        if host is None:
            host = await self._select_optimal_host(role)

        if host not in self.hosts:
            raise ValueError(f"Unknown host: {host}")

        host_config = self.hosts[host]

        async with self._lock:
            # Check for available session in pool
            available_sessions = [s for s in self.session_pools[host]
                                if s.active and s.role == role]

            if available_sessions:
                # Return most recently used session
                session_info = min(available_sessions, key=lambda s: s.last_activity)
                async_session = self.active_sessions[session_info.session_id]

                # Update activity tracking
                session_info.last_activity = asyncio.get_event_loop().time()
                return async_session

            # Create new session if under limit
            current_sessions = len([s for s in self.sessions.values()
                                  if s.host_config.host == host and s.active])

            if current_sessions < host_config.max_sessions:
                return await self._create_session(host, role)

            # Pool full, wait for available session
            return await self._wait_for_available_session(host, role)

    async def _select_optimal_host(self, role: SessionRole) -> str:
        """Select optimal host based on role and load."""
        suitable_hosts = []

        for host, config in self.hosts.items():
            # Filter by role requirements
            if self._host_suitable_for_role(config, role):
                current_load = len([s for s in self.sessions.values()
                                  if s.host_config.host == host and s.active])

                suitable_hosts.append((host, current_load, config))

        if not suitable_hosts:
            raise ValueError(f"No suitable host found for role {role}")

        # Select host with lowest load
        optimal_host = min(suitable_hosts, key=lambda x: x[1])
        return optimal_host[0]

    def _host_suitable_for_role(self, config: HostConfig, role: SessionRole) -> bool:
        """Check if host is suitable for the given role."""
        if role == SessionRole.READ_ONLY:
            return True
        elif role == SessionRole.TRANSACTIONAL:
            return config.host_type in [HostType.PRODUCTION, HostType.STAGING]
        elif role == SessionRole.BATCH:
            return config.host_type in [HostType.PRODUCTION, HostType.STAGING, HostType.DEVELOPMENT]
        elif role == SessionRole.ADMIN:
            return config.host_type == HostType.PRODUCTION
        else:
            return False

    async def _create_session(self, host: str, role: SessionRole) -> AsyncSession:
        """Create new session for host and role."""
        host_config = self.hosts[host]

        session = AsyncSession(terminal_type=host_config.terminal_type)
        await session.connect(host, port=host_config.port, ssl_context=host_config.ssl_context)

        # Generate session ID
        session_id = hashlib.md5(f"{host}:{role}:{asyncio.get_event_loop().time()}".encode()).hexdigest()[:8]

        # Track session info
        session_info = SessionInfo(
            session_id=session_id,
            host_config=host_config,
            role=role,
            created_at=asyncio.get_event_loop().time(),
            last_activity=asyncio.get_event_loop().time()
        )

        async with self._lock:
            self.sessions[session_id] = session_info
            self.session_pools[host].append(session_info)
            self.active_sessions[session_id] = session

        print(f"Created {role.value} session {session_id} for {host}")
        return session

    async def return_session(self, session: AsyncSession):
        """Return session to pool."""
        session_id = None
        for sid, s in self.active_sessions.items():
            if s == session:
                session_id = sid
                break

        if session_id:
            async with self._lock:
                session_info = self.sessions.get(session_id)
                if session_info:
                    session_info.last_activity = asyncio.get_event_loop().time()
                    print(f"Returned {session_info.role.value} session {session_id}")

    async def _wait_for_available_session(self, host: str, role: SessionRole) -> AsyncSession:
        """Wait for available session in pool."""
        # This is a simplified implementation
        # In production, would use asyncio.Queue or similar
        for _ in range(30):  # Wait up to 30 seconds
            await asyncio.sleep(1)
            try:
                return await self.get_session(host, role)
            except:
                continue
        raise TimeoutError("No session available")

    async def start_health_monitoring(self):
        """Start background health monitoring."""
        async def monitor():
            while True:
                try:
                    await self._check_all_sessions()
                    await asyncio.sleep(30)  # Check every 30 seconds
                except Exception as e:
                    print(f"Health monitor error: {e}")
                    await asyncio.sleep(5)

        self._health_monitor_task = asyncio.create_task(monitor())

    async def _check_all_sessions(self):
        """Check health of all active sessions."""
        async with self._lock:
            unhealthy_sessions = []

            for session_id, session_info in list(self.sessions.items()):
                if not session_info.active:
                    continue

                session = self.active_sessions.get(session_id)
                if session:
                    try:
                        # Quick health check
                        await asyncio.wait_for(session.read(), timeout=2.0)
                    except:
                        unhealthy_sessions.append(session_id)

            # Remove unhealthy sessions
            for session_id in unhealthy_sessions:
                await self._cleanup_session(session_id)

    async def _cleanup_session(self, session_id: str):
        """Cleanup unhealthy session."""
        session_info = self.sessions.get(session_id)
        if session_info:
            session_info.active = False

            session = self.active_sessions.pop(session_id, None)
            if session:
                try:
                    await session.close()
                except:
                    pass

            # Remove from pool
            host = session_info.host_config.host
            if host in self.session_pools:
                self.session_pools[host] = [s for s in self.session_pools[host]
                                          if s.session_id != session_id]

            print(f"Cleaned up unhealthy session {session_id}")

class SessionAffinityManager:
    """Manage session affinity for user sessions."""

    def __init__(self, session_manager: EnterpriseSessionManager):
        self.session_manager = session_manager
        self.user_sessions: Dict[str, str] = {}  # user_id -> session_id
        self._lock = asyncio.Lock()

    async def get_session_for_user(self, user_id: str, preferred_host: str = None) -> AsyncSession:
        """Get session with affinity for specific user."""
        async with self._lock:
            # Check if user already has a session
            if user_id in self.user_sessions:
                session_id = self.user_sessions[user_id]
                session_info = self.session_manager.sessions.get(session_id)

                if session_info and session_info.active:
                    return self.session_manager.active_sessions[session_id]
                else:
                    # Clean up old session
                    self.user_sessions.pop(user_id, None)

            # Create new session with user affinity
            if preferred_host:
                session = await self.session_manager.get_session(preferred_host, SessionRole.TRANSACTIONAL)
            else:
                # Use consistent hashing to select host
                host = self._hash_to_host(user_id)
                session = await self.session_manager.get_session(host, SessionRole.TRANSACTIONAL)

            # Track user session
            session_id = id(session)
            for sid, s in self.session_manager.active_sessions.items():
                if s == session:
                    self.user_sessions[user_id] = sid
                    break

            return session

    def _hash_to_host(self, user_id: str) -> str:
        """Hash user ID to consistent host selection."""
        hash_value = int(hashlib.md5(user_id.encode()).hexdigest()[:8], 16)
        host_names = list(self.session_manager.hosts.keys())
        return host_names[hash_value % len(host_names)]

    async def release_user_session(self, user_id: str):
        """Release session for user."""
        async with self._lock:
            session_id = self.user_sessions.pop(user_id, None)
            if session_id:
                session = self.session_manager.active_sessions.get(session_id)
                if session:
                    await self.session_manager.return_session(session)

# Example usage of enterprise session management
async def enterprise_session_example():
    """Demonstrate enterprise multi-session management."""

    # Configure hosts
    session_manager = EnterpriseSessionManager()

    # Add production hosts
    await session_manager.add_host(HostConfig(
        host="prod-mainframe1.corp.com",
        port=23,
        host_type=HostType.PRODUCTION,
        terminal_type="IBM-3278-4",
        max_sessions=15
    ))

    await session_manager.add_host(HostConfig(
        host="prod-mainframe2.corp.com",
        port=23,
        host_type=HostType.PRODUCTION,
        terminal_type="IBM-3278-4",
        max_sessions=15
    ))

    # Add development host
    await session_manager.add_host(HostConfig(
        host="dev-mainframe.corp.com",
        port=23,
        host_type=HostType.DEVELOPMENT,
        terminal_type="IBM-3278-2",
        max_sessions=5
    ))

    # Start health monitoring
    await session_manager.start_health_monitoring()

    # Create session affinity manager
    affinity_manager = SessionAffinityManager(session_manager)

    try:
        # Get different types of sessions
        read_session = await session_manager.get_session(role=SessionRole.READ_ONLY)
        write_session = await session_manager.get_session(role=SessionRole.TRANSACTIONAL)
        admin_session = await session_manager.get_session(role=SessionRole.ADMIN)

        # Get session with user affinity
        user_session = await affinity_manager.get_session_for_user("user123")

        # Use sessions...
        print("All sessions obtained successfully")

    finally:
        if session_manager._health_monitor_task:
            session_manager._health_monitor_task.cancel()

Printer Emulation Scenarios

Enterprise Printer Management

Multi-printer management for enterprise environments:

import asyncio
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
from pure3270 import AsyncPrinterSession, AsyncSession

class PrinterType(Enum):
    """Types of printers in enterprise environment."""
    LASER = "laser"
    MATRIX = "dot_matrix"
    THERMAL = "thermal"
    BARCODE = "barcode"
    ARCHIVE = "archive"

class PrintJobStatus(Enum):
    """Status of print jobs."""
    PENDING = "pending"
    PRINTING = "printing"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class PrintJob:
    """Enterprise print job definition."""
    job_id: str
    printer_host: str
    document_name: str
    content: str
    priority: int
    copies: int
    printer_type: PrinterType
    submitted_at: float
    status: PrintJobStatus = PrintJobStatus.PENDING
    error_message: Optional[str] = None

@dataclass
class PrinterStatus:
    """Status information for printer."""
    host: str
    status: str
    job_count: int
    paper_level: int
    toner_level: int
    last_activity: float
    error_count: int

class EnterprisePrinterManager:
    """Enterprise printer management system."""

    def __init__(self):
        self.printers: Dict[str, PrinterStatus] = {}
        self.print_jobs: Dict[str, PrintJob] = {}
        self.job_queue: List[str] = []
        self.printer_sessions: Dict[str, AsyncPrinterSession] = {}
        self._lock = asyncio.Lock()
        self._monitoring_task = None

    async def add_printer(self, printer_host: str, printer_type: PrinterType = PrinterType.LASER):
        """Add printer to management system."""
        self.printers[printer_host] = PrinterStatus(
            host=printer_host,
            status="unknown",
            job_count=0,
            paper_level=100,
            toner_level=100,
            last_activity=asyncio.get_event_loop().time(),
            error_count=0
        )
        print(f"Added printer: {printer_host} ({printer_type.value})")

    async def submit_print_job(self, job: PrintJob) -> str:
        """Submit print job to enterprise printer system."""
        async with self._lock:
            self.print_jobs[job.job_id] = job
            self.job_queue.append(job.job_id)

            print(f"Print job {job.job_id} submitted to {job.printer_host}")
            return job.job_id

    async def process_print_queue(self):
        """Process pending print jobs."""
    async with self._lock:
        while self.job_queue:
            job_id = self.job_queue.pop(0)
            job = self.print_jobs[job_id]

            if job.status == PrintJobStatus.CANCELLED:
                continue

            try:
                # Update job status
                job.status = PrintJobStatus.PRINTING

                # Process job
                success = await self._process_job(job)

                if success:
                    job.status = PrintJobStatus.COMPLETED
                    print(f"Job {job.job_id} completed successfully")
                else:
                    job.status = PrintJobStatus.FAILED
                    print(f"Job {job.job_id} failed")

            except Exception as e:
                job.status = PrintJobStatus.FAILED
                job.error_message = str(e)
                print(f"Job {job.job_id} failed with error: {e}")

    async def _process_job(self, job: PrintJob) -> bool:
        """Process individual print job."""
        try:
            # Get or create printer session
            printer_session = await self._get_printer_session(job.printer_host)

            # Send print job
            if job.printer_type == PrinterType.LASER:
                return await self._print_laser_job(printer_session, job)
            elif job.printer_type == PrinterType.MATRIX:
                return await self._print_matrix_job(printer_session, job)
            else:
                # Generic print job
                return await self._print_generic_job(printer_session, job)

        except Exception as e:
            self.printers[job.printer_host].error_count += 1
            raise e

    async def _get_printer_session(self, printer_host: str) -> AsyncPrinterSession:
        """Get or create printer session."""
        if printer_host in self.printer_sessions:
            return self.printer_sessions[printer_host]

        # Create new printer session
        session = AsyncPrinterSession(host=printer_host)
        await session.connect()

        self.printer_sessions[printer_host] = session
        return session

    async def _print_laser_job(self, session: AsyncPrinterSession, job: PrintJob) -> bool:
        """Print job on laser printer."""
        try:
            # Send multiple copies if required
            for copy in range(job.copies):
                # Convert content to printer format
                print_data = self._format_for_laser_printer(job.content)

                # Send to printer
                await session.send_print_data(print_data)

                # Small delay between copies
                if copy < job.copies - 1:
                    await asyncio.sleep(0.5)

            return True

        except Exception as e:
            print(f"Laser print failed: {e}")
            return False

    async def _print_matrix_job(self, session: AsyncPrinterSession, job: PrintJob) -> bool:
        """Print job on dot matrix printer."""
        try:
            # Matrix printers need special formatting
            print_data = self._format_for_matrix_printer(job.content)
            await session.send_print_data(print_data)
            return True

        except Exception as e:
            print(f"Matrix print failed: {e}")
            return False

    async def _print_generic_job(self, session: AsyncPrinterSession, job: PrintJob) -> bool:
        """Generic print job for unknown printer types."""
        try:
            print_data = job.content.encode('ascii')
            await session.send_print_data(print_data)
            return True

        except Exception as e:
            print(f"Generic print failed: {e}")
            return False

    def _format_for_laser_printer(self, content: str) -> bytes:
        """Format content for laser printer (PostScript/PCL)."""
        # Simplified PCL formatting
        pcl_data = b"\\x1B\\x45"  # Reset
        pcl_data += b"\\x1B(0N"  # Select symbol set
        pcl_data += content.encode('ascii', errors='ignore')
        pcl_data += b"\\x1B\\x0C"  # Form feed
        return pcl_data

    def _format_for_matrix_printer(self, content: str) -> bytes:
        """Format content for dot matrix printer."""
        # Dot matrix format
        ascii_content = content.encode('ascii', errors='ignore')
        # Add line feeds for impact printing
        formatted = ascii_content.replace(b'\\n', b'\\r\\n')
        return formatted

    async def cancel_print_job(self, job_id: str) -> bool:
        """Cancel pending print job."""
        async with self._lock:
            if job_id in self.print_jobs:
                job = self.print_jobs[job_id]
                if job.status == PrintJobStatus.PENDING:
                    job.status = PrintJobStatus.CANCELLED
                    return True
            return False

    async def get_printer_status(self, printer_host: str) -> Optional[PrinterStatus]:
        """Get current status of printer."""
        return self.printers.get(printer_host)

    async def get_print_job_status(self, job_id: str) -> Optional[PrintJob]:
        """Get status of print job."""
        return self.print_jobs.get(job_id)

    async def start_monitoring(self):
        """Start background printer monitoring."""
        async def monitor():
            while True:
                try:
                    await self._monitor_all_printers()
                    await asyncio.sleep(30)  # Check every 30 seconds
                except Exception as e:
                    print(f"Printer monitoring error: {e}")
                    await asyncio.sleep(5)

        self._monitoring_task = asyncio.create_task(monitor())

    async def _monitor_all_printers(self):
        """Monitor status of all printers."""
        for printer_host in list(self.printer_sessions.keys()):
            try:
                await self._check_printer_status(printer_host)
            except Exception as e:
                print(f"Error checking printer {printer_host}: {e}")

    async def _check_printer_status(self, printer_host: str):
        """Check status of specific printer."""
        session = self.printer_sessions[printer_host]

        try:
            # Get printer status
            status = await session.get_printer_status()

            # Update status
            if printer_host in self.printers:
                self.printers[printer_host].status = f"0x{status:02x}"
                self.printers[printer_host].last_activity = asyncio.get_event_loop().time()

            # Check for jobs
            output = await session.get_printer_output()
            if output:
                self.printers[printer_host].job_count = len(output)

        except Exception as e:
            print(f"Printer {printer_host} check failed: {e}")
            self.printers[printer_host].error_count += 1

    async def cleanup(self):
        """Cleanup all printer sessions."""
        for session in self.printer_sessions.values():
            try:
                await session.close()
            except:
                pass

        if self._monitoring_task:
            self._monitoring_task.cancel()

# Example usage of enterprise printer management
async def printer_management_example():
    """Demonstrate enterprise printer management."""

    # Create printer manager
    printer_manager = EnterprisePrinterManager()

    # Add printers
    await printer_manager.add_printer("laser-printer1.corp.com", PrinterType.LASER)
    await printer_manager.add_printer("matrix-printer1.corp.com", PrinterType.MATRIX)
    await printer_manager.add_printer("archive-printer1.corp.com", PrinterType.ARCHIVE)

    # Start monitoring
    await printer_manager.start_monitoring()

    try:
        # Submit various print jobs
        jobs = [
            PrintJob(
                job_id="JOB001",
                printer_host="laser-printer1.corp.com",
                document_name="Monthly Report",
                content="Monthly business report content...",
                priority=1,
                copies=3,
                printer_type=PrinterType.LASER,
                submitted_at=asyncio.get_event_loop().time()
            ),
            PrintJob(
                job_id="JOB002",
                printer_host="matrix-printer1.corp.com",
                document_name="Shipping Labels",
                content="Shipping label content...",
                priority=2,
                copies=1,
                printer_type=PrinterType.MATRIX,
                submitted_at=asyncio.get_event_loop().time()
            )
        ]

        # Submit jobs
        for job in jobs:
            await printer_manager.submit_print_job(job)

        # Process queue
        await printer_manager.process_print_queue()

        # Check status
        for job in jobs:
            status = await printer_manager.get_print_job_status(job.job_id)
            print(f"Job {job.job_id}: {status.status.value if status else 'Unknown'}")

    finally:
        await printer_manager.cleanup()

MCP Server Integration Examples

Pure3270 MCP Server for AI Integration

Integration with Model Context Protocol servers for AI-driven terminal operations:

import asyncio
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pure3270 import AsyncSession
from mcp_server import MCPServer, tool, resource

@dataclass
class MCPTerminalSession:
    """MCP terminal session with AI integration."""
    session_id: str
    host: str
    terminal_type: str
    ai_context: Dict[str, Any]
    last_screen_state: str

class Pure3270MCPServer(MCPServer):
    """MCP server providing TN3270 terminal access to AI systems."""

    def __init__(self):
        super().__init__("pure3270-terminal")
        self.sessions: Dict[str, MCPTerminalSession] = {}
        self.global_settings = {
            "default_terminal": "IBM-3278-2",
            "default_timeout": 30.0,
            "enable_ai_assistance": True
        }

    @tool("create_terminal_session")
    async def create_terminal_session(
        self,
        host: str,
        port: int = 23,
        terminal_type: str = None,
        ai_context: str = None
    ) -> str:
        """
        Create a new TN3270 terminal session.

        Args:
            host: Mainframe hostname
            port: TN3270 port (default 23)
            terminal_type: Terminal model (default: IBM-3278-2)
            ai_context: JSON context for AI assistance

        Returns:
            Session ID for subsequent operations
        """
        terminal_type = terminal_type or self.global_settings["default_terminal"]

        # Create session
        session = AsyncSession(terminal_type=terminal_type)
        await session.connect(host, port)

        # Generate session ID
        import uuid
        session_id = str(uuid.uuid4())[:8]

        # Create MCP session
        mcp_session = MCPTerminalSession(
            session_id=session_id,
            host=host,
            terminal_type=terminal_type,
            ai_context=json.loads(ai_context) if ai_context else {},
            last_screen_state=""
        )

        self.sessions[session_id] = mcp_session

        print(f"Created terminal session {session_id} for {host}")
        return session_id

    @tool("execute_terminal_command")
    async def execute_terminal_command(
        self,
        session_id: str,
        command: str,
        parameters: str = None
    ) -> str:
        """
        Execute a terminal command through the session.

        Args:
            session_id: Session ID from create_terminal_session
            command: Command to execute (key, string, read, etc.)
            parameters: JSON parameters for the command

        Returns:
            Command result and screen state
        """
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} not found")

        session_info = self.sessions[session_id]
        session = await self._get_session_object(session_info)

        try:
            params = json.loads(parameters) if parameters else {}

            if command == "key":
                key_name = params.get("key", "")
                await session.key(key_name)
                result = {"status": "success", "action": "key_pressed", "key": key_name}

            elif command == "string":
                text = params.get("text", "")
                await session.string(text)
                result = {"status": "success", "action": "string_entered", "text": text}

            elif command == "read":
                timeout = params.get("timeout", self.global_settings["default_timeout"])
                screen_data = await asyncio.wait_for(session.read(), timeout=timeout)
                screen_text = session.ascii(screen_data)
                result = {
                    "status": "success",
                    "action": "screen_read",
                    "screen_data": screen_text,
                    "bytes_received": len(screen_data)
                }

            elif command == "clear":
                await session.key("CLEAR")
                result = {"status": "success", "action": "screen_cleared"}

            else:
                raise ValueError(f"Unknown command: {command}")

            # Update session state
            if command == "read":
                session_info.last_screen_state = result.get("screen_data", "")

            return json.dumps(result)

        except Exception as e:
            error_result = {
                "status": "error",
                "action": command,
                "error": str(e)
            }
            return json.dumps(error_result)

    @tool("ai_assisted_navigation")
    async def ai_assisted_navigation(
        self,
        session_id: str,
        goal_description: str,
        current_screen: str = None
    ) -> str:
        """
        AI-assisted navigation to accomplish a goal.

        Args:
            session_id: Session ID
            goal_description: Natural language description of what to accomplish
            current_screen: Current screen state (optional, will be read if not provided)

        Returns:
            Navigation steps and final screen state
        """
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} not found")

        session_info = self.sessions[session_id]
        session = await self._get_session_object(session_info)

        # Get current screen if not provided
        if not current_screen:
            screen_data = await session.read()
            current_screen = session.ascii(screen_data)

        # AI planning (simplified - in real implementation would use LLM)
        navigation_plan = self._plan_navigation(current_screen, goal_description, session_info.ai_context)

        # Execute navigation plan
        executed_steps = []
        for step in navigation_plan:
            try:
                if step["type"] == "key":
                    await session.key(step["key"])
                elif step["type"] == "string":
                    await session.string(step["text"])

                executed_steps.append(step)
                await asyncio.sleep(0.5)  # Brief pause between steps

            except Exception as e:
                executed_steps.append({
                    "type": "error",
                    "error": str(e),
                    "step": step
                })
                break

        # Get final screen state
        final_screen_data = await session.read()
        final_screen = session.ascii(final_screen_data)

        result = {
            "status": "success",
            "goal": goal_description,
            "steps_planned": len(navigation_plan),
            "steps_executed": len(executed_steps),
            "executed_steps": executed_steps,
            "final_screen": final_screen,
            "session_id": session_id
        }

        return json.dumps(result)

    @tool("batch_terminal_operations")
    async def batch_terminal_operations(
        self,
        session_id: str,
        operations: str
    ) -> str:
        """
        Execute multiple terminal operations in batch.

        Args:
            session_id: Session ID
            operations: JSON array of operations to execute

        Returns:
            Results of all operations
        """
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} not found")

        session_info = self.sessions[session_id]
        session = await self._get_session_object(session_info)

        ops = json.loads(operations)
        results = []

        for op in ops:
            try:
                command = op.get("command", "")
                params = op.get("parameters", {})

                if command == "key":
                    await session.key(params.get("key", ""))
                    results.append({"operation": op, "status": "success"})

                elif command == "string":
                    await session.string(params.get("text", ""))
                    results.append({"operation": op, "status": "success"})

                elif command == "read":
                    timeout = params.get("timeout", self.global_settings["default_timeout"])
                    screen_data = await asyncio.wait_for(session.read(), timeout=timeout)
                    screen_text = session.ascii(screen_data)
                    results.append({
                        "operation": op,
                        "status": "success",
                        "screen_data": screen_text
                    })

                else:
                    results.append({
                        "operation": op,
                        "status": "error",
                        "error": f"Unknown command: {command}"
                    })

            except Exception as e:
                results.append({
                    "operation": op,
                    "status": "error",
                    "error": str(e)
                })

        return json.dumps({
            "status": "success",
            "total_operations": len(ops),
            "successful_operations": len([r for r in results if r["status"] == "success"]),
            "results": results
        })

    @tool("get_session_info")
    async def get_session_info(self, session_id: str) -> str:
        """Get information about a terminal session."""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} not found")

        session_info = self.sessions[session_id]

        return json.dumps({
            "session_id": session_info.session_id,
            "host": session_info.host,
            "terminal_type": session_info.terminal_type,
            "ai_context": session_info.ai_context,
            "last_screen_state": session_info.last_screen_state[:100] + "..." if len(session_info.last_screen_state) > 100 else session_info.last_screen_state
        })

    @tool("close_terminal_session")
    async def close_terminal_session(self, session_id: str) -> str:
        """Close a terminal session."""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} not found")

        session_info = self.sessions[session_id]
        session = await self._get_session_object(session_info)

        try:
            await session.close()
            del self.sessions[session_id]
            return json.dumps({"status": "success", "message": f"Session {session_id} closed"})
        except Exception as e:
            return json.dumps({"status": "error", "error": str(e)})

    @resource("terminal://sessions")
    async def list_sessions(self) -> str:
        """List all active terminal sessions."""
        sessions_list = []

        for session_id, session_info in self.sessions.items():
            sessions_list.append({
                "session_id": session_id,
                "host": session_info.host,
                "terminal_type": session_info.terminal_type,
                "ai_context_keys": list(session_info.ai_context.keys())
            })

        return json.dumps({
            "status": "success",
            "session_count": len(sessions_list),
            "sessions": sessions_list
        })

    async def _get_session_object(self, session_info: MCPTerminalSession) -> AsyncSession:
        """Get AsyncSession object from session info."""
        # This would cache session objects in a real implementation
        session = AsyncSession(terminal_type=session_info.terminal_type)
        await session.connect(session_info.host)
        return session

    def _plan_navigation(self, current_screen: str, goal: str, ai_context: Dict) -> List[Dict]:
        """Plan navigation steps based on goal and current screen."""
        # Simplified planning logic
        # In real implementation, would use AI/LLM for sophisticated planning

        steps = []

        if "login" in goal.lower():
            steps.append({"type": "key", "key": "CLEAR"})
            steps.append({"type": "string", "text": "LOGON"})
            steps.append({"type": "key", "key": "ENTER"})

        elif "menu" in goal.lower():
            steps.append({"type": "key", "key": "ENTER"})

        elif "exit" in goal.lower():
            steps.append({"type": "key", "key": "F3"})

        else:
            # Default to reading screen
            steps.append({"type": "key", "key": "ENTER"})

        return steps

# Example MCP client usage
class MCPClient:
    """Client for interacting with Pure3270 MCP server."""

    def __init__(self, server_url: str = "http://localhost:8000"):
        self.server_url = server_url

    async def create_session(self, host: str, terminal_type: str = "IBM-3278-2") -> str:
        """Create new terminal session."""
        # Implementation would make HTTP request to MCP server
        # For demo purposes, return mock session ID
        return "session123"

    async def ai_navigate_to_goal(self, session_id: str, goal: str) -> dict:
        """Use AI to navigate to accomplish goal."""
        # Implementation would call MCP server AI navigation tool
        return {
            "status": "success",
            "goal": goal,
            "steps_executed": 3,
            "final_screen": "Application main menu"
        }

# Example usage of MCP integration
async def mcp_integration_example():
    """Demonstrate MCP integration with AI assistance."""

    # Start MCP server (would be separate process in production)
    # server = Pure3270MCPServer()
    # await server.start()

    # Create MCP client
    client = MCPClient()

    # Create terminal session
    session_id = await client.create_session("mainframe.example.com", "IBM-3278-4")

    # Use AI to navigate to goal
    result = await client.ai_navigate_to_goal(session_id, "Navigate to transaction entry screen")

    print(f"AI navigation result: {result}")

Network Resilience Patterns

Enterprise Network Resilience

Advanced network resilience patterns for mission-critical environments:

import asyncio
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from pure3270 import AsyncSession, setup_logging
from pure3270.exceptions import ConnectionError

class NetworkCondition(Enum):
    """Network condition states."""
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    FAILED = "failed"
    RECOVERING = "recovering"

class CircuitBreakerState(Enum):
    """Circuit breaker states."""
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Blocking requests
    HALF_OPEN = "half_open" # Testing recovery

@dataclass
class NetworkMetrics:
    """Network performance and reliability metrics."""
    latency_ms: float
    packet_loss_percent: float
    bandwidth_mbps: float
    error_rate: float
    availability_percent: float
    last_check: float

@dataclass
class FailoverRule:
    """Rule for automatic failover decisions."""
    primary_host: str
    backup_hosts: List[str]
    health_threshold: float
    latency_threshold_ms: float
    error_rate_threshold: float

class AdvancedCircuitBreaker:
    """Advanced circuit breaker with multiple failure detection methods."""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0,
                 half_open_max_calls: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls

        self.state = CircuitBreakerState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.half_open_calls = 0
        self.success_count = 0

    def can_execute(self) -> bool:
        """Check if operation can be executed."""
        current_time = time.time()

        if self.state == CircuitBreakerState.CLOSED:
            return True
        elif self.state == CircuitBreakerState.OPEN:
            if current_time - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitBreakerState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False
        else:  # HALF_OPEN
            if self.half_open_calls < self.half_open_max_calls:
                self.half_open_calls += 1
                return True
            return False

    def record_success(self):
        """Record successful operation."""
        self.failure_count = 0
        self.success_count += 1

        if self.state == CircuitBreakerState.HALF_OPEN:
            if self.success_count >= self.half_open_max_calls:
                self.state = CircuitBreakerState.CLOSED
                print("Circuit breaker recovered - CLOSED state")

    def record_failure(self):
        """Record failed operation."""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            if self.state != CircuitBreakerState.OPEN:
                self.state = CircuitBreakerState.OPEN
                print(f"Circuit breaker OPEN after {self.failure_count} failures")
        elif self.state == CircuitBreakerState.HALF_OPEN:
            self.state = CircuitBreakerState.OPEN
            print("Circuit breaker OPEN - half-open test failed")

class NetworkResilienceManager:
    """Enterprise network resilience management."""

    def __init__(self):
        self.hosts: Dict[str, NetworkMetrics] = {}
        self.circuit_breakers: Dict[str, AdvancedCircuitBreaker] = {}
        self.failover_rules: Dict[str, FailoverRule] = {}
        self.health_monitor_tasks: Dict[str, asyncio.Task] = {}
        self.network_conditions: Dict[str, NetworkCondition] = {}
        self._lock = asyncio.Lock()

    async def add_host_with_failover(self, rule: FailoverRule):
        """Add host with automatic failover configuration."""
        self.failover_rules[rule.primary_host] = rule
        self.hosts[rule.primary_host] = NetworkMetrics(0, 0, 0, 0, 0, time.time())

        for backup_host in rule.backup_hosts:
            self.hosts[backup_host] = NetworkMetrics(0, 0, 0, 0, 0, time.time())

        # Start health monitoring for all hosts
        await self._start_health_monitoring(rule)

    async def get_resilient_session(self, host: str = None, max_retries: int = 5) -> AsyncSession:
        """Get session with comprehensive resilience features."""

        if host is None:
            host = await self._select_optimal_host()

        if host not in self.hosts:
            raise ValueError(f"Host {host} not registered")

        # Check circuit breaker
        if host not in self.circuit_breakers:
            self.circuit_breakers[host] = AdvancedCircuitBreaker()

        breaker = self.circuit_breakers[host]

        if not breaker.can_execute():
            # Try failover
            failover_host = await self._find_working_backup(host)
            if failover_host:
                print(f"Circuit breaker open for {host}, using backup {failover_host}")
                host = failover_host
            else:
                raise ConnectionError(f"All hosts failed for {host}")

        # Retry with exponential backoff
        last_error = None
        delay = 1.0

        for attempt in range(max_retries):
            try:
                session = await self._create_resilient_session(host)

                # Test session with health check
                await asyncio.wait_for(session.read(), timeout=5.0)

                # Update success metrics
                await self._update_success_metrics(host)
                breaker.record_success()

                return session

            except Exception as e:
                last_error = e

                # Record failure
                breaker.record_failure()
                await self._update_failure_metrics(host)

                print(f"Connection attempt {attempt + 1} to {host} failed: {e}")

                # Try failover if this is a critical failure
                if await self._is_critical_failure(e):
                    failover_host = await self._find_working_backup(host)
                    if failover_host:
                        print(f"Critical failure, switching to backup {failover_host}")
                        host = failover_host

                # Exponential backoff
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay)
                    delay *= 1.5

        raise ConnectionError(f"Failed to connect after {max_retries} attempts: {last_error}")

    async def _create_resilient_session(self, host: str) -> AsyncSession:
        """Create session with resilience configurations."""
        session = AsyncSession()

        # Configure session for resilience
        session._resilience_mode = True
        session._connection_timeout = 30.0
        session._read_timeout = 30.0
        session._write_timeout = 30.0

        await session.connect(host)
        return session

    async def _select_optimal_host(self) -> str:
        """Select optimal host based on performance metrics."""
        suitable_hosts = []

        for host, metrics in self.hosts.items():
            if self.network_conditions.get(host, NetworkCondition.HEALTHY) == NetworkCondition.HEALTHY:
                score = self._calculate_host_score(host, metrics)
                suitable_hosts.append((host, score))

        if not suitable_hosts:
            # All hosts unhealthy, return best available
            return min(self.hosts.keys(), key=lambda h: self.hosts[h].error_rate)

        return min(suitable_hosts, key=lambda x: x[1])[0]

    def _calculate_host_score(self, host: str, metrics: NetworkMetrics) -> float:
        """Calculate host fitness score (lower is better)."""
        score = 0.0

        # Latency penalty
        score += metrics.latency_ms / 100.0

        # Error rate penalty
        score += metrics.error_rate * 10.0

        # Packet loss penalty
        score += metrics.packet_loss_percent

        # Availability bonus (lower availability increases score)
        score += (100 - metrics.availability_percent) / 10.0

        return score

    async def _find_working_backup(self, primary_host: str) -> Optional[str]:
        """Find working backup host for primary."""
        if primary_host not in self.failover_rules:
            return None

        rule = self.failover_rules[primary_host]

        # Try backup hosts in order
        for backup_host in rule.backup_hosts:
            condition = self.network_conditions.get(backup_host, NetworkCondition.HEALTHY)

            if condition == NetworkCondition.HEALTHY:
                metrics = self.hosts.get(backup_host, NetworkMetrics(0, 0, 0, 0, 0, time.time()))

                # Check if backup meets thresholds
                if (metrics.latency_ms < rule.latency_threshold_ms and
                    metrics.error_rate < rule.error_rate_threshold):
                    return backup_host

        return None

    async def _is_critical_failure(self, error: Exception) -> bool:
        """Determine if failure is critical enough to trigger immediate failover."""
        critical_errors = [
            ConnectionRefusedError,
            TimeoutError,
            OSError,  # Network unreachable
        ]

        return isinstance(error, tuple(critical_errors))

    async def _start_health_monitoring(self, rule: FailoverRule):
        """Start health monitoring for host and its backups."""
        all_hosts = [rule.primary_host] + rule.backup_hosts

        for host in all_hosts:
            task = asyncio.create_task(self._monitor_host(host, rule))
            self.health_monitor_tasks[host] = task

    async def _monitor_host(self, host: str, rule: FailoverRule):
        """Monitor individual host health."""
        while True:
            try:
                await self._check_host_health(host, rule)
                await asyncio.sleep(30)  # Check every 30 seconds
            except Exception as e:
                print(f"Health monitor error for {host}: {e}")
                await asyncio.sleep(5)

    async def _check_host_health(self, host: str, rule: FailoverRule):
        """Perform comprehensive health check on host."""
        start_time = time.time()

        try:
            # Test connection
            session = AsyncSession()
            await asyncio.wait_for(session.connect(host), timeout=10.0)

            # Test read operation
            await asyncio.wait_for(session.read(), timeout=5.0)

            await session.close()

            # Calculate metrics
            latency = (time.time() - start_time) * 1000  # Convert to ms

            # Update host metrics
            if host in self.hosts:
                self.hosts[host].latency_ms = latency
                self.hosts[host].last_check = time.time()

                # Determine network condition
                if latency < rule.latency_threshold_ms:
                    self.network_conditions[host] = NetworkCondition.HEALTHY
                elif latency < rule.latency_threshold_ms * 2:
                    self.network_conditions[host] = NetworkCondition.DEGRADED
                else:
                    self.network_conditions[host] = NetworkCondition.FAILED

        except Exception as e:
            print(f"Health check failed for {host}: {e}")

            if host in self.hosts:
                self.hosts[host].error_rate = min(100.0, self.hosts[host].error_rate + 5.0)
                self.network_conditions[host] = NetworkCondition.FAILED

    async def _update_success_metrics(self, host: str):
        """Update metrics after successful operation."""
        if host in self.hosts:
            # Gradually decrease error rate on success
            self.hosts[host].error_rate = max(0.0, self.hosts[host].error_rate - 1.0)

            # Update availability
            uptime = time.time()
            self.hosts[host].availability_percent = min(100.0,
                self.hosts[host].availability_percent + 0.1)

    async def _update_failure_metrics(self, host: str):
        """Update metrics after failed operation."""
        if host in self.hosts:
            # Increase error rate
            self.hosts[host].error_rate = min(100.0,
                self.hosts[host].error_rate + 5.0)

            # Decrease availability
            self.hosts[host].availability_percent = max(0.0,
                self.hosts[host].availability_percent - 1.0)

    async def get_network_status(self) -> Dict:
        """Get comprehensive network status report."""
        status = {
            "overall_condition": "unknown",
            "hosts": {},
            "failover_rules": {},
            "timestamp": time.time()
        }

        # Determine overall condition
        healthy_hosts = len([h for h in self.network_conditions.values()
                           if h == NetworkCondition.HEALTHY])
        total_hosts = len(self.network_conditions)

        if total_hosts == 0:
            status["overall_condition"] = "no_hosts"
        elif healthy_hosts == total_hosts:
            status["overall_condition"] = "all_healthy"
        elif healthy_hosts > total_hosts / 2:
            status["overall_condition"] = "degraded"
        else:
            status["overall_condition"] = "critical"

        # Host details
        for host in self.hosts:
            status["hosts"][host] = {
                "condition": self.network_conditions.get(host, NetworkCondition.FAILED).value,
                "metrics": {
                    "latency_ms": self.hosts[host].latency_ms,
                    "error_rate": self.hosts[host].error_rate,
                    "availability_percent": self.hosts[host].availability_percent,
                    "last_check": self.hosts[host].last_check
                },
                "circuit_breaker": {
                    "state": self.circuit_breakers.get(host, AdvancedCircuitBreaker()).state.value,
                    "failure_count": self.circuit_breakers.get(host, AdvancedCircuitBreaker()).failure_count
                }
            }

        # Failover rules
        for primary, rule in self.failover_rules.items():
            status["failover_rules"][primary] = {
                "backup_hosts": rule.backup_hosts,
                "health_threshold": rule.health_threshold,
                "latency_threshold_ms": rule.latency_threshold_ms,
                "error_rate_threshold": rule.error_rate_threshold
            }

        return status

# Example usage of network resilience
async def network_resilience_example():
    """Demonstrate enterprise network resilience patterns."""

    # Create resilience manager
    manager = NetworkResilienceManager()

    # Configure failover rules
    await manager.add_host_with_failover(FailoverRule(
        primary_host="mainframe1.corp.com",
        backup_hosts=["mainframe2.corp.com", "mainframe3.corp.com"],
        health_threshold=0.95,
        latency_threshold_ms=500.0,
        error_rate_threshold=5.0
    ))

    try:
        # Get resilient session (automatically handles failover)
        session = await manager.get_resilient_session()

        # Use session normally
        await session.string("TEST")
        await session.key("ENTER")
        response = session.ascii(session.read())
        print(f"Response: {response}")

        # Get network status
        status = await manager.get_network_status()
        print(f"Network status: {status['overall_condition']}")

    finally:
        # Cleanup
        for task in manager.health_monitor_tasks.values():
            task.cancel()

Production Deployment Examples

Enterprise Production Deployment

Production deployment configurations and monitoring:

import asyncio
import yaml
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from pure3270 import AsyncSession

@dataclass
class ProductionConfig:
    """Production deployment configuration."""
    hosts: List[str]
    monitoring_enabled: bool
    performance_targets: Dict[str, float]
    security_settings: Dict[str, bool]
    scaling_config: Dict[str, int]
    alerting_rules: Dict[str, str]

class ProductionDeploymentManager:
    """Production deployment manager for enterprise TN3270 operations."""

    def __init__(self, config_file: str = "production_config.yaml"):
        self.config = self._load_config(config_file)
        self.session_pools: Dict[str, List[AsyncSession]] = {}
        self.metrics_collector = None
        self.alert_manager = None
        self.load_balancer = None
        self._setup_monitoring()

    def _load_config(self, config_file: str) -> ProductionConfig:
        """Load production configuration from file."""
        try:
            with open(config_file, 'r') as f:
                config_data = yaml.safe_load(f)

            return ProductionConfig(**config_data)

        except FileNotFoundError:
            # Default production configuration
            return ProductionConfig(
                hosts=["mainframe1.corp.com", "mainframe2.corp.com"],
                monitoring_enabled=True,
                performance_targets={
                    "max_response_time_ms": 1000,
                    "min_availability_percent": 99.5,
                    "max_error_rate_percent": 1.0
                },
                security_settings={
                    "require_ssl": True,
                    "certificate_validation": True,
                    "session_timeout_minutes": 30
                },
                scaling_config={
                    "min_sessions_per_host": 5,
                    "max_sessions_per_host": 50,
                    "autoscale_threshold": 80
                },
                alerting_rules={
                    "response_time_threshold": "2s",
                    "error_rate_threshold": "5%",
                    "availability_threshold": "99%"
                }
            )

    def _setup_monitoring(self):
        """Setup production monitoring and alerting."""
        if not self.config.monitoring_enabled:
            return

        # Setup logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('production_tn3270.log'),
                logging.StreamHandler()
            ]
        )

        # Setup metrics collection
        self.metrics_collector = ProductionMetricsCollector(self.config)

        # Setup alerting
        self.alert_manager = ProductionAlertManager(self.config)

    async def initialize_production_environment(self):
        """Initialize production environment with all components."""
        print("Initializing production TN3270 environment...")

        # Initialize session pools
        await self._initialize_session_pools()

        # Start monitoring
        if self.config.monitoring_enabled:
            await self.metrics_collector.start()
            await self.alert_manager.start()

        # Setup load balancing
        self.load_balancer = ProductionLoadBalancer(self.config)

        print("Production environment initialized successfully")

    async def _initialize_session_pools(self):
        """Initialize session pools for all production hosts."""
        for host in self.config.hosts:
            pool = []

            for i in range(self.config.scaling_config["min_sessions_per_host"]):
                try:
                    session = await self._create_production_session(host)
                    pool.append(session)
                    print(f"Created production session {i+1} for {host}")
                except Exception as e:
                    print(f"Failed to create session {i+1} for {host}: {e}")

            self.session_pools[host] = pool
            print(f"Initialized pool with {len(pool)} sessions for {host}")

    async def _create_production_session(self, host: str) -> AsyncSession:
        """Create session with production optimizations."""
        session = AsyncSession(terminal_type="IBM-3278-4")

        # Configure for production performance
        session._connection_timeout = 15.0
        session._read_timeout = 30.0
        session._write_timeout = 30.0
        session._enable_keepalive = True
        session._buffer_size = 8192

        # Security settings
        if self.config.security_settings.get("require_ssl"):
            # Configure SSL context
            ssl_context = {
                "verify_mode": "CERT_REQUIRED" if self.config.security_settings.get("certificate_validation") else "CERT_NONE"
            }
            await session.connect(host, ssl_context=ssl_context)
        else:
            await session.connect(host)

        return session

    async def process_production_workload(self, workload: List[Dict]) -> Dict:
        """Process production workload with monitoring and scaling."""
        start_time = asyncio.get_event_loop().time()
        results = []

        # Distribute workload across hosts
        for work_item in workload:
            host = self.load_balancer.select_host(work_item)

            try:
                # Get session from pool
                session = await self._get_session_from_pool(host)

                # Process work item
                result = await self._process_work_item(session, work_item)
                results.append(result)

                # Return session to pool
                await self._return_session_to_pool(host, session)

                # Record metrics
                if self.config.monitoring_enabled:
                    self.metrics_collector.record_success(host, work_item)

            except Exception as e:
                results.append({"status": "error", "error": str(e)})

                if self.config.monitoring_enabled:
                    self.metrics_collector.record_failure(host, work_item, e)

        # Check if scaling is needed
        if self.config.monitoring_enabled:
            await self._check_scaling_requirements()

        end_time = asyncio.get_event_loop().time()
        processing_time = end_time - start_time

        # Generate performance report
        report = {
            "total_items": len(workload),
            "successful_items": len([r for r in results if r.get("status") == "success"]),
            "processing_time_seconds": processing_time,
            "items_per_second": len(workload) / processing_time,
            "host_distribution": self.load_balancer.get_distribution_stats()
        }

        return report

    async def _process_work_item(self, session: AsyncSession, work_item: Dict) -> Dict:
        """Process individual work item."""
        operation = work_item.get("operation", "read")
        parameters = work_item.get("parameters", {})

        if operation == "key":
            key = parameters.get("key", "")
            await session.key(key)
            return {"status": "success", "operation": "key", "key": key}

        elif operation == "string":
            text = parameters.get("text", "")
            await session.string(text)
            return {"status": "success", "operation": "string", "text": text[:20] + "..." if len(text) > 20 else text}

        elif operation == "read":
            timeout = parameters.get("timeout", 30.0)
            screen_data = await asyncio.wait_for(session.read(), timeout=timeout)
            screen_text = session.ascii(screen_data)
            return {
                "status": "success",
                "operation": "read",
                "screen_length": len(screen_text),
                "first_100_chars": screen_text[:100]
            }

        else:
            return {"status": "error", "error": f"Unknown operation: {operation}"}

    async def _check_scaling_requirements(self):
        """Check if automatic scaling is needed."""
        metrics = self.metrics_collector.get_current_metrics()

        for host in self.config.hosts:
            utilization = metrics.get(host, {}).get("utilization_percent", 0)

            if utilization > self.config.scaling_config["autoscale_threshold"]:
                # Scale up
                await self._scale_up_host(host)
            elif utilization < 20:  # Scale down threshold
                # Scale down
                await self._scale_down_host(host)

    async def _scale_up_host(self, host: str):
        """Scale up sessions for host."""
        current_count = len(self.session_pools.get(host, []))
        max_count = self.config.scaling_config["max_sessions_per_host"]

        if current_count < max_count:
            sessions_to_add = min(5, max_count - current_count)

            for _ in range(sessions_to_add):
                try:
                    session = await self._create_production_session(host)
                    self.session_pools[host].append(session)
                    print(f"Scaled up: added session to {host}")
                except Exception as e:
                    print(f"Scale up failed for {host}: {e}")

    async def _scale_down_host(self, host: str):
        """Scale down sessions for host."""
        min_count = self.config.scaling_config["min_sessions_per_host"]
        current_count = len(self.session_pools.get(host, []))

        if current_count > min_count:
            # Remove one session
            session = self.session_pools[host].pop()
            try:
                await session.close()
                print(f"Scaled down: removed session from {host}")
            except Exception as e:
                print(f"Error closing session during scale down: {e}")

    async def shutdown_production_environment(self):
        """Gracefully shutdown production environment."""
        print("Shutting down production environment...")

        # Close all session pools
        for host, pool in self.session_pools.items():
            for session in pool:
                try:
                    await session.close()
                except Exception as e:
                    print(f"Error closing session: {e}")

        # Stop monitoring
        if self.metrics_collector:
            await self.metrics_collector.stop()

        if self.alert_manager:
            await self.alert_manager.stop()

        print("Production environment shutdown complete")

class ProductionMetricsCollector:
    """Production metrics collection and reporting."""

    def __init__(self, config: ProductionConfig):
        self.config = config
        self.metrics: Dict[str, Dict] = {}
        self.collection_task = None

    async def start(self):
        """Start metrics collection."""
        self.collection_task = asyncio.create_task(self._collect_metrics())

    async def stop(self):
        """Stop metrics collection."""
        if self.collection_task:
            self.collection_task.cancel()

    def record_success(self, host: str, work_item: Dict):
        """Record successful operation."""
        if host not in self.metrics:
            self.metrics[host] = {
                "total_operations": 0,
                "successful_operations": 0,
                "failed_operations": 0,
                "response_times": [],
                "start_time": asyncio.get_event_loop().time()
            }

        self.metrics[host]["total_operations"] += 1
        self.metrics[host]["successful_operations"] += 1

    def record_failure(self, host: str, work_item: Dict, error: Exception):
        """Record failed operation."""
        if host not in self.metrics:
            self.metrics[host] = {
                "total_operations": 0,
                "successful_operations": 0,
                "failed_operations": 0,
                "response_times": [],
                "start_time": asyncio.get_event_loop().time()
            }

        self.metrics[host]["total_operations"] += 1
        self.metrics[host]["failed_operations"] += 1

    def get_current_metrics(self) -> Dict:
        """Get current metrics snapshot."""
        metrics = {}

        for host, data in self.metrics.items():
            total = data["total_operations"]
            if total > 0:
                success_rate = (data["successful_operations"] / total) * 100
                error_rate = (data["failed_operations"] / total) * 100

                metrics[host] = {
                    "total_operations": total,
                    "success_rate_percent": success_rate,
                    "error_rate_percent": error_rate,
                    "availability_percent": success_rate,
                    "utilization_percent": min(100, total / 100)  # Simplified
                }

        return metrics

    async def _collect_metrics(self):
        """Background metrics collection task."""
        while True:
            try:
                current_metrics = self.get_current_metrics()

                # Check performance targets
                for host, metrics in current_metrics.items():
                    if metrics["response_time_ms"] > self.config.performance_targets["max_response_time_ms"]:
                        print(f"ALERT: {host} response time exceeds target")

                    if metrics["availability_percent"] < self.config.performance_targets["min_availability_percent"]:
                        print(f"ALERT: {host} availability below target")

                    if metrics["error_rate_percent"] > self.config.performance_targets["max_error_rate_percent"]:
                        print(f"ALERT: {host} error rate exceeds target")

                await asyncio.sleep(60)  # Collect every minute

            except Exception as e:
                print(f"Metrics collection error: {e}")
                await asyncio.sleep(10)

class ProductionAlertManager:
    """Production alerting system."""

    def __init__(self, config: ProductionConfig):
        self.config = config
        self.alert_history: List[Dict] = []
        self.alert_task = None

    async def start(self):
        """Start alert monitoring."""
        self.alert_task = asyncio.create_task(self._monitor_alerts())

    async def stop(self):
        """Stop alert monitoring."""
        if self.alert_task:
            self.alert_task.cancel()

    async def send_alert(self, alert_type: str, message: str, severity: str = "WARNING"):
        """Send alert to appropriate channels."""
        alert = {
            "timestamp": asyncio.get_event_loop().time(),
            "type": alert_type,
            "message": message,
            "severity": severity
        }

        self.alert_history.append(alert)

        # Log alert
        if severity == "CRITICAL":
            logging.critical(f"CRITICAL ALERT: {message}")
        elif severity == "ERROR":
            logging.error(f"ERROR ALERT: {message}")
        else:
            logging.warning(f"WARNING ALERT: {message}")

        # In production, would send to external alert systems
        # (email, Slack, PagerDuty, etc.)

    async def _monitor_alerts(self):
        """Background alert monitoring task."""
        while True:
            try:
                # Check for alert patterns
                recent_alerts = [a for a in self.alert_history
                               if asyncio.get_event_loop().time() - a["timestamp"] < 300]

                # Detect alert storms
                if len(recent_alerts) > 10:
                    await self.send_alert(
                        "alert_storm",
                        f"High alert volume: {len(recent_alerts)} alerts in 5 minutes",
                        "CRITICAL"
                    )

                await asyncio.sleep(30)

            except Exception as e:
                print(f"Alert monitoring error: {e}")
                await asyncio.sleep(10)

class ProductionLoadBalancer:
    """Production load balancer for TN3270 sessions."""

    def __init__(self, config: ProductionConfig):
        self.config = config
        self.host_stats: Dict[str, Dict] = {}

        # Initialize host stats
        for host in config.hosts:
            self.host_stats[host] = {
                "active_sessions": 0,
                "total_requests": 0,
                "avg_response_time": 0.0,
                "error_count": 0
            }

    def select_host(self, work_item: Dict) -> str:
        """Select optimal host for work item."""
        # Simple round-robin with performance weighting
        return self.config.hosts[0]  # Simplified for demo

    def get_distribution_stats(self) -> Dict:
        """Get load distribution statistics."""
        return self.host_stats

# Example production deployment
async def production_deployment_example():
    """Demonstrate enterprise production deployment."""

    # Create deployment manager
    deployment_manager = ProductionDeploymentManager("production_config.yaml")

    try:
        # Initialize production environment
        await deployment_manager.initialize_production_environment()

        # Create sample workload
        workload = [
            {"operation": "read", "parameters": {"timeout": 10.0}},
            {"operation": "string", "parameters": {"text": "LOGON"}},
            {"operation": "key", "parameters": {"key": "ENTER"}},
        ] * 10  # Repeat for load testing

        # Process workload
        report = await deployment_manager.process_production_workload(workload)

        print("Production workload processing report:")
        print(f"  Total items: {report['total_items']}")
        print(f"  Success rate: {report['successful_items']}/{report['total_items']}")
        print(f"  Processing rate: {report['items_per_second']:.2f} items/sec")

        # Get metrics
        if deployment_manager.metrics_collector:
            metrics = deployment_manager.metrics_collector.get_current_metrics()
            print("Current metrics:")
            for host, host_metrics in metrics.items():
                print(f"  {host}: {host_metrics}")

    finally:
        await deployment_manager.shutdown_production_environment()