Coverage for pure3270/patching/patching.py: 84%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 20:54 +0000

1""" 

2Monkey patching utilities for pure3270 compatibility. 

3Applies patches to align with expected library versions. 

4""" 

5 

6import sys 

7import logging 

8from unittest.mock import patch as mock_patch 

9from typing import Optional, Dict, Any 

10import types 

11from contextlib import contextmanager 

12 

13from pure3270.patching.s3270_wrapper import Pure3270S3270Wrapper 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class Pure3270PatchError(Exception): 

19 """Raised on patching errors.""" 

20 

21 def __init__(self, *args, **kwargs): 

22 super().__init__(*args, **kwargs) 

23 logger.error(args[0] if args else str(self)) 

24 

25 

26class MonkeyPatchManager: 

27 """ 

28 Manager for applying monkey patches to modules. 

29 

30 Handles version checks and patch application for compatibility. 

31 """ 

32 

33 def __init__(self, patches: Optional[Dict[str, Any]] = None): 

34 """ 

35 Initialize the patch manager. 

36 

37 Args: 

38 patches: Dictionary of patches to apply (module_name: patch_functions). 

39 """ 

40 self.patches = patches or {} 

41 self.applied = set() 

42 self.originals = {} 

43 self.patched = {} 

44 self.selective_patches = {} 

45 logger.info("Initialized MonkeyPatchManager") 

46 

47 def apply(self, module_name: str) -> bool: 

48 """ 

49 Apply patches to a specific module. 

50 

51 Args: 

52 module_name: Name of the module to patch. 

53 

54 Returns: 

55 True if patches were applied successfully. 

56 

57 Raises: 

58 ValueError: If module not found or patching fails. 

59 """ 

60 if module_name in self.applied: 

61 return True 

62 module = sys.modules.get(module_name) 

63 if not module: 

64 raise ValueError(f"Module {module_name} not found.") 

65 for patch in self.patches.get(module_name, []): 

66 patch(module) 

67 self.applied.add(module_name) 

68 return True 

69 

70 def _store_original(self, key: str, value: Any) -> None: 

71 if key not in self.originals: 

72 self.originals[key] = value 

73 

74 def _apply_module_patch(self, module_name: str, replacement: Any) -> None: 

75 original = sys.modules.get(module_name) 

76 self._store_original(module_name, original) 

77 sys.modules[module_name] = replacement 

78 self.patched[module_name] = replacement 

79 logger.info(f"Patched module: {module_name} -> {replacement.__name__}") 

80 

81 def _apply_method_patch( 

82 self, 

83 obj: Any, 

84 method_name: str, 

85 new_method: Any, 

86 docstring: Optional[str] = None, 

87 ) -> None: 

88 original = getattr(obj, method_name, None) 

89 if isinstance(obj, type): 

90 class_name = obj.__name__ 

91 else: 

92 class_name = type(obj).__name__ 

93 key = f"{class_name}.{method_name}" 

94 self._store_original(key, original) 

95 setattr(obj, method_name, new_method) 

96 if docstring: 

97 new_method.__doc__ = docstring 

98 self.patched[key] = new_method 

99 logger.info(f"Added method: {class_name}.{method_name}") 

100 

101 def _check_version_compatibility( 

102 self, module: Any, expected_version: str = "0.3.0" 

103 ) -> bool: 

104 # Get the actual version of p3270 

105 from pure3270.emulation.ebcdic import get_p3270_version 

106 

107 actual_version = get_p3270_version() 

108 

109 if expected_version and actual_version != expected_version: 

110 logger.warning( 

111 f"Version mismatch: expected {expected_version}, got {actual_version}" 

112 ) 

113 logger.info("Graceful degradation: proceeding with partial compatibility") 

114 return False 

115 return True 

116 

117 def apply_patches( 

118 self, 

119 patch_sessions: bool = True, 

120 patch_commands: bool = True, 

121 strict_version: bool = False, 

122 expected_version: str = "0.1.6", 

123 ) -> None: 

124 try: 

125 import p3270 

126 

127 version_compatible = self._check_version_compatibility( 

128 p3270, expected_version 

129 ) 

130 if strict_version and not version_compatible: 

131 from pure3270.emulation.ebcdic import get_p3270_version 

132 

133 actual_version = get_p3270_version() 

