Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jsonpickle/handlers.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

156 statements  

1""" 

2Custom handlers may be created to handle other objects. Each custom handler 

3must derive from :class:`jsonpickle.handlers.BaseHandler` and 

4implement ``flatten`` and ``restore``. 

5 

6A handler can be bound to other types by calling 

7:func:`jsonpickle.handlers.register`. 

8 

9""" 

10 

11import array 

12import copy 

13import datetime 

14import inspect 

15import io 

16import queue 

17import re 

18import threading 

19import uuid 

20from typing import ( 

21 TYPE_CHECKING, 

22 Any, 

23 Callable, 

24 Dict, 

25 NoReturn, 

26 Optional, 

27 Type, 

28 TypeAlias, 

29 TypeVar, 

30 Union, 

31) 

32 

33from . import util 

34 

35T = TypeVar("T") 

36 

37if TYPE_CHECKING: 

38 from .pickler import Pickler # noqa: F401 

39 from .unpickler import Unpickler # noqa: F401 

40 

41ContextType: TypeAlias = Union["Pickler", "Unpickler"] 

42RestoreType: TypeAlias = "Unpickler" 

43HandlerType = Type[Any] 

44KeyType = Union[Type[Any], str] 

45HandlerReturn = Optional[Union[dict[str, Any], str]] 

46DateTime = Union[datetime.datetime, datetime.date, datetime.time] 

47 

48 

49class Registry: 

50 def __init__(self) -> None: 

51 self._handlers = {} 

52 self._base_handlers = {} 

53 

54 def get(self, cls_or_name: type, default: Optional[Any] = None) -> Any: 

55 """ 

56 :param cls_or_name: the type or its fully qualified name 

57 :param default: default value, if a matching handler is not found 

58 

59 Looks up a handler by type reference or its fully 

60 qualified name. If a direct match 

61 is not found, the search is performed over all 

62 handlers registered with base=True. 

63 """ 

64 handler = self._handlers.get(cls_or_name) 

65 # attempt to find a base class 

66 if handler is None and util._is_type(cls_or_name): 

67 for cls, base_handler in self._base_handlers.items(): 

68 if issubclass(cls_or_name, cls): 

69 return base_handler 

70 return default if handler is None else handler 

71 

72 def register( 

73 self, cls: Type[Any], handler: Optional[KeyType] = None, base: bool = False 

74 ) -> Optional[Callable[[HandlerType], HandlerType]]: 

75 """Register the a custom handler for a class 

76 

77 :param cls: The custom object class to handle 

78 :param handler: The custom handler class (if 

79 None, a decorator wrapper is returned) 

80 :param base: Indicates whether the handler should 

81 be registered for all subclasses 

82 

83 This function can be also used as a decorator 

84 by omitting the `handler` argument:: 

85 

86 @jsonpickle.handlers.register(Foo, base=True) 

87 class FooHandler(jsonpickle.handlers.BaseHandler): 

88 pass 

89 

90 """ 

91 if handler is None: 

92 

93 def _register(handler_cls: Type[Any]) -> Type[Any]: 

94 self.register(cls, handler=handler_cls, base=base) 

95 return handler_cls 

96 

97 return _register 

98 if not util._is_type(cls): 

99 raise TypeError(f"{cls!r} is not a class/type") 

100 # store both the name and the actual type for the ugly cases like 

101 # _sre.SRE_Pattern that cannot be loaded back directly 

102 self._handlers[util.importable_name(cls)] = self._handlers[cls] = handler 

103 if base: 

104 # only store the actual type for subclass checking 

105 self._base_handlers[cls] = handler 

106 

107 def unregister(self, cls: Type[Any]) -> None: 

108 self._handlers.pop(cls, None) 

109 self._handlers.pop(util.importable_name(cls), None) 

110 self._base_handlers.pop(cls, None) 

111 

112 

113registry = Registry() 

114register = registry.register 

115unregister = registry.unregister 

