Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/externals/loky/backend/context.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1############################################################################### 

2# Basic context management with LokyContext 

3# 

4# author: Thomas Moreau and Olivier Grisel 

5# 

6# adapted from multiprocessing/context.py 

7# * Create a context ensuring loky uses only objects that are compatible 

8# * Add LokyContext to the list of context of multiprocessing so loky can be 

9# used with multiprocessing.set_start_method 

10# * Implement a CFS-aware amd physical-core aware cpu_count function. 

11# 

12import os 

13import sys 

14import math 

15import subprocess 

16import traceback 

17import warnings 

18import multiprocessing as mp 

19from multiprocessing import get_context as mp_get_context 

20from multiprocessing.context import BaseContext 

21from concurrent.futures.process import _MAX_WINDOWS_WORKERS 

22 

23 

24from .process import LokyProcess, LokyInitMainProcess 

25 

26# Apparently, on older Python versions, loky cannot work 61 workers on Windows 

27# but instead 60: ¯\_(ツ)_/¯ 

28if sys.version_info < (3, 10): 

29 _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1 

30 

31START_METHODS = ["loky", "loky_init_main", "spawn"] 

32if sys.platform != "win32": 

33 START_METHODS += ["fork", "forkserver"] 

34 

35_DEFAULT_START_METHOD = None 

36 

37# Cache for the number of physical cores to avoid repeating subprocess calls. 

38# It should not change during the lifetime of the program. 

39physical_cores_cache = None 

40 

41 

42def get_context(method=None): 

43 # Try to overload the default context 

44 method = method or _DEFAULT_START_METHOD or "loky" 

45 if method == "fork": 

46 # If 'fork' is explicitly requested, warn user about potential issues. 

47 warnings.warn( 

48 "`fork` start method should not be used with " 

49 "`loky` as it does not respect POSIX. Try using " 

50 "`spawn` or `loky` instead.", 

51 UserWarning, 

52 ) 

53 try: 

54 return mp_get_context(method) 

55 except ValueError: 

56 raise ValueError( 

57 f"Unknown context '{method}'. Value should be in " 

58 f"{START_METHODS}." 

59 ) 

60 

61 

62def set_start_method(method, force=False): 

63 global _DEFAULT_START_METHOD 

64 if _DEFAULT_START_METHOD is not None and not force: 

65 raise RuntimeError("context has already been set") 

66 assert method is None or method in START_METHODS, ( 

67 f"'{method}' is not a valid start_method. It should be in " 

68 f"{START_METHODS}" 

69 ) 

70 

71 _DEFAULT_START_METHOD = method 

72 

73 

74def get_start_method(): 

75 return _DEFAULT_START_METHOD 

76 

77 

78def cpu_count(only_physical_cores=False): 

79 """Return the number of CPUs the current process can use. 

80 

81 The returned number of CPUs accounts for: 

82 * the number of CPUs in the system, as given by 

83 ``multiprocessing.cpu_count``; 

84 * the CPU affinity settings of the current process 

85 (available on some Unix systems); 

86 * Cgroup CPU bandwidth limit (available on Linux only, typically 

87 set by docker and similar container orchestration systems); 

88 * the value of the LOKY_MAX_CPU_COUNT environment variable if defined. 

89 and is given as the minimum of these constraints. 

90 

91 If ``only_physical_cores`` is True, return the number of physical cores 

92 instead of the number of logical cores (hyperthreading / SMT). Note that 

93 this option is not enforced if the number of usable cores is controlled in 

94 any other way such as: process affinity, Cgroup restricted CPU bandwidth 

95 or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical 

96 cores is not found, return the number of logical cores. 

97 

98 Note that on Windows, the returned number of CPUs cannot exceed 61 (or 60 for 

99 Python < 3.10), see: 

100 https://bugs.python.org/issue26903. 

101 

102 It is also always larger or equal to 1. 

103 """ 

104 # Note: os.cpu_count() is allowed to return None in its docstring 

105 os_cpu_count = os.cpu_count() or 1 

106 if sys.platform == "win32": 

107 # On Windows, attempting to use more than 61 CPUs would result in a 

108 # OS-level error. See https://bugs.python.org/issue26903. According to 

109 # https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups 

110 # it might be possible to go beyond with a lot of extra work but this 

111 # does not look easy. 

112 os_cpu_count = min(os_cpu_count, _MAX_WINDOWS_WORKERS) 

113 

114 cpu_count_user = _cpu_count_user(os_cpu_count) 

115 aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1) 

116 

117 if not only_physical_cores: 

118 return aggregate_cpu_count 

119 

120 if cpu_count_user < os_cpu_count: 

121 # Respect user setting 

122 return max(cpu_count_user, 1) 

123 

124 cpu_count_physical, exception = _count_physical_cores() 

125 if cpu_count_physical != "not found": 

126 return cpu_count_physical 

127 

128 # Fallback to default behavior 

129 if exception is not None: 

130 # warns only the first time 

