Coverage for pure3270/patching/patching.py: 84%
128 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""
2Monkey patching utilities for pure3270 compatibility.
3Applies patches to align with expected library versions.
4"""
6import sys
7import logging
8from unittest.mock import patch as mock_patch
9from typing import Optional, Dict, Any
10import types
11from contextlib import contextmanager
13from pure3270.patching.s3270_wrapper import Pure3270S3270Wrapper
15logger = logging.getLogger(__name__)
18class Pure3270PatchError(Exception):
19 """Raised on patching errors."""
21 def __init__(self, *args, **kwargs):
22 super().__init__(*args, **kwargs)
23 logger.error(args[0] if args else str(self))
26class MonkeyPatchManager:
27 """
28 Manager for applying monkey patches to modules.
30 Handles version checks and patch application for compatibility.
31 """
33 def __init__(self, patches: Optional[Dict[str, Any]] = None):
34 """
35 Initialize the patch manager.
37 Args:
38 patches: Dictionary of patches to apply (module_name: patch_functions).
39 """
40 self.patches = patches or {}
41 self.applied = set()
42 self.originals = {}
43 self.patched = {}
44 self.selective_patches = {}
45 logger.info("Initialized MonkeyPatchManager")
47 def apply(self, module_name: str) -> bool:
48 """
49 Apply patches to a specific module.
51 Args:
52 module_name: Name of the module to patch.
54 Returns:
55 True if patches were applied successfully.
57 Raises:
58 ValueError: If module not found or patching fails.
59 """
60 if module_name in self.applied:
61 return True
62 module = sys.modules.get(module_name)
63 if not module:
64 raise ValueError(f"Module {module_name} not found.")
65 for patch in self.patches.get(module_name, []):
66 patch(module)
67 self.applied.add(module_name)
68 return True
70 def _store_original(self, key: str, value: Any) -> None:
71 if key not in self.originals:
72 self.originals[key] = value
74 def _apply_module_patch(self, module_name: str, replacement: Any) -> None:
75 original = sys.modules.get(module_name)
76 self._store_original(module_name, original)
77 sys.modules[module_name] = replacement
78 self.patched[module_name] = replacement
79 logger.info(f"Patched module: {module_name} -> {replacement.__name__}")
81 def _apply_method_patch(
82 self,
83 obj: Any,
84 method_name: str,
85 new_method: Any,
86 docstring: Optional[str] = None,
87 ) -> None:
88 original = getattr(obj, method_name, None)
89 if isinstance(obj, type):
90 class_name = obj.__name__
91 else:
92 class_name = type(obj).__name__
93 key = f"{class_name}.{method_name}"
94 self._store_original(key, original)
95 setattr(obj, method_name, new_method)
96 if docstring:
97 new_method.__doc__ = docstring
98 self.patched[key] = new_method
99 logger.info(f"Added method: {class_name}.{method_name}")
101 def _check_version_compatibility(
102 self, module: Any, expected_version: str = "0.3.0"
103 ) -> bool:
104 # Get the actual version of p3270
105 from pure3270.emulation.ebcdic import get_p3270_version
107 actual_version = get_p3270_version()
109 if expected_version and actual_version != expected_version:
110 logger.warning(
111 f"Version mismatch: expected {expected_version}, got {actual_version}"
112 )
113 logger.info("Graceful degradation: proceeding with partial compatibility")
114 return False
115 return True
117 def apply_patches(
118 self,
119 patch_sessions: bool = True,
120 patch_commands: bool = True,
121 strict_version: bool = False,
122 expected_version: str = "0.1.6",
123 ) -> None:
124 try:
125 import p3270
127 version_compatible = self._check_version_compatibility(
128 p3270, expected_version
129 )
130 if strict_version and not version_compatible:
131 from pure3270.emulation.ebcdic import get_p3270_version
133 actual_version = get_p3270_version()
134 raise Pure3270PatchError(
135 f"Version incompatible: {actual_version or 'unknown'}"
136 )
137 if not version_compatible and not strict_version:
138 logger.info(
139 "Graceful degradation: Version mismatch but continuing with patching"
140 )
141 if patch_sessions:
142 # Patch the S3270 class at module level
143 original = getattr(p3270, "S3270", None)
144 self._store_original("p3270.S3270", original)
145 from pure3270.patching.s3270_wrapper import Pure3270S3270Wrapper
147 setattr(p3270, "S3270", Pure3270S3270Wrapper)
148 logger.info("Patched Session")
150 # Also patch the S3270 reference in the p3270 module's global namespace
151 # This ensures that any code that references S3270 directly gets our wrapper
152 if hasattr(p3270, "p3270"):
153 # Patch the S3270 in the actual p3270.p3270 module as well
154 p3270_module = sys.modules.get("p3270.p3270")
155 if p3270_module and hasattr(p3270_module, "S3270"):
156 original_inner = getattr(p3270_module, "S3270", None)
157 self._store_original("p3270.p3270.S3270", original_inner)
158 setattr(p3270_module, "S3270", Pure3270S3270Wrapper)
159 logger.info("Patched inner S3270 class")
160 logger.info("Patches applied")
161 except ImportError as e:
162 logger.warning(f"p3270 not installed: {e}")
163 if strict_version:
164 raise Pure3270PatchError("p3270 required for strict mode")
165 self._store_original("p3270.S3270", None)
166 except Exception as e:
167 logger.error(f"Patch error: {e}")
168 raise Pure3270PatchError(str(e))
170 def unpatch(self) -> None:
171 for key, original in self.originals.items():
172 if original is None:
173 continue # Skip None originals
174 if "." in key:
175 # Method patch
176 obj_name, method = key.rsplit(".", 1)
177 # Try to reconstruct the object from the name
178 try:
179 if obj_name in sys.modules:
180 obj = sys.modules[obj_name]
181 else:
182 # For class methods, try to find the class
183 parts = obj_name.split(".")
184 obj = sys.modules.get(parts[0])
185 if obj and len(parts) > 1:
186 for part in parts[1:]:
187 obj = getattr(obj, part, None)
188 if obj is None:
189 break
190 if obj is not None:
191 setattr(obj, method, original)
192 logger.debug(f"Restored method {key}")
193 except Exception as e:
194 logger.warning(f"Failed to restore method {key}: {e}")
195 else:
196 # Module patch
197 sys.modules[key] = original
198 logger.debug(f"Restored module {key}")
199 self.originals.clear()
200 self.patched.clear()
201 logger.info("Unpatched all")
204@contextmanager
205def PatchContext(patches: Optional[Dict[str, Any]] = None):
206 """
207 Context manager for temporary patching.
209 Args:
210 patches: Patches to apply.
212 Yields:
213 Manager instance.
214 """
215 manager = MonkeyPatchManager(patches)
216 try:
217 yield manager
218 finally:
219 manager.unpatch()
222def enable_replacement(
223 patch_sessions: bool = True,
224 patch_commands: bool = True,
225 strict_version: bool = False,
226 expected_version: str = "0.1.6",
227) -> MonkeyPatchManager:
228 """
229 Enable replacement patching with version check.
231 Args:
232 patch_sessions: Whether to patch sessions.
233 patch_commands: Whether to patch commands.
234 strict_version: Whether to enforce strict version check.
235 expected_version: The expected version for compatibility (default "0.1.6").
237 Raises:
238 ValueError: If version or replacement fails.
239 """
240 manager = MonkeyPatchManager()
241 manager.apply_patches(
242 patch_sessions=patch_sessions,
243 patch_commands=patch_commands,
244 strict_version=strict_version,
245 expected_version=expected_version,
246 )
247 return manager
250patch = enable_replacement