Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/executor.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

39 statements  

1"""Utility function to construct a loky.ReusableExecutor with custom pickler. 

2 

3This module provides efficient ways of working with data stored in 

4shared memory with numpy.memmap arrays without inducing any memory 

5copy between the parent and child processes. 

6""" 

7# Author: Thomas Moreau <thomas.moreau.2010@gmail.com> 

8# Copyright: 2017, Thomas Moreau 

9# License: BSD 3 clause 

10 

11from ._memmapping_reducer import TemporaryResourcesManager, get_memmapping_reducers 

12from .externals.loky.reusable_executor import _ReusablePoolExecutor 

13 

14_executor_args = None 

15 

16 

17def get_memmapping_executor(n_jobs, **kwargs): 

18 return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs) 

19 

20 

21class MemmappingExecutor(_ReusablePoolExecutor): 

22 @classmethod 

23 def get_memmapping_executor( 

24 cls, 

25 n_jobs, 

26 timeout=300, 

27 initializer=None, 

28 initargs=(), 

29 env=None, 

30 temp_folder=None, 

31 context_id=None, 

32 **backend_args, 

33 ): 

34 """Factory for ReusableExecutor with automatic memmapping for large 

35 numpy arrays. 

36 """ 

37 global _executor_args 

38 # Check if we can reuse the executor here instead of deferring the test 

39 # to loky as the reducers are objects that changes at each call. 

40 executor_args = backend_args.copy() 

41 executor_args.update(env if env else {}) 

42 executor_args.update( 

43 dict(timeout=timeout, initializer=initializer, initargs=initargs) 

44 ) 

45 reuse = _executor_args is None or _executor_args == executor_args 

46 _executor_args = executor_args 

47 

48 manager = TemporaryResourcesManager(temp_folder) 

49 

50 # reducers access the temporary folder in which to store temporary 

51 # pickles through a call to manager.resolve_temp_folder_name. resolving 

52 # the folder name dynamically is useful to use different folders across 

53 # calls of a same reusable executor 

54 job_reducers, result_reducers = get_memmapping_reducers( 

55 unlink_on_gc_collect=True, 

56 temp_folder_resolver=manager.resolve_temp_folder_name, 

57 **backend_args, 

58 ) 

59 _executor, executor_is_reused = super().get_reusable_executor( 

60 n_jobs, 

61 job_reducers=job_reducers, 

62 result_reducers=result_reducers, 

63 reuse=reuse, 

64 timeout=timeout, 

65 initializer=initializer, 

66 initargs=initargs, 

67 env=env, 

68 ) 

69 

70 if not executor_is_reused: 

71 # Only set a _temp_folder_manager for new executors. Reused 

72 # executors already have a _temporary_folder_manager that must not 

73 # be re-assigned like that because it is referenced in various 

74 # places in the reducing machinery of the executor. 

75 _executor._temp_folder_manager = manager 

76 

77 if context_id is not None: 

78 # Only register the specified context once we know which manager 

79 # the current executor is using, in order to not register an atexit 

80 # finalizer twice for the same folder. 

81 _executor._temp_folder_manager.register_new_context(context_id) 

82 

83 return _executor 

84 

85 def terminate(self, kill_workers=False): 

86 self.shutdown(kill_workers=kill_workers) 

87 

88 # When workers are killed in a brutal manner, they cannot execute the 

89 # finalizer of their shared memmaps. The refcount of those memmaps may 

90 # be off by an unknown number, so instead of decref'ing them, we force 

91 # delete the whole temporary folder, and unregister them. There is no 

92 # risk of PermissionError at folder deletion because at this 

93 # point, all child processes are dead, so all references to temporary 

94 # memmaps are closed. Otherwise, just try to delete as much as possible 

95 # with allow_non_empty=True but if we can't, it will be clean up later 

96 # on by the resource_tracker. 

97 with self._submit_resize_lock: 

98 self._temp_folder_manager._clean_temporary_resources( 

99 force=kill_workers, allow_non_empty=True 

100 ) 

101 

102 @property 

103 def _temp_folder(self): 

104 # Legacy property in tests. could be removed if we refactored the 

105 # memmapping tests. SHOULD ONLY BE USED IN TESTS! 

106 # We cache this property because it is called late in the tests - at 

107 # this point, all context have been unregistered, and 

108 # resolve_temp_folder_name raises an error. 

109 if getattr(self, "_cached_temp_folder", None) is not None: 

110 return self._cached_temp_folder 

111 else: 

112 self._cached_temp_folder = ( 

113 self._temp_folder_manager.resolve_temp_folder_name() 

114 ) # noqa 

115 return self._cached_temp_folder 

116 

117 

118class _TestingMemmappingExecutor(MemmappingExecutor): 

119 """Wrapper around ReusableExecutor to ease memmapping testing with Pool 

120 and Executor. This is only for testing purposes. 

121 

122 """ 

123 

124 def apply_async(self, func, args): 

125 """Schedule a func to be run""" 

126 future = self.submit(func, *args) 

127 future.get = future.result 

128 return future 

129 

130 def map(self, f, *args): 

131 return list(super().map(f, *args))