Advanced Patterns

This section covers advanced usage patterns, optimization techniques, and complex scenarios for Pure3270 implementation.

For practical examples of the patterns described here, see:

Complex Session Management Patterns

Advanced session lifecycle management with resource pooling and state tracking. For connection pooling implementations, see Examples section on connection management.

import asyncio
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from contextlib import asynccontextmanager
from pure3270 import AsyncSession, setup_logging
from pure3270.exceptions import ConnectionError, TN3270Error

@dataclass
class SessionMetrics:
    """Track session performance and usage metrics."""
    created_at: float
    last_activity: float
    request_count: int
    error_count: int
    total_bytes_sent: int
    total_bytes_received: int
    average_response_time: float
    peak_memory_usage: int

class AdvancedSessionManager:
    """
    Advanced session management with comprehensive monitoring,
    resource pooling, and automatic recovery.
    """

    def __init__(self, hosts: Dict[str, dict], max_sessions_per_host: int = 5):
        self.hosts = hosts  # {host: {"port": 23, "ssl": True, "terminal": "IBM-3278-2"}}
        self.max_sessions_per_host = max_sessions_per_host
        self.sessions: Dict[str, List[AsyncSession]] = {}
        self.metrics: Dict[str, SessionMetrics] = {}
        self.active_sessions: Set[AsyncSession] = set()
        self._monitoring_task = None
        self._lock = asyncio.Lock()

    async def start_monitoring(self):
        """Start background monitoring and maintenance tasks."""

        async def monitor_sessions():
            while True:
                try:
                    await self._health_check_all_sessions()
                    await self._cleanup_idle_sessions()
                    await self._rebalance_session_pool()
                    await asyncio.sleep(30)  # Check every 30 seconds
                except Exception as e:
                    print(f"Monitoring error: {e}")
                    await asyncio.sleep(5)

        self._monitoring_task = asyncio.create_task(monitor_sessions())

    async def stop_monitoring(self):
        """Stop monitoring tasks and cleanup."""
        if self._monitoring_task:
            self._monitoring_task.cancel()
            try:
                await self._monitoring_task
            except asyncio.CancelledError:
                pass

    async def get_session(self, host: str = None) -> AsyncSession:
        """
        Get an active session with automatic session management.

        Args:
            host: Specific host to get session from, or None for auto-select
        """
        if host is None:
            # Auto-select host with best metrics
            host = await self._select_best_host()

        if host not in self.hosts:
            raise ValueError(f"Unknown host: {host}")

        async with self._lock:
            # Check existing sessions
            if host in self.sessions and self.sessions[host]:
                session = self.sessions[host].pop(0)

                # Validate session health
                if await self._is_session_healthy(session):
                    self.active_sessions.add(session)
                    await self._update_metrics(session, "checkout")
                    return session
                else:
                    # Session is unhealthy, close it
                    await self._close_session(session)

        # Create new session
        return await self._create_session(host)

    async def return_session(self, session: AsyncSession):
        """Return session to pool with health validation."""
        host = None
        for h, sessions in self.sessions.items():
            if session in sessions:
                host = h
                break

        if host is None:
            # Session not from pool, close it
            await self._close_session(session)
            return

        async with self._lock:
            self.active_sessions.discard(session)

            # Validate before returning to pool
            if await self._is_session_healthy(session):
                if len(self.sessions.get(host, [])) < self.max_sessions_per_host:
                    self.sessions.setdefault(host, []).append(session)
                    await self._update_metrics(session, "return")
                else:
                    # Pool full, close session
                    await self._close_session(session)
            else:
                await self._close_session(session)

    async def _create_session(self, host: str) -> AsyncSession:
        """Create a new session with proper initialization."""
        config = self.hosts[host]

        session = AsyncSession(terminal_type=config.get("terminal", "IBM-3278-2"))

        try:
            await session.connect(
                host,
                port=config.get("port", 23),
                ssl_context=config.get("ssl_context", None)
            )

            # Initialize session metrics
            self.metrics[id(session)] = SessionMetrics(
                created_at=time.time(),
                last_activity=time.time(),
                request_count=0,
                error_count=0,
                total_bytes_sent=0,
                total_bytes_received=0,
                average_response_time=0.0,
                peak_memory_usage=0
            )

            self.active_sessions.add(session)
            return session

        except Exception as e:
            await self._close_session(session)
            raise ConnectionError(f"Failed to create session for {host}: {e}")

    async def _close_session(self, session: AsyncSession):
        """Safely close a session and cleanup."""
        try:
            await session.close()
        except Exception as e:
            print(f"Error closing session: {e}")
        finally:
            self.active_sessions.discard(session)
            self.metrics.pop(id(session), None)

    async def _is_session_healthy(self, session: AsyncSession) -> bool:
        """Perform comprehensive health check on session."""
        try:
            # Quick response test
            start_time = time.time()
            await asyncio.wait_for(session.read(), timeout=2.0)
            response_time = time.time() - start_time

            # Check response time is reasonable
            if response_time > 5.0:
                return False

            return True

        except Exception:
            return False

    async def _health_check_all_sessions(self):
        """Perform health checks on all sessions in pool."""
        async with self._lock:
            for host, sessions in self.sessions.items():
                healthy_sessions = []
                for session in sessions:
                    if await self._is_session_healthy(session):
                        healthy_sessions.append(session)
                    else:
                        await self._close_session(session)
                self.sessions[host] = healthy_sessions

    async def _cleanup_idle_sessions(self):
        """Remove idle sessions to free resources."""
        current_time = time.time()
        idle_threshold = 300  # 5 minutes

        async with self._lock:
            for host, sessions in list(self.sessions.items()):
                active_sessions = []
                for session in sessions:
                    metrics = self.metrics.get(id(session))
                    if metrics:
                        if current_time - metrics.last_activity > idle_threshold:
                            # Remove from pool but keep if still in use
                            if session not in self.active_sessions:
                                await self._close_session(session)
                            else:
                                active_sessions.append(session)
                        else:
                            active_sessions.append(session)
                self.sessions[host] = active_sessions

    async def _rebalance_session_pool(self):
        """Rebalance session distribution across hosts."""
        # Implementation for load balancing across multiple hosts
        pass

    async def _select_best_host(self) -> str:
        """Select the host with the best performance metrics."""
        best_host = None
        best_score = float('inf')

        for host, sessions in self.sessions.items():
            # Simple scoring based on active session count
            active_count = len([s for s in sessions if s in self.active_sessions])
            if active_count < best_score:
                best_score = active_count
                best_host = host

        return best_host or list(self.hosts.keys())[0]

    async def _update_metrics(self, session: AsyncSession, operation: str):
        """Update session metrics based on operation."""
        metrics = self.metrics.get(id(session))
        if metrics:
            if operation == "checkout":
                metrics.request_count += 1
                metrics.last_activity = time.time()

    async def get_statistics(self) -> Dict:
        """Get comprehensive session pool statistics."""
        async with self._lock:
            stats = {
                "total_sessions": sum(len(sessions) for sessions in self.sessions.values()),
                "active_sessions": len(self.active_sessions),
                "hosts": {}
            }

            for host, sessions in self.sessions.items():
                host_stats = {
                    "available_sessions": len(sessions),
                    "active_sessions": len([s for s in sessions if s in self.active_sessions])
                }

                # Add metrics for this host's sessions
                for session in sessions:
                    if id(session) in self.metrics:
                        metrics = self.metrics[id(session)]
                        if "session_metrics" not in host_stats:
                            host_stats["session_metrics"] = []
                        host_stats["session_metrics"].append({
                            "requests": metrics.request_count,
                            "errors": metrics.error_count,
                            "uptime": time.time() - metrics.created_at
                        })

                stats["hosts"][host] = host_stats

            return stats