131 warnings.warn( 

132 "Could not find the number of physical cores for the " 

133 f"following reason:\n{exception}\n" 

134 "Returning the number of logical cores instead. You can " 

135 "silence this warning by setting LOKY_MAX_CPU_COUNT to " 

136 "the number of cores you want to use." 

137 ) 

138 traceback.print_tb(exception.__traceback__) 

139 

140 return aggregate_cpu_count 

141 

142 

143def _cpu_count_cgroup(os_cpu_count): 

144 # Cgroup CPU bandwidth limit available in Linux since 2.6 kernel 

145 cpu_max_fname = "/sys/fs/cgroup/cpu.max" 

146 cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us" 

147 cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us" 

148 if os.path.exists(cpu_max_fname): 

149 # cgroup v2 

150 # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html 

151 with open(cpu_max_fname) as fh: 

152 cpu_quota_us, cpu_period_us = fh.read().strip().split() 

153 elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname): 

154 # cgroup v1 

155 # https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management 

156 with open(cfs_quota_fname) as fh: 

157 cpu_quota_us = fh.read().strip() 

158 with open(cfs_period_fname) as fh: 

159 cpu_period_us = fh.read().strip() 

160 else: 

161 # No Cgroup CPU bandwidth limit (e.g. non-Linux platform) 

162 cpu_quota_us = "max" 

163 cpu_period_us = 100_000 # unused, for consistency with default values 

164 

165 if cpu_quota_us == "max": 

166 # No active Cgroup quota on a Cgroup-capable platform 

167 return os_cpu_count 

168 else: 

169 cpu_quota_us = int(cpu_quota_us) 

170 cpu_period_us = int(cpu_period_us) 

171 if cpu_quota_us > 0 and cpu_period_us > 0: 

172 return math.ceil(cpu_quota_us / cpu_period_us) 

173 else: # pragma: no cover 

174 # Setting a negative cpu_quota_us value is a valid way to disable 

175 # cgroup CPU bandwith limits 

176 return os_cpu_count 

177 

178 

179def _cpu_count_affinity(os_cpu_count): 

180 # Number of available CPUs given affinity settings 

181 if hasattr(os, "sched_getaffinity"): 

182 try: 

183 return len(os.sched_getaffinity(0)) 

184 except NotImplementedError: 

185 pass 

186 

187 # On some platforms, os.sched_getaffinity does not exist or raises 

188 # NotImplementedError, let's try with the psutil if installed. 

189 try: 

190 import psutil 

191 

192 p = psutil.Process() 

193 if hasattr(p, "cpu_affinity"): 

194 return len(p.cpu_affinity()) 

195 

196 except ImportError: # pragma: no cover 

197 if ( 

198 sys.platform == "linux" 

199 and os.environ.get("LOKY_MAX_CPU_COUNT") is None 

200 ): 

201 # Some platforms don't implement os.sched_getaffinity on Linux which 

202 # can cause severe oversubscription problems. Better warn the 

203 # user in this particularly pathological case which can wreck 

204 # havoc, typically on CI workers. 

205 warnings.warn( 

206 "Failed to inspect CPU affinity constraints on this system. " 

207 "Please install psutil or explictly set LOKY_MAX_CPU_COUNT." 

208 ) 

209 

210 # This can happen for platforms that do not implement any kind of CPU 

211 # infinity such as macOS-based platforms. 

212 return os_cpu_count 

213 

214 

215def _cpu_count_user(os_cpu_count): 

216 """Number of user defined available CPUs""" 

217 cpu_count_affinity = _cpu_count_affinity(os_cpu_count) 

218 

219 cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count) 

220 

221 # User defined soft-limit passed as a loky specific environment variable. 

222 cpu_count_loky = int(os.environ.get("LOKY_MAX_CPU_COUNT", os_cpu_count)) 

223 

224 return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky) 

225 

226 

227def _count_physical_cores(): 

228 """Return a tuple (number of physical cores, exception) 

229 

230 If the number of physical cores is found, exception is set to None. 

231 If it has not been found, return ("not found", exception). 

232 

233 The number of physical cores is cached to avoid repeating subprocess calls. 

234 """ 

235 exception = None 

236 

237 # First check if the value is cached 

238 global physical_cores_cache 

239 if physical_cores_cache is not None: 

240 return physical_cores_cache, exception 

241 

242 # Not cached yet, find it 

243 try: 

244 if sys.platform == "linux": 

245 cpu_count_physical = _count_physical_cores_linux() 

246 elif sys.platform == "win32": 

247 cpu_count_physical = _count_physical_cores_win32() 

248 elif sys.platform == "darwin": 

249 cpu_count_physical = _count_physical_cores_darwin() 

250 else: 

251 raise NotImplementedError(f"unsupported platform: {sys.platform}") 

252 

253 # if cpu_count_physical < 1, we did not find a valid value 

254 if cpu_count_physical < 1: 

255 raise ValueError(f"found {cpu_count_physical} physical cores < 1") 

256 

257 except Exception as e: 

258 exception = e 

259 cpu_count_physical = "not found" 

260 

