Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/toolz/sandbox/parallel.py: 89%
19 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:47 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:47 +0000
1import functools
2from toolz.itertoolz import partition_all
3from toolz.utils import no_default
6def _reduce(func, seq, initial=None):
7 if initial is None:
8 return functools.reduce(func, seq)
9 else:
10 return functools.reduce(func, seq, initial)
13def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
14 """
15 Reduce without guarantee of ordered reduction.
17 inputs:
19 ``binop`` - associative operator. The associative property allows us to
20 leverage a parallel map to perform reductions in parallel.
21 ``seq`` - a sequence to be aggregated
22 ``default`` - an identity element like 0 for ``add`` or 1 for mul
24 ``map`` - an implementation of ``map``. This may be parallel and
25 determines how work is distributed.
26 ``chunksize`` - Number of elements of ``seq`` that should be handled
27 within a single function call
28 ``combine`` - Binary operator to combine two intermediate results.
29 If ``binop`` is of type (total, item) -> total
30 then ``combine`` is of type (total, total) -> total
31 Defaults to ``binop`` for common case of operators like add
33 Fold chunks up the collection into blocks of size ``chunksize`` and then
34 feeds each of these to calls to ``reduce``. This work is distributed
35 with a call to ``map``, gathered back and then refolded to finish the
36 computation. In this way ``fold`` specifies only how to chunk up data but
37 leaves the distribution of this work to an externally provided ``map``
38 function. This function can be sequential or rely on multithreading,
39 multiprocessing, or even distributed solutions.
41 If ``map`` intends to serialize functions it should be prepared to accept
42 and serialize lambdas. Note that the standard ``pickle`` module fails
43 here.
45 Example
46 -------
48 >>> # Provide a parallel map to accomplish a parallel sum
49 >>> from operator import add
50 >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
51 10
52 """
53 assert chunksize > 1
55 if combine is None:
56 combine = binop
58 chunks = partition_all(chunksize, seq)
60 # Evaluate sequence in chunks via map
61 if default == no_default:
62 results = map(
63 functools.partial(_reduce, binop),
64 chunks)
65 else:
66 results = map(
67 functools.partial(_reduce, binop, initial=default),
68 chunks)
70 results = list(results) # TODO: Support complete laziness
72 if len(results) == 1: # Return completed result
73 return results[0]
74 else: # Recurse to reaggregate intermediate results
75 return fold(combine, results, map=map, chunksize=chunksize)