Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/concurrency.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

44 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8 

9"""Common functionality for concurrent processing. The main entry point is :func:`create_pool`.""" 

10 

11import contextlib 

12import logging 

13import warnings 

14 

15logger = logging.getLogger(__name__) 

16 

17# AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool. 

18# However they do support Threads and therefore concurrent.futures's ThreadPoolExecutor. 

19# We use this flag to allow python 2 backward compatibility, where concurrent.futures doesn't exist. 

20_CONCURRENT_FUTURES = False 

21try: 

22 import concurrent.futures 

23 _CONCURRENT_FUTURES = True 

24except ImportError: 

25 warnings.warn("concurrent.futures could not be imported and won't be used") 

26 

27# Multiprocessing is unavailable in App Engine (and possibly other sandboxes). 

28# The only method currently relying on it is iter_bucket, which is instructed 

29# whether to use it by the MULTIPROCESSING flag. 

30_MULTIPROCESSING = False 

31try: 

32 import multiprocessing.pool 

33 _MULTIPROCESSING = True 

34except ImportError: 

35 warnings.warn("multiprocessing could not be imported and won't be used") 

36 

37 

38class DummyPool(object): 

39 """A class that mimics multiprocessing.pool.Pool for our purposes.""" 

40 def imap_unordered(self, function, items): 

41 return map(function, items) 

42 

43 def terminate(self): 

44 pass 

45 

46 

47class ConcurrentFuturesPool(object): 

48 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes.""" 

49 def __init__(self, max_workers): 

50 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers) 

51 

52 def imap_unordered(self, function, items): 

53 futures = [self.executor.submit(function, item) for item in items] 

54 for future in concurrent.futures.as_completed(futures): 

55 yield future.result() 

56 

57 def terminate(self): 

58 self.executor.shutdown(wait=True) 

59 

60 

61@contextlib.contextmanager 

62def create_pool(processes=1): 

63 if _MULTIPROCESSING and processes: 

64 logger.info("creating multiprocessing pool with %i workers", processes) 

65 pool = multiprocessing.pool.Pool(processes=processes) 

66 elif _CONCURRENT_FUTURES and processes: 

67 logger.info("creating concurrent futures pool with %i workers", processes) 

68 pool = ConcurrentFuturesPool(max_workers=processes) 

69 else: 

70 logger.info("creating dummy pool") 

71 pool = DummyPool() 

72 yield pool 

73 pool.terminate()