1"""
2Custom handlers may be created to handle other objects. Each custom handler
3must derive from :class:`jsonpickle.handlers.BaseHandler` and
4implement ``flatten`` and ``restore``.
5
6A handler can be bound to other types by calling
7:func:`jsonpickle.handlers.register`.
8
9"""
10
11import array
12import copy
13import datetime
14import io
15import queue
16import re
17import threading
18import uuid
19from typing import Any, Callable, Dict, Optional, Type, TypeVar, Union
20
21from . import util
22
23T = TypeVar("T")
24# we can't import the below types directly from pickler/unpickler because we'd get a circular import
25ContextType = Union[ # type: ignore[valid-type]
26 TypeVar("Pickler", bound="Pickler"), # noqa: F821
27 TypeVar("Unpickler", bound="Unpickler"), # noqa: F821
28]
29HandlerType = Type[Any]
30KeyType = Union[Type[Any], str]
31HandlerReturn = Optional[Union[Dict[str, Any], str]]
32DateTime = Union[datetime.datetime, datetime.date, datetime.time]
33
34
35class Registry:
36 def __init__(self) -> None:
37 self._handlers = {}
38 self._base_handlers = {}
39
40 def get(self, cls_or_name: Type, default: Optional[Any] = None) -> Any:
41 """
42 :param cls_or_name: the type or its fully qualified name
43 :param default: default value, if a matching handler is not found
44
45 Looks up a handler by type reference or its fully
46 qualified name. If a direct match
47 is not found, the search is performed over all
48 handlers registered with base=True.
49 """
50 handler = self._handlers.get(cls_or_name)
51 # attempt to find a base class
52 if handler is None and util._is_type(cls_or_name):
53 for cls, base_handler in self._base_handlers.items():
54 if issubclass(cls_or_name, cls):
55 return base_handler
56 return default if handler is None else handler
57
58 def register(
59 self, cls: Type[Any], handler: Optional[KeyType] = None, base: bool = False
60 ) -> Optional[Callable[[HandlerType], HandlerType]]:
61 """Register the a custom handler for a class
62
63 :param cls: The custom object class to handle
64 :param handler: The custom handler class (if
65 None, a decorator wrapper is returned)
66 :param base: Indicates whether the handler should
67 be registered for all subclasses
68
69 This function can be also used as a decorator
70 by omitting the `handler` argument::
71
72 @jsonpickle.handlers.register(Foo, base=True)
73 class FooHandler(jsonpickle.handlers.BaseHandler):
74 pass
75
76 """
77 if handler is None:
78
79 def _register(handler_cls):
80 self.register(cls, handler=handler_cls, base=base)
81 return handler_cls
82
83 return _register
84 if not util._is_type(cls):
85 raise TypeError(f'{cls!r} is not a class/type')
86 # store both the name and the actual type for the ugly cases like
87 # _sre.SRE_Pattern that cannot be loaded back directly
88 self._handlers[util.importable_name(cls)] = self._handlers[cls] = handler
89 if base:
90 # only store the actual type for subclass checking
91 self._base_handlers[cls] = handler
92
93 def unregister(self, cls: Type[Any]) -> None:
94 self._handlers.pop(cls, None)
95 self._handlers.pop(util.importable_name(cls), None)
96 self._base_handlers.pop(cls, None)
97
98
99registry = Registry()
100register = registry.register
101unregister = registry.unregister
102get = registry.get
103
104
105class BaseHandler:
106 def __init__(self, context: Any):
107 """
108 Initialize a new handler to handle a registered type.
109
110 :Parameters:
111 - `context`: reference to pickler/unpickler
112
113 """
114 self.context = context
115
116 def flatten(self, obj: Any, data: Dict[str, Any]) -> HandlerReturn:
117 """
118 Flatten `obj` into a json-friendly form and write result to `data`.
119
120 :param object obj: The object to be serialized.
121 :param dict data: A partially filled dictionary which will contain the
122 json-friendly representation of `obj` once this method has
123 finished.
124 """
125 raise NotImplementedError('You must implement flatten() in %s' % self.__class__)
126
127 def restore(self, obj: Any) -> Any:
128 """
129 Restore an object of the registered type from the json-friendly
130 representation `obj` and return it.
131 """
132 raise NotImplementedError('You must implement restore() in %s' % self.__class__)
133
134 @classmethod
135 def handles(self, cls: Type[Any]) -> Type[Any]:
136 """
137 Register this handler for the given class. Suitable as a decorator,
138 e.g.::
139
140 @MyCustomHandler.handles
141 class MyCustomClass:
142 def __reduce__(self):
143 ...
144 """
145 registry.register(cls, self)
146 return cls
147
148 #
149 def __call__(self, context: ContextType) -> "BaseHandler": # type: ignore[valid-type]
150 """This permits registering either Handler instances or classes
151
152 :Parameters:
153 - `context`: reference to pickler/unpickler
154 """
155 self.context = context
156 return self
157
158
159class ArrayHandler(BaseHandler):
160 """Flatten and restore array.array objects"""
161
162 def flatten(self, obj: array.array, data: Dict[str, Any]) -> HandlerReturn:
163 data['typecode'] = obj.typecode
164 data['values'] = self.context.flatten(obj.tolist(), reset=False)
165 return data
166
167 def restore(self, data: Dict[str, Any]) -> array.array:
168 typecode = data['typecode']
169 values = self.context.restore(data['values'], reset=False)
170 if typecode == 'c':
171 values = [bytes(x) for x in values]
172 return array.array(typecode, values)
173
174
175ArrayHandler.handles(array.array)
176
177
178class DatetimeHandler(BaseHandler):
179 """Custom handler for datetime objects
180
181 Datetime objects use __reduce__, and they generate binary strings encoding
182 the payload. This handler encodes that payload to reconstruct the
183 object.
184
185 """
186
187 def flatten(self, obj: DateTime, data: Dict[str, Any]) -> HandlerReturn:
188 pickler = self.context
189 if not pickler.unpicklable:
190 if hasattr(obj, 'isoformat'):
191 result = obj.isoformat()
192 else:
193 result = str(obj)
194 return result
195 cls, args = obj.__reduce__() # type: ignore[misc]
196 flatten = pickler.flatten
197 payload = util.b64encode(args[0])
198 args = [payload] + [flatten(i, reset=False) for i in args[1:]]
199 data['__reduce__'] = (flatten(cls, reset=False), args)
200 return data
201
202 def restore(self, data: Dict[str, Any]) -> DateTime:
203 cls, args = data['__reduce__']
204 unpickler = self.context
205 restore = unpickler.restore
206 cls = restore(cls, reset=False)
207 value = util.b64decode(args[0])
208 params = (value,) + tuple([restore(i, reset=False) for i in args[1:]])
209 return cls.__new__(cls, *params)
210
211
212DatetimeHandler.handles(datetime.datetime)
213DatetimeHandler.handles(datetime.date)
214DatetimeHandler.handles(datetime.time)
215
216
217class RegexHandler(BaseHandler):
218 """Flatten _sre.SRE_Pattern (compiled regex) objects"""
219
220 def flatten(self, obj: re.Pattern, data: Dict[str, Any]) -> HandlerReturn:
221 data['pattern'] = obj.pattern
222 return data
223
224 def restore(self, data: Dict[str, Any]) -> re.Pattern:
225 return re.compile(data['pattern'])
226
227
228RegexHandler.handles(type(re.compile('')))
229
230
231class QueueHandler(BaseHandler):
232 """Opaquely serializes Queue objects
233
234 Queues contains mutex and condition variables which cannot be serialized.
235 Construct a new Queue instance when restoring.
236
237 """
238
239 def flatten(self, obj: queue.Queue, data: Dict[str, Any]) -> HandlerReturn:
240 return data
241
242 def restore(self, data: Dict[str, Any]) -> queue.Queue:
243 return queue.Queue()
244
245
246QueueHandler.handles(queue.Queue)
247
248
249class CloneFactory:
250 """Serialization proxy for collections.defaultdict's default_factory"""
251
252 def __init__(self, exemplar: T) -> None:
253 self.exemplar = exemplar
254
255 def __call__(self, clone: Callable[[T], T] = copy.copy) -> T:
256 """Create new instances by making copies of the provided exemplar"""
257 return clone(self.exemplar) # type: ignore[arg-type]
258
259 def __repr__(self) -> str:
260 return f'<CloneFactory object at 0x{id(self):x} ({self.exemplar})>'
261
262
263class UUIDHandler(BaseHandler):
264 """Serialize uuid.UUID objects"""
265
266 def flatten(self, obj: uuid.UUID, data: Dict[str, Any]) -> HandlerReturn:
267 data['hex'] = obj.hex
268 return data
269
270 def restore(self, data: Dict[str, Any]) -> uuid.UUID:
271 return uuid.UUID(data['hex'])
272
273
274UUIDHandler.handles(uuid.UUID)
275
276
277class LockHandler(BaseHandler):
278 """Serialize threading.Lock objects"""
279
280 def flatten(self, obj: threading.Lock, data: Dict[str, Any]) -> HandlerReturn:
281 data['locked'] = obj.locked()
282 return data
283
284 def restore(self, data: Dict[str, Any]) -> threading.Lock:
285 lock = threading.Lock()
286 if data.get('locked', False):
287 lock.acquire()
288 return lock
289
290
291_lock = threading.Lock()
292LockHandler.handles(_lock.__class__)
293
294
295class TextIOHandler(BaseHandler):
296 """Serialize file descriptors as None because we cannot roundtrip"""
297
298 def flatten(self, obj: io.TextIOBase, data: Dict[str, Any]) -> None:
299 return None
300
301 def restore(self, data: Dict[str, Any]):
302 """Restore should never get called because flatten() returns None"""
303 raise AssertionError('Restoring IO.TextIOHandler is not supported')
304
305
306TextIOHandler.handles(io.TextIOWrapper)