Protocol Examples¶
This section provides comprehensive examples of TN3270/TN3270E protocol operations, from basic connections to advanced scenarios.
For basic protocol operations and EBCDIC handling examples, see the Examples section. For detailed protocol specifications, see the pure3270.protocol Module module documentation.
Basic TN3270 Connection Flow¶
Complete step-by-step TN3270 connection with detailed protocol operations:
import asyncio
from pure3270 import AsyncSession, setup_logging
async def basic_tn3270_connection():
"""Demonstrate complete TN3270 connection flow."""
# Setup detailed logging to see protocol exchange
setup_logging(level="DEBUG", component="tn3270")
async with AsyncSession() as session:
# Step 1: Create session with specific terminal model
session = AsyncSession(terminal_type="IBM-3278-2")
# Step 2: Connect to host
# This initiates TN3270 negotiation
await session.connect('mainframe.example.com', port=23)
# Step 3: Read initial screen
screen_data = await session.read()
print(f"Initial screen size: {session.screen_buffer.rows}x{session.screen_buffer.cols}")
# Step 4: Send attention identifier (AID) to get initial screen
await session.key('Enter')
screen = session.ascii(session.read())
# Step 5: Navigate through application
await session.string("LOGON") # Enter application name
await session.key('Enter') # Submit
return session
TN3270E Negotiation Details¶
Detailed TN3270E protocol negotiation with structured fields:
import asyncio
from pure3270.protocol.negotiator import Negotiator
from pure3270.protocol.tn3270e_header import TN3270EHeader, TN3270EDataStream
async def tn3270e_negotiation_example():
"""Demonstrate TN3270E protocol negotiation."""
async with AsyncSession() as session:
# Connect triggers automatic TN3270E negotiation
await session.connect('host.example.com', port=23, use_tn3270e=True)
# TN3270E negotiation involves:
# 1. Device type negotiation (TN3270E command)
# 2. Bind session (TN3270E command)
# 3. LU assignment (TN3270E command)
# Check if TN3270E is active
if hasattr(session, '_tn3270e_active') and session._tn3270e_active:
print("TN3270E negotiation successful")
# TN3270E provides additional features:
# - Structured fields (SF)
# - SCS printer data streams
# - Extended error handling
# Send structured field query
# In real implementation, this would create proper SF
print("Structured field capabilities available")
else:
print("Standard TN3270 connection established")
Data Stream Parsing Examples¶
Comprehensive data stream parsing and manipulation:
import asyncio
from pure3270 import AsyncSession
from pure3270.protocol.data_stream import DataStreamParser
from pure3270.emulation.screen_buffer import ScreenBuffer
from pure3270.emulation.ebcdic import translate_ebcdic_to_ascii
async def data_stream_parsing_example():
"""Demonstrate comprehensive data stream parsing."""
async with AsyncSession() as session:
await session.connect('mainframe.example.com')
# Get the raw data stream from host
raw_data = await session.read()
# Parse TN3270 data stream components
parser = DataStreamParser()
# TN3270 data stream consists of:
# - Order sequences (e.g., Start Field, Set Buffer Address)
# - Text data (EBCDIC encoded)
# - Field attributes
# - Presentation commands
# Example parsing logic
print(f"Received {len(raw_data)} bytes of data stream")
# Convert to screen buffer
screen = ScreenBuffer(24, 80)
# Parse orders and text data
buffer_index = 0
while buffer_index < len(raw_data):
byte = raw_data[buffer_index]
if byte == 0x11: # Start Field (SF)
field_attr = raw_data[buffer_index + 1]
screen.add_field_attribute(
position=buffer_index,
attribute=field_attr
)
buffer_index += 2
elif byte == 0x13: # Set Buffer Address (SBA)
# Next 2 bytes contain address
addr_high = raw_data[buffer_index + 1]
addr_low = raw_data[buffer_index + 2]
buffer_index += 3
elif byte == 0x3C: # Field attribute byte
# Unprotected/protected, highlight, etc.
attr_info = {
'unprotected': bool(byte & 0x20),
'highlights': byte & 0x0F
}
buffer_index += 1
else:
# Regular text data
text_char = translate_ebcdic_to_ascii(bytes([byte]))
screen.write_char(byte, position=buffer_index)
buffer_index += 1
print(f"Parsed screen: {screen.to_text()}")
async def advanced_screen_buffer_manipulation():
"""Advanced screen buffer operations with protocol awareness."""
async with AsyncSession(terminal_type="IBM-3279-3") as session:
await session.connect('mainframe.example.com')
sb = session.screen_buffer
# Find all unprotected fields (input fields)
input_fields = []
current_field = None
for position, attr_byte in sb.field_attributes.items():
if attr_byte & 0x20: # Unprotected field
if current_field is None:
current_field = {'start': position, 'attributes': []}
current_field['attributes'].append(attr_byte)
else:
if current_field:
current_field['end'] = position
input_fields.append(current_field)
current_field = None
if current_field:
current_field['end'] = len(sb.buffer)
input_fields.append(current_field)
print(f"Found {len(input_fields)} input fields")
# Navigate to each field and fill it
for i, field in enumerate(input_fields):
field_pos = field['start'] + 1 # Skip attribute byte
row, col = sb.position_to_coords(field_pos)
# Set cursor to field position
sb.set_position(row, col)
# Generate test data for field
test_data = f"INPUT{i+1:03d}"
# Clear field first
field_length = field['end'] - field['start'] - 1
for j in range(field_length):
sb.write_char(0x40, position=field_pos + j) # Space in EBCDIC
# Enter data
for j, char in enumerate(test_data):
if j < field_length:
ebcdic_char = session.ebcdic(char)
sb.write_char(ebcdic_char[0], position=field_pos + j)
# Submit form
await session.key('Enter')
# Read response
response = session.ascii(session.read())
print(f"Form submission result: {response}")
Field Attribute Handling Examples¶
Advanced field attribute handling and manipulation:
import asyncio
from pure3270 import AsyncSession
from pure3270.emulation.field_attributes import FieldAttribute
async def field_attribute_examples():
"""Comprehensive field attribute handling."""
async with AsyncSession() as session:
await session.connect('mainframe.example.com')
sb = session.screen_buffer
# Define field attribute constants
FA_UNPROTECTED = 0x20 # Field can be modified
FA_PROTECTED = 0x00 # Field is read-only
FA_NUMERIC = 0x10 # Numeric-only input
FA_INTENSITY_NORMAL = 0x00
FA_INTENSITY_HIGH = 0x01
FA_INTENSITY_NONDISPLAY = 0x02
# Create and modify field attributes programmatically
screen_size = sb.rows * sb.cols
# Create a login form with various field types
fields = [
{'pos': 100, 'type': FA_UNPROTECTED, 'label': 'Username'},
{'pos': 150, 'type': FA_UNPROTECTED | FA_NUMERIC, 'label': 'Account'},
{'pos': 200, 'type': FA_UNPROTECTED, 'label': 'Password'},
{'pos': 250, 'type': FA_PROTECTED | FA_INTENSITY_HIGH, 'label': 'Message'},
]
for field in fields:
# Set field attribute byte
sb.field_attributes[field['pos']] = field['type']
# Add field label if specified
if 'label' in field:
label_bytes = session.ebcdic(field['label'])
for i, byte in enumerate(label_bytes):
if field['pos'] - 15 + i < screen_size: # Leave space for FA byte
sb.write_char(byte, position=field['pos'] - 15 + i)
# Demonstrate field validation
def validate_field_data(field_pos, expected_length, data):
"""Validate data for a specific field."""
field_attr = sb.field_attributes.get(field_pos, 0)
# Check if field is protected
if not (field_attr & FA_UNPROTECTED):
raise ValueError("Field is read-only")
# Check for numeric-only fields
if field_attr & FA_NUMERIC:
for char in data:
if not char.isdigit():
raise ValueError("Numeric field contains non-digit")
# Check length
if len(data) > expected_length:
raise ValueError(f"Data too long for field (max {expected_length})")
return True
# Test field validation
try:
validate_field_data(150, 10, "1234567890") # Should succeed
validate_field_data(150, 10, "ABC123") # Should fail - contains letters
print("Field validation working correctly")
except ValueError as e:
print(f"Validation error: {e}")
# Process form submission
await session.key('Enter')
Screen Buffer Reconstruction Examples¶
Advanced screen buffer reconstruction from protocol data:
import asyncio
from pure3270 import AsyncSession
from pure3270.emulation.screen_buffer import ScreenBuffer
from pure3270.emulation.addressing import parse_address
async def screen_reconstruction_example():
"""Reconstruct screen from raw TN3270 data stream."""
async with AsyncSession(terminal_type="IBM-3278-4") as session:
await session.connect('mainframe.example.com')
# Capture raw network data
raw_stream = await session.read()
# Create screen buffer for reconstruction
screen = ScreenBuffer(session.screen_buffer.rows,
session.screen_buffer.cols)
# Parse TN3270 orders and reconstruct screen
i = 0
current_address = 0 # Start at screen position 0
while i < len(raw_stream):
byte = raw_stream[i]
if byte == 0x11: # Start Field (SF)
field_attr = raw_stream[i + 1]
screen.field_attributes[current_address] = field_attr
# Next position after field attribute
current_address += 1
i += 2
elif byte == 0x13: # Set Buffer Address (SBA)
# Next 2 bytes are the address (12-bit format)
addr_high = raw_stream[i + 1]
addr_low = raw_stream[i + 2]
current_address = parse_address(addr_high, addr_low)
i += 3
elif byte == 0x12: # Repeat to Address (RA)
repeat_char = raw_stream[i + 1]
addr_high = raw_stream[i + 2]
addr_low = raw_stream[i + 3]
target_address = parse_address(addr_high, addr_low)
# Fill positions with repeat character
distance = target_address - current_address
for pos in range(distance):
if current_address + pos < len(screen.buffer):
screen.buffer[current_address + pos] = repeat_char
current_address = target_address
i += 4
elif byte == 0x3D: # Erase to End of Field (EOF)
# Fill current field with spaces
field_start = current_address
while field_start > 0 and 0x3C not in screen.field_attributes.get(field_start, [0]):
field_start -= 1
if field_start > 0:
field_start += 1
# Find field end
field_end = current_address
pos = current_address
while pos < len(screen.buffer):
if 0x3E in screen.field_attributes.get(pos, []):
field_end = pos
break
pos += 1
# Clear field
for pos in range(field_start, min(field_end + 1, len(screen.buffer))):
screen.buffer[pos] = 0x40 # EBCDIC space
i += 1
else:
# Regular character data
if current_address < len(screen.buffer):
screen.buffer[current_address] = byte
current_address += 1
i += 1
# Convert to readable format
display_text = translate_ebcdic_to_ascii(screen.to_bytes())
print(f"Reconstructed screen ({screen.rows}x{screen.cols}):")
print("=" * 50)
for row in range(screen.rows):
line_start = row * screen.cols
line_end = line_start + screen.cols
line_data = screen.to_bytes()[line_start:line_end]
print(f"{row+1:2d}: {translate_ebcdic_to_ascii(line_data)}")
return screen
async def interactive_screen_update():
"""Demonstrate real-time screen updates from protocol data."""
async with AsyncSession() as session:
await session.connect('mainframe.example.com')
# Initial screen
initial_screen = session.ascii(session.read())
print("Initial screen received")
# Wait for screen changes
while True:
try:
# Read with timeout to avoid blocking indefinitely
new_data = await asyncio.wait_for(session.read(), timeout=30.0)
if new_data:
# Compare with previous screen
new_screen = ScreenBuffer(session.screen_buffer.rows,
session.screen_buffer.cols)
# Parse new data
# ... (similar to reconstruction example)
print("Screen update received")
break # Exit for demo purposes
except asyncio.TimeoutError:
print("No screen updates received in 30 seconds")
break
Protocol Error Handling Examples¶
Comprehensive protocol error handling and recovery:
import asyncio
import logging
from pure3270 import AsyncSession, setup_logging
from pure3270.exceptions import TN3270Error, ConnectionError, ProtocolError
async def protocol_error_handling_example():
"""Demonstrate comprehensive protocol error handling."""
# Setup detailed protocol logging
logger = logging.getLogger('tn3270.protocol')
logger.setLevel(logging.DEBUG)
async def robust_connection():
"""Connection with comprehensive error handling."""
# Retry configuration
max_retries = 3
retry_delay = 5.0
timeout = 30.0
session = None
for attempt in range(max_retries + 1):
try:
session = AsyncSession(terminal_type="IBM-3278-2")
# Connection timeout
await asyncio.wait_for(
session.connect('mainframe.example.com', port=23),
timeout=timeout
)
# Verify connection with test read
await asyncio.wait_for(
session.read(),
timeout=10.0
)
print(f"Connection successful on attempt {attempt + 1}")
return session
except asyncio.TimeoutError:
print(f"Connection timeout on attempt {attempt + 1}")
except ConnectionError as e:
print(f"Connection error: {e}")
except ProtocolError as e:
print(f"Protocol error: {e}")
except Exception as e:
print(f"Unexpected error: {type(e).__name__}: {e}")
if attempt < max_retries:
print(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
raise ConnectionError("All connection attempts failed")
# Handle different error scenarios
try:
session = await robust_connection()
# Screen read with error handling
try:
screen_data = await asyncio.wait_for(session.read(), timeout=15.0)
print(f"Screen data received: {len(screen_data)} bytes")
except asyncio.TimeoutError:
print("Screen read timeout - host may be unavailable")
# Attempt reconnection
await session.connect()
except Exception as e:
logger.error(f"Screen read error: {e}")
# Attempt recovery
try:
await session.send(session.ebcdic('RESET'))
screen_data = await session.read()
logger.info("Recovery successful")
except Exception as recovery_error:
logger.error(f"Recovery failed: {recovery_error}")
raise
# Keyboard input with validation
async def safe_input(prompt_text):
"""Safe keyboard input with error handling."""
try:
# Clear any pending input
await session.key('CLEAR')
# Enter prompt text
await session.string(prompt_text)
# Submit
await session.key('ENTER')
# Validate response
screen = session.ascii(session.read())
# Check for error messages
if "ERROR" in screen.upper():
raise ValueError("Input resulted in error")
return screen
except Exception as e:
logger.error(f"Input error: {e}")
# Try recovery sequence
try:
await session.key('PA1') # Attention
await asyncio.sleep(1)
await session.key('CLEAR') # Clear screen
await asyncio.sleep(1)
return session.ascii(session.read())
except Exception as recovery_error:
logger.error(f"Recovery failed: {recovery_error}")
raise
# Use safe input function
response = await safe_input("TEST INPUT")
print(f"Response: {response}")
except Exception as e:
logger.error(f"Session failed: {e}")
finally:
if session:
await session.close()
Connection Recovery Examples¶
Network resilience and connection recovery patterns:
import asyncio
import time
from contextlib import asynccontextmanager
from pure3270 import AsyncSession
from pure3270.exceptions import ConnectionError
@asynccontextmanager
async def resilient_session(host, port=23, max_retries=5, retry_delay=2.0):
"""Context manager for resilient TN3270 sessions."""
session = None
last_error = None
for attempt in range(max_retries + 1):
try:
session = AsyncSession()
await session.connect(host, port, ssl_context=None)
# Test connection with health check
await asyncio.wait_for(session.read(), timeout=10.0)
print(f"Session established after {attempt + 1} attempts")
yield session
break
except Exception as e:
last_error = e
print(f"Connection attempt {attempt + 1} failed: {e}")
if session:
try:
await session.close()
except:
pass
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...")
await asyncio.sleep(retry_delay)
retry_delay *= 1.5 # Exponential backoff
else:
raise ConnectionError(f"Failed to establish session after {max_retries + 1} attempts: {last_error}")
try:
yield session
finally:
if session:
await session.close()
class SessionManager:
"""Advanced session management with pooling and health monitoring."""
def __init__(self, host, port=23, pool_size=3, health_check_interval=60):
self.host = host
self.port = port
self.pool_size = pool_size
self.health_check_interval = health_check_interval
self.sessions = []
self._health_task = None
self._lock = asyncio.Lock()
async def start(self):
"""Initialize session pool."""
async with self._lock:
for _ in range(self.pool_size):
try:
session = AsyncSession()
await session.connect(self.host, self.port)
self.sessions.append(session)
print(f"Pool session created ({len(self.sessions)}/{self.pool_size})")
except Exception as e:
print(f"Failed to create pool session: {e}")
# Start health monitoring
self._health_task = asyncio.create_task(self._health_monitor())
async def get_session(self):
"""Get an available session from pool."""
async with self._lock:
# Remove any dead sessions
healthy_sessions = []
for session in self.sessions:
try:
# Quick health check
await asyncio.wait_for(session.read(), timeout=2.0)
healthy_sessions.append(session)
except:
try:
await session.close()
except:
pass
self.sessions = healthy_sessions
# Create new session if pool is depleted
if not self.sessions:
print("Pool depleted, creating new session...")
session = AsyncSession()
await session.connect(self.host, self.port)
self.sessions.append(session)
# Return session and remove from pool temporarily
session = self.sessions.pop(0)
return session
async def return_session(self, session):
"""Return session to pool."""
async with self._lock:
try:
# Quick validation
await asyncio.wait_for(session.read(), timeout=1.0)
self.sessions.append(session)
print(f"Session returned to pool ({len(self.sessions)} active)")
except Exception as e:
print(f"Returned session failed validation: {e}")
try:
await session.close()
except:
pass
async def _health_monitor(self):
"""Background health monitoring task."""
while True:
await asyncio.sleep(self.health_check_interval)
async with self._lock:
for session in self.sessions.copy():
try:
await asyncio.wait_for(session.read(), timeout=5.0)
except:
print("Health check failed, removing session from pool")
try:
await session.close()
except:
pass
self.sessions.remove(session)
async def stop(self):
"""Shutdown session pool."""
if self._health_task:
self._health_task.cancel()
async with self._lock:
for session in self.sessions:
try:
await session.close()
except:
pass
self.sessions.clear()
async def session_manager_example():
"""Demonstrate session management patterns."""
# Create resilient connection
async with resilient_session('mainframe.example.com') as session:
# Use session normally
await session.key('Enter')
screen = session.ascii(session.read())
print("Resilient session working")
# Use session manager
manager = SessionManager('mainframe.example.com')
await manager.start()
try:
# Get session from pool
session = await manager.get_session()
# Use session
await session.string("TEST")
await session.key('Enter')
response = session.ascii(session.read())
print(f"Pool session response: {response}")
# Return session to pool
await manager.return_session(session)
finally:
await manager.stop()
async def network_interruption_recovery():
"""Handle network interruptions gracefully."""
session = AsyncSession()
await session.connect('mainframe.example.com')
# Monitor network state
last_activity = time.time()
timeout_threshold = 300 # 5 minutes
try:
while True:
try:
# Read with short timeout
data = await asyncio.wait_for(session.read(), timeout=10.0)
if data:
last_activity = time.time()
print(f"Received {len(data)} bytes")
except asyncio.TimeoutError:
# Check if session is stale
if time.time() - last_activity > timeout_threshold:
print("Session timeout - reconnecting")
await session.close()
session = AsyncSession()
await session.connect('mainframe.example.com')
last_activity = time.time()
except ConnectionError as e:
print(f"Connection lost: {e}")
print("Attempting reconnection...")
# Exponential backoff reconnection
delay = 1.0
for _ in range(5):
try:
await asyncio.sleep(delay)
session = AsyncSession()
await session.connect('mainframe.example.com')
print("Reconnection successful")
last_activity = time.time()
break
except:
delay *= 2
else:
raise
# Small delay between iterations
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Monitoring stopped by user")
finally:
await session.close()