116get = registry.get 

117 

118 

119def handler_accepts_handler_context(fn: Callable[..., Any]) -> bool: 

120 """ 

121 Check if the handler function has a handler_context parameter. 

122 """ 

123 try: 

124 params = inspect.signature(fn).parameters 

125 except (TypeError, ValueError): 

126 return False 

127 

128 param = params.get("handler_context") 

129 if param is None: 

130 return False 

131 

132 return param.kind in ( 

133 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

134 inspect.Parameter.KEYWORD_ONLY, 

135 inspect.Parameter.VAR_KEYWORD, 

136 ) 

137 

138 

139class BaseHandler: 

140 def __init__(self, context: Any): 

141 """ 

142 Initialize a new handler to handle a registered type. 

143 

144 :Parameters: 

145 - `context`: reference to pickler/unpickler 

146 

147 """ 

148 self.context = context 

149 

150 def flatten(self, obj: Any, data: Dict[str, Any]) -> HandlerReturn: 

151 """ 

152 Flatten `obj` into a json-friendly form and write result to `data`. 

153 

154 :param object obj: The object to be serialized. 

155 :param dict data: A partially filled dictionary which will contain the 

156 json-friendly representation of `obj` once this method has 

157 finished. 

158 """ 

159 raise NotImplementedError("You must implement flatten() in %s" % self.__class__) 

160 

161 def restore(self, data: Dict[str, Any]) -> Any: 

162 """ 

163 Restore an object of the registered type from the json-friendly 

164 representation `obj` and return it. 

165 """ 

166 raise NotImplementedError("You must implement restore() in %s" % self.__class__) 

167 

168 @classmethod 

169 def handles(self, cls: Type[Any]) -> Type[Any]: 

170 """ 

171 Register this handler for the given class. Suitable as a decorator, 

172 e.g.:: 

173 

174 @MyCustomHandler.handles 

175 class MyCustomClass: 

176 def __reduce__(self): 

177 ... 

178 """ 

179 registry.register(cls, self) 

180 return cls 

181 

182 def __call__(self, context: ContextType) -> "BaseHandler": 

183 """This permits registering either Handler instances or classes 

184 

185 :Parameters: 

186 - `context`: reference to pickler/unpickler 

187 """ 

188 self.context = context 

189 return self 

190 

191 

192class ArrayHandler(BaseHandler): 

193 """Flatten and restore array.array objects""" 

194 

195 def flatten(self, obj: array.array, data: Dict[str, Any]) -> HandlerReturn: # type: ignore[type-arg] 

196 data["typecode"] = obj.typecode 

197 data["values"] = self.context.flatten(obj.tolist(), reset=False) 

198 return data 

199 

200 def restore(self, data: Dict[str, Any]) -> array.array: # type: ignore[type-arg] 

201 typecode = data["typecode"] 

202 values = self.context.restore(data["values"], reset=False) 

203 if typecode == "c": 

204 values = [bytes(x) for x in values] 

205 return array.array(typecode, values) 

206 

207 

208ArrayHandler.handles(array.array) 

209 

210 

211class DatetimeHandler(BaseHandler): 

212 """Custom handler for datetime objects 

213 

214 Datetime objects use __reduce__, and they generate binary strings encoding 

215 the payload. This handler encodes that payload to reconstruct the 

216 object. 

217 

218 """ 

219 

220 def flatten(self, obj: DateTime, data: Dict[str, Any]) -> HandlerReturn: 

221 pickler = self.context 

222 if not pickler.unpicklable: 

223 if hasattr(obj, "isoformat"): 

224 result = obj.isoformat() 

225 else: 

226 result = str(obj) 

227 return result 

228 cls, args = obj.__reduce__() # type: ignore[misc] 

229 flatten = pickler.flatten 

230 payload = util.b64encode(args[0]) 

231 args = [payload] + [flatten(i, reset=False) for i in args[1:]] 

