Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/toolz/sandbox/parallel.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

19 statements  

1import functools 

2from toolz.itertoolz import partition_all 

3from toolz.utils import no_default 

4 

5 

6def _reduce(func, seq, initial=None): 

7 if initial is None: 

8 return functools.reduce(func, seq) 

9 else: 

10 return functools.reduce(func, seq, initial) 

11 

12 

13def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): 

14 """ 

15 Reduce without guarantee of ordered reduction. 

16 

17 Parameters 

18 ---------- 

19 binops 

20 Associative operator. The associative property allows us to 

21 leverage a parallel map to perform reductions in parallel. 

22 

23 

24 inputs: 

25 

26 ``binop`` - associative operator. The associative property allows us to 

27 leverage a parallel map to perform reductions in parallel. 

28 

29 ``seq`` - a sequence to be aggregated 

30 ``default`` - an identity element like 0 for ``add`` or 1 for mul 

31 

32 ``map`` - an implementation of ``map``. This may be parallel and 

33 determines how work is distributed. 

34 ``chunksize`` - Number of elements of ``seq`` that should be handled 

35 within a single function call 

36 ``combine`` - Binary operator to combine two intermediate results. 

37 If ``binop`` is of type (total, item) -> total 

38 then ``combine`` is of type (total, total) -> total 

39 Defaults to ``binop`` for common case of operators like add 

40 

41 Fold chunks up the collection into blocks of size ``chunksize`` and then 

42 feeds each of these to calls to ``reduce``. This work is distributed 

43 with a call to ``map``, gathered back and then refolded to finish the 

44 computation. In this way ``fold`` specifies only how to chunk up data but 

45 leaves the distribution of this work to an externally provided ``map`` 

46 function. This function can be sequential or rely on multithreading, 

47 multiprocessing, or even distributed solutions. 

48 

49 If ``map`` intends to serialize functions it should be prepared to accept 

50 and serialize lambdas. Note that the standard ``pickle`` module fails 

51 here. 

52 

53 Example 

54 ------- 

55 

56 >>> # Provide a parallel map to accomplish a parallel sum 

57 >>> from operator import add 

58 >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map) 

59 10 

60 """ 

61 assert chunksize > 1 

62 

63 if combine is None: 

64 combine = binop 

65 

66 chunks = partition_all(chunksize, seq) 

67 

68 # Evaluate sequence in chunks via map 

69 if default == no_default: 

70 results = map( 

71 functools.partial(_reduce, binop), 

72 chunks) 

73 else: 

74 results = map( 

75 functools.partial(_reduce, binop, initial=default), 

76 chunks) 

77 

78 results = list(results) # TODO: Support complete laziness 

79 

80 if len(results) == 1: # Return completed result 

81 return results[0] 

82 else: # Recurse to reaggregate intermediate results 

83 return fold(combine, results, map=map, chunksize=chunksize)