Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/externals/loky/backend/context.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

151 statements  

1############################################################################### 

2# Basic context management with LokyContext 

3# 

4# author: Thomas Moreau and Olivier Grisel 

5# 

6# adapted from multiprocessing/context.py 

7# * Create a context ensuring loky uses only objects that are compatible 

8# * Add LokyContext to the list of context of multiprocessing so loky can be 

9# used with multiprocessing.set_start_method 

10# * Implement a CFS-aware amd physical-core aware cpu_count function. 

11# 

12import os 

13import sys 

14import math 

15import subprocess 

16import traceback 

17import warnings 

18import multiprocessing as mp 

19from multiprocessing import get_context as mp_get_context 

20from multiprocessing.context import BaseContext 

21 

22 

23from .process import LokyProcess, LokyInitMainProcess 

24 

25# Apparently, on older Python versions, loky cannot work 61 workers on Windows 

26# but instead 60: ¯\_(ツ)_/¯ 

27if sys.version_info >= (3, 8): 

28 from concurrent.futures.process import _MAX_WINDOWS_WORKERS 

29 

30 if sys.version_info < (3, 10): 

31 _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1 

32else: 

33 # compat for versions before 3.8 which do not define this. 

34 _MAX_WINDOWS_WORKERS = 60 

35 

36START_METHODS = ["loky", "loky_init_main", "spawn"] 

37if sys.platform != "win32": 

38 START_METHODS += ["fork", "forkserver"] 

39 

40_DEFAULT_START_METHOD = None 

41 

42# Cache for the number of physical cores to avoid repeating subprocess calls. 

43# It should not change during the lifetime of the program. 

44physical_cores_cache = None 

45 

46 

47def get_context(method=None): 

48 # Try to overload the default context 

49 method = method or _DEFAULT_START_METHOD or "loky" 

50 if method == "fork": 

51 # If 'fork' is explicitly requested, warn user about potential issues. 

52 warnings.warn( 

53 "`fork` start method should not be used with " 

54 "`loky` as it does not respect POSIX. Try using " 

55 "`spawn` or `loky` instead.", 

56 UserWarning, 

57 ) 

58 try: 

59 return mp_get_context(method) 

60 except ValueError: 

61 raise ValueError( 

62 f"Unknown context '{method}'. Value should be in " 

63 f"{START_METHODS}." 

64 ) 

65 

66 

67def set_start_method(method, force=False): 

68 global _DEFAULT_START_METHOD 

69 if _DEFAULT_START_METHOD is not None and not force: 

70 raise RuntimeError("context has already been set") 

71 assert method is None or method in START_METHODS, ( 

72 f"'{method}' is not a valid start_method. It should be in " 

73 f"{START_METHODS}" 

74 ) 

75 

76 _DEFAULT_START_METHOD = method 

77 

78 

79def get_start_method(): 

80 return _DEFAULT_START_METHOD 

81 

82 

83def cpu_count(only_physical_cores=False): 

84 """Return the number of CPUs the current process can use. 

85 

86 The returned number of CPUs accounts for: 

87 * the number of CPUs in the system, as given by 

88 ``multiprocessing.cpu_count``; 

89 * the CPU affinity settings of the current process 

90 (available on some Unix systems); 

91 * Cgroup CPU bandwidth limit (available on Linux only, typically 

92 set by docker and similar container orchestration systems); 

93 * the value of the LOKY_MAX_CPU_COUNT environment variable if defined. 

94 and is given as the minimum of these constraints. 

95 

96 If ``only_physical_cores`` is True, return the number of physical cores 

97 instead of the number of logical cores (hyperthreading / SMT). Note that 

98 this option is not enforced if the number of usable cores is controlled in 

99 any other way such as: process affinity, Cgroup restricted CPU bandwidth 

100 or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical 

101 cores is not found, return the number of logical cores. 

102 

103 Note that on Windows, the returned number of CPUs cannot exceed 61 (or 60 for 

104 Python < 3.10), see: 

105 https://bugs.python.org/issue26903. 

106 

107 It is also always larger or equal to 1. 

108 """ 

109 # Note: os.cpu_count() is allowed to return None in its docstring 

110 os_cpu_count = os.cpu_count() or 1 

111 if sys.platform == "win32": 

112 # On Windows, attempting to use more than 61 CPUs would result in a 

113 # OS-level error. See https://bugs.python.org/issue26903. According to 

114 # https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups 

115 # it might be possible to go beyond with a lot of extra work but this 

116 # does not look easy. 

117 os_cpu_count = min(os_cpu_count, _MAX_WINDOWS_WORKERS) 

118 

119 cpu_count_user = _cpu_count_user(os_cpu_count) 

120 aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1) 

121 