261 # Put the result in cache 

262 physical_cores_cache = cpu_count_physical 

263 

264 return cpu_count_physical, exception 

265 

266 

267def _count_physical_cores_linux(): 

268 try: 

269 cpu_info = subprocess.run( 

270 "lscpu --parse=core".split(), capture_output=True, text=True 

271 ) 

272 cpu_info = cpu_info.stdout.splitlines() 

273 cpu_info = {line for line in cpu_info if not line.startswith("#")} 

274 return len(cpu_info) 

275 except: 

276 pass # fallback to /proc/cpuinfo 

277 

278 cpu_info = subprocess.run( 

279 "cat /proc/cpuinfo".split(), capture_output=True, text=True 

280 ) 

281 cpu_info = cpu_info.stdout.splitlines() 

282 cpu_info = {line for line in cpu_info if line.startswith("core id")} 

283 return len(cpu_info) 

284 

285 

286def _count_physical_cores_win32(): 

287 try: 

288 cmd = "-Command (Get-CimInstance -ClassName Win32_Processor).NumberOfCores" 

289 cpu_info = subprocess.run( 

290 f"powershell.exe {cmd}".split(), 

291 capture_output=True, 

292 text=True, 

293 ) 

294 cpu_info = cpu_info.stdout.splitlines() 

295 return int(cpu_info[0]) 

296 except: 

297 pass # fallback to wmic (older Windows versions; deprecated now) 

298 

299 cpu_info = subprocess.run( 

300 "wmic CPU Get NumberOfCores /Format:csv".split(), 

301 capture_output=True, 

302 text=True, 

303 ) 

304 cpu_info = cpu_info.stdout.splitlines() 

305 cpu_info = [ 

306 l.split(",")[1] for l in cpu_info if (l and l != "Node,NumberOfCores") 

307 ] 

308 return sum(map(int, cpu_info)) 

309 

310 

311def _count_physical_cores_darwin(): 

312 cpu_info = subprocess.run( 

313 "sysctl -n hw.physicalcpu".split(), 

314 capture_output=True, 

315 text=True, 

316 ) 

317 cpu_info = cpu_info.stdout 

318 return int(cpu_info) 

319 

320 

321class LokyContext(BaseContext): 

322 """Context relying on the LokyProcess.""" 

323 

324 _name = "loky" 

325 Process = LokyProcess 

326 cpu_count = staticmethod(cpu_count) 

327 

328 def Queue(self, maxsize=0, reducers=None): 

329 """Returns a queue object""" 

330 from .queues import Queue 

331 

332 return Queue(maxsize, reducers=reducers, ctx=self.get_context()) 

333 

334 def SimpleQueue(self, reducers=None): 

335 """Returns a queue object""" 

336 from .queues import SimpleQueue 

337 

338 return SimpleQueue(reducers=reducers, ctx=self.get_context()) 

339 

340 if sys.platform != "win32": 

341 """For Unix platform, use our custom implementation of synchronize 

342 ensuring that we use the loky.backend.resource_tracker to clean-up 

343 the semaphores in case of a worker crash. 

344 """ 

345 

346 def Semaphore(self, value=1): 

347 """Returns a semaphore object""" 

348 from .synchronize import Semaphore 

349 

350 return Semaphore(value=value) 

351 

352 def BoundedSemaphore(self, value): 

353 """Returns a bounded semaphore object""" 

354 from .synchronize import BoundedSemaphore 

355 

356 return BoundedSemaphore(value) 

357 

358 def Lock(self): 

359 """Returns a lock object""" 

360 from .synchronize import Lock 

361 

362 return Lock() 

363 

364 def RLock(self): 

365 """Returns a recurrent lock object""" 

366 from .synchronize import RLock 

367 

368 return RLock() 

369 

370 def Condition(self, lock=None): 

371 """Returns a condition object""" 

372 from .synchronize import Condition 

373 

374 return Condition(lock) 

375 

376 def Event(self): 

377 """Returns an event object""" 

378 from .synchronize import Event 

379 

380 return Event() 

381 

382 

383class LokyInitMainContext(LokyContext): 

384 """Extra context with LokyProcess, which does load the main module 

385 

386 This context is used for compatibility in the case ``cloudpickle`` is not 

387 present on the running system. This permits to load functions defined in 

388 the ``main`` module, using proper safeguards. The declaration of the 

389 ``executor`` should be protected by ``if __name__ == "__main__":`` and the 

390 functions and variable used from main should be out of this block. 

391 

392 This mimics the default behavior of multiprocessing under Windows and the 

393 behavior of the ``spawn`` start method on a posix system. 

394 For more details, see the end of the following section of python doc 

395 https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming 

396 """ 

397 

398 _name = "loky_init_main" 

399 Process = LokyInitMainProcess 

400 

401 

402# Register loky context so it works with multiprocessing.get_context 

403ctx_loky = LokyContext() 

404mp.context._concrete_contexts["loky"] = ctx_loky 

405mp.context._concrete_contexts["loky_init_main"] = LokyInitMainContext()