Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/client/remotefunction.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

151 statements  

1"""Remote Functions and decorators for Views.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import warnings 

6from inspect import signature 

7 

8from decorator import decorator 

9 

10from ..serialize import PrePickled 

11from . import map as Map 

12from .asyncresult import AsyncMapResult 

13 

14# ----------------------------------------------------------------------------- 

15# Functions and Decorators 

16# ----------------------------------------------------------------------------- 

17 

18 

19def remote(view, block=None, **flags): 

20 """Turn a function into a remote function. 

21 

22 This method can be used for map:: 

23 

24 In [1]: @remote(view,block=True) 

25 ...: def func(a): 

26 ...: pass 

27 """ 

28 

29 def remote_function(f): 

30 return RemoteFunction(view, f, block=block, **flags) 

31 

32 return remote_function 

33 

34 

35def parallel(view, dist='b', block=None, ordered=True, **flags): 

36 """Turn a function into a parallel remote function. 

37 

38 This method can be used for map:: 

39 

40 In [1]: @parallel(view, block=True) 

41 ...: def func(a): 

42 ...: pass 

43 """ 

44 

45 def parallel_function(f): 

46 return ParallelFunction( 

47 view, f, dist=dist, block=block, ordered=ordered, **flags 

48 ) 

49 

50 return parallel_function 

51 

52 

53def getname(f): 

54 """Get the name of an object. 

55 

56 For use in case of callables that are not functions, and 

57 thus may not have __name__ defined. 

58 

59 Order: f.__name__ > f.name > str(f) 

60 """ 

61 try: 

62 return f.__name__ 

63 except Exception: 

64 pass 

65 try: 

66 return f.name 

67 except Exception: 

68 pass 

69 

70 return str(f) 

71 

72 

73@decorator 

74def sync_view_results(f, self, *args, **kwargs): 

75 """sync relevant results from self.client to our results attribute. 

76 

77 This is a clone of view.sync_results, but for remote functions 

78 """ 

79 view = self.view 

80 if view._in_sync_results: 

81 return f(self, *args, **kwargs) 

82 view._in_sync_results = True 

83 try: 

84 ret = f(self, *args, **kwargs) 

85 finally: 

86 view._in_sync_results = False 

87 view._sync_results() 

88 return ret 

89 

90 

91# -------------------------------------------------------------------------- 

92# Classes 

93# -------------------------------------------------------------------------- 

94 

95 

96class RemoteFunction: 

97 """Turn an existing function into a remote function. 

98 

99 Parameters 

100 ---------- 

101 

102 view : View instance 

103 The view to be used for execution 

104 f : callable 

105 The function to be wrapped into a remote function 

106 block : bool [default: None] 

107 Whether to wait for results or not. The default behavior is 

108 to use the current `block` attribute of `view` 

109 

110 **flags : remaining kwargs are passed to View.temp_flags 

111 """ 

112 

113 view = None # the remote connection 

114 func = None # the wrapped function 

115 block = None # whether to block 

116 flags = None # dict of extra kwargs for temp_flags 

117 

118 def __init__(self, view, f, block=None, **flags): 

119 self.view = view 

120 self.func = f 

121 self.block = block 

122 self.flags = flags 

123 

124 # copy function attributes for nicer inspection 

125 # of decorated functions 

126 self.__name__ = getname(f) 

127 if getattr(f, '__doc__', None): 

128 self.__doc__ = f'{self.__class__.__name__} wrapping:\n{f.__doc__}' 

129 if getattr(f, '__signature__', None): 

130 self.__signature__ = f.__signature__ 

131 else: 

132 try: 

133 self.__signature__ = signature(f) 

134 except Exception: 

135 # no signature, but that's okay 

136 pass 

137 

138 def __call__(self, *args, **kwargs): 

139 block = self.view.block if self.block is None else self.block 

140 with self.view.temp_flags(block=block, **self.flags): 

141 return self.view.apply(self.func, *args, **kwargs) 

142 

143 

144def _map(f, *sequences): 

145 return list(map(f, *sequences)) 

146 

147 

148_prepickled_map = None 

149 

150 

151class ParallelFunction(RemoteFunction): 

152 """Class for mapping a function to sequences. 

153 

154 This will distribute the sequences according the a mapper, and call 

155 the function on each sub-sequence. If called via map, then the function 

156 will be called once on each element, rather that each sub-sequence. 

157 

158 Parameters 

159 ---------- 

160 

161 view : View instance 

162 The view to be used for execution 

163 f : callable 

164 The function to be wrapped into a remote function 

165 dist : str [default: 'b'] 

166 The key for which mapObject to use to distribute sequences 

167 options are: 

168 

169 * 'b' : use contiguous chunks in order 

170 * 'r' : use round-robin striping 

171 

172 block : bool [default: None] 

173 Whether to wait for results or not. The default behavior is 

174 to use the current `block` attribute of `view` 