# Context manager for automatic session lifecycle management
@asynccontextmanager
async def managed_session(manager: AdvancedSessionManager, host: str = None):
    """
    Context manager that automatically handles session checkout/checkin.

    Usage:
        async with managed_session(session_manager, "mainframe.example.com") as session:
            await session.string("TEST")
            await session.key("Enter")
    """
    session = None
    try:
        session = await manager.get_session(host)
        yield session
    finally:
        if session:
            await manager.return_session(session)

# Example usage
async def session_management_example():
    """Demonstrate advanced session management patterns."""

    # Configure multiple hosts
    hosts = {
        "mainframe1.example.com": {"port": 23, "terminal": "IBM-3278-4"},
        "mainframe2.example.com": {"port": 23, "terminal": "IBM-3279-3"},
        "printerhost.example.com": {"port": 23, "terminal": "IBM-3281-1"}
    }

    manager = AdvancedSessionManager(hosts)
    await manager.start_monitoring()

    try:
        # Use managed session
        async with managed_session(manager, "mainframe1.example.com") as session:
            await session.string("HELLO")
            await session.key("Enter")
            response = session.ascii(session.read())
            print(f"Response: {response}")

        # Get statistics
        stats = await manager.get_statistics()
        print(f"Session pool statistics: {stats}")

    finally:
        await manager.stop_monitoring()

