Coverage for pure3270/protocol/ssl_wrapper.py: 95%
43 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""SSL/TLS wrapper for secure TN3270 connections using stdlib ssl module."""
3import ssl
4import logging
5from typing import Optional
7logger = logging.getLogger(__name__)
10class SSLError(Exception):
11 """Error during SSL operations."""
13 pass
16class SSLWrapper:
17 """Layers SSL/TLS on top of asyncio connections using Python's ssl module."""
19 """Layers SSL/TLS on top of asyncio connections using Python's ssl module."""
21 def __init__(
22 self,
23 verify: bool = True,
24 cafile: Optional[str] = None,
25 capath: Optional[str] = None,
26 ):
27 """
28 Initialize the SSLWrapper.
30 :param verify: Whether to verify the server's certificate.
31 :param cafile: Path to CA certificate file.
32 :param capath: Path to CA certificates directory.
33 """
34 self.verify = verify
35 self.cafile = cafile
36 self.capath = capath
37 self.context: Optional[ssl.SSLContext] = None
39 def create_context(self) -> ssl.SSLContext:
40 """
41 Create an SSLContext for secure connections.
43 :return: Configured SSLContext.
44 :raises SSLError: If context creation fails.
45 """
46 try:
47 # Create SSL context for TLS client
48 self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
50 # Set verification options
51 if self.verify:
52 self.context.check_hostname = True
53 self.context.verify_mode = ssl.CERT_REQUIRED
54 if self.cafile:
55 self.context.load_verify_locations(cafile=self.cafile)
56 if self.capath:
57 self.context.load_verify_locations(capath=self.capath)
58 else:
59 self.context.check_hostname = False
60 self.context.verify_mode = ssl.CERT_NONE
61 logger.warning("SSL verification disabled")
63 # Set minimum TLS version to 1.2
64 self.context.minimum_version = ssl.TLSVersion.TLSv1_2
66 # Enable cipher suites for compatibility
67 self.context.set_ciphers("HIGH:!aNULL:!MD5")
69 logger.debug("SSLContext created successfully")
70 return self.context
72 except ssl.SSLError as e:
73 logger.error(f"SSL context creation failed: {e}")
74 raise SSLError(f"SSL context creation failed: {e}")
76 def wrap_connection(self, telnet_connection):
77 """
78 Wrap an existing telnet connection with SSL (if asyncio doesn't handle natively).
80 :param telnet_connection: The telnet connection object (e.g., from asyncio.open_connection).
81 :return: Wrapped connection.
82 Note: This is a stub; asyncio.open_connection handles SSL natively via ssl parameter.
83 """
84 # Since asyncio supports SSL natively, this method is for compatibility or custom wrapping.
85 # For basics, log and return original.
86 self.get_context() # Ensure context is created
87 logger.info("Using native asyncio SSL support; no additional wrapping needed")
88 return telnet_connection
90 def get_context(self) -> ssl.SSLContext:
91 """Get the SSLContext (create if not exists)."""
92 if self.context is None:
93 self.create_context()
94 return self.context
96 def decrypt(self, encrypted_data: bytes) -> bytes:
97 """Stub for decrypting data (for testing)."""
98 return encrypted_data
101# Usage example (for docstrings):
102# wrapper = SSLWrapper(verify=True)
103# context = wrapper.create_context()
104# handler = TN3270Handler(host="example.com", port=992, ssl_context=context)