1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
8
9"""Common functionality for concurrent processing. The main entry point is :func:`create_pool`."""
10
11import contextlib
12import logging
13import warnings
14
15logger = logging.getLogger(__name__)
16
17# AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool.
18# However they do support Threads and therefore concurrent.futures's ThreadPoolExecutor.
19# We use this flag to allow python 2 backward compatibility, where concurrent.futures doesn't exist.
20_CONCURRENT_FUTURES = False
21try:
22 import concurrent.futures
23 _CONCURRENT_FUTURES = True
24except ImportError:
25 warnings.warn("concurrent.futures could not be imported and won't be used")
26
27# Multiprocessing is unavailable in App Engine (and possibly other sandboxes).
28# The only method currently relying on it is iter_bucket, which is instructed
29# whether to use it by the MULTIPROCESSING flag.
30_MULTIPROCESSING = False
31try:
32 import multiprocessing.pool
33 _MULTIPROCESSING = True
34except ImportError:
35 warnings.warn("multiprocessing could not be imported and won't be used")
36
37
38class DummyPool(object):
39 """A class that mimics multiprocessing.pool.Pool for our purposes."""
40 def imap_unordered(self, function, items):
41 return map(function, items)
42
43 def terminate(self):
44 pass
45
46
47class ConcurrentFuturesPool(object):
48 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
49 def __init__(self, max_workers):
50 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers)
51
52 def imap_unordered(self, function, items):
53 futures = [self.executor.submit(function, item) for item in items]
54 for future in concurrent.futures.as_completed(futures):
55 yield future.result()
56
57 def terminate(self):
58 self.executor.shutdown(wait=True)
59
60
61@contextlib.contextmanager
62def create_pool(processes=1):
63 if _MULTIPROCESSING and processes:
64 logger.info("creating multiprocessing pool with %i workers", processes)
65 pool = multiprocessing.pool.Pool(processes=processes)
66 elif _CONCURRENT_FUTURES and processes:
67 logger.info("creating concurrent futures pool with %i workers", processes)
68 pool = ConcurrentFuturesPool(max_workers=processes)
69 else:
70 logger.info("creating dummy pool")
71 pool = DummyPool()
72 yield pool
73 pool.terminate()