Error Handling and Recovery Strategies

Comprehensive error handling and recovery patterns for production environments. For basic error handling examples, see the Examples section on error handling patterns.

import asyncio
import logging
import traceback
from enum import Enum
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from pure3270 import AsyncSession
from pure3270.exceptions import TN3270Error, ConnectionError, TimeoutError

class ErrorSeverity(Enum):
    """Error severity levels for handling strategy determination."""
    LOW = "low"          # Non-critical, can continue
    MEDIUM = "medium"    # May require recovery
    HIGH = "high"        # Critical, needs immediate attention
    CRITICAL = "critical" # Fatal, terminate session

class RecoveryStrategy(Enum):
    """Available recovery strategies."""
    RETRY = "retry"
    RECONNECT = "reconnect"
    SWITCH_HOST = "switch_host"
    ESCALATE = "escalate"
    TERMINATE = "terminate"

@dataclass
class ErrorContext:
    """Context information for error handling decisions."""
    error_type: type
    error_message: str
    session_state: str
    host: str
    operation: str
    timestamp: float
    retry_count: int = 0

@dataclass
class RecoveryConfig:
    """Configuration for error recovery mechanisms."""
    max_retries: int = 3
    retry_delay: float = 1.0
    backoff_multiplier: float = 2.0
    max_reconnect_attempts: int = 5
    failover_hosts: list = None
    circuit_breaker_threshold: int = 5
    circuit_breaker_timeout: float = 300.0

class CircuitBreaker:
    """Circuit breaker pattern for preventing cascade failures."""

    def __init__(self, failure_threshold: int, recovery_timeout: float):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half-open

    def can_execute(self) -> bool:
        """Check if operation can be executed."""
        if self.state == "closed":
            return True
        elif self.state == "open":
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        else:  # half-open
            return True

    def record_success(self):
        """Record successful operation."""
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        """Record failed operation."""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = "open"