122 if not only_physical_cores: 

123 return aggregate_cpu_count 

124 

125 if cpu_count_user < os_cpu_count: 

126 # Respect user setting 

127 return max(cpu_count_user, 1) 

128 

129 cpu_count_physical, exception = _count_physical_cores() 

130 if cpu_count_physical != "not found": 

131 return cpu_count_physical 

132 

133 # Fallback to default behavior 

134 if exception is not None: 

135 # warns only the first time 

136 warnings.warn( 

137 "Could not find the number of physical cores for the " 

138 f"following reason:\n{exception}\n" 

139 "Returning the number of logical cores instead. You can " 

140 "silence this warning by setting LOKY_MAX_CPU_COUNT to " 

141 "the number of cores you want to use." 

142 ) 

143 traceback.print_tb(exception.__traceback__) 

144 

145 return aggregate_cpu_count 

146 

147 

148def _cpu_count_cgroup(os_cpu_count): 

149 # Cgroup CPU bandwidth limit available in Linux since 2.6 kernel 

150 cpu_max_fname = "/sys/fs/cgroup/cpu.max" 

151 cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us" 

152 cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us" 

153 if os.path.exists(cpu_max_fname): 

154 # cgroup v2 

155 # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html 

156 with open(cpu_max_fname) as fh: 

157 cpu_quota_us, cpu_period_us = fh.read().strip().split() 

158 elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname): 

159 # cgroup v1 

160 # https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management 

161 with open(cfs_quota_fname) as fh: 

162 cpu_quota_us = fh.read().strip() 

163 with open(cfs_period_fname) as fh: 

164 cpu_period_us = fh.read().strip() 

165 else: 

166 # No Cgroup CPU bandwidth limit (e.g. non-Linux platform) 

167 cpu_quota_us = "max" 

168 cpu_period_us = 100_000 # unused, for consistency with default values 

169 

170 if cpu_quota_us == "max": 

171 # No active Cgroup quota on a Cgroup-capable platform 

172 return os_cpu_count 

173 else: 

174 cpu_quota_us = int(cpu_quota_us) 

175 cpu_period_us = int(cpu_period_us) 

176 if cpu_quota_us > 0 and cpu_period_us > 0: 

177 return math.ceil(cpu_quota_us / cpu_period_us) 

178 else: # pragma: no cover 

179 # Setting a negative cpu_quota_us value is a valid way to disable 

180 # cgroup CPU bandwith limits 

181 return os_cpu_count 

182 

183 

184def _cpu_count_affinity(os_cpu_count): 

185 # Number of available CPUs given affinity settings 

186 if hasattr(os, "sched_getaffinity"): 

187 try: 

188 return len(os.sched_getaffinity(0)) 

189 except NotImplementedError: 

190 pass 

191 

192 # On PyPy and possibly other platforms, os.sched_getaffinity does not exist 

193 # or raises NotImplementedError, let's try with the psutil if installed. 

194 try: 

195 import psutil 

196 

197 p = psutil.Process() 

198 if hasattr(p, "cpu_affinity"): 

199 return len(p.cpu_affinity()) 

200 

201 except ImportError: # pragma: no cover 

202 if ( 

203 sys.platform == "linux" 

204 and os.environ.get("LOKY_MAX_CPU_COUNT") is None 

205 ): 

206 # PyPy does not implement os.sched_getaffinity on Linux which 

207 # can cause severe oversubscription problems. Better warn the 

208 # user in this particularly pathological case which can wreck 

209 # havoc, typically on CI workers. 

210 warnings.warn( 

211 "Failed to inspect CPU affinity constraints on this system. " 

212 "Please install psutil or explictly set LOKY_MAX_CPU_COUNT." 

213 ) 

214 

215 # This can happen for platforms that do not implement any kind of CPU 

216 # infinity such as macOS-based platforms. 

217 return os_cpu_count 

218 

219 

220def _cpu_count_user(os_cpu_count): 

221 """Number of user defined available CPUs""" 

222 cpu_count_affinity = _cpu_count_affinity(os_cpu_count) 

223 

224 cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count) 

225 

226 # User defined soft-limit passed as a loky specific environment variable. 

227 cpu_count_loky = int(os.environ.get("LOKY_MAX_CPU_COUNT", os_cpu_count)) 

228 

229 return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky) 

230 

231 

232def _count_physical_cores(): 

233 """Return a tuple (number of physical cores, exception) 

234 

235 If the number of physical cores is found, exception is set to None. 

236 If it has not been found, return ("not found", exception). 

237 

238 The number of physical cores is cached to avoid repeating subprocess calls. 

239 """ 

240 exception = None 

241 

242 # First check if the value is cached 

243 global physical_cores_cache 

