Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/concurrency.py: 51%

43 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:57 +0000

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8 

9"""Common functionality for concurrent processing. 

10 

11The main entry point is :func:`create_pool`. 

12""" 

13 

14import contextlib 

15import logging 

16import warnings 

17 

18logger = logging.getLogger(__name__) 

19 

20# AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool. 

21# However they do support Threads and therefore concurrent.futures's ThreadPoolExecutor. 

22# We use this flag to allow python 2 backward compatibility, where concurrent.futures doesn't exist. 

23_CONCURRENT_FUTURES = False 

24try: 

25 import concurrent.futures 

26 _CONCURRENT_FUTURES = True 

27except ImportError: 

28 warnings.warn("concurrent.futures could not be imported and won't be used") 

29 

30# Multiprocessing is unavailable in App Engine (and possibly other sandboxes). 

31# The only method currently relying on it is iter_bucket, which is instructed 

32# whether to use it by the MULTIPROCESSING flag. 

33_MULTIPROCESSING = False 

34try: 

35 import multiprocessing.pool 

36 _MULTIPROCESSING = True 

37except ImportError: 

38 warnings.warn("multiprocessing could not be imported and won't be used") 

39 

40 

41class DummyPool(object): 

42 """A class that mimics multiprocessing.pool.Pool for our purposes.""" 

43 def imap_unordered(self, function, items): 

44 return map(function, items) 

45 

46 def terminate(self): 

47 pass 

48 

49 

50class ConcurrentFuturesPool(object): 

51 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes.""" 

52 def __init__(self, max_workers): 

53 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers) 

54 

55 def imap_unordered(self, function, items): 

56 futures = [self.executor.submit(function, item) for item in items] 

57 for future in concurrent.futures.as_completed(futures): 

58 yield future.result() 

59 

60 def terminate(self): 

61 self.executor.shutdown(wait=True) 

62 

63 

64@contextlib.contextmanager 

65def create_pool(processes=1): 

66 if _MULTIPROCESSING and processes: 

67 logger.info("creating multiprocessing pool with %i workers", processes) 

68 pool = multiprocessing.pool.Pool(processes=processes) 

69 elif _CONCURRENT_FUTURES and processes: 

70 logger.info("creating concurrent futures pool with %i workers", processes) 

71 pool = ConcurrentFuturesPool(max_workers=processes) 

72 else: 

73 logger.info("creating dummy pool") 

74 pool = DummyPool() 

75 yield pool 

76 pool.terminate()