class AdvancedErrorHandler:
    """Advanced error handling with automatic recovery strategies."""

    def __init__(self, recovery_config: RecoveryConfig = None):
        self.config = recovery_config or RecoveryConfig()
        self.circuit_breakers: Dict[str, CircuitBreaker] = {}
        self.error_callbacks: Dict[type, Callable] = {}
        self.recovery_strategies: Dict[ErrorSeverity, RecoveryStrategy] = {
            ErrorSeverity.LOW: RecoveryStrategy.RETRY,
            ErrorSeverity.MEDIUM: RecoveryStrategy.RETRY,
            ErrorSeverity.HIGH: RecoveryStrategy.RECONNECT,
            ErrorSeverity.CRITICAL: RecoveryStrategy.TERMINATE
        }

    async def handle_error(self, error: Exception, context: ErrorContext) -> bool:
        """
        Handle error with appropriate recovery strategy.

        Returns:
            bool: True if error was handled successfully, False otherwise
        """
        severity = self._determine_severity(error, context)
        strategy = self.recovery_strategies.get(severity, RecoveryStrategy.TERMINATE)

        logging.error(f"Error: {error}, Severity: {severity}, Strategy: {strategy}")

        try:
            if strategy == RecoveryStrategy.RETRY:
                return await self._handle_with_retry(error, context, severity)
            elif strategy == RecoveryStrategy.RECONNECT:
                return await self._handle_with_reconnect(error, context, severity)
            elif strategy == RecoveryStrategy.SWITCH_HOST:
                return await self._handle_with_failover(error, context, severity)
            elif strategy == RecoveryStrategy.ESCALATE:
                await self._handle_with_escalation(error, context, severity)
                return False
            else:  # TERMINATE
                return False

        except Exception as recovery_error:
            logging.error(f"Recovery failed: {recovery_error}")
            return False

    def _determine_severity(self, error: Exception, context: ErrorContext) -> ErrorSeverity:
        """Determine error severity based on error type and context."""
        if isinstance(error, ConnectionError):
            return ErrorSeverity.HIGH
        elif isinstance(error, TimeoutError):
            return ErrorSeverity.MEDIUM
        elif isinstance(error, TN3270Error):
            if "protocol" in str(error).lower():
                return ErrorSeverity.HIGH
            else:
                return ErrorSeverity.MEDIUM
        elif isinstance(error, KeyboardInterrupt):
            return ErrorSeverity.CRITICAL
        else:
            return ErrorSeverity.LOW

    async def _handle_with_retry(self, error: Exception, context: ErrorContext,
                               severity: ErrorSeverity) -> bool:
        """Handle error with retry strategy."""
        max_retries = self.config.max_retries
        delay = self.config.retry_delay

        for attempt in range(max_retries):
            try:
                if await self._execute_with_circuit_breaker(context.host):
                    return True
                else:
                    break
            except Exception as e:
                logging.warning(f"Retry attempt {attempt + 1} failed: {e}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay)
                    delay *= self.config.backoff_multiplier
                else:
                    # Final retry failed, escalate
                    return await self._handle_with_reconnect(error, context, severity)

        return False

    async def _handle_with_reconnect(self, error: Exception, context: ErrorContext,
                                   severity: ErrorSeverity) -> bool:
        """Handle error with reconnection strategy."""
        for attempt in range(self.config.max_reconnect_attempts):
            try:
                logging.info(f"Reconnection attempt {attempt + 1}")

                # Wait before reconnecting
                if attempt > 0:
                    await asyncio.sleep(self.config.retry_delay * attempt)

                # Create new session (implementation depends on your session management)
                new_session = await self._create_new_session(context.host)

                if new_session:
                    # Test connection
                    await asyncio.wait_for(new_session.read(), timeout=10.0)
                    logging.info("Reconnection successful")
                    return True

            except Exception as e:
                logging.warning(f"Reconnection attempt {attempt + 1} failed: {e}")

        return False

    async def _handle_with_failover(self, error: Exception, context: ErrorContext,
                                  severity: ErrorSeverity) -> bool:
        """Handle error with host failover strategy."""
        if not self.config.failover_hosts:
            return False

        for failover_host in self.config.failover_hosts:
            try:
                logging.info(f"Attempting failover to {failover_host}")

                new_session = await self._create_new_session(failover_host)
                if new_session:
                    logging.info(f"Failover to {failover_host} successful")
                    return True

            except Exception as e:
                logging.warning(f"Failover to {failover_host} failed: {e}")

        return False

    async def _handle_with_escalation(self, error: Exception, context: ErrorContext,
                                    severity: ErrorSeverity):
        """Handle error with escalation (alerts, logging, etc.)."""
        logging.critical(f"Critical error requiring escalation: {error}")
        logging.critical(f"Context: {context}")

        # In real implementation, send alerts, create tickets, etc.
        await self._send_alert(error, context)

    async def _execute_with_circuit_breaker(self, host: str) -> bool:
        """Execute operation with circuit breaker protection."""
        if host not in self.circuit_breakers:
            self.circuit_breakers[host] = CircuitBreaker(
                self.config.circuit_breaker_threshold,
                self.config.circuit_breaker_timeout
            )

        breaker = self.circuit_breakers[host]

        if not breaker.can_execute():
            logging.warning(f"Circuit breaker open for {host}")
            return False

        try:
            # Execute operation here
            result = True  # Placeholder for actual operation
            breaker.record_success()
            return result
        except Exception as e:
            breaker.record_failure()
            raise e

    async def _create_new_session(self, host: str) -> AsyncSession:
        """Create a new session for reconnection."""
        session = AsyncSession()
        await session.connect(host)
        return session

    async def _send_alert(self, error: Exception, context: ErrorContext):
        """Send alert for critical errors."""
        # Implementation would send alerts via email, Slack, etc.
        logging.critical(f"ALERT: {error} at {context.host} during {context.operation}")

class ResilientSessionWrapper:
    """Wrapper that adds resilience to existing sessions."""

    def __init__(self, session: AsyncSession, error_handler: AdvancedErrorHandler):
        self.session = session
        self.error_handler = error_handler
        self.operation_retry_map = {
            "read": self._safe_read,
            "write": self._safe_write,
            "connect": self._safe_connect,
            "key": self._safe_key,
            "string": self._safe_string
        }

    def __getattr__(self, name):
        """Proxy attribute access to wrapped session."""
        if name in self.operation_retry_map:
            return self.operation_retry_map[name]
        return getattr(self.session, name)

    async def _safe_read(self, *args, **kwargs):
        """Safe read operation with error handling."""
        context = ErrorContext(
            error_type=type(None),
            error_message="",
            session_state="reading",
            host="unknown",  # Should be populated from session
            operation="read",
            timestamp=time.time()
        )

        try:
            return await self.session.read(*args, **kwargs)
        except Exception as error:
            context.error_type = type(error)
            context.error_message = str(error)

            handled = await self.error_handler.handle_error(error, context)
            if handled:
                # Retry the operation
                return await self.session.read(*args, **kwargs)
            else:
                raise error

    async def _safe_write(self, data: bytes, *args, **kwargs):
        """Safe write operation with error handling."""
        context = ErrorContext(
            error_type=type(None),
            error_message="",
            session_state="writing",
            host="unknown",
            operation="write",
            timestamp=time.time()
        )

        try:
            return await self.session.write(data, *args, **kwargs)
        except Exception as error:
            context.error_type = type(error)
            context.error_message = str(error)

            handled = await self.error_handler.handle_error(error, context)
            if handled:
                return await self.session.write(data, *args, **kwargs)
            else:
                raise error

    async def _safe_connect(self, *args, **kwargs):
        """Safe connect operation with error handling."""
        context = ErrorContext(
            error_type=type(None),
            error_message="",
            session_state="connecting",
            host=args[0] if args else "unknown",
            operation="connect",
            timestamp=time.time()
        )

        try:
            return await self.session.connect(*args, **kwargs)
        except Exception as error:
            context.error_type = type(error)
            context.error_message = str(error)

            handled = await self.error_handler.handle_error(error, context)
            if handled:
                return await self.session.connect(*args, **kwargs)
            else:
                raise error

    async def _safe_key(self, key: str, *args, **kwargs):
        """Safe key operation with error handling."""
        context = ErrorContext(
            error_type=type(None),
            error_message="",
            session_state="keying",
            host="unknown",
            operation="key",
            timestamp=time.time()
        )

        try:
            return await self.session.key(key, *args, **kwargs)
        except Exception as error:
            context.error_type = type(error)
            context.error_message = str(error)

            handled = await self.error_handler.handle_error(error, context)
            if handled:
                return await self.session.key(key, *args, **kwargs)
            else:
                raise error

    async def _safe_string(self, text: str, *args, **kwargs):
        """Safe string operation with error handling."""
        context = ErrorContext(
            error_type=type(None),
            error_message="",
            session_state="string_input",
            host="unknown",
            operation="string",
            timestamp=time.time()
        )

        try:
            return await self.session.string(text, *args, **kwargs)
        except Exception as error:
            context.error_type = type(error)
            context.error_message = str(error)

            handled = await self.error_handler.handle_error(error, context)
            if handled:
                return await self.session.string(text, *args, **kwargs)
            else:
                raise error

# Example usage of advanced error handling
async def error_handling_example():
    """Demonstrate advanced error handling patterns."""

    # Configure error handling
    recovery_config = RecoveryConfig(
        max_retries=3,
        retry_delay=1.0,
        backoff_multiplier=2.0,
        max_reconnect_attempts=5,
        failover_hosts=["backup1.example.com", "backup2.example.com"]
    )

    error_handler = AdvancedErrorHandler(recovery_config)

    # Create resilient session wrapper
    session = AsyncSession()
    resilient_session = ResilientSessionWrapper(session, error_handler)

    try:
        # Connect with automatic error handling
        await resilient_session.connect('mainframe.example.com')

        # Perform operations with automatic recovery
        await resilient_session.string("LOGON")
        await resilient_session.key("Enter")

        # This will automatically retry and recover from errors
        screen_data = await resilient_session.read()

    except Exception as e:
        print(f"Operation failed after all recovery attempts: {e}")
    finally:
        await session.close()

Performance Optimization Techniques

Performance optimization strategies for high-throughput environments. For detailed performance examples and benchmarking, see the Examples section on performance optimization.

import asyncio
import time
import gc
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from pure3270 import AsyncSession
from pure3270.emulation.screen_buffer import ScreenBuffer

@datlass
class PerformanceMetrics:
    """Performance metrics for optimization analysis."""
    operations_per_second: float
    average_response_time: float
    peak_memory_usage: int
    throughput_bytes_per_second: int
    error_rate: float
    connection_reuse_ratio: float

class ConnectionPool:
    """Optimized connection pool for high-performance scenarios."""

    def __init__(self, host: str, port: int, min_size: int = 5, max_size: int = 20):
        self.host = host
        self.port = port
        self.min_size = min_size
        self.max_size = max_size
        self.idle_pool: List[AsyncSession] = []
        self.active_sessions: Dict[int, AsyncSession] = {}
        self.in_use: set = set()
        self._lock = asyncio.Lock()
        self._metrics = PerformanceMetrics(0, 0, 0, 0, 0, 0)

    async def get_session(self) -> AsyncSession:
        """Get session from pool with optimization."""
        async with self._lock:
            # Try to reuse idle session
            if self.idle_pool:
                session = self.idle_pool.pop(0)

                # Quick health check
                if await self._quick_health_check(session):
                    return session
                else:
                    # Session is unhealthy, close it
                    await self._close_session(session)

            # Create new session if pool not at max
            if len(self.active_sessions) + len(self.idle_pool) < self.max_size:
                session = await self._create_optimized_session()
                return session

            # Pool at capacity, wait for available session
            return await self._wait_for_session()

    async def return_session(self, session: AsyncSession):
        """Return session to pool with optimization."""
        async with self._lock:
            session_id = id(session)
            self.active_sessions.pop(session_id, None)

            # Only return healthy sessions to pool
            if await self._quick_health_check(session):
                if len(self.idle_pool) < self.max_size:
                    self.idle_pool.append(session)
                else:
                    await self._close_session(session)
            else:
                await self._close_session(session)

    async def _create_optimized_session(self) -> AsyncSession:
        """Create session with performance optimizations."""
        session = AsyncSession()
        session._performance_mode = True

        # Pre-configure for performance
        session._buffer_pool = []
        session._optimization_level = "high"

        await session.connect(self.host, self.port)
        return session

    async def _quick_health_check(self, session: AsyncSession) -> bool:
        """Quick health check for performance."""
        try:
            # Set short timeout for health check
            await asyncio.wait_for(session.read(), timeout=1.0)
            return True
        except:
            return False

    async def _close_session(self, session: AsyncSession):
        """Clean up session resources."""
        try:
            await session.close()
        except:
            pass
        finally:
            # Force garbage collection
            gc.collect()

    async def _wait_for_session(self) -> AsyncSession:
        """Wait for available session from pool."""
        # Implementation would use asyncio.Queue or similar
        # For brevity, just create new session
        return await self._create_optimized_session()

class ScreenBufferOptimizer:
    """Optimizations for screen buffer operations."""

    def __init__(self):
        self.buffer_cache: Dict[str, ScreenBuffer] = {}
        self.parse_cache: Dict[str, dict] = {}
        self._cache_size_limit = 100

    def optimize_screen_reading(self, session: AsyncSession, operation: Callable):
        """Decorator to optimize screen reading operations."""
        async def wrapper(*args, **kwargs):
            # Use cached screen if available and recent
            cache_key = f"{session.host}:{hash(str(args))}"

            if cache_key in self.buffer_cache:
                cached_buffer = self.buffer_cache[cache_key]
                if cached_buffer._timestamp and time.time() - cached_buffer._timestamp < 5.0:
                    return cached_buffer

            # Perform operation
            result = await operation(*args, **kwargs)

            # Cache result if it's a screen buffer
            if hasattr(result, 'to_text'):
                self._cache_screen_buffer(cache_key, result)

            return result

        return wrapper

    def _cache_screen_buffer(self, key: str, buffer: ScreenBuffer):
        """Cache screen buffer with size management."""
        if len(self.buffer_cache) >= self._cache_size_limit:
            # Remove oldest cache entry
            oldest_key = min(self.buffer_cache.keys(),
                           key=lambda k: self.buffer_cache[k]._timestamp)
            del self.buffer_cache[oldest_key]

        buffer._timestamp = time.time()
        self.buffer_cache[key] = buffer

class BatchOperationOptimizer:
    """Optimizations for batch operations."""

    def __init__(self, batch_size: int = 10, timeout: float = 0.1):
        self.batch_size = batch_size
        self.timeout = timeout
        self.pending_operations = []

    async def submit_batch(self, operations: List[Callable]):
        """Submit operations for batch processing."""
        for operation in operations:
            task = asyncio.create_task(operation())
            self.pending_operations.append(task)

            if len(self.pending_operations) >= self.batch_size:
                await self._process_batch()

        # Process remaining operations
        if self.pending_operations:
            await self._process_batch()

    async def _process_batch(self):
        """Process pending operations in batch."""
        if not self.pending_operations:
            return

        # Wait for all operations with timeout
        completed = await asyncio.wait_for(
            asyncio.gather(*self.pending_operations, return_exceptions=True),
            timeout=self.timeout
        )

        # Clear completed operations
        self.pending_operations.clear()

        # Return results
        return completed

class AsyncBatchedTN3270Client:
    """High-performance batched TN3270 client."""

    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.pool = ConnectionPool(host, port, min_size=5, max_size=20)
        self.buffer_optimizer = ScreenBufferOptimizer()
        self.batch_optimizer = BatchOperationOptimizer()
        self.metrics = PerformanceMetrics(0, 0, 0, 0, 0, 0)

    async def batch_operations(self, operations: List[dict]) -> List[Any]:
        """
        Execute operations in batch for optimal performance.

        operations: List of dicts with 'type': 'key'|'string'|'read', 'data': value
        """
        session = await self.pool.get_session()

        try:
            # Group operations by type for optimization
            key_operations = []
            string_operations = []
            read_operations = []

            for op in operations:
                if op['type'] == 'key':
                    key_operations.append(op['data'])
                elif op['type'] == 'string':
                    string_operations.append(op['data'])
                elif op['type'] == 'read':
                    read_operations.append(op['data'])

            # Execute batched operations
            results = []

            # Batch key operations
            if key_operations:
                start_time = time.time()
                for key in key_operations:
                    await session.key(key)

                key_time = time.time() - start_time
                print(f"Executed {len(key_operations)} key operations in {key_time:.3f}s")

            # Batch string operations
            if string_operations:
                start_time = time.time()
                for text in string_operations:
                    await session.string(text)

                string_time = time.time() - start_time
                print(f"Executed {len(string_operations)} string operations in {string_time:.3f}s")

            # Batch read operations
            if read_operations:
                start_time = time.time()
                for _ in read_operations:
                    data = await session.read()
                    results.append(data)

                read_time = time.time() - start_time
                print(f"Executed {len(read_operations)} read operations in {read_time:.3f}s")

            return results

        finally:
            await self.pool.return_session(session)

    async def high_throughput_scenario(self, num_operations: int = 1000):
        """Demonstrate high-throughput scenario."""
        start_time = time.time()

        # Create batch operations
        operations = []
        for i in range(num_operations):
            operations.append({
                'type': 'key',
                'data': f'Enter' if i % 10 == 0 else 'Tab'
            })

            if i % 20 == 0:  # Add reads periodically
                operations.append({
                    'type': 'read',
                    'data': None
                })

        # Execute batch
        results = await self.batch_operations(operations)

        end_time = time.time()
        total_time = end_time - start_time

        # Calculate metrics
        self.metrics.operations_per_second = num_operations / total_time
        self.metrics.average_response_time = total_time / num_operations

        print(f"Performance metrics:")
        print(f"  Operations: {num_operations}")
        print(f"  Total time: {total_time:.3f}s")
        print(f"  Ops/second: {self.metrics.operations_per_second:.1f}")
        print(f"  Avg response: {self.metrics.average_response_time:.4f}s")

class MemoryOptimizer:
    """Memory usage optimization for large-scale operations."""

    def __init__(self, max_memory_mb: int = 512):
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.session_memory_tracker = {}

    def track_session_memory(self, session: AsyncSession):
        """Track memory usage of a session."""
        import psutil
        import os

        process = psutil.Process(os.getpid())
        session_memory = process.memory_info().rss
        self.session_memory_tracker[id(session)] = session_memory

        if self._should_cleanup():
            self._cleanup_memory()

    def _should_cleanup(self) -> bool:
        """Check if memory cleanup is needed."""
        import psutil
        import os

        current_memory = psutil.Process(os.getpid()).memory_info().rss
        return current_memory > self.max_memory_bytes

    def _cleanup_memory(self):
        """Perform memory cleanup."""
        # Clear caches
        gc.collect()

        # Reset session memory tracker
        self.session_memory_tracker.clear()

# Example usage of performance optimizations
async def performance_optimization_example():
    """Demonstrate performance optimization techniques."""

    # Create optimized client
    client = AsyncBatchedTN3270Client('mainframe.example.com')

    # Configure memory optimization
    memory_optimizer = MemoryOptimizer(max_memory_mb=256)

    try:
        # High-throughput scenario
        await client.high_throughput_scenario(1000)

        # Memory tracking
        session = await client.pool.get_session()
        memory_optimizer.track_session_memory(session)
        await client.pool.return_session(session)

    finally:
        # Cleanup
        memory_optimizer._cleanup_memory()