Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/toolz/sandbox/parallel.py: 89%

19 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:47 +0000

1import functools 

2from toolz.itertoolz import partition_all 

3from toolz.utils import no_default 

4 

5 

6def _reduce(func, seq, initial=None): 

7 if initial is None: 

8 return functools.reduce(func, seq) 

9 else: 

10 return functools.reduce(func, seq, initial) 

11 

12 

13def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): 

14 """ 

15 Reduce without guarantee of ordered reduction. 

16 

17 inputs: 

18 

19 ``binop`` - associative operator. The associative property allows us to 

20 leverage a parallel map to perform reductions in parallel. 

21 ``seq`` - a sequence to be aggregated 

22 ``default`` - an identity element like 0 for ``add`` or 1 for mul 

23 

24 ``map`` - an implementation of ``map``. This may be parallel and 

25 determines how work is distributed. 

26 ``chunksize`` - Number of elements of ``seq`` that should be handled 

27 within a single function call 

28 ``combine`` - Binary operator to combine two intermediate results. 

29 If ``binop`` is of type (total, item) -> total 

30 then ``combine`` is of type (total, total) -> total 

31 Defaults to ``binop`` for common case of operators like add 

32 

33 Fold chunks up the collection into blocks of size ``chunksize`` and then 

34 feeds each of these to calls to ``reduce``. This work is distributed 

35 with a call to ``map``, gathered back and then refolded to finish the 

36 computation. In this way ``fold`` specifies only how to chunk up data but 

37 leaves the distribution of this work to an externally provided ``map`` 

38 function. This function can be sequential or rely on multithreading, 

39 multiprocessing, or even distributed solutions. 

40 

41 If ``map`` intends to serialize functions it should be prepared to accept 

42 and serialize lambdas. Note that the standard ``pickle`` module fails 

43 here. 

44 

45 Example 

46 ------- 

47 

48 >>> # Provide a parallel map to accomplish a parallel sum 

49 >>> from operator import add 

50 >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map) 

51 10 

52 """ 

53 assert chunksize > 1 

54 

55 if combine is None: 

56 combine = binop 

57 

58 chunks = partition_all(chunksize, seq) 

59 

60 # Evaluate sequence in chunks via map 

61 if default == no_default: 

62 results = map( 

63 functools.partial(_reduce, binop), 

64 chunks) 

65 else: 

66 results = map( 

67 functools.partial(_reduce, binop, initial=default), 

68 chunks) 

69 

70 results = list(results) # TODO: Support complete laziness 

71 

72 if len(results) == 1: # Return completed result 

73 return results[0] 

74 else: # Recurse to reaggregate intermediate results 

75 return fold(combine, results, map=map, chunksize=chunksize)