Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/executor.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

39 statements  

1"""Utility function to construct a loky.ReusableExecutor with custom pickler. 

2 

3This module provides efficient ways of working with data stored in 

4shared memory with numpy.memmap arrays without inducing any memory 

5copy between the parent and child processes. 

6""" 

7# Author: Thomas Moreau <thomas.moreau.2010@gmail.com> 

8# Copyright: 2017, Thomas Moreau 

9# License: BSD 3 clause 

10 

11from ._memmapping_reducer import get_memmapping_reducers 

12from ._memmapping_reducer import TemporaryResourcesManager 

13from .externals.loky.reusable_executor import _ReusablePoolExecutor 

14 

15 

16_executor_args = None 

17 

18 

19def get_memmapping_executor(n_jobs, **kwargs): 

20 return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs) 

21 

22 

23class MemmappingExecutor(_ReusablePoolExecutor): 

24 

25 @classmethod 

26 def get_memmapping_executor(cls, n_jobs, timeout=300, initializer=None, 

27 initargs=(), env=None, temp_folder=None, 

28 context_id=None, **backend_args): 

29 """Factory for ReusableExecutor with automatic memmapping for large 

30 numpy arrays. 

31 """ 

32 global _executor_args 

33 # Check if we can reuse the executor here instead of deferring the test 

34 # to loky as the reducers are objects that changes at each call. 

35 executor_args = backend_args.copy() 

36 executor_args.update(env if env else {}) 

37 executor_args.update(dict( 

38 timeout=timeout, initializer=initializer, initargs=initargs)) 

39 reuse = _executor_args is None or _executor_args == executor_args 

40 _executor_args = executor_args 

41 

42 manager = TemporaryResourcesManager(temp_folder) 

43 

44 # reducers access the temporary folder in which to store temporary 

45 # pickles through a call to manager.resolve_temp_folder_name. resolving 

46 # the folder name dynamically is useful to use different folders across 

47 # calls of a same reusable executor 

48 job_reducers, result_reducers = get_memmapping_reducers( 

49 unlink_on_gc_collect=True, 

50 temp_folder_resolver=manager.resolve_temp_folder_name, 

51 **backend_args) 

52 _executor, executor_is_reused = super().get_reusable_executor( 

53 n_jobs, job_reducers=job_reducers, result_reducers=result_reducers, 

54 reuse=reuse, timeout=timeout, initializer=initializer, 

55 initargs=initargs, env=env 

56 ) 

57 

58 if not executor_is_reused: 

59 # Only set a _temp_folder_manager for new executors. Reused 

60 # executors already have a _temporary_folder_manager that must not 

61 # be re-assigned like that because it is referenced in various 

62 # places in the reducing machinery of the executor. 

63 _executor._temp_folder_manager = manager 

64 

65 if context_id is not None: 

66 # Only register the specified context once we know which manager 

67 # the current executor is using, in order to not register an atexit 

68 # finalizer twice for the same folder. 

69 _executor._temp_folder_manager.register_new_context(context_id) 

70 

71 return _executor 

72 

73 def terminate(self, kill_workers=False): 

74 

75 self.shutdown(kill_workers=kill_workers) 

76 

77 # When workers are killed in a brutal manner, they cannot execute the 

78 # finalizer of their shared memmaps. The refcount of those memmaps may 

79 # be off by an unknown number, so instead of decref'ing them, we force 

80 # delete the whole temporary folder, and unregister them. There is no 

81 # risk of PermissionError at folder deletion because at this 

82 # point, all child processes are dead, so all references to temporary 

83 # memmaps are closed. Otherwise, just try to delete as much as possible 

84 # with allow_non_empty=True but if we can't, it will be clean up later 

85 # on by the resource_tracker. 

86 with self._submit_resize_lock: 

87 self._temp_folder_manager._clean_temporary_resources( 

88 force=kill_workers, allow_non_empty=True 

89 ) 

90 

91 @property 

92 def _temp_folder(self): 

93 # Legacy property in tests. could be removed if we refactored the 

94 # memmapping tests. SHOULD ONLY BE USED IN TESTS! 

95 # We cache this property because it is called late in the tests - at 

96 # this point, all context have been unregistered, and 

97 # resolve_temp_folder_name raises an error. 

98 if getattr(self, '_cached_temp_folder', None) is not None: 

99 return self._cached_temp_folder 

100 else: 

101 self._cached_temp_folder = self._temp_folder_manager.resolve_temp_folder_name() # noqa 

102 return self._cached_temp_folder 

103 

104 

105class _TestingMemmappingExecutor(MemmappingExecutor): 

106 """Wrapper around ReusableExecutor to ease memmapping testing with Pool 

107 and Executor. This is only for testing purposes. 

108 

109 """ 

110 def apply_async(self, func, args): 

111 """Schedule a func to be run""" 

112 future = self.submit(func, *args) 

113 future.get = future.result 

114 return future 

115 

116 def map(self, f, *args): 

117 return list(super().map(f, *args))