1import functools
2from toolz.itertoolz import partition_all
3from toolz.utils import no_default
4
5
6def _reduce(func, seq, initial=None):
7 if initial is None:
8 return functools.reduce(func, seq)
9 else:
10 return functools.reduce(func, seq, initial)
11
12
13def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
14 """
15 Reduce without guarantee of ordered reduction.
16
17 Parameters
18 ----------
19 binops
20 Associative operator. The associative property allows us to
21 leverage a parallel map to perform reductions in parallel.
22
23
24 inputs:
25
26 ``binop`` - associative operator. The associative property allows us to
27 leverage a parallel map to perform reductions in parallel.
28
29 ``seq`` - a sequence to be aggregated
30 ``default`` - an identity element like 0 for ``add`` or 1 for mul
31
32 ``map`` - an implementation of ``map``. This may be parallel and
33 determines how work is distributed.
34 ``chunksize`` - Number of elements of ``seq`` that should be handled
35 within a single function call
36 ``combine`` - Binary operator to combine two intermediate results.
37 If ``binop`` is of type (total, item) -> total
38 then ``combine`` is of type (total, total) -> total
39 Defaults to ``binop`` for common case of operators like add
40
41 Fold chunks up the collection into blocks of size ``chunksize`` and then
42 feeds each of these to calls to ``reduce``. This work is distributed
43 with a call to ``map``, gathered back and then refolded to finish the
44 computation. In this way ``fold`` specifies only how to chunk up data but
45 leaves the distribution of this work to an externally provided ``map``
46 function. This function can be sequential or rely on multithreading,
47 multiprocessing, or even distributed solutions.
48
49 If ``map`` intends to serialize functions it should be prepared to accept
50 and serialize lambdas. Note that the standard ``pickle`` module fails
51 here.
52
53 Example
54 -------
55
56 >>> # Provide a parallel map to accomplish a parallel sum
57 >>> from operator import add
58 >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
59 10
60 """
61 assert chunksize > 1
62
63 if combine is None:
64 combine = binop
65
66 chunks = partition_all(chunksize, seq)
67
68 # Evaluate sequence in chunks via map
69 if default == no_default:
70 results = map(
71 functools.partial(_reduce, binop),
72 chunks)
73 else:
74 results = map(
75 functools.partial(_reduce, binop, initial=default),
76 chunks)
77
78 results = list(results) # TODO: Support complete laziness
79
80 if len(results) == 1: # Return completed result
81 return results[0]
82 else: # Recurse to reaggregate intermediate results
83 return fold(combine, results, map=map, chunksize=chunksize)