Advanced Patterns¶
This section covers advanced usage patterns, optimization techniques, and complex scenarios for Pure3270 implementation.
For practical examples of the patterns described here, see:
Examples - Basic usage examples and API demonstrations
Protocol Examples - Detailed protocol-level examples
Integration Scenarios - Real-world enterprise integration patterns
Complex Session Management Patterns¶
Advanced session lifecycle management with resource pooling and state tracking. For connection pooling implementations, see Examples section on connection management.
import asyncio
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from contextlib import asynccontextmanager
from pure3270 import AsyncSession, setup_logging
from pure3270.exceptions import ConnectionError, TN3270Error
@dataclass
class SessionMetrics:
"""Track session performance and usage metrics."""
created_at: float
last_activity: float
request_count: int
error_count: int
total_bytes_sent: int
total_bytes_received: int
average_response_time: float
peak_memory_usage: int
class AdvancedSessionManager:
"""
Advanced session management with comprehensive monitoring,
resource pooling, and automatic recovery.
"""
def __init__(self, hosts: Dict[str, dict], max_sessions_per_host: int = 5):
self.hosts = hosts # {host: {"port": 23, "ssl": True, "terminal": "IBM-3278-2"}}
self.max_sessions_per_host = max_sessions_per_host
self.sessions: Dict[str, List[AsyncSession]] = {}
self.metrics: Dict[str, SessionMetrics] = {}
self.active_sessions: Set[AsyncSession] = set()
self._monitoring_task = None
self._lock = asyncio.Lock()
async def start_monitoring(self):
"""Start background monitoring and maintenance tasks."""
async def monitor_sessions():
while True:
try:
await self._health_check_all_sessions()
await self._cleanup_idle_sessions()
await self._rebalance_session_pool()
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Monitoring error: {e}")
await asyncio.sleep(5)
self._monitoring_task = asyncio.create_task(monitor_sessions())
async def stop_monitoring(self):
"""Stop monitoring tasks and cleanup."""
if self._monitoring_task:
self._monitoring_task.cancel()
try:
await self._monitoring_task
except asyncio.CancelledError:
pass
async def get_session(self, host: str = None) -> AsyncSession:
"""
Get an active session with automatic session management.
Args:
host: Specific host to get session from, or None for auto-select
"""
if host is None:
# Auto-select host with best metrics
host = await self._select_best_host()
if host not in self.hosts:
raise ValueError(f"Unknown host: {host}")
async with self._lock:
# Check existing sessions
if host in self.sessions and self.sessions[host]:
session = self.sessions[host].pop(0)
# Validate session health
if await self._is_session_healthy(session):
self.active_sessions.add(session)
await self._update_metrics(session, "checkout")
return session
else:
# Session is unhealthy, close it
await self._close_session(session)
# Create new session
return await self._create_session(host)
async def return_session(self, session: AsyncSession):
"""Return session to pool with health validation."""
host = None
for h, sessions in self.sessions.items():
if session in sessions:
host = h
break
if host is None:
# Session not from pool, close it
await self._close_session(session)
return
async with self._lock:
self.active_sessions.discard(session)
# Validate before returning to pool
if await self._is_session_healthy(session):
if len(self.sessions.get(host, [])) < self.max_sessions_per_host:
self.sessions.setdefault(host, []).append(session)
await self._update_metrics(session, "return")
else:
# Pool full, close session
await self._close_session(session)
else:
await self._close_session(session)
async def _create_session(self, host: str) -> AsyncSession:
"""Create a new session with proper initialization."""
config = self.hosts[host]
session = AsyncSession(terminal_type=config.get("terminal", "IBM-3278-2"))
try:
await session.connect(
host,
port=config.get("port", 23),
ssl_context=config.get("ssl_context", None)
)
# Initialize session metrics
self.metrics[id(session)] = SessionMetrics(
created_at=time.time(),
last_activity=time.time(),
request_count=0,
error_count=0,
total_bytes_sent=0,
total_bytes_received=0,
average_response_time=0.0,
peak_memory_usage=0
)
self.active_sessions.add(session)
return session
except Exception as e:
await self._close_session(session)
raise ConnectionError(f"Failed to create session for {host}: {e}")
async def _close_session(self, session: AsyncSession):
"""Safely close a session and cleanup."""
try:
await session.close()
except Exception as e:
print(f"Error closing session: {e}")
finally:
self.active_sessions.discard(session)
self.metrics.pop(id(session), None)
async def _is_session_healthy(self, session: AsyncSession) -> bool:
"""Perform comprehensive health check on session."""
try:
# Quick response test
start_time = time.time()
await asyncio.wait_for(session.read(), timeout=2.0)
response_time = time.time() - start_time
# Check response time is reasonable
if response_time > 5.0:
return False
return True
except Exception:
return False
async def _health_check_all_sessions(self):
"""Perform health checks on all sessions in pool."""
async with self._lock:
for host, sessions in self.sessions.items():
healthy_sessions = []
for session in sessions:
if await self._is_session_healthy(session):
healthy_sessions.append(session)
else:
await self._close_session(session)
self.sessions[host] = healthy_sessions
async def _cleanup_idle_sessions(self):
"""Remove idle sessions to free resources."""
current_time = time.time()
idle_threshold = 300 # 5 minutes
async with self._lock:
for host, sessions in list(self.sessions.items()):
active_sessions = []
for session in sessions:
metrics = self.metrics.get(id(session))
if metrics:
if current_time - metrics.last_activity > idle_threshold:
# Remove from pool but keep if still in use
if session not in self.active_sessions:
await self._close_session(session)
else:
active_sessions.append(session)
else:
active_sessions.append(session)
self.sessions[host] = active_sessions
async def _rebalance_session_pool(self):
"""Rebalance session distribution across hosts."""
# Implementation for load balancing across multiple hosts
pass
async def _select_best_host(self) -> str:
"""Select the host with the best performance metrics."""
best_host = None
best_score = float('inf')
for host, sessions in self.sessions.items():
# Simple scoring based on active session count
active_count = len([s for s in sessions if s in self.active_sessions])
if active_count < best_score:
best_score = active_count
best_host = host
return best_host or list(self.hosts.keys())[0]
async def _update_metrics(self, session: AsyncSession, operation: str):
"""Update session metrics based on operation."""
metrics = self.metrics.get(id(session))
if metrics:
if operation == "checkout":
metrics.request_count += 1
metrics.last_activity = time.time()
async def get_statistics(self) -> Dict:
"""Get comprehensive session pool statistics."""
async with self._lock:
stats = {
"total_sessions": sum(len(sessions) for sessions in self.sessions.values()),
"active_sessions": len(self.active_sessions),
"hosts": {}
}
for host, sessions in self.sessions.items():
host_stats = {
"available_sessions": len(sessions),
"active_sessions": len([s for s in sessions if s in self.active_sessions])
}
# Add metrics for this host's sessions
for session in sessions:
if id(session) in self.metrics:
metrics = self.metrics[id(session)]
if "session_metrics" not in host_stats:
host_stats["session_metrics"] = []
host_stats["session_metrics"].append({
"requests": metrics.request_count,
"errors": metrics.error_count,
"uptime": time.time() - metrics.created_at
})
stats["hosts"][host] = host_stats
return stats
# Context manager for automatic session lifecycle management
@asynccontextmanager
async def managed_session(manager: AdvancedSessionManager, host: str = None):
"""
Context manager that automatically handles session checkout/checkin.
Usage:
async with managed_session(session_manager, "mainframe.example.com") as session:
await session.string("TEST")
await session.key("Enter")
"""
session = None
try:
session = await manager.get_session(host)
yield session
finally:
if session:
await manager.return_session(session)
# Example usage
async def session_management_example():
"""Demonstrate advanced session management patterns."""
# Configure multiple hosts
hosts = {
"mainframe1.example.com": {"port": 23, "terminal": "IBM-3278-4"},
"mainframe2.example.com": {"port": 23, "terminal": "IBM-3279-3"},
"printerhost.example.com": {"port": 23, "terminal": "IBM-3281-1"}
}
manager = AdvancedSessionManager(hosts)
await manager.start_monitoring()
try:
# Use managed session
async with managed_session(manager, "mainframe1.example.com") as session:
await session.string("HELLO")
await session.key("Enter")
response = session.ascii(session.read())
print(f"Response: {response}")
# Get statistics
stats = await manager.get_statistics()
print(f"Session pool statistics: {stats}")
finally:
await manager.stop_monitoring()
Error Handling and Recovery Strategies¶
Comprehensive error handling and recovery patterns for production environments. For basic error handling examples, see the Examples section on error handling patterns.
import asyncio
import logging
import traceback
from enum import Enum
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from pure3270 import AsyncSession
from pure3270.exceptions import TN3270Error, ConnectionError, TimeoutError
class ErrorSeverity(Enum):
"""Error severity levels for handling strategy determination."""
LOW = "low" # Non-critical, can continue
MEDIUM = "medium" # May require recovery
HIGH = "high" # Critical, needs immediate attention
CRITICAL = "critical" # Fatal, terminate session
class RecoveryStrategy(Enum):
"""Available recovery strategies."""
RETRY = "retry"
RECONNECT = "reconnect"
SWITCH_HOST = "switch_host"
ESCALATE = "escalate"
TERMINATE = "terminate"
@dataclass
class ErrorContext:
"""Context information for error handling decisions."""
error_type: type
error_message: str
session_state: str
host: str
operation: str
timestamp: float
retry_count: int = 0
@dataclass
class RecoveryConfig:
"""Configuration for error recovery mechanisms."""
max_retries: int = 3
retry_delay: float = 1.0
backoff_multiplier: float = 2.0
max_reconnect_attempts: int = 5
failover_hosts: list = None
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: float = 300.0
class CircuitBreaker:
"""Circuit breaker pattern for preventing cascade failures."""
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half-open
def can_execute(self) -> bool:
"""Check if operation can be executed."""
if self.state == "closed":
return True
elif self.state == "open":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "half-open"
return True
return False
else: # half-open
return True
def record_success(self):
"""Record successful operation."""
self.failure_count = 0
self.state = "closed"
def record_failure(self):
"""Record failed operation."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
class AdvancedErrorHandler:
"""Advanced error handling with automatic recovery strategies."""
def __init__(self, recovery_config: RecoveryConfig = None):
self.config = recovery_config or RecoveryConfig()
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.error_callbacks: Dict[type, Callable] = {}
self.recovery_strategies: Dict[ErrorSeverity, RecoveryStrategy] = {
ErrorSeverity.LOW: RecoveryStrategy.RETRY,
ErrorSeverity.MEDIUM: RecoveryStrategy.RETRY,
ErrorSeverity.HIGH: RecoveryStrategy.RECONNECT,
ErrorSeverity.CRITICAL: RecoveryStrategy.TERMINATE
}
async def handle_error(self, error: Exception, context: ErrorContext) -> bool:
"""
Handle error with appropriate recovery strategy.
Returns:
bool: True if error was handled successfully, False otherwise
"""
severity = self._determine_severity(error, context)
strategy = self.recovery_strategies.get(severity, RecoveryStrategy.TERMINATE)
logging.error(f"Error: {error}, Severity: {severity}, Strategy: {strategy}")
try:
if strategy == RecoveryStrategy.RETRY:
return await self._handle_with_retry(error, context, severity)
elif strategy == RecoveryStrategy.RECONNECT:
return await self._handle_with_reconnect(error, context, severity)
elif strategy == RecoveryStrategy.SWITCH_HOST:
return await self._handle_with_failover(error, context, severity)
elif strategy == RecoveryStrategy.ESCALATE:
await self._handle_with_escalation(error, context, severity)
return False
else: # TERMINATE
return False
except Exception as recovery_error:
logging.error(f"Recovery failed: {recovery_error}")
return False
def _determine_severity(self, error: Exception, context: ErrorContext) -> ErrorSeverity:
"""Determine error severity based on error type and context."""
if isinstance(error, ConnectionError):
return ErrorSeverity.HIGH
elif isinstance(error, TimeoutError):
return ErrorSeverity.MEDIUM
elif isinstance(error, TN3270Error):
if "protocol" in str(error).lower():
return ErrorSeverity.HIGH
else:
return ErrorSeverity.MEDIUM
elif isinstance(error, KeyboardInterrupt):
return ErrorSeverity.CRITICAL
else:
return ErrorSeverity.LOW
async def _handle_with_retry(self, error: Exception, context: ErrorContext,
severity: ErrorSeverity) -> bool:
"""Handle error with retry strategy."""
max_retries = self.config.max_retries
delay = self.config.retry_delay
for attempt in range(max_retries):
try:
if await self._execute_with_circuit_breaker(context.host):
return True
else:
break
except Exception as e:
logging.warning(f"Retry attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= self.config.backoff_multiplier
else:
# Final retry failed, escalate
return await self._handle_with_reconnect(error, context, severity)
return False
async def _handle_with_reconnect(self, error: Exception, context: ErrorContext,
severity: ErrorSeverity) -> bool:
"""Handle error with reconnection strategy."""
for attempt in range(self.config.max_reconnect_attempts):
try:
logging.info(f"Reconnection attempt {attempt + 1}")
# Wait before reconnecting
if attempt > 0:
await asyncio.sleep(self.config.retry_delay * attempt)
# Create new session (implementation depends on your session management)
new_session = await self._create_new_session(context.host)
if new_session:
# Test connection
await asyncio.wait_for(new_session.read(), timeout=10.0)
logging.info("Reconnection successful")
return True
except Exception as e:
logging.warning(f"Reconnection attempt {attempt + 1} failed: {e}")
return False
async def _handle_with_failover(self, error: Exception, context: ErrorContext,
severity: ErrorSeverity) -> bool:
"""Handle error with host failover strategy."""
if not self.config.failover_hosts:
return False
for failover_host in self.config.failover_hosts:
try:
logging.info(f"Attempting failover to {failover_host}")
new_session = await self._create_new_session(failover_host)
if new_session:
logging.info(f"Failover to {failover_host} successful")
return True
except Exception as e:
logging.warning(f"Failover to {failover_host} failed: {e}")
return False
async def _handle_with_escalation(self, error: Exception, context: ErrorContext,
severity: ErrorSeverity):
"""Handle error with escalation (alerts, logging, etc.)."""
logging.critical(f"Critical error requiring escalation: {error}")
logging.critical(f"Context: {context}")
# In real implementation, send alerts, create tickets, etc.
await self._send_alert(error, context)
async def _execute_with_circuit_breaker(self, host: str) -> bool:
"""Execute operation with circuit breaker protection."""
if host not in self.circuit_breakers:
self.circuit_breakers[host] = CircuitBreaker(
self.config.circuit_breaker_threshold,
self.config.circuit_breaker_timeout
)
breaker = self.circuit_breakers[host]
if not breaker.can_execute():
logging.warning(f"Circuit breaker open for {host}")
return False
try:
# Execute operation here
result = True # Placeholder for actual operation
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise e
async def _create_new_session(self, host: str) -> AsyncSession:
"""Create a new session for reconnection."""
session = AsyncSession()
await session.connect(host)
return session
async def _send_alert(self, error: Exception, context: ErrorContext):
"""Send alert for critical errors."""
# Implementation would send alerts via email, Slack, etc.
logging.critical(f"ALERT: {error} at {context.host} during {context.operation}")
class ResilientSessionWrapper:
"""Wrapper that adds resilience to existing sessions."""
def __init__(self, session: AsyncSession, error_handler: AdvancedErrorHandler):
self.session = session
self.error_handler = error_handler
self.operation_retry_map = {
"read": self._safe_read,
"write": self._safe_write,
"connect": self._safe_connect,
"key": self._safe_key,
"string": self._safe_string
}
def __getattr__(self, name):
"""Proxy attribute access to wrapped session."""
if name in self.operation_retry_map:
return self.operation_retry_map[name]
return getattr(self.session, name)
async def _safe_read(self, *args, **kwargs):
"""Safe read operation with error handling."""
context = ErrorContext(
error_type=type(None),
error_message="",
session_state="reading",
host="unknown", # Should be populated from session
operation="read",
timestamp=time.time()
)
try:
return await self.session.read(*args, **kwargs)
except Exception as error:
context.error_type = type(error)
context.error_message = str(error)
handled = await self.error_handler.handle_error(error, context)
if handled:
# Retry the operation
return await self.session.read(*args, **kwargs)
else:
raise error
async def _safe_write(self, data: bytes, *args, **kwargs):
"""Safe write operation with error handling."""
context = ErrorContext(
error_type=type(None),
error_message="",
session_state="writing",
host="unknown",
operation="write",
timestamp=time.time()
)
try:
return await self.session.write(data, *args, **kwargs)
except Exception as error:
context.error_type = type(error)
context.error_message = str(error)
handled = await self.error_handler.handle_error(error, context)
if handled:
return await self.session.write(data, *args, **kwargs)
else:
raise error
async def _safe_connect(self, *args, **kwargs):
"""Safe connect operation with error handling."""
context = ErrorContext(
error_type=type(None),
error_message="",
session_state="connecting",
host=args[0] if args else "unknown",
operation="connect",
timestamp=time.time()
)
try:
return await self.session.connect(*args, **kwargs)
except Exception as error:
context.error_type = type(error)
context.error_message = str(error)
handled = await self.error_handler.handle_error(error, context)
if handled:
return await self.session.connect(*args, **kwargs)
else:
raise error
async def _safe_key(self, key: str, *args, **kwargs):
"""Safe key operation with error handling."""
context = ErrorContext(
error_type=type(None),
error_message="",
session_state="keying",
host="unknown",
operation="key",
timestamp=time.time()
)
try:
return await self.session.key(key, *args, **kwargs)
except Exception as error:
context.error_type = type(error)
context.error_message = str(error)
handled = await self.error_handler.handle_error(error, context)
if handled:
return await self.session.key(key, *args, **kwargs)
else:
raise error
async def _safe_string(self, text: str, *args, **kwargs):
"""Safe string operation with error handling."""
context = ErrorContext(
error_type=type(None),
error_message="",
session_state="string_input",
host="unknown",
operation="string",
timestamp=time.time()
)
try:
return await self.session.string(text, *args, **kwargs)
except Exception as error:
context.error_type = type(error)
context.error_message = str(error)
handled = await self.error_handler.handle_error(error, context)
if handled:
return await self.session.string(text, *args, **kwargs)
else:
raise error
# Example usage of advanced error handling
async def error_handling_example():
"""Demonstrate advanced error handling patterns."""
# Configure error handling
recovery_config = RecoveryConfig(
max_retries=3,
retry_delay=1.0,
backoff_multiplier=2.0,
max_reconnect_attempts=5,
failover_hosts=["backup1.example.com", "backup2.example.com"]
)
error_handler = AdvancedErrorHandler(recovery_config)
# Create resilient session wrapper
session = AsyncSession()
resilient_session = ResilientSessionWrapper(session, error_handler)
try:
# Connect with automatic error handling
await resilient_session.connect('mainframe.example.com')
# Perform operations with automatic recovery
await resilient_session.string("LOGON")
await resilient_session.key("Enter")
# This will automatically retry and recover from errors
screen_data = await resilient_session.read()
except Exception as e:
print(f"Operation failed after all recovery attempts: {e}")
finally:
await session.close()
Performance Optimization Techniques¶
Performance optimization strategies for high-throughput environments. For detailed performance examples and benchmarking, see the Examples section on performance optimization.
import asyncio
import time
import gc
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from pure3270 import AsyncSession
from pure3270.emulation.screen_buffer import ScreenBuffer
@datlass
class PerformanceMetrics:
"""Performance metrics for optimization analysis."""
operations_per_second: float
average_response_time: float
peak_memory_usage: int
throughput_bytes_per_second: int
error_rate: float
connection_reuse_ratio: float
class ConnectionPool:
"""Optimized connection pool for high-performance scenarios."""
def __init__(self, host: str, port: int, min_size: int = 5, max_size: int = 20):
self.host = host
self.port = port
self.min_size = min_size
self.max_size = max_size
self.idle_pool: List[AsyncSession] = []
self.active_sessions: Dict[int, AsyncSession] = {}
self.in_use: set = set()
self._lock = asyncio.Lock()
self._metrics = PerformanceMetrics(0, 0, 0, 0, 0, 0)
async def get_session(self) -> AsyncSession:
"""Get session from pool with optimization."""
async with self._lock:
# Try to reuse idle session
if self.idle_pool:
session = self.idle_pool.pop(0)
# Quick health check
if await self._quick_health_check(session):
return session
else:
# Session is unhealthy, close it
await self._close_session(session)
# Create new session if pool not at max
if len(self.active_sessions) + len(self.idle_pool) < self.max_size:
session = await self._create_optimized_session()
return session
# Pool at capacity, wait for available session
return await self._wait_for_session()
async def return_session(self, session: AsyncSession):
"""Return session to pool with optimization."""
async with self._lock:
session_id = id(session)
self.active_sessions.pop(session_id, None)
# Only return healthy sessions to pool
if await self._quick_health_check(session):
if len(self.idle_pool) < self.max_size:
self.idle_pool.append(session)
else:
await self._close_session(session)
else:
await self._close_session(session)
async def _create_optimized_session(self) -> AsyncSession:
"""Create session with performance optimizations."""
session = AsyncSession()
session._performance_mode = True
# Pre-configure for performance
session._buffer_pool = []
session._optimization_level = "high"
await session.connect(self.host, self.port)
return session
async def _quick_health_check(self, session: AsyncSession) -> bool:
"""Quick health check for performance."""
try:
# Set short timeout for health check
await asyncio.wait_for(session.read(), timeout=1.0)
return True
except:
return False
async def _close_session(self, session: AsyncSession):
"""Clean up session resources."""
try:
await session.close()
except:
pass
finally:
# Force garbage collection
gc.collect()
async def _wait_for_session(self) -> AsyncSession:
"""Wait for available session from pool."""
# Implementation would use asyncio.Queue or similar
# For brevity, just create new session
return await self._create_optimized_session()
class ScreenBufferOptimizer:
"""Optimizations for screen buffer operations."""
def __init__(self):
self.buffer_cache: Dict[str, ScreenBuffer] = {}
self.parse_cache: Dict[str, dict] = {}
self._cache_size_limit = 100
def optimize_screen_reading(self, session: AsyncSession, operation: Callable):
"""Decorator to optimize screen reading operations."""
async def wrapper(*args, **kwargs):
# Use cached screen if available and recent
cache_key = f"{session.host}:{hash(str(args))}"
if cache_key in self.buffer_cache:
cached_buffer = self.buffer_cache[cache_key]
if cached_buffer._timestamp and time.time() - cached_buffer._timestamp < 5.0:
return cached_buffer
# Perform operation
result = await operation(*args, **kwargs)
# Cache result if it's a screen buffer
if hasattr(result, 'to_text'):
self._cache_screen_buffer(cache_key, result)
return result
return wrapper
def _cache_screen_buffer(self, key: str, buffer: ScreenBuffer):
"""Cache screen buffer with size management."""
if len(self.buffer_cache) >= self._cache_size_limit:
# Remove oldest cache entry
oldest_key = min(self.buffer_cache.keys(),
key=lambda k: self.buffer_cache[k]._timestamp)
del self.buffer_cache[oldest_key]
buffer._timestamp = time.time()
self.buffer_cache[key] = buffer
class BatchOperationOptimizer:
"""Optimizations for batch operations."""
def __init__(self, batch_size: int = 10, timeout: float = 0.1):
self.batch_size = batch_size
self.timeout = timeout
self.pending_operations = []
async def submit_batch(self, operations: List[Callable]):
"""Submit operations for batch processing."""
for operation in operations:
task = asyncio.create_task(operation())
self.pending_operations.append(task)
if len(self.pending_operations) >= self.batch_size:
await self._process_batch()
# Process remaining operations
if self.pending_operations:
await self._process_batch()
async def _process_batch(self):
"""Process pending operations in batch."""
if not self.pending_operations:
return
# Wait for all operations with timeout
completed = await asyncio.wait_for(
asyncio.gather(*self.pending_operations, return_exceptions=True),
timeout=self.timeout
)
# Clear completed operations
self.pending_operations.clear()
# Return results
return completed
class AsyncBatchedTN3270Client:
"""High-performance batched TN3270 client."""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.pool = ConnectionPool(host, port, min_size=5, max_size=20)
self.buffer_optimizer = ScreenBufferOptimizer()
self.batch_optimizer = BatchOperationOptimizer()
self.metrics = PerformanceMetrics(0, 0, 0, 0, 0, 0)
async def batch_operations(self, operations: List[dict]) -> List[Any]:
"""
Execute operations in batch for optimal performance.
operations: List of dicts with 'type': 'key'|'string'|'read', 'data': value
"""
session = await self.pool.get_session()
try:
# Group operations by type for optimization
key_operations = []
string_operations = []
read_operations = []
for op in operations:
if op['type'] == 'key':
key_operations.append(op['data'])
elif op['type'] == 'string':
string_operations.append(op['data'])
elif op['type'] == 'read':
read_operations.append(op['data'])
# Execute batched operations
results = []
# Batch key operations
if key_operations:
start_time = time.time()
for key in key_operations:
await session.key(key)
key_time = time.time() - start_time
print(f"Executed {len(key_operations)} key operations in {key_time:.3f}s")
# Batch string operations
if string_operations:
start_time = time.time()
for text in string_operations:
await session.string(text)
string_time = time.time() - start_time
print(f"Executed {len(string_operations)} string operations in {string_time:.3f}s")
# Batch read operations
if read_operations:
start_time = time.time()
for _ in read_operations:
data = await session.read()
results.append(data)
read_time = time.time() - start_time
print(f"Executed {len(read_operations)} read operations in {read_time:.3f}s")
return results
finally:
await self.pool.return_session(session)
async def high_throughput_scenario(self, num_operations: int = 1000):
"""Demonstrate high-throughput scenario."""
start_time = time.time()
# Create batch operations
operations = []
for i in range(num_operations):
operations.append({
'type': 'key',
'data': f'Enter' if i % 10 == 0 else 'Tab'
})
if i % 20 == 0: # Add reads periodically
operations.append({
'type': 'read',
'data': None
})
# Execute batch
results = await self.batch_operations(operations)
end_time = time.time()
total_time = end_time - start_time
# Calculate metrics
self.metrics.operations_per_second = num_operations / total_time
self.metrics.average_response_time = total_time / num_operations
print(f"Performance metrics:")
print(f" Operations: {num_operations}")
print(f" Total time: {total_time:.3f}s")
print(f" Ops/second: {self.metrics.operations_per_second:.1f}")
print(f" Avg response: {self.metrics.average_response_time:.4f}s")
class MemoryOptimizer:
"""Memory usage optimization for large-scale operations."""
def __init__(self, max_memory_mb: int = 512):
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.session_memory_tracker = {}
def track_session_memory(self, session: AsyncSession):
"""Track memory usage of a session."""
import psutil
import os
process = psutil.Process(os.getpid())
session_memory = process.memory_info().rss
self.session_memory_tracker[id(session)] = session_memory
if self._should_cleanup():
self._cleanup_memory()
def _should_cleanup(self) -> bool:
"""Check if memory cleanup is needed."""
import psutil
import os
current_memory = psutil.Process(os.getpid()).memory_info().rss
return current_memory > self.max_memory_bytes
def _cleanup_memory(self):
"""Perform memory cleanup."""
# Clear caches
gc.collect()
# Reset session memory tracker
self.session_memory_tracker.clear()
# Example usage of performance optimizations
async def performance_optimization_example():
"""Demonstrate performance optimization techniques."""
# Create optimized client
client = AsyncBatchedTN3270Client('mainframe.example.com')
# Configure memory optimization
memory_optimizer = MemoryOptimizer(max_memory_mb=256)
try:
# High-throughput scenario
await client.high_throughput_scenario(1000)
# Memory tracking
session = await client.pool.get_session()
memory_optimizer.track_session_memory(session)
await client.pool.return_session(session)
finally:
# Cleanup
memory_optimizer._cleanup_memory()