134 raise Pure3270PatchError( 

135 f"Version incompatible: {actual_version or 'unknown'}" 

136 ) 

137 if not version_compatible and not strict_version: 

138 logger.info( 

139 "Graceful degradation: Version mismatch but continuing with patching" 

140 ) 

141 if patch_sessions: 

142 # Patch the S3270 class at module level 

143 original = getattr(p3270, "S3270", None) 

144 self._store_original("p3270.S3270", original) 

145 from pure3270.patching.s3270_wrapper import Pure3270S3270Wrapper 

146 

147 setattr(p3270, "S3270", Pure3270S3270Wrapper) 

148 logger.info("Patched Session") 

149 

150 # Also patch the S3270 reference in the p3270 module's global namespace 

151 # This ensures that any code that references S3270 directly gets our wrapper 

152 if hasattr(p3270, "p3270"): 

153 # Patch the S3270 in the actual p3270.p3270 module as well 

154 p3270_module = sys.modules.get("p3270.p3270") 

155 if p3270_module and hasattr(p3270_module, "S3270"): 

156 original_inner = getattr(p3270_module, "S3270", None) 

157 self._store_original("p3270.p3270.S3270", original_inner) 

158 setattr(p3270_module, "S3270", Pure3270S3270Wrapper) 

159 logger.info("Patched inner S3270 class") 

160 logger.info("Patches applied") 

161 except ImportError as e: 

162 logger.warning(f"p3270 not installed: {e}") 

163 if strict_version: 

164 raise Pure3270PatchError("p3270 required for strict mode") 

165 self._store_original("p3270.S3270", None) 

166 except Exception as e: 

167 logger.error(f"Patch error: {e}") 

168 raise Pure3270PatchError(str(e)) 

169 

170 def unpatch(self) -> None: 

171 for key, original in self.originals.items(): 

172 if original is None: 

173 continue # Skip None originals 

174 if "." in key: 

175 # Method patch 

176 obj_name, method = key.rsplit(".", 1) 

177 # Try to reconstruct the object from the name 

178 try: 

179 if obj_name in sys.modules: 

180 obj = sys.modules[obj_name] 

181 else: 

182 # For class methods, try to find the class 

183 parts = obj_name.split(".") 

184 obj = sys.modules.get(parts[0]) 

185 if obj and len(parts) > 1: 

186 for part in parts[1:]: 

187 obj = getattr(obj, part, None) 

188 if obj is None: 

189 break 

190 if obj is not None: 

191 setattr(obj, method, original) 

192 logger.debug(f"Restored method {key}") 

193 except Exception as e: 

194 logger.warning(f"Failed to restore method {key}: {e}") 

195 else: 

196 # Module patch 

197 sys.modules[key] = original 

198 logger.debug(f"Restored module {key}") 

199 self.originals.clear() 

200 self.patched.clear() 

201 logger.info("Unpatched all") 

202 

203 

204@contextmanager 

205def PatchContext(patches: Optional[Dict[str, Any]] = None): 

206 """ 

207 Context manager for temporary patching. 

208 

209 Args: 

210 patches: Patches to apply. 

211 

212 Yields: 

213 Manager instance. 

214 """ 

215 manager = MonkeyPatchManager(patches) 

216 try: 

217 yield manager 

218 finally: 

219 manager.unpatch() 

220 

221 

222def enable_replacement( 

223 patch_sessions: bool = True, 

224 patch_commands: bool = True, 

225 strict_version: bool = False, 

226 expected_version: str = "0.1.6", 

227) -> MonkeyPatchManager: 

228 """ 

229 Enable replacement patching with version check. 

230 

231 Args: 

232 patch_sessions: Whether to patch sessions. 

233 patch_commands: Whether to patch commands. 

234 strict_version: Whether to enforce strict version check. 

235 expected_version: The expected version for compatibility (default "0.1.6"). 

236 

237 Raises: 

238 ValueError: If version or replacement fails. 

239 """ 

240 manager = MonkeyPatchManager() 

241 manager.apply_patches( 

242 patch_sessions=patch_sessions, 

243 patch_commands=patch_commands, 

244 strict_version=strict_version, 

245 expected_version=expected_version, 

246 ) 

247 return manager 

248 

249 

250patch = enable_replacement