Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jsonpickle/handlers.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

157 statements  

1""" 

2Custom handlers may be created to handle other objects. Each custom handler 

3must derive from :class:`jsonpickle.handlers.BaseHandler` and 

4implement ``flatten`` and ``restore``. 

5 

6A handler can be bound to other types by calling 

7:func:`jsonpickle.handlers.register`. 

8 

9""" 

10 

11import array 

12import copy 

13import datetime 

14import inspect 

15import io 

16import queue 

17import re 

18import threading 

19import uuid 

20from collections.abc import Callable 

21from typing import TYPE_CHECKING, Any, NoReturn, TypeAlias, TypeVar 

22 

23from . import util 

24 

25T = TypeVar("T") 

26 

27if TYPE_CHECKING: 

28 from .pickler import Pickler # noqa: F401 

29 from .unpickler import Unpickler # noqa: F401 

30 

31ContextType: TypeAlias = "Pickler | Unpickler" 

32RestoreType: TypeAlias = "Unpickler" 

33HandlerType: TypeAlias = type 

34KeyType: TypeAlias = type | str 

35HandlerReturn: TypeAlias = dict[str, Any] | str | None 

36DateTime: TypeAlias = datetime.date | datetime.time 

37 

38 

39class Registry: 

40 def __init__(self) -> None: 

41 self._handlers = {} 

42 self._base_handlers = {} 

43 

44 def get(self, cls_or_name: type, default: Any | None = None) -> Any: 

45 """ 

46 :param cls_or_name: the type or its fully qualified name 

47 :param default: default value, if a matching handler is not found 

48 

49 Looks up a handler by type reference or its fully 

50 qualified name. If a direct match 

51 is not found, the search is performed over all 

52 handlers registered with base=True. 

53 """ 

54 handler = self._handlers.get(cls_or_name) 

55 # attempt to find a base class 

56 if handler is None and util._is_type(cls_or_name): 

57 for cls, base_handler in self._base_handlers.items(): 

58 if issubclass(cls_or_name, cls): 

59 return base_handler 

60 return default if handler is None else handler 

61 

62 def register( 

63 self, cls: type, handler: KeyType | None = None, base: bool = False 

64 ) -> Callable[[HandlerType], HandlerType] | None: 

65 """Register the a custom handler for a class 

66 

67 :param cls: The custom object class to handle 

68 :param handler: The custom handler class (if 

69 None, a decorator wrapper is returned) 

70 :param base: Indicates whether the handler should 

71 be registered for all subclasses 

72 

73 This function can be also used as a decorator 

74 by omitting the `handler` argument:: 

75 

76 @jsonpickle.handlers.register(Foo, base=True) 

77 class FooHandler(jsonpickle.handlers.BaseHandler): 

78 pass 

79 

80 """ 

81 if handler is None: 

82 

83 def _register(handler_cls: HandlerType) -> HandlerType: 

84 self.register(cls, handler=handler_cls, base=base) 

85 return handler_cls 

86 

87 return _register 

88 if not util._is_type(cls): 

89 raise TypeError(f"{cls!r} is not a class/type") 

90 # store both the name and the actual type for the ugly cases like 

91 # _sre.SRE_Pattern that cannot be loaded back directly 

92 self._handlers[util.importable_name(cls)] = self._handlers[cls] = handler 

93 if base: 

94 # only store the actual type for subclass checking 

95 self._base_handlers[cls] = handler 

96 

97 def unregister(self, cls: type) -> None: 

98 self._handlers.pop(cls, None) 

99 self._handlers.pop(util.importable_name(cls), None) 

100 self._base_handlers.pop(cls, None) 

101 

102 

103registry = Registry() 

104register = registry.register 

105unregister = registry.unregister 

106get = registry.get 

107 

108 

109def handler_accepts_handler_context(fn: Callable[..., Any]) -> bool: 

110 """ 

111 Check if the handler function has a handler_context parameter. 

112 """ 

113 try: 

114 params = inspect.signature(fn).parameters 

115 except (TypeError, ValueError): 

116 return False 

117 

118 param = params.get("handler_context") 

119 if param is None: 

120 return False 

121 

122 return param.kind in ( 

123 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

124 inspect.Parameter.KEYWORD_ONLY, 

125 inspect.Parameter.VAR_KEYWORD, 

126 ) 

127 

128 

129class BaseHandler: 

130 def __init__(self, context: Any): 

131 """ 

132 Initialize a new handler to handle a registered type. 

133 

134 :Parameters: 

135 - `context`: reference to pickler/unpickler 

136 

137 """ 

138 self.context = context 

139 

140 def flatten(self, obj: Any, data: dict[str, Any]) -> HandlerReturn: 

141 """ 

142 Flatten `obj` into a json-friendly form and write result to `data`. 

143 

144 :param object obj: The object to be serialized. 

145 :param dict data: A partially filled dictionary which will contain the 

146 json-friendly representation of `obj` once this method has 

147 finished. 

148 """ 

149 raise NotImplementedError("You must implement flatten() in %s" % self.__class__) 

150 

151 def restore(self, data: dict[str, Any]) -> Any: 

152 """ 

153 Restore an object of the registered type from the json-friendly 

154 representation `obj` and return it. 

155 """ 

156 raise NotImplementedError("You must implement restore() in %s" % self.__class__) 

157 

158 @classmethod 

159 def handles(self, cls: type) -> type: 

160 """ 

161 Register this handler for the given class. Suitable as a decorator, 

162 e.g.:: 

163 

164 @MyCustomHandler.handles 

165 class MyCustomClass: 

166 def __reduce__(self): 

167 ... 

168 """ 

169 registry.register(cls, self) 

170 return cls 

171 

172 def __call__(self, context: ContextType) -> "BaseHandler": 

