1"""serialization utilities for apply messages"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import pickle
7
8try:
9 PICKLE_PROTOCOL = pickle.DEFAULT_PROTOCOL
10except AttributeError:
11 PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL
12
13from itertools import chain
14
15from jupyter_client.session import MAX_BYTES, MAX_ITEMS
16
17from .canning import (
18 CannedObject,
19 can,
20 can_sequence,
21 istype,
22 sequence_types,
23 uncan,
24 uncan_sequence,
25)
26
27# -----------------------------------------------------------------------------
28# Serialization Functions
29# -----------------------------------------------------------------------------
30
31
32class PrePickled:
33 """Wrapper for a pre-pickled object
34
35 Used for pre-emptively pickling re-used objects
36 to avoid pickling the same object several times.
37 """
38
39 def __init__(self, obj):
40 self.buffers = serialize_object(obj)
41
42
43def _nbytes(buf):
44 """Return byte-size of a memoryview or buffer"""
45 if isinstance(buf, memoryview):
46 return buf.nbytes
47 else:
48 # not a memoryview, raw bytes
49 return len(buf)
50
51
52def _extract_buffers(obj, threshold=MAX_BYTES):
53 """extract buffers larger than a certain threshold"""
54 buffers = []
55 if isinstance(obj, CannedObject) and obj.buffers:
56 for i, buf in enumerate(obj.buffers):
57 nbytes = _nbytes(buf)
58 if nbytes > threshold:
59 # buffer larger than threshold, prevent pickling
60 obj.buffers[i] = None
61 buffers.append(buf)
62 # buffer too small for separate send, coerce to bytes
63 # because pickling buffer objects just results in broken pointers
64 elif isinstance(buf, memoryview):
65 obj.buffers[i] = buf.tobytes()
66 return buffers
67
68
69def _restore_buffers(obj, buffers):
70 """restore buffers extracted by _extract_buffers"""
71 if isinstance(obj, CannedObject) and obj.buffers:
72 for i, buf in enumerate(obj.buffers):
73 if buf is None:
74 obj.buffers[i] = buffers.pop(0)
75
76
77def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
78 """Serialize an object into a list of sendable buffers.
79
80 Parameters
81 ----------
82 obj : object
83 The object to be serialized
84 buffer_threshold : int
85 The threshold (in bytes) for pulling out data buffers
86 to avoid pickling them.
87 item_threshold : int
88 The maximum number of items over which canning will iterate.
89 Containers (lists, dicts) larger than this will be pickled without
90 introspection.
91
92 Returns
93 -------
94 [bufs] : list of buffers representing the serialized object.
95 """
96 if isinstance(obj, PrePickled):
97 return obj.buffers[:]
98 buffers = []
99 if istype(obj, sequence_types) and len(obj) < item_threshold:
100 cobj = can_sequence(obj)
101 for c in cobj:
102 buffers.extend(_extract_buffers(c, buffer_threshold))
103 elif istype(obj, dict) and len(obj) < item_threshold:
104 cobj = {}
105 for k in sorted(obj):
106 c = can(obj[k])
107 buffers.extend(_extract_buffers(c, buffer_threshold))
108 cobj[k] = c
109 else:
110 cobj = can(obj)
111 buffers.extend(_extract_buffers(cobj, buffer_threshold))
112
113 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
114 return buffers
115
116
117def deserialize_object(buffers, g=None):
118 """reconstruct an object serialized by serialize_object from data buffers.
119
120 Parameters
121 ----------
122 buffers : list of buffers/bytes
123 g : globals to be used when uncanning
124
125 Returns
126 -------
127 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
128 """
129 bufs = list(buffers)
130 pobj = bufs.pop(0)
131 canned = pickle.loads(pobj)
132 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
133 for c in canned:
134 _restore_buffers(c, bufs)
135 newobj = uncan_sequence(canned, g)
136 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
137 newobj = {}
138 for k in sorted(canned):
139 c = canned[k]
140 _restore_buffers(c, bufs)
141 newobj[k] = uncan(c, g)
142 else:
143 _restore_buffers(canned, bufs)
144 newobj = uncan(canned, g)
145
146 return newobj, bufs
147
148
149def pack_apply_message(
150 f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS
151):
152 """pack up a function, args, and kwargs to be sent over the wire
153
154 Each element of args/kwargs will be canned for special treatment,
155 but inspection will not go any deeper than that.
156
157 Any object whose data is larger than `threshold` will not have their data copied
158 (only numpy arrays and bytes/buffers support zero-copy)
159
160 Message will be a list of bytes/buffers of the format:
161
162 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
163
164 With length at least two + len(args) + len(kwargs)
165 """
166
167 arg_bufs = list(
168 chain.from_iterable(
169 serialize_object(arg, buffer_threshold, item_threshold) for arg in args
170 )
171 )
172
173 kw_keys = sorted(kwargs.keys())
174 kwarg_bufs = list(
175 chain.from_iterable(
176 serialize_object(kwargs[key], buffer_threshold, item_threshold)
177 for key in kw_keys
178 )
179 )
180
181 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
182 msg = serialize_object(f)
183 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
184 msg.extend(arg_bufs)
185 msg.extend(kwarg_bufs)
186
187 return msg
188
189
190def unpack_apply_message(bufs, g=None, copy=True):
191 """unpack f,args,kwargs from buffers packed by pack_apply_message()
192 Returns: original f,args,kwargs"""
193 bufs = list(bufs) # allow us to pop
194 assert len(bufs) >= 2, "not enough buffers!"
195 f, bufs = deserialize_object(bufs, g)
196 pinfo = bufs.pop(0)
197 info = pickle.loads(pinfo)
198 arg_bufs, kwarg_bufs = bufs[: info['narg_bufs']], bufs[info['narg_bufs'] :]
199
200 args = []
201 for i in range(info['nargs']):
202 arg, arg_bufs = deserialize_object(arg_bufs, g)
203 args.append(arg)
204 args = tuple(args)
205 assert not arg_bufs, "Shouldn't be any arg bufs left over"
206
207 kwargs = {}
208 for key in info['kw_keys']:
209 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
210 kwargs[key] = kwarg
211 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
212
213 return f, args, kwargs