Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/serialize/serialize.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

96 statements  

1"""serialization utilities for apply messages""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6import pickle 

7 

8try: 

9 PICKLE_PROTOCOL = pickle.DEFAULT_PROTOCOL 

10except AttributeError: 

11 PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL 

12 

13from itertools import chain 

14 

15from jupyter_client.session import MAX_BYTES, MAX_ITEMS 

16 

17from .canning import ( 

18 CannedObject, 

19 can, 

20 can_sequence, 

21 istype, 

22 sequence_types, 

23 uncan, 

24 uncan_sequence, 

25) 

26 

27# ----------------------------------------------------------------------------- 

28# Serialization Functions 

29# ----------------------------------------------------------------------------- 

30 

31 

32class PrePickled: 

33 """Wrapper for a pre-pickled object 

34 

35 Used for pre-emptively pickling re-used objects 

36 to avoid pickling the same object several times. 

37 """ 

38 

39 def __init__(self, obj): 

40 self.buffers = serialize_object(obj) 

41 

42 

43def _nbytes(buf): 

44 """Return byte-size of a memoryview or buffer""" 

45 if isinstance(buf, memoryview): 

46 return buf.nbytes 

47 else: 

48 # not a memoryview, raw bytes 

49 return len(buf) 

50 

51 

52def _extract_buffers(obj, threshold=MAX_BYTES): 

53 """extract buffers larger than a certain threshold""" 

54 buffers = [] 

55 if isinstance(obj, CannedObject) and obj.buffers: 

56 for i, buf in enumerate(obj.buffers): 

57 nbytes = _nbytes(buf) 

58 if nbytes > threshold: 

59 # buffer larger than threshold, prevent pickling 

60 obj.buffers[i] = None 

61 buffers.append(buf) 

62 # buffer too small for separate send, coerce to bytes 

63 # because pickling buffer objects just results in broken pointers 

64 elif isinstance(buf, memoryview): 

65 obj.buffers[i] = buf.tobytes() 

66 return buffers 

67 

68 

69def _restore_buffers(obj, buffers): 

70 """restore buffers extracted by _extract_buffers""" 

71 if isinstance(obj, CannedObject) and obj.buffers: 

72 for i, buf in enumerate(obj.buffers): 

73 if buf is None: 

74 obj.buffers[i] = buffers.pop(0) 

75 

76 

77def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS): 

78 """Serialize an object into a list of sendable buffers. 

79 

80 Parameters 

81 ---------- 

82 obj : object 

83 The object to be serialized 

84 buffer_threshold : int 

85 The threshold (in bytes) for pulling out data buffers 

86 to avoid pickling them. 

87 item_threshold : int 

88 The maximum number of items over which canning will iterate. 

89 Containers (lists, dicts) larger than this will be pickled without 

90 introspection. 

91 

92 Returns 

93 ------- 

94 [bufs] : list of buffers representing the serialized object. 

95 """ 

96 if isinstance(obj, PrePickled): 

97 return obj.buffers[:] 

98 buffers = [] 

99 if istype(obj, sequence_types) and len(obj) < item_threshold: 

100 cobj = can_sequence(obj) 

101 for c in cobj: 

102 buffers.extend(_extract_buffers(c, buffer_threshold)) 

103 elif istype(obj, dict) and len(obj) < item_threshold: 

104 cobj = {} 

105 for k in sorted(obj): 

106 c = can(obj[k]) 

107 buffers.extend(_extract_buffers(c, buffer_threshold)) 

108 cobj[k] = c 

109 else: 

110 cobj = can(obj) 

111 buffers.extend(_extract_buffers(cobj, buffer_threshold)) 

112 

113 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL)) 

114 return buffers 

115 

116 

117def deserialize_object(buffers, g=None): 

118 """reconstruct an object serialized by serialize_object from data buffers. 

119 

120 Parameters 

121 ---------- 

122 buffers : list of buffers/bytes 

123 g : globals to be used when uncanning 

124 

125 Returns 

126 ------- 

127 (newobj, bufs) : unpacked object, and the list of remaining unused buffers. 

128 """ 

129 bufs = list(buffers) 

130 pobj = bufs.pop(0) 

131 canned = pickle.loads(pobj) 

132 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS: 

133 for c in canned: 

134 _restore_buffers(c, bufs) 

135 newobj = uncan_sequence(canned, g) 

136 elif istype(canned, dict) and len(canned) < MAX_ITEMS: 

137 newobj = {} 

138 for k in sorted(canned): 

139 c = canned[k] 

140 _restore_buffers(c, bufs) 

141 newobj[k] = uncan(c, g) 

142 else: 

143 _restore_buffers(canned, bufs) 

144 newobj = uncan(canned, g) 

145 

146 return newobj, bufs 

147 

148 

149def pack_apply_message( 

150 f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS 

151): 

152 """pack up a function, args, and kwargs to be sent over the wire 

153 

154 Each element of args/kwargs will be canned for special treatment, 

155 but inspection will not go any deeper than that. 

156 

157 Any object whose data is larger than `threshold` will not have their data copied 

158 (only numpy arrays and bytes/buffers support zero-copy) 

159 

160 Message will be a list of bytes/buffers of the format: 

161 

162 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ] 

163 

164 With length at least two + len(args) + len(kwargs) 

165 """ 

166 

167 arg_bufs = list( 

168 chain.from_iterable( 

169 serialize_object(arg, buffer_threshold, item_threshold) for arg in args 

170 ) 

171 ) 

172 

173 kw_keys = sorted(kwargs.keys()) 

174 kwarg_bufs = list( 

175 chain.from_iterable( 

176 serialize_object(kwargs[key], buffer_threshold, item_threshold) 

177 for key in kw_keys 

178 ) 

179 ) 

180 

181 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) 

182 msg = serialize_object(f) 

183 msg.append(pickle.dumps(info, PICKLE_PROTOCOL)) 

184 msg.extend(arg_bufs) 

185 msg.extend(kwarg_bufs) 

186 

187 return msg 

188 

189 

190def unpack_apply_message(bufs, g=None, copy=True): 

191 """unpack f,args,kwargs from buffers packed by pack_apply_message() 

192 Returns: original f,args,kwargs""" 

193 bufs = list(bufs) # allow us to pop 

194 assert len(bufs) >= 2, "not enough buffers!" 

195 f, bufs = deserialize_object(bufs, g) 

196 pinfo = bufs.pop(0) 

197 info = pickle.loads(pinfo) 

198 arg_bufs, kwarg_bufs = bufs[: info['narg_bufs']], bufs[info['narg_bufs'] :] 

199 

200 args = [] 

201 for i in range(info['nargs']): 

202 arg, arg_bufs = deserialize_object(arg_bufs, g) 

203 args.append(arg) 

204 args = tuple(args) 

205 assert not arg_bufs, "Shouldn't be any arg bufs left over" 

206 

207 kwargs = {} 

208 for key in info['kw_keys']: 

209 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g) 

210 kwargs[key] = kwarg 

211 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over" 

212 

213 return f, args, kwargs