1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8
9"""Common functionality for concurrent processing. The main entry point is :func:`create_pool`."""
10
11import concurrent.futures
12import contextlib
13import logging
14
15logger = logging.getLogger(__name__)
16
17
18class ConcurrentFuturesPool(object):
19 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
20 def __init__(self, max_workers):
21 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
22
23 def imap_unordered(self, function, items):
24 futures = [self.executor.submit(function, item) for item in items]
25 for future in concurrent.futures.as_completed(futures):
26 yield future.result()
27
28 def terminate(self):
29 self.executor.shutdown(wait=True)
30
31
32@contextlib.contextmanager
33def create_pool(processes=1): # arg is called processes due to historical reasons
34 logger.info("creating concurrent futures pool with %i workers", processes)
35 pool = ConcurrentFuturesPool(max_workers=processes)
36 yield pool
37 pool.terminate()