244 if physical_cores_cache is not None: 

245 return physical_cores_cache, exception 

246 

247 # Not cached yet, find it 

248 try: 

249 if sys.platform == "linux": 

250 cpu_info = subprocess.run( 

251 "lscpu --parse=core".split(), capture_output=True, text=True 

252 ) 

253 cpu_info = cpu_info.stdout.splitlines() 

254 cpu_info = {line for line in cpu_info if not line.startswith("#")} 

255 cpu_count_physical = len(cpu_info) 

256 elif sys.platform == "win32": 

257 cpu_info = subprocess.run( 

258 "wmic CPU Get NumberOfCores /Format:csv".split(), 

259 capture_output=True, 

260 text=True, 

261 ) 

262 cpu_info = cpu_info.stdout.splitlines() 

263 cpu_info = [ 

264 l.split(",")[1] 

265 for l in cpu_info 

266 if (l and l != "Node,NumberOfCores") 

267 ] 

268 cpu_count_physical = sum(map(int, cpu_info)) 

269 elif sys.platform == "darwin": 

270 cpu_info = subprocess.run( 

271 "sysctl -n hw.physicalcpu".split(), 

272 capture_output=True, 

273 text=True, 

274 ) 

275 cpu_info = cpu_info.stdout 

276 cpu_count_physical = int(cpu_info) 

277 else: 

278 raise NotImplementedError(f"unsupported platform: {sys.platform}") 

279 

280 # if cpu_count_physical < 1, we did not find a valid value 

281 if cpu_count_physical < 1: 

282 raise ValueError(f"found {cpu_count_physical} physical cores < 1") 

283 

284 except Exception as e: 

285 exception = e 

286 cpu_count_physical = "not found" 

287 

288 # Put the result in cache 

289 physical_cores_cache = cpu_count_physical 

290 

291 return cpu_count_physical, exception 

292 

293 

294class LokyContext(BaseContext): 

295 """Context relying on the LokyProcess.""" 

296 

297 _name = "loky" 

298 Process = LokyProcess 

299 cpu_count = staticmethod(cpu_count) 

300 

301 def Queue(self, maxsize=0, reducers=None): 

302 """Returns a queue object""" 

303 from .queues import Queue 

304 

305 return Queue(maxsize, reducers=reducers, ctx=self.get_context()) 

306 

307 def SimpleQueue(self, reducers=None): 

308 """Returns a queue object""" 

309 from .queues import SimpleQueue 

310 

311 return SimpleQueue(reducers=reducers, ctx=self.get_context()) 

312 

313 if sys.platform != "win32": 

314 """For Unix platform, use our custom implementation of synchronize 

315 ensuring that we use the loky.backend.resource_tracker to clean-up 

316 the semaphores in case of a worker crash. 

317 """ 

318 

319 def Semaphore(self, value=1): 

320 """Returns a semaphore object""" 

321 from .synchronize import Semaphore 

322 

323 return Semaphore(value=value) 

324 

325 def BoundedSemaphore(self, value): 

326 """Returns a bounded semaphore object""" 

327 from .synchronize import BoundedSemaphore 

328 

329 return BoundedSemaphore(value) 

330 

331 def Lock(self): 

332 """Returns a lock object""" 

333 from .synchronize import Lock 

334 

335 return Lock() 

336 

337 def RLock(self): 

338 """Returns a recurrent lock object""" 

339 from .synchronize import RLock 

340 

341 return RLock() 

342 

343 def Condition(self, lock=None): 

344 """Returns a condition object""" 

345 from .synchronize import Condition 

346 

347 return Condition(lock) 

348 

349 def Event(self): 

350 """Returns an event object""" 

351 from .synchronize import Event 

352 

353 return Event() 

354 

355 

356class LokyInitMainContext(LokyContext): 

357 """Extra context with LokyProcess, which does load the main module 

358 

359 This context is used for compatibility in the case ``cloudpickle`` is not 

360 present on the running system. This permits to load functions defined in 

361 the ``main`` module, using proper safeguards. The declaration of the 

362 ``executor`` should be protected by ``if __name__ == "__main__":`` and the 

363 functions and variable used from main should be out of this block. 

364 

365 This mimics the default behavior of multiprocessing under Windows and the 

366 behavior of the ``spawn`` start method on a posix system. 

367 For more details, see the end of the following section of python doc 

368 https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming 

369 """ 

370 

371 _name = "loky_init_main" 

372 Process = LokyInitMainProcess 

373 

374 

375# Register loky context so it works with multiprocessing.get_context 

376ctx_loky = LokyContext() 

377mp.context._concrete_contexts["loky"] = ctx_loky 

378mp.context._concrete_contexts["loky_init_main"] = LokyInitMainContext()