173 """This permits registering either Handler instances or classes 

174 

175 :Parameters: 

176 - `context`: reference to pickler/unpickler 

177 """ 

178 self.context = context 

179 return self 

180 

181 

182class ArrayHandler(BaseHandler): 

183 """Flatten and restore array.array objects""" 

184 

185 def flatten(self, obj: array.array, data: dict[str, Any]) -> HandlerReturn: # type: ignore[type-arg] 

186 data["typecode"] = obj.typecode 

187 data["values"] = self.context.flatten(obj.tolist(), reset=False) 

188 return data 

189 

190 def restore(self, data: dict[str, Any]) -> array.array: # type: ignore[type-arg] 

191 typecode = data["typecode"] 

192 values = self.context.restore(data["values"], reset=False) 

193 if typecode == "c": 

194 values = [bytes(x) for x in values] 

195 return array.array(typecode, values) 

196 

197 

198ArrayHandler.handles(array.array) 

199 

200 

201class DatetimeHandler(BaseHandler): 

202 """Custom handler for datetime objects 

203 

204 Datetime objects use __reduce__, and they generate binary strings encoding 

205 the payload. This handler encodes that payload to reconstruct the 

206 object. 

207 

208 """ 

209 

210 def flatten(self, obj: DateTime, data: dict[str, Any]) -> HandlerReturn: 

211 pickler = self.context 

212 if not pickler.unpicklable: 

213 if hasattr(obj, "isoformat"): 

214 result = obj.isoformat() 

215 else: 

216 result = str(obj) 

217 return result 

218 cls, args = obj.__reduce__() # type: ignore[str-unpack] 

219 flatten = pickler.flatten 

220 payload = util.b64encode(args[0]) # type: ignore[arg-type] 

221 args = [payload] + [flatten(i, reset=False) for i in args[1:]] 

222 data["__reduce__"] = (flatten(cls, reset=False), args) 

223 return data 

224 

225 def restore(self, data: dict[str, Any]) -> Any: 

226 cls, args = data["__reduce__"] 

227 unpickler = self.context 

228 restore = unpickler.restore 

229 cls = restore(cls, reset=False) 

230 value = util.b64decode(args[0]) 

231 params = (value,) + tuple([restore(i, reset=False) for i in args[1:]]) 

232 return cls.__new__(cls, *params) 

233 

234 

235DatetimeHandler.handles(datetime.datetime) 

236DatetimeHandler.handles(datetime.date) 

237DatetimeHandler.handles(datetime.time) 

238 

239 

240class RegexHandler(BaseHandler): 

241 """Flatten _sre.SRE_Pattern (compiled regex) objects""" 

242 

243 def flatten(self, obj: re.Pattern[str], data: dict[str, Any]) -> HandlerReturn: 

244 data["pattern"] = obj.pattern 

245 return data 

246 

247 def restore(self, data: dict[str, Any]) -> re.Pattern[str]: 

248 return re.compile(data["pattern"]) 

249 

250 

251RegexHandler.handles(type(re.compile(""))) 

252 

253 

254class QueueHandler(BaseHandler): 

255 """Opaquely serializes Queue objects 

256 

257 Queues contains mutex and condition variables which cannot be serialized. 

258 Construct a new Queue instance when restoring. 

259 

260 """ 

261 

262 def flatten(self, obj: queue.Queue[Any], data: dict[str, Any]) -> HandlerReturn: 

263 return data 

264 

265 def restore(self, data: dict[str, Any]) -> queue.Queue[Any]: 

266 return queue.Queue() 

267 

268 

269QueueHandler.handles(queue.Queue) 

270 

271 

272class CloneFactory: 

273 """Serialization proxy for collections.defaultdict's default_factory""" 

274 

275 def __init__(self, exemplar: Any) -> None: 

276 self.exemplar = exemplar 

277 

278 def __call__(self, clone: Callable[[Any], Any] = copy.copy) -> Any: 

279 """Create new instances by making copies of the provided exemplar""" 

280 return clone(self.exemplar) 

281 

282 def __repr__(self) -> str: 

283 return f"<CloneFactory object at 0x{id(self):x} ({self.exemplar})>" 

284 

285 

286class UUIDHandler(BaseHandler): 

287 """Serialize uuid.UUID objects""" 

288 

289 def flatten(self, obj: uuid.UUID, data: dict[str, Any]) -> HandlerReturn: 

290 data["hex"] = obj.hex 

291 return data 

292 

293 def restore(self, data: dict[str, Any]) -> uuid.UUID: 

294 return uuid.UUID(data["hex"]) 

295 

296 

297UUIDHandler.handles(uuid.UUID) 

298 

299 

300class LockHandler(BaseHandler): 

301 """Serialize threading.Lock objects""" 

302 

303 def flatten(self, obj: Any, data: dict[str, Any]) -> HandlerReturn: 

304 data["locked"] = obj.locked() 

305 return data 

306 

307 def restore(self, data: dict[str, Any]) -> Any: 

308 lock = threading.Lock() 

309 if data.get("locked", False): 

310 lock.acquire() 

311 return lock 

312 

313 

314_lock = threading.Lock() 

315LockHandler.handles(_lock.__class__) 

316 

317 

318class TextIOHandler(BaseHandler): 

319 """Serialize file descriptors as None because we cannot roundtrip""" 

320 

321 def flatten(self, obj: io.TextIOBase, data: dict[str, Any]) -> None: 

322 return None 

323 

324 def restore(self, data: dict[str, Any]) -> NoReturn: 

325 """Restore should never get called because flatten() returns None""" 

326 raise AssertionError("Restoring IO.TextIOHandler is not supported") 

327 

328 

329TextIOHandler.handles(io.TextIOWrapper)