Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipykernel/serialize.py: 8%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1"""serialization utilities for apply messages""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import pickle 

7import warnings 

8from itertools import chain 

9 

10try: 

11 # available since ipyparallel 5.0.0 

12 from ipyparallel.serialize.canning import ( 

13 CannedObject, 

14 can, 

15 can_sequence, 

16 istype, 

17 sequence_types, 

18 uncan, 

19 uncan_sequence, 

20 ) 

21 from ipyparallel.serialize.serialize import PICKLE_PROTOCOL 

22except ImportError: 

23 # Deprecated since ipykernel 4.3.0 

24 from ipykernel.pickleutil import ( 

25 PICKLE_PROTOCOL, 

26 CannedObject, 

27 can, 

28 can_sequence, 

29 istype, 

30 sequence_types, 

31 uncan, 

32 uncan_sequence, 

33 ) 

34 

35from jupyter_client.session import MAX_BYTES, MAX_ITEMS 

36 

37warnings.warn( 

38 "ipykernel.serialize is deprecated. It has moved to ipyparallel.serialize", 

39 DeprecationWarning, 

40 stacklevel=2, 

41) 

42 

43# ----------------------------------------------------------------------------- 

44# Serialization Functions 

45# ----------------------------------------------------------------------------- 

46 

47 

48def _extract_buffers(obj, threshold=MAX_BYTES): 

49 """extract buffers larger than a certain threshold""" 

50 buffers = [] 

51 if isinstance(obj, CannedObject) and obj.buffers: 

52 for i, buf in enumerate(obj.buffers): 

53 if len(buf) > threshold: 

54 # buffer larger than threshold, prevent pickling 

55 obj.buffers[i] = None 

56 buffers.append(buf) 

57 # buffer too small for separate send, coerce to bytes 

58 # because pickling buffer objects just results in broken pointers 

59 elif isinstance(buf, memoryview): 

60 obj.buffers[i] = buf.tobytes() 

61 return buffers 

62 

63 

64def _restore_buffers(obj, buffers): 

65 """restore buffers extracted by""" 

66 if isinstance(obj, CannedObject) and obj.buffers: 

67 for i, buf in enumerate(obj.buffers): 

68 if buf is None: 

69 obj.buffers[i] = buffers.pop(0) 

70 

71 

72def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): 

73 """Serialize an object into a list of sendable buffers. 

74 

75 Parameters 

76 ---------- 

77 obj : object 

78 The object to be serialized 

79 buffer_threshold : int 

80 The threshold (in bytes) for pulling out data buffers 

81 to avoid pickling them. 

82 item_threshold : int 

83 The maximum number of items over which canning will iterate. 

84 Containers (lists, dicts) larger than this will be pickled without 

85 introspection. 

86 

87 Returns 

88 ------- 

89 [bufs] : list of buffers representing the serialized object. 

90 """ 

91 buffers = [] 

92 if istype(obj, sequence_types) and len(obj) < item_threshold: 

93 cobj = can_sequence(obj) 

94 for c in cobj: 

95 buffers.extend(_extract_buffers(c, buffer_threshold)) 

96 elif istype(obj, dict) and len(obj) < item_threshold: 

97 cobj = {} 

98 for k in sorted(obj): 

99 c = can(obj[k]) 

100 buffers.extend(_extract_buffers(c, buffer_threshold)) 

101 cobj[k] = c 

102 else: 

103 cobj = can(obj) 

104 buffers.extend(_extract_buffers(cobj, buffer_threshold)) 

105 

106 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL)) 

107 return buffers 

108 

109 

110def deserialize_object(buffers, g=None): 

111 """reconstruct an object serialized by serialize_object from data buffers. 

112 

113 Parameters 

114 ---------- 

115 buffers : list of buffers/bytes 

116 g : globals to be used when uncanning 

117 

118 Returns 

119 ------- 

120 (newobj, bufs) : unpacked object, and the list of remaining unused buffers. 

121 """ 

122 bufs = list(buffers) 

123 pobj = bufs.pop(0) 

124 canned = pickle.loads(pobj) 

125 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS: 

126 for c in canned: 

127 _restore_buffers(c, bufs) 

128 newobj = uncan_sequence(canned, g) 

129 elif istype(canned, dict) and len(canned) < MAX_ITEMS: 

130 newobj = {} 

131 for k in sorted(canned): 

132 c = canned[k] 

133 _restore_buffers(c, bufs) 

134 newobj[k] = uncan(c, g) 

135 else: 

136 _restore_buffers(canned, bufs) 

137 newobj = uncan(canned, g) 

138 

139 return newobj, bufs 

140 

141 

142def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): 

143 """pack up a function, args, and kwargs to be sent over the wire 

144 

145 Each element of args/kwargs will be canned for special treatment, 

146 but inspection will not go any deeper than that. 

147 

148 Any object whose data is larger than `threshold` will not have their data copied 

149 (only numpy arrays and bytes/buffers support zero-copy) 

150 

151 Message will be a list of bytes/buffers of the format: 

152 

153 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ] 

154 

155 With length at least two + len(args) + len(kwargs) 

156 """ 

157 

158 arg_bufs = list( 

159 chain.from_iterable(serialize_object(arg, buffer_threshold, item_threshold) for arg in args) 

160 ) 

161 

162 kw_keys = sorted(kwargs.keys()) 

163 kwarg_bufs = list( 

164 chain.from_iterable( 

165 serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys 

166 ) 

167 ) 

168 

169 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) 

170 

171 msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)] 

172 msg.append(pickle.dumps(info, PICKLE_PROTOCOL)) 

173 msg.extend(arg_bufs) 

174 msg.extend(kwarg_bufs) 

175 

176 return msg 

177 

178 

179def unpack_apply_message(bufs, g=None, copy=True): 

180 """unpack f,args,kwargs from buffers packed by pack_apply_message() 

181 Returns: original f,args,kwargs""" 

182 bufs = list(bufs) # allow us to pop 

183 assert len(bufs) >= 2, "not enough buffers!" 

184 pf = bufs.pop(0) 

185 f = uncan(pickle.loads(pf), g) 

186 pinfo = bufs.pop(0) 

187 info = pickle.loads(pinfo) 

188 arg_bufs, kwarg_bufs = bufs[: info["narg_bufs"]], bufs[info["narg_bufs"] :] 

189 

190 args_list = [] 

191 for _ in range(info["nargs"]): 

192 arg, arg_bufs = deserialize_object(arg_bufs, g) 

193 args_list.append(arg) 

194 args = tuple(args_list) 

195 assert not arg_bufs, "Shouldn't be any arg bufs left over" 

196 

197 kwargs = {} 

198 for key in info["kw_keys"]: 

199 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g) 

200 kwargs[key] = kwarg 

201 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" 

202 

203 return f, args, kwargs