Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jsonpickle/handlers.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

146 statements  

1""" 

2Custom handlers may be created to handle other objects. Each custom handler 

3must derive from :class:`jsonpickle.handlers.BaseHandler` and 

4implement ``flatten`` and ``restore``. 

5 

6A handler can be bound to other types by calling 

7:func:`jsonpickle.handlers.register`. 

8 

9""" 

10 

11import array 

12import copy 

13import datetime 

14import io 

15import queue 

16import re 

17import threading 

18import uuid 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Callable, 

23 Dict, 

24 NoReturn, 

25 Optional, 

26 Type, 

27 TypeAlias, 

28 TypeVar, 

29 Union, 

30) 

31 

32from . import util 

33 

34T = TypeVar("T") 

35 

36if TYPE_CHECKING: 

37 from .pickler import Pickler # noqa: F401 

38 from .unpickler import Unpickler # noqa: F401 

39 

40ContextType: TypeAlias = Union["Pickler", "Unpickler"] 

41RestoreType: TypeAlias = "Unpickler" 

42HandlerType = Type[Any] 

43KeyType = Union[Type[Any], str] 

44HandlerReturn = Optional[Union[dict[str, Any], str]] 

45DateTime = Union[datetime.datetime, datetime.date, datetime.time] 

46 

47 

48class Registry: 

49 def __init__(self) -> None: 

50 self._handlers = {} 

51 self._base_handlers = {} 

52 

53 def get(self, cls_or_name: type, default: Optional[Any] = None) -> Any: 

54 """ 

55 :param cls_or_name: the type or its fully qualified name 

56 :param default: default value, if a matching handler is not found 

57 

58 Looks up a handler by type reference or its fully 

59 qualified name. If a direct match 

60 is not found, the search is performed over all 

61 handlers registered with base=True. 

62 """ 

63 handler = self._handlers.get(cls_or_name) 

64 # attempt to find a base class 

65 if handler is None and util._is_type(cls_or_name): 

66 for cls, base_handler in self._base_handlers.items(): 

67 if issubclass(cls_or_name, cls): 

68 return base_handler 

69 return default if handler is None else handler 

70 

71 def register( 

72 self, cls: Type[Any], handler: Optional[KeyType] = None, base: bool = False 

73 ) -> Optional[Callable[[HandlerType], HandlerType]]: 

74 """Register the a custom handler for a class 

75 

76 :param cls: The custom object class to handle 

77 :param handler: The custom handler class (if 

78 None, a decorator wrapper is returned) 

79 :param base: Indicates whether the handler should 

80 be registered for all subclasses 

81 

82 This function can be also used as a decorator 

83 by omitting the `handler` argument:: 

84 

85 @jsonpickle.handlers.register(Foo, base=True) 

86 class FooHandler(jsonpickle.handlers.BaseHandler): 

87 pass 

88 

89 """ 

90 if handler is None: 

91 

92 def _register(handler_cls: Type[Any]) -> Type[Any]: 

93 self.register(cls, handler=handler_cls, base=base) 

94 return handler_cls 

95 

96 return _register 

97 if not util._is_type(cls): 

98 raise TypeError(f"{cls!r} is not a class/type") 

99 # store both the name and the actual type for the ugly cases like 

100 # _sre.SRE_Pattern that cannot be loaded back directly 

101 self._handlers[util.importable_name(cls)] = self._handlers[cls] = handler 

102 if base: 

103 # only store the actual type for subclass checking 

104 self._base_handlers[cls] = handler 

105 

106 def unregister(self, cls: Type[Any]) -> None: 

107 self._handlers.pop(cls, None) 

108 self._handlers.pop(util.importable_name(cls), None) 

109 self._base_handlers.pop(cls, None) 

110 

111 

112registry = Registry() 

113register = registry.register 

114unregister = registry.unregister 

115get = registry.get 

116 

117 

118class BaseHandler: 

119 def __init__(self, context: Any): 

120 """ 

121 Initialize a new handler to handle a registered type. 

122 

123 :Parameters: 

124 - `context`: reference to pickler/unpickler 

125 

126 """ 

127 self.context = context 

128 

129 def flatten(self, obj: Any, data: Dict[str, Any]) -> HandlerReturn: 

130 """ 

131 Flatten `obj` into a json-friendly form and write result to `data`. 

132 

133 :param object obj: The object to be serialized. 

134 :param dict data: A partially filled dictionary which will contain the 

135 json-friendly representation of `obj` once this method has 

136 finished. 

137 """ 

138 raise NotImplementedError("You must implement flatten() in %s" % self.__class__) 

139 

140 def restore(self, data: Dict[str, Any]) -> Any: 

141 """ 

142 Restore an object of the registered type from the json-friendly 

143 representation `obj` and return it. 

144 """ 

145 raise NotImplementedError("You must implement restore() in %s" % self.__class__) 

146 

147 @classmethod 

148 def handles(self, cls: Type[Any]) -> Type[Any]: 

149 """ 

150 Register this handler for the given class. Suitable as a decorator, 

151 e.g.:: 

152 

153 @MyCustomHandler.handles 

154 class MyCustomClass: 

155 def __reduce__(self): 

156 ... 

157 """ 

158 registry.register(cls, self) 

159 return cls 

160 

161 def __call__(self, context: ContextType) -> "BaseHandler": 

162 """This permits registering either Handler instances or classes 

163 

164 :Parameters: 

165 - `context`: reference to pickler/unpickler 

166 """ 

167 self.context = context 

168 return self 

169 

170 

