Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/concurrency.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

35 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8 

9"""Common functionality for concurrent processing. The main entry point is :func:`create_pool`.""" 

10 

11import concurrent.futures 

12import contextlib 

13import logging 

14from collections import deque 

15from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class ThreadPoolExecutor(_ThreadPoolExecutor): 

21 """Subclass with a lazy consuming imap method.""" 

22 

23 def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=2): 

24 """Ordered imap that consumes iterables just-in-time. 

25 

26 References: 

27 https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999 

28 

29 Args: 

30 fn: Function to apply. 

31 iterables: One (or more) iterable(s) to pass to fn (using zip) as positional argument(s). 

32 timeout: Per-future result retrieval timeout in seconds. 

33 queued_tasks_per_worker: Amount of additional items per worker to fetch from iterables to 

34 fill the queue: this determines the total queue size. 

35 Setting 0 will result in a true just-in-time behaviour: when a worker finishes a task, 

36 it waits until a result is consumed from the imap generator, at which point next() 

37 is called on the input iterable(s) and a new task is submitted. 

38 Default 2 ensures there is always some work to pick up. Note that at imap startup, 

39 the queue will fill up before the first yield occurs. 

40 

41 Example: 

42 long_generator = itertools.count() 

43 with ThreadPoolExecutor(42) as pool: 

44 result_generator = pool.imap(fn, long_generator) 

45 for result in result_generator: 

46 print(result) 

47 """ 

48 futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1) 

49 popleft, append, submit = futures.popleft, futures.append, self.submit 

50 

51 def get(): 

52 """Block until the next task is done and return the result.""" 

53 return popleft().result(timeout) 

54 

55 for args in zip(*iterables): 

56 append(submit(fn, *args)) 

57 if len(futures) == maxlen: 

58 yield get() 

59 

60 while futures: 

61 yield get() 

62 

63 

64# ConcurrentFuturesPool and create_pool were once used in smart_open.s3.iter_bucket. 

65# Left here for backwards compatibility. 

66 

67 

68class ConcurrentFuturesPool(object): 

69 """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes.""" 

70 def __init__(self, max_workers): 

71 self.executor = ThreadPoolExecutor(max_workers=max_workers) 

72 

73 def imap_unordered(self, function, items): 

74 futures = [self.executor.submit(function, item) for item in items] 

75 for future in concurrent.futures.as_completed(futures): 

76 yield future.result() 

77 

78 def terminate(self): 

79 self.executor.shutdown(wait=True) 

80 

81 

82@contextlib.contextmanager 

83def create_pool(processes=1): # arg is called processes due to historical reasons 

84 logger.info("creating concurrent futures pool with %i workers", processes) 

85 pool = ConcurrentFuturesPool(max_workers=processes) 

86 yield pool 

87 pool.terminate()