1###############################################################################
2# Basic context management with LokyContext
3#
4# author: Thomas Moreau and Olivier Grisel
5#
6# adapted from multiprocessing/context.py
7# * Create a context ensuring loky uses only objects that are compatible
8# * Add LokyContext to the list of context of multiprocessing so loky can be
9# used with multiprocessing.set_start_method
10# * Implement a CFS-aware amd physical-core aware cpu_count function.
11#
12import os
13import sys
14import math
15import subprocess
16import traceback
17import warnings
18import multiprocessing as mp
19from multiprocessing import get_context as mp_get_context
20from multiprocessing.context import BaseContext
21from concurrent.futures.process import _MAX_WINDOWS_WORKERS
22
23
24from .process import LokyProcess, LokyInitMainProcess
25
26# Apparently, on older Python versions, loky cannot work 61 workers on Windows
27# but instead 60: ¯\_(ツ)_/¯
28if sys.version_info < (3, 10):
29 _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1
30
31START_METHODS = ["loky", "loky_init_main", "spawn"]
32if sys.platform != "win32":
33 START_METHODS += ["fork", "forkserver"]
34
35_DEFAULT_START_METHOD = None
36
37# Cache for the number of physical cores to avoid repeating subprocess calls.
38# It should not change during the lifetime of the program.
39physical_cores_cache = None
40
41
42def get_context(method=None):
43 # Try to overload the default context
44 method = method or _DEFAULT_START_METHOD or "loky"
45 if method == "fork":
46 # If 'fork' is explicitly requested, warn user about potential issues.
47 warnings.warn(
48 "`fork` start method should not be used with "
49 "`loky` as it does not respect POSIX. Try using "
50 "`spawn` or `loky` instead.",
51 UserWarning,
52 )
53 try:
54 return mp_get_context(method)
55 except ValueError:
56 raise ValueError(
57 f"Unknown context '{method}'. Value should be in "
58 f"{START_METHODS}."
59 )
60
61
62def set_start_method(method, force=False):
63 global _DEFAULT_START_METHOD
64 if _DEFAULT_START_METHOD is not None and not force:
65 raise RuntimeError("context has already been set")
66 assert method is None or method in START_METHODS, (
67 f"'{method}' is not a valid start_method. It should be in "
68 f"{START_METHODS}"
69 )
70
71 _DEFAULT_START_METHOD = method
72
73
74def get_start_method():
75 return _DEFAULT_START_METHOD
76
77
78def cpu_count(only_physical_cores=False):
79 """Return the number of CPUs the current process can use.
80
81 The returned number of CPUs accounts for:
82 * the number of CPUs in the system, as given by
83 ``multiprocessing.cpu_count``;
84 * the CPU affinity settings of the current process
85 (available on some Unix systems);
86 * Cgroup CPU bandwidth limit (available on Linux only, typically
87 set by docker and similar container orchestration systems);
88 * the value of the LOKY_MAX_CPU_COUNT environment variable if defined.
89 and is given as the minimum of these constraints.
90
91 If ``only_physical_cores`` is True, return the number of physical cores
92 instead of the number of logical cores (hyperthreading / SMT). Note that
93 this option is not enforced if the number of usable cores is controlled in
94 any other way such as: process affinity, Cgroup restricted CPU bandwidth
95 or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical
96 cores is not found, return the number of logical cores.
97
98 Note that on Windows, the returned number of CPUs cannot exceed 61 (or 60 for
99 Python < 3.10), see:
100 https://bugs.python.org/issue26903.
101
102 It is also always larger or equal to 1.
103 """
104 # Note: os.cpu_count() is allowed to return None in its docstring
105 os_cpu_count = os.cpu_count() or 1
106 if sys.platform == "win32":
107 # On Windows, attempting to use more than 61 CPUs would result in a
108 # OS-level error. See https://bugs.python.org/issue26903. According to
109 # https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups
110 # it might be possible to go beyond with a lot of extra work but this
111 # does not look easy.
112 os_cpu_count = min(os_cpu_count, _MAX_WINDOWS_WORKERS)
113
114 cpu_count_user = _cpu_count_user(os_cpu_count)
115 aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1)
116
117 if not only_physical_cores:
118 return aggregate_cpu_count
119
120 if cpu_count_user < os_cpu_count:
121 # Respect user setting
122 return max(cpu_count_user, 1)
123
124 cpu_count_physical, exception = _count_physical_cores()
125 if cpu_count_physical != "not found":
126 return cpu_count_physical
127
128 # Fallback to default behavior
129 if exception is not None:
130 # warns only the first time
131 warnings.warn(
132 "Could not find the number of physical cores for the "
133 f"following reason:\n{exception}\n"
134 "Returning the number of logical cores instead. You can "
135 "silence this warning by setting LOKY_MAX_CPU_COUNT to "
136 "the number of cores you want to use."
137 )
138 traceback.print_tb(exception.__traceback__)
139
140 return aggregate_cpu_count
141
142
143def _cpu_count_cgroup(os_cpu_count):
144 # Cgroup CPU bandwidth limit available in Linux since 2.6 kernel
145 cpu_max_fname = "/sys/fs/cgroup/cpu.max"
146 cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
147 cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
148 if os.path.exists(cpu_max_fname):
149 # cgroup v2
150 # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
151 with open(cpu_max_fname) as fh:
152 cpu_quota_us, cpu_period_us = fh.read().strip().split()
153 elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname):
154 # cgroup v1
155 # https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management
156 with open(cfs_quota_fname) as fh:
157 cpu_quota_us = fh.read().strip()
158 with open(cfs_period_fname) as fh:
159 cpu_period_us = fh.read().strip()
160 else:
161 # No Cgroup CPU bandwidth limit (e.g. non-Linux platform)
162 cpu_quota_us = "max"
163 cpu_period_us = 100_000 # unused, for consistency with default values
164
165 if cpu_quota_us == "max":
166 # No active Cgroup quota on a Cgroup-capable platform
167 return os_cpu_count
168 else:
169 cpu_quota_us = int(cpu_quota_us)
170 cpu_period_us = int(cpu_period_us)
171 if cpu_quota_us > 0 and cpu_period_us > 0:
172 return math.ceil(cpu_quota_us / cpu_period_us)
173 else: # pragma: no cover
174 # Setting a negative cpu_quota_us value is a valid way to disable
175 # cgroup CPU bandwith limits
176 return os_cpu_count
177
178
179def _cpu_count_affinity(os_cpu_count):
180 # Number of available CPUs given affinity settings
181 if hasattr(os, "sched_getaffinity"):
182 try:
183 return len(os.sched_getaffinity(0))
184 except NotImplementedError:
185 pass
186
187 # On some platforms, os.sched_getaffinity does not exist or raises
188 # NotImplementedError, let's try with the psutil if installed.
189 try:
190 import psutil
191
192 p = psutil.Process()
193 if hasattr(p, "cpu_affinity"):
194 return len(p.cpu_affinity())
195
196 except ImportError: # pragma: no cover
197 if (
198 sys.platform == "linux"
199 and os.environ.get("LOKY_MAX_CPU_COUNT") is None
200 ):
201 # Some platforms don't implement os.sched_getaffinity on Linux which
202 # can cause severe oversubscription problems. Better warn the
203 # user in this particularly pathological case which can wreck
204 # havoc, typically on CI workers.
205 warnings.warn(
206 "Failed to inspect CPU affinity constraints on this system. "
207 "Please install psutil or explictly set LOKY_MAX_CPU_COUNT."
208 )
209
210 # This can happen for platforms that do not implement any kind of CPU
211 # infinity such as macOS-based platforms.
212 return os_cpu_count
213
214
215def _cpu_count_user(os_cpu_count):
216 """Number of user defined available CPUs"""
217 cpu_count_affinity = _cpu_count_affinity(os_cpu_count)
218
219 cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count)
220
221 # User defined soft-limit passed as a loky specific environment variable.
222 cpu_count_loky = int(os.environ.get("LOKY_MAX_CPU_COUNT", os_cpu_count))
223
224 return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky)
225
226
227def _count_physical_cores():
228 """Return a tuple (number of physical cores, exception)
229
230 If the number of physical cores is found, exception is set to None.
231 If it has not been found, return ("not found", exception).
232
233 The number of physical cores is cached to avoid repeating subprocess calls.
234 """
235 exception = None
236
237 # First check if the value is cached
238 global physical_cores_cache
239 if physical_cores_cache is not None:
240 return physical_cores_cache, exception
241
242 # Not cached yet, find it
243 try:
244 if sys.platform == "linux":
245 cpu_count_physical = _count_physical_cores_linux()
246 elif sys.platform == "win32":
247 cpu_count_physical = _count_physical_cores_win32()
248 elif sys.platform == "darwin":
249 cpu_count_physical = _count_physical_cores_darwin()
250 else:
251 raise NotImplementedError(f"unsupported platform: {sys.platform}")
252
253 # if cpu_count_physical < 1, we did not find a valid value
254 if cpu_count_physical < 1:
255 raise ValueError(f"found {cpu_count_physical} physical cores < 1")
256
257 except Exception as e:
258 exception = e
259 cpu_count_physical = "not found"
260
261 # Put the result in cache
262 physical_cores_cache = cpu_count_physical
263
264 return cpu_count_physical, exception
265
266
267def _count_physical_cores_linux():
268 try:
269 cpu_info = subprocess.run(
270 "lscpu --parse=core".split(), capture_output=True, text=True
271 )
272 cpu_info = cpu_info.stdout.splitlines()
273 cpu_info = {line for line in cpu_info if not line.startswith("#")}
274 return len(cpu_info)
275 except:
276 pass # fallback to /proc/cpuinfo
277
278 cpu_info = subprocess.run(
279 "cat /proc/cpuinfo".split(), capture_output=True, text=True
280 )
281 cpu_info = cpu_info.stdout.splitlines()
282 cpu_info = {line for line in cpu_info if line.startswith("core id")}
283 return len(cpu_info)
284
285
286def _count_physical_cores_win32():
287 try:
288 cmd = "-Command (Get-CimInstance -ClassName Win32_Processor).NumberOfCores"
289 cpu_info = subprocess.run(
290 f"powershell.exe {cmd}".split(),
291 capture_output=True,
292 text=True,
293 )
294 cpu_info = cpu_info.stdout.splitlines()
295 return int(cpu_info[0])
296 except:
297 pass # fallback to wmic (older Windows versions; deprecated now)
298
299 cpu_info = subprocess.run(
300 "wmic CPU Get NumberOfCores /Format:csv".split(),
301 capture_output=True,
302 text=True,
303 )
304 cpu_info = cpu_info.stdout.splitlines()
305 cpu_info = [
306 l.split(",")[1] for l in cpu_info if (l and l != "Node,NumberOfCores")
307 ]
308 return sum(map(int, cpu_info))
309
310
311def _count_physical_cores_darwin():
312 cpu_info = subprocess.run(
313 "sysctl -n hw.physicalcpu".split(),
314 capture_output=True,
315 text=True,
316 )
317 cpu_info = cpu_info.stdout
318 return int(cpu_info)
319
320
321class LokyContext(BaseContext):
322 """Context relying on the LokyProcess."""
323
324 _name = "loky"
325 Process = LokyProcess
326 cpu_count = staticmethod(cpu_count)
327
328 def Queue(self, maxsize=0, reducers=None):
329 """Returns a queue object"""
330 from .queues import Queue
331
332 return Queue(maxsize, reducers=reducers, ctx=self.get_context())
333
334 def SimpleQueue(self, reducers=None):
335 """Returns a queue object"""
336 from .queues import SimpleQueue
337
338 return SimpleQueue(reducers=reducers, ctx=self.get_context())
339
340 if sys.platform != "win32":
341 """For Unix platform, use our custom implementation of synchronize
342 ensuring that we use the loky.backend.resource_tracker to clean-up
343 the semaphores in case of a worker crash.
344 """
345
346 def Semaphore(self, value=1):
347 """Returns a semaphore object"""
348 from .synchronize import Semaphore
349
350 return Semaphore(value=value)
351
352 def BoundedSemaphore(self, value):
353 """Returns a bounded semaphore object"""
354 from .synchronize import BoundedSemaphore
355
356 return BoundedSemaphore(value)
357
358 def Lock(self):
359 """Returns a lock object"""
360 from .synchronize import Lock
361
362 return Lock()
363
364 def RLock(self):
365 """Returns a recurrent lock object"""
366 from .synchronize import RLock
367
368 return RLock()
369
370 def Condition(self, lock=None):
371 """Returns a condition object"""
372 from .synchronize import Condition
373
374 return Condition(lock)
375
376 def Event(self):
377 """Returns an event object"""
378 from .synchronize import Event
379
380 return Event()
381
382
383class LokyInitMainContext(LokyContext):
384 """Extra context with LokyProcess, which does load the main module
385
386 This context is used for compatibility in the case ``cloudpickle`` is not
387 present on the running system. This permits to load functions defined in
388 the ``main`` module, using proper safeguards. The declaration of the
389 ``executor`` should be protected by ``if __name__ == "__main__":`` and the
390 functions and variable used from main should be out of this block.
391
392 This mimics the default behavior of multiprocessing under Windows and the
393 behavior of the ``spawn`` start method on a posix system.
394 For more details, see the end of the following section of python doc
395 https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
396 """
397
398 _name = "loky_init_main"
399 Process = LokyInitMainProcess
400
401
402# Register loky context so it works with multiprocessing.get_context
403ctx_loky = LokyContext()
404mp.context._concrete_contexts["loky"] = ctx_loky
405mp.context._concrete_contexts["loky_init_main"] = LokyInitMainContext()