Coverage for pure3270/protocol/ssl_wrapper.py: 95%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1"""SSL/TLS wrapper for secure TN3270 connections using stdlib ssl module.""" 

2 

3import ssl 

4import logging 

5from typing import Optional 

6 

7logger = logging.getLogger(__name__) 

8 

9 

10class SSLError(Exception): 

11 """Error during SSL operations.""" 

12 

13 pass 

14 

15 

16class SSLWrapper: 

17 """Layers SSL/TLS on top of asyncio connections using Python's ssl module.""" 

18 

19 """Layers SSL/TLS on top of asyncio connections using Python's ssl module.""" 

20 

21 def __init__( 

22 self, 

23 verify: bool = True, 

24 cafile: Optional[str] = None, 

25 capath: Optional[str] = None, 

26 ): 

27 """ 

28 Initialize the SSLWrapper. 

29 

30 :param verify: Whether to verify the server's certificate. 

31 :param cafile: Path to CA certificate file. 

32 :param capath: Path to CA certificates directory. 

33 """ 

34 self.verify = verify 

35 self.cafile = cafile 

36 self.capath = capath 

37 self.context: Optional[ssl.SSLContext] = None 

38 

39 def create_context(self) -> ssl.SSLContext: 

40 """ 

41 Create an SSLContext for secure connections. 

42 

43 :return: Configured SSLContext. 

44 :raises SSLError: If context creation fails. 

45 """ 

46 try: 

47 # Create SSL context for TLS client 

48 self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

49 

50 # Set verification options 

51 if self.verify: 

52 self.context.check_hostname = True 

53 self.context.verify_mode = ssl.CERT_REQUIRED 

54 if self.cafile: 

55 self.context.load_verify_locations(cafile=self.cafile) 

56 if self.capath: 

57 self.context.load_verify_locations(capath=self.capath) 

58 else: 

59 self.context.check_hostname = False 

60 self.context.verify_mode = ssl.CERT_NONE 

61 logger.warning("SSL verification disabled") 

62 

63 # Set minimum TLS version to 1.2 

64 self.context.minimum_version = ssl.TLSVersion.TLSv1_2 

65 

66 # Enable cipher suites for compatibility 

67 self.context.set_ciphers("HIGH:!aNULL:!MD5") 

68 

69 logger.debug("SSLContext created successfully") 

70 return self.context 

71 

72 except ssl.SSLError as e: 

73 logger.error(f"SSL context creation failed: {e}") 

74 raise SSLError(f"SSL context creation failed: {e}") 

75 

76 def wrap_connection(self, telnet_connection): 

77 """ 

78 Wrap an existing telnet connection with SSL (if asyncio doesn't handle natively). 

79 

80 :param telnet_connection: The telnet connection object (e.g., from asyncio.open_connection). 

81 :return: Wrapped connection. 

82 Note: This is a stub; asyncio.open_connection handles SSL natively via ssl parameter. 

83 """ 

84 # Since asyncio supports SSL natively, this method is for compatibility or custom wrapping. 

85 # For basics, log and return original. 

86 self.get_context() # Ensure context is created 

87 logger.info("Using native asyncio SSL support; no additional wrapping needed") 

88 return telnet_connection 

89 

90 def get_context(self) -> ssl.SSLContext: 

91 """Get the SSLContext (create if not exists).""" 

92 if self.context is None: 

93 self.create_context() 

94 return self.context 

95 

96 def decrypt(self, encrypted_data: bytes) -> bytes: 

97 """Stub for decrypting data (for testing).""" 

98 return encrypted_data 

99 

100 

101# Usage example (for docstrings): 

102# wrapper = SSLWrapper(verify=True) 

103# context = wrapper.create_context() 

104# handler = TN3270Handler(host="example.com", port=992, ssl_context=context)