1"""serialization utilities for apply messages"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import pickle
7import warnings
8from itertools import chain
9
10try:
11 # available since ipyparallel 5.0.0
12 from ipyparallel.serialize.canning import (
13 CannedObject,
14 can,
15 can_sequence,
16 istype,
17 sequence_types,
18 uncan,
19 uncan_sequence,
20 )
21 from ipyparallel.serialize.serialize import PICKLE_PROTOCOL
22except ImportError:
23 # Deprecated since ipykernel 4.3.0
24 from ipykernel.pickleutil import (
25 PICKLE_PROTOCOL,
26 CannedObject,
27 can,
28 can_sequence,
29 istype,
30 sequence_types,
31 uncan,
32 uncan_sequence,
33 )
34
35from jupyter_client.session import MAX_BYTES, MAX_ITEMS
36
37warnings.warn(
38 "ipykernel.serialize is deprecated. It has moved to ipyparallel.serialize",
39 DeprecationWarning,
40 stacklevel=2,
41)
42
43# -----------------------------------------------------------------------------
44# Serialization Functions
45# -----------------------------------------------------------------------------
46
47
48def _extract_buffers(obj, threshold=MAX_BYTES):
49 """extract buffers larger than a certain threshold"""
50 buffers = []
51 if isinstance(obj, CannedObject) and obj.buffers:
52 for i, buf in enumerate(obj.buffers):
53 if len(buf) > threshold:
54 # buffer larger than threshold, prevent pickling
55 obj.buffers[i] = None
56 buffers.append(buf)
57 # buffer too small for separate send, coerce to bytes
58 # because pickling buffer objects just results in broken pointers
59 elif isinstance(buf, memoryview):
60 obj.buffers[i] = buf.tobytes()
61 return buffers
62
63
64def _restore_buffers(obj, buffers):
65 """restore buffers extracted by"""
66 if isinstance(obj, CannedObject) and obj.buffers:
67 for i, buf in enumerate(obj.buffers):
68 if buf is None:
69 obj.buffers[i] = buffers.pop(0)
70
71
72def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
73 """Serialize an object into a list of sendable buffers.
74
75 Parameters
76 ----------
77 obj : object
78 The object to be serialized
79 buffer_threshold : int
80 The threshold (in bytes) for pulling out data buffers
81 to avoid pickling them.
82 item_threshold : int
83 The maximum number of items over which canning will iterate.
84 Containers (lists, dicts) larger than this will be pickled without
85 introspection.
86
87 Returns
88 -------
89 [bufs] : list of buffers representing the serialized object.
90 """
91 buffers = []
92 if istype(obj, sequence_types) and len(obj) < item_threshold:
93 cobj = can_sequence(obj)
94 for c in cobj:
95 buffers.extend(_extract_buffers(c, buffer_threshold))
96 elif istype(obj, dict) and len(obj) < item_threshold:
97 cobj = {}
98 for k in sorted(obj):
99 c = can(obj[k])
100 buffers.extend(_extract_buffers(c, buffer_threshold))
101 cobj[k] = c
102 else:
103 cobj = can(obj)
104 buffers.extend(_extract_buffers(cobj, buffer_threshold))
105
106 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
107 return buffers
108
109
110def deserialize_object(buffers, g=None):
111 """reconstruct an object serialized by serialize_object from data buffers.
112
113 Parameters
114 ----------
115 buffers : list of buffers/bytes
116 g : globals to be used when uncanning
117
118 Returns
119 -------
120 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
121 """
122 bufs = list(buffers)
123 pobj = bufs.pop(0)
124 canned = pickle.loads(pobj)
125 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
126 for c in canned:
127 _restore_buffers(c, bufs)
128 newobj = uncan_sequence(canned, g)
129 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
130 newobj = {}
131 for k in sorted(canned):
132 c = canned[k]
133 _restore_buffers(c, bufs)
134 newobj[k] = uncan(c, g)
135 else:
136 _restore_buffers(canned, bufs)
137 newobj = uncan(canned, g)
138
139 return newobj, bufs
140
141
142def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
143 """pack up a function, args, and kwargs to be sent over the wire
144
145 Each element of args/kwargs will be canned for special treatment,
146 but inspection will not go any deeper than that.
147
148 Any object whose data is larger than `threshold` will not have their data copied
149 (only numpy arrays and bytes/buffers support zero-copy)
150
151 Message will be a list of bytes/buffers of the format:
152
153 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
154
155 With length at least two + len(args) + len(kwargs)
156 """
157
158 arg_bufs = list(
159 chain.from_iterable(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
160 )
161
162 kw_keys = sorted(kwargs.keys())
163 kwarg_bufs = list(
164 chain.from_iterable(
165 serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys
166 )
167 )
168
169 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
170
171 msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
172 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
173 msg.extend(arg_bufs)
174 msg.extend(kwarg_bufs)
175
176 return msg
177
178
179def unpack_apply_message(bufs, g=None, copy=True):
180 """unpack f,args,kwargs from buffers packed by pack_apply_message()
181 Returns: original f,args,kwargs"""
182 bufs = list(bufs) # allow us to pop
183 assert len(bufs) >= 2, "not enough buffers!"
184 pf = bufs.pop(0)
185 f = uncan(pickle.loads(pf), g)
186 pinfo = bufs.pop(0)
187 info = pickle.loads(pinfo)
188 arg_bufs, kwarg_bufs = bufs[: info["narg_bufs"]], bufs[info["narg_bufs"] :]
189
190 args_list = []
191 for _ in range(info["nargs"]):
192 arg, arg_bufs = deserialize_object(arg_bufs, g)
193 args_list.append(arg)
194 args = tuple(args_list)
195 assert not arg_bufs, "Shouldn't be any arg bufs left over"
196
197 kwargs = {}
198 for key in info["kw_keys"]:
199 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
200 kwargs[key] = kwarg
201 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
202
203 return f, args, kwargs