Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/concurrency.py: 51%
43 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:57 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:57 +0000
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
9"""Common functionality for concurrent processing.
11The main entry point is :func:`create_pool`.
12"""
14import contextlib
15import logging
16import warnings
18logger = logging.getLogger(__name__)
20# AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool.
21# However they do support Threads and therefore concurrent.futures's ThreadPoolExecutor.
22# We use this flag to allow python 2 backward compatibility, where concurrent.futures doesn't exist.
23_CONCURRENT_FUTURES = False
24try:
25 import concurrent.futures
26 _CONCURRENT_FUTURES = True
27except ImportError:
28 warnings.warn("concurrent.futures could not be imported and won't be used")
30# Multiprocessing is unavailable in App Engine (and possibly other sandboxes).
31# The only method currently relying on it is iter_bucket, which is instructed
32# whether to use it by the MULTIPROCESSING flag.
33_MULTIPROCESSING = False
34try:
35 import multiprocessing.pool
36 _MULTIPROCESSING = True
37except ImportError:
38 warnings.warn("multiprocessing could not be imported and won't be used")
41class DummyPool(object):
42 """A class that mimics multiprocessing.pool.Pool for our purposes."""
43 def imap_unordered(self, function, items):
44 return map(function, items)
46 def terminate(self):
47 pass
50class ConcurrentFuturesPool(object):
51 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
52 def __init__(self, max_workers):
53 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers)
55 def imap_unordered(self, function, items):
56 futures = [self.executor.submit(function, item) for item in items]
57 for future in concurrent.futures.as_completed(futures):
58 yield future.result()
60 def terminate(self):
61 self.executor.shutdown(wait=True)
64@contextlib.contextmanager
65def create_pool(processes=1):
66 if _MULTIPROCESSING and processes:
67 logger.info("creating multiprocessing pool with %i workers", processes)
68 pool = multiprocessing.pool.Pool(processes=processes)
69 elif _CONCURRENT_FUTURES and processes:
70 logger.info("creating concurrent futures pool with %i workers", processes)
71 pool = ConcurrentFuturesPool(max_workers=processes)
72 else:
73 logger.info("creating dummy pool")
74 pool = DummyPool()
75 yield pool
76 pool.terminate()