171class ArrayHandler(BaseHandler): 

172 """Flatten and restore array.array objects""" 

173 

174 def flatten(self, obj: array.array, data: Dict[str, Any]) -> HandlerReturn: # type: ignore[type-arg] 

175 data["typecode"] = obj.typecode 

176 data["values"] = self.context.flatten(obj.tolist(), reset=False) 

177 return data 

178 

179 def restore(self, data: Dict[str, Any]) -> array.array: # type: ignore[type-arg] 

180 typecode = data["typecode"] 

181 values = self.context.restore(data["values"], reset=False) 

182 if typecode == "c": 

183 values = [bytes(x) for x in values] 

184 return array.array(typecode, values) 

185 

186 

187ArrayHandler.handles(array.array) 

188 

189 

190class DatetimeHandler(BaseHandler): 

191 """Custom handler for datetime objects 

192 

193 Datetime objects use __reduce__, and they generate binary strings encoding 

194 the payload. This handler encodes that payload to reconstruct the 

195 object. 

196 

197 """ 

198 

199 def flatten(self, obj: DateTime, data: Dict[str, Any]) -> HandlerReturn: 

200 pickler = self.context 

201 if not pickler.unpicklable: 

202 if hasattr(obj, "isoformat"): 

203 result = obj.isoformat() 

204 else: 

205 result = str(obj) 

206 return result 

207 cls, args = obj.__reduce__() # type: ignore[misc] 

208 flatten = pickler.flatten 

209 payload = util.b64encode(args[0]) 

210 args = [payload] + [flatten(i, reset=False) for i in args[1:]] 

211 data["__reduce__"] = (flatten(cls, reset=False), args) 

212 return data 

213 

214 def restore(self, data: Dict[str, Any]) -> Any: 

215 cls, args = data["__reduce__"] 

216 unpickler = self.context 

217 restore = unpickler.restore 

218 cls = restore(cls, reset=False) 

219 value = util.b64decode(args[0]) 

220 params = (value,) + tuple([restore(i, reset=False) for i in args[1:]]) 

221 return cls.__new__(cls, *params) 

222 

223 

224DatetimeHandler.handles(datetime.datetime) 

225DatetimeHandler.handles(datetime.date) 

226DatetimeHandler.handles(datetime.time) 

227 

228 

229class RegexHandler(BaseHandler): 

230 """Flatten _sre.SRE_Pattern (compiled regex) objects""" 

231 

232 def flatten(self, obj: re.Pattern[str], data: Dict[str, Any]) -> HandlerReturn: 

233 data["pattern"] = obj.pattern 

234 return data 

235 

236 def restore(self, data: Dict[str, Any]) -> re.Pattern[str]: 

237 return re.compile(data["pattern"]) 

238 

239 

240RegexHandler.handles(type(re.compile(""))) 

241 

242 

243class QueueHandler(BaseHandler): 

244 """Opaquely serializes Queue objects 

245 

246 Queues contains mutex and condition variables which cannot be serialized. 

247 Construct a new Queue instance when restoring. 

248 

249 """ 

250 

251 def flatten(self, obj: queue.Queue[Any], data: Dict[str, Any]) -> HandlerReturn: 

252 return data 

253 

254 def restore(self, data: Dict[str, Any]) -> queue.Queue[Any]: 

255 return queue.Queue() 

256 

257 

258QueueHandler.handles(queue.Queue) 

259 

260 

261class CloneFactory: 

262 """Serialization proxy for collections.defaultdict's default_factory""" 

263 

264 def __init__(self, exemplar: Any) -> None: 

265 self.exemplar = exemplar 

266 

267 def __call__(self, clone: Callable[[Any], Any] = copy.copy) -> Any: 

268 """Create new instances by making copies of the provided exemplar""" 

269 return clone(self.exemplar) 

270 

271 def __repr__(self) -> str: 

272 return f"<CloneFactory object at 0x{id(self):x} ({self.exemplar})>" 

273 

274 

275class UUIDHandler(BaseHandler): 

276 """Serialize uuid.UUID objects""" 

277 

278 def flatten(self, obj: uuid.UUID, data: Dict[str, Any]) -> HandlerReturn: 

279 data["hex"] = obj.hex 

280 return data 

281 

282 def restore(self, data: Dict[str, Any]) -> uuid.UUID: 

283 return uuid.UUID(data["hex"]) 

284 

285 

286UUIDHandler.handles(uuid.UUID) 

287 

288 

289class LockHandler(BaseHandler): 

290 """Serialize threading.Lock objects""" 

291 

292 def flatten(self, obj: Any, data: dict[str, Any]) -> HandlerReturn: 

293 data["locked"] = obj.locked() 

294 return data 

295 

296 def restore(self, data: Dict[str, Any]) -> Any: 

297 lock = threading.Lock() 

298 if data.get("locked", False): 

299 lock.acquire() 

300 return lock 

301 

302 

303_lock = threading.Lock() 

304LockHandler.handles(_lock.__class__) 

305 

306 

307class TextIOHandler(BaseHandler): 

308 """Serialize file descriptors as None because we cannot roundtrip""" 

309 

310 def flatten(self, obj: io.TextIOBase, data: Dict[str, Any]) -> None: 

311 return None 

312 

313 def restore(self, data: Dict[str, Any]) -> NoReturn: 

314 """Restore should never get called because flatten() returns None""" 

315 raise AssertionError("Restoring IO.TextIOHandler is not supported") 

316 

317 

318TextIOHandler.handles(io.TextIOWrapper)