175 chunksize : int or None 

176 The size of chunk to use when breaking up sequences in a load-balanced manner 

177 ordered : bool [default: True] 

178 Whether the result should be kept in order. If False, 

179 results become available as they arrive, regardless of submission order. 

180 return_exceptions : bool [default: False] 

181 **flags 

182 remaining kwargs are passed to View.temp_flags 

183 """ 

184 

185 chunksize = None 

186 ordered = None 

187 mapObject = None 

188 

189 def __init__( 

190 self, 

191 view, 

192 f, 

193 dist='b', 

194 block=None, 

195 chunksize=None, 

196 ordered=True, 

197 return_exceptions=False, 

198 **flags, 

199 ): 

200 super().__init__(view, f, block=block, **flags) 

201 self.chunksize = chunksize 

202 self.ordered = ordered 

203 self.return_exceptions = return_exceptions 

204 

205 mapClass = Map.dists[dist] 

206 self.mapObject = mapClass() 

207 

208 @sync_view_results 

209 def __call__(self, *sequences, **kwargs): 

210 global _prepickled_map 

211 if _prepickled_map is None: 

212 _prepickled_map = PrePickled(_map) 

213 client = self.view.client 

214 _mapping = kwargs.pop('__ipp_mapping', False) 

215 if kwargs: 

216 raise TypeError(f"Unexpected keyword arguments: {kwargs}") 

217 

218 lens = [] 

219 maxlen = minlen = -1 

220 for i, seq in enumerate(sequences): 

221 try: 

222 n = len(seq) 

223 except Exception: 

224 seq = list(seq) 

225 if isinstance(sequences, tuple): 

226 # can't alter a tuple 

227 sequences = list(sequences) 

228 sequences[i] = seq 

229 n = len(seq) 

230 if n > maxlen: 

231 maxlen = n 

232 if minlen == -1 or n < minlen: 

233 minlen = n 

234 lens.append(n) 

235 

236 if maxlen == 0: 

237 # nothing to iterate over 

238 return [] 

239 

240 # check that the length of sequences match 

241 if not _mapping and minlen != maxlen: 

242 msg = f'all sequences must have equal length, but have {lens}' 

243 raise ValueError(msg) 

244 

245 balanced = 'Balanced' in self.view.__class__.__name__ 

246 if balanced: 

247 if self.chunksize: 

248 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0) 

249 else: 

250 nparts = maxlen 

251 targets = [None] * nparts 

252 else: 

253 if self.chunksize: 

254 warnings.warn( 

255 "`chunksize` is ignored unless load balancing", UserWarning 

256 ) 

257 # multiplexed: 

258 targets = self.view.targets 

259 # 'all' is lazily evaluated at execution time, which is now: 

260 if targets == 'all': 

261 targets = client._build_targets(targets)[1] 

262 elif isinstance(targets, int): 

263 # single-engine view, targets must be iterable 

264 targets = [targets] 

265 nparts = len(targets) 

266 

267 futures = [] 

268 

269 pf = PrePickled(self.func) 

270 

271 chunk_sizes = {} 

272 chunk_size = 1 

273 

274 for index, t in enumerate(targets): 

275 args = [] 

276 for seq in sequences: 

277 part = self.mapObject.getPartition(seq, index, nparts, maxlen) 

278 args.append(part) 

279 

280 if sum(len(arg) for arg in args) == 0: 

281 continue 

282 

283 if _mapping: 

284 chunk_size = min(len(arg) for arg in args) 

285 

286 args = [PrePickled(arg) for arg in args] 

287 

288 if _mapping: 

289 f = _prepickled_map 

290 args = [pf] + args 

291 else: 

292 f = pf 

293 

294 view = self.view if balanced else client[t] 

295 with view.temp_flags(block=False, **self.flags): 

296 ar = view.apply(f, *args) 

297 ar.owner = False 

298 

299 msg_id = ar.msg_ids[0] 

300 chunk_sizes[msg_id] = chunk_size 

301 futures.extend(ar._children) 

302 

303 r = AsyncMapResult( 

304 self.view.client, 

305 futures, 

306 self.mapObject, 

307 fname=getname(self.func), 

308 ordered=self.ordered, 

309 return_exceptions=self.return_exceptions, 

310 chunk_sizes=chunk_sizes, 

311 ) 

312 

313 if self.block: 

314 try: 

315 return r.get() 

316 except KeyboardInterrupt: 

317 return r 

318 else: 

319 return r 

320 

321 def map(self, *sequences): 

322 """call a function on each element of one or more sequence(s) remotely. 

323 This should behave very much like the builtin map, but return an AsyncMapResult 

324 if self.block is False. 

325 

326 That means it can take generators (will be cast to lists locally), 

327 and mismatched sequence lengths will be padded with None. 

328 """ 

329 return self(*sequences, __ipp_mapping=True) 

330 

331 

332__all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']