232 data["__reduce__"] = (flatten(cls, reset=False), args) 

233 return data 

234 

235 def restore(self, data: Dict[str, Any]) -> Any: 

236 cls, args = data["__reduce__"] 

237 unpickler = self.context 

238 restore = unpickler.restore 

239 cls = restore(cls, reset=False) 

240 value = util.b64decode(args[0]) 

241 params = (value,) + tuple([restore(i, reset=False) for i in args[1:]]) 

242 return cls.__new__(cls, *params) 

243 

244 

245DatetimeHandler.handles(datetime.datetime) 

246DatetimeHandler.handles(datetime.date) 

247DatetimeHandler.handles(datetime.time) 

248 

249 

250class RegexHandler(BaseHandler): 

251 """Flatten _sre.SRE_Pattern (compiled regex) objects""" 

252 

253 def flatten(self, obj: re.Pattern[str], data: Dict[str, Any]) -> HandlerReturn: 

254 data["pattern"] = obj.pattern 

255 return data 

256 

257 def restore(self, data: Dict[str, Any]) -> re.Pattern[str]: 

258 return re.compile(data["pattern"]) 

259 

260 

261RegexHandler.handles(type(re.compile(""))) 

262 

263 

264class QueueHandler(BaseHandler): 

265 """Opaquely serializes Queue objects 

266 

267 Queues contains mutex and condition variables which cannot be serialized. 

268 Construct a new Queue instance when restoring. 

269 

270 """ 

271 

272 def flatten(self, obj: queue.Queue[Any], data: Dict[str, Any]) -> HandlerReturn: 

273 return data 

274 

275 def restore(self, data: Dict[str, Any]) -> queue.Queue[Any]: 

276 return queue.Queue() 

277 

278 

279QueueHandler.handles(queue.Queue) 

280 

281 

282class CloneFactory: 

283 """Serialization proxy for collections.defaultdict's default_factory""" 

284 

285 def __init__(self, exemplar: Any) -> None: 

286 self.exemplar = exemplar 

287 

288 def __call__(self, clone: Callable[[Any], Any] = copy.copy) -> Any: 

289 """Create new instances by making copies of the provided exemplar""" 

290 return clone(self.exemplar) 

291 

292 def __repr__(self) -> str: 

293 return f"<CloneFactory object at 0x{id(self):x} ({self.exemplar})>" 

294 

295 

296class UUIDHandler(BaseHandler): 

297 """Serialize uuid.UUID objects""" 

298 

299 def flatten(self, obj: uuid.UUID, data: Dict[str, Any]) -> HandlerReturn: 

300 data["hex"] = obj.hex 

301 return data 

302 

303 def restore(self, data: Dict[str, Any]) -> uuid.UUID: 

304 return uuid.UUID(data["hex"]) 

305 

306 

307UUIDHandler.handles(uuid.UUID) 

308 

309 

310class LockHandler(BaseHandler): 

311 """Serialize threading.Lock objects""" 

312 

313 def flatten(self, obj: Any, data: dict[str, Any]) -> HandlerReturn: 

314 data["locked"] = obj.locked() 

315 return data 

316 

317 def restore(self, data: Dict[str, Any]) -> Any: 

318 lock = threading.Lock() 

319 if data.get("locked", False): 

320 lock.acquire() 

321 return lock 

322 

323 

324_lock = threading.Lock() 

325LockHandler.handles(_lock.__class__) 

326 

327 

328class TextIOHandler(BaseHandler): 

329 """Serialize file descriptors as None because we cannot roundtrip""" 

330 

331 def flatten(self, obj: io.TextIOBase, data: Dict[str, Any]) -> None: 

332 return None 

333 

334 def restore(self, data: Dict[str, Any]) -> NoReturn: 

335 """Restore should never get called because flatten() returns None""" 

336 raise AssertionError("Restoring IO.TextIOHandler is not supported") 

337 

338 

339TextIOHandler.handles(io.TextIOWrapper)