1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8
9"""Common functionality for concurrent processing. The main entry point is :func:`create_pool`."""
10
11import concurrent.futures
12import contextlib
13import logging
14from collections import deque
15from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
16
17logger = logging.getLogger(__name__)
18
19
20class ThreadPoolExecutor(_ThreadPoolExecutor):
21 """Subclass with a lazy consuming imap method."""
22
23 def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=2):
24 """Ordered imap that consumes iterables just-in-time.
25
26 References:
27 https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999
28
29 Args:
30 fn: Function to apply.
31 iterables: One (or more) iterable(s) to pass to fn (using zip) as positional argument(s).
32 timeout: Per-future result retrieval timeout in seconds.
33 queued_tasks_per_worker: Amount of additional items per worker to fetch from iterables to
34 fill the queue: this determines the total queue size.
35 Setting 0 will result in a true just-in-time behaviour: when a worker finishes a task,
36 it waits until a result is consumed from the imap generator, at which point next()
37 is called on the input iterable(s) and a new task is submitted.
38 Default 2 ensures there is always some work to pick up. Note that at imap startup,
39 the queue will fill up before the first yield occurs.
40
41 Example:
42 long_generator = itertools.count()
43 with ThreadPoolExecutor(42) as pool:
44 result_generator = pool.imap(fn, long_generator)
45 for result in result_generator:
46 print(result)
47 """
48 futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1)
49 popleft, append, submit = futures.popleft, futures.append, self.submit
50
51 def get():
52 """Block until the next task is done and return the result."""
53 return popleft().result(timeout)
54
55 for args in zip(*iterables):
56 append(submit(fn, *args))
57 if len(futures) == maxlen:
58 yield get()
59
60 while futures:
61 yield get()
62
63
64# ConcurrentFuturesPool and create_pool were once used in smart_open.s3.iter_bucket.
65# Left here for backwards compatibility.
66
67
68class ConcurrentFuturesPool(object):
69 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
70 def __init__(self, max_workers):
71 self.executor = ThreadPoolExecutor(max_workers=max_workers)
72
73 def imap_unordered(self, function, items):
74 futures = [self.executor.submit(function, item) for item in items]
75 for future in concurrent.futures.as_completed(futures):
76 yield future.result()
77
78 def terminate(self):
79 self.executor.shutdown(wait=True)
80
81
82@contextlib.contextmanager
83def create_pool(processes=1): # arg is called processes due to historical reasons
84 logger.info("creating concurrent futures pool with %i workers", processes)
85 pool = ConcurrentFuturesPool(max_workers=processes)
86 yield pool
87 pool.terminate()