1"""Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives."""
2
3from __future__ import annotations
4
5import atexit
6import hmac
7import os
8import re
9import secrets
10import socket
11import stat
12import sys
13import threading
14import time
15import uuid
16from contextlib import contextmanager, suppress
17from dataclasses import dataclass
18from pathlib import Path
19from typing import TYPE_CHECKING, Literal
20from weakref import WeakValueDictionary
21
22from filelock._api import AcquireReturnProxy
23from filelock._error import Timeout
24from filelock._soft import SoftFileLock
25from filelock._util import ensure_directory_exists
26
27if TYPE_CHECKING:
28 from collections.abc import Callable, Generator
29
30
31_Mode = Literal["read", "write"]
32_BREAK_SUFFIX = ".break"
33_MAX_MARKER_SIZE = 1024
34_O_NOFOLLOW = getattr(os, "O_NOFOLLOW", 0)
35# dirfd-relative I/O is a Unix-only optimization; Windows cannot ``os.open()`` a directory at all, and
36# its ``os`` module skips dir_fd support entirely. When disabled, callers fall back to full-path ops.
37_SUPPORTS_DIR_FD = sys.platform != "win32" and os.open in os.supports_dir_fd
38
39_all_instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary()
40_all_instances_lock = threading.Lock()
41_atexit_registered = False
42_fork_registered = False
43
44
45@dataclass(frozen=True)
46class _Paths:
47 state: str
48 write: str
49 readers: str
50
51
52@dataclass
53class _Locks:
54 internal: threading.Lock
55 transaction: threading.Lock
56 state: SoftFileLock
57
58
59@dataclass(frozen=True)
60class _MarkerInfo:
61 token: str
62 pid: int
63 hostname: str
64
65
66@dataclass
67class _Hold:
68 """Everything that exists only while a lock is held; ``None`` when the instance has no lock."""
69
70 level: int
71 mode: _Mode
72 write_thread_id: int | None
73 marker_name: str
74 is_reader: bool
75 token: str
76 heartbeat_thread: _HeartbeatThread
77 heartbeat_stop: threading.Event
78
79
80class _SoftRWMeta(type):
81 _instances: WeakValueDictionary[Path, SoftReadWriteLock]
82 _instances_lock: threading.Lock
83
84 def __call__( # noqa: PLR0913
85 cls,
86 lock_file: str | os.PathLike[str],
87 timeout: float = -1,
88 *,
89 blocking: bool = True,
90 is_singleton: bool = True,
91 heartbeat_interval: float = 30.0,
92 stale_threshold: float | None = None,
93 poll_interval: float = 0.25,
94 ) -> SoftReadWriteLock:
95 if not is_singleton:
96 return super().__call__(
97 lock_file,
98 timeout,
99 blocking=blocking,
100 is_singleton=is_singleton,
101 heartbeat_interval=heartbeat_interval,
102 stale_threshold=stale_threshold,
103 poll_interval=poll_interval,
104 )
105
106 normalized = Path(lock_file).resolve()
107 with cls._instances_lock:
108 instance = cls._instances.get(normalized)
109 if instance is None:
110 instance = super().__call__(
111 lock_file,
112 timeout,
113 blocking=blocking,
114 is_singleton=is_singleton,
115 heartbeat_interval=heartbeat_interval,
116 stale_threshold=stale_threshold,
117 poll_interval=poll_interval,
118 )
119 cls._instances[normalized] = instance
120 elif instance.timeout != timeout or instance.blocking != blocking:
121 msg = (
122 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
123 f" cannot be changed to timeout={timeout}, blocking={blocking}"
124 )
125 raise ValueError(msg)
126 return instance
127
128
129class SoftReadWriteLock(metaclass=_SoftRWMeta):
130 """
131 Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives.
132
133 Use this class instead of :class:`~filelock.ReadWriteLock` when the lock file lives on a network
134 filesystem (NFS, Lustre with ``-o flock``, HPC cluster shared storage). ``ReadWriteLock`` is backed
135 by SQLite and cannot run on NFS because SQLite's ``fcntl`` locking is unreliable there.
136
137 Layout on disk for a lock at ``foo.lock``:
138
139 - ``foo.lock.state`` — a :class:`SoftFileLock` taken only during state transitions (microseconds).
140 - ``foo.lock.write`` — writer marker; its presence means a writer is claiming or holding the lock.
141 - ``foo.lock.readers/<host>.<pid>.<uuid>`` — one file per reader.
142
143 Each marker stores a random token (``secrets.token_hex(16)``), the holder's pid, and the holder's
144 hostname. A daemon heartbeat thread refreshes ``mtime`` on every held marker. A marker whose mtime
145 has not advanced in ``stale_threshold`` seconds may be evicted by any process on any host, giving
146 correct behavior when a compute node crashes with a lock held.
147
148 Writer acquire is two-phase and writer-preferring: phase 1 claims ``.write`` (blocking any new
149 reader), phase 2 waits for existing readers to drain. Writer starvation is impossible.
150
151 Reentrancy, upgrade/downgrade rules, thread pinning, and singleton caching by resolved path match
152 :class:`~filelock.ReadWriteLock`.
153
154 Forking while holding a lock invalidates the inherited instance in the child so the child cannot
155 double-own the lock with its parent; ``release()`` on a fork-invalidated instance is a no-op, and
156 the child must re-acquire if it needs a lock.
157
158 Trust boundary: protects against same-UID non-cooperating processes (one host or cross-host) and
159 same-host different-UID users via ``0o600`` / ``0o700`` permissions. Does not protect against root
160 compromise, NTP tampering on same-UID cross-host nodes, or multi-tenant mounts where hostile
161 co-tenants share the UID.
162
163 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
164 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
165 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention
166 :param is_singleton: if ``True``, reuse existing instances for the same resolved path
167 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s
168 :param stale_threshold: seconds of ``mtime`` inactivity before a marker is stale; defaults to
169 ``3 * heartbeat_interval``, matching etcd's ``LeaseKeepAlive`` convention
170 :param poll_interval: seconds between acquire retries under contention; default 0.25 s
171
172 .. versionadded:: 3.27.0
173
174 """
175
176 _instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary()
177 _instances_lock = threading.Lock()
178
179 def __init__( # noqa: PLR0913
180 self,
181 lock_file: str | os.PathLike[str],
182 timeout: float = -1,
183 *,
184 blocking: bool = True,
185 is_singleton: bool = True, # noqa: ARG002
186 heartbeat_interval: float = 30.0,
187 stale_threshold: float | None = None,
188 poll_interval: float = 0.25,
189 ) -> None:
190 if heartbeat_interval <= 0:
191 msg = f"heartbeat_interval must be positive, got {heartbeat_interval}"
192 raise ValueError(msg)
193 if stale_threshold is None:
194 stale_threshold = heartbeat_interval * 3
195 if stale_threshold <= heartbeat_interval:
196 msg = f"stale_threshold must exceed heartbeat_interval ({stale_threshold} <= {heartbeat_interval})"
197 raise ValueError(msg)
198 if poll_interval <= 0:
199 msg = f"poll_interval must be positive, got {poll_interval}"
200 raise ValueError(msg)
201
202 self.lock_file: str = os.fspath(lock_file)
203 self.timeout: float = timeout
204 self.blocking: bool = blocking
205 self.heartbeat_interval: float = heartbeat_interval
206 self.stale_threshold: float = stale_threshold
207 self.poll_interval: float = poll_interval
208
209 self._paths = _Paths(
210 state=f"{self.lock_file}.state",
211 write=f"{self.lock_file}.write",
212 readers=f"{self.lock_file}.readers",
213 )
214 ensure_directory_exists(self.lock_file)
215 self._locks = _Locks(
216 internal=threading.Lock(),
217 transaction=threading.Lock(),
218 state=SoftFileLock(self._paths.state, timeout=-1),
219 )
220 self._readers_dir_fd: int | None = None
221 self._hold: _Hold | None = None
222 self._fork_invalidated: bool = False
223 self._closed: bool = False
224
225 with _all_instances_lock:
226 _all_instances[Path(self.lock_file).resolve()] = self
227 _register_hooks()
228
229 @contextmanager
230 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
231 """
232 Context manager that acquires and releases a shared read lock.
233
234 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
235
236 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
237 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
238
239 :raises RuntimeError: if a write lock is already held on this instance
240 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
241
242 """
243 self.acquire_read(timeout, blocking=blocking)
244 try:
245 yield
246 finally:
247 self.release()
248
249 @contextmanager
250 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
251 """
252 Context manager that acquires and releases an exclusive write lock.
253
254 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
255
256 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
257 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
258
259 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
260 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
261
262 """
263 self.acquire_write(timeout, blocking=blocking)
264 try:
265 yield
266 finally:
267 self.release()
268
269 def acquire_read(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy:
270 """
271 Acquire a shared read lock.
272
273 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
274 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). On the 0→1
275 transition a daemon heartbeat thread is started that refreshes the reader marker's ``mtime`` every
276 ``heartbeat_interval`` seconds so peers on other hosts do not evict the marker as stale.
277
278 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block
279 indefinitely
280 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable;
281 ``None`` uses the instance default
282
283 :returns: a proxy that can be used as a context manager to release the lock
284
285 :raises RuntimeError: if a write lock is already held on this instance, if this instance was invalidated by
286 :func:`os.fork`, or if :meth:`close` was called
287 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
288
289 """
290 return self._acquire("read", timeout, blocking=blocking)
291
292 def acquire_write(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy:
293 """
294 Acquire an exclusive write lock.
295
296 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
297 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not
298 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
299 :class:`RuntimeError`.
300
301 Writer acquisition runs in two phases. Phase 1 atomically claims ``<path>.write`` via ``O_CREAT | O_EXCL``,
302 which immediately blocks any new reader on any host. Phase 2 waits for existing readers to drain. Writer
303 starvation is impossible: new readers see ``<path>.write`` during phase 2 and wait behind the pending writer.
304
305 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block
306 indefinitely
307 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable;
308 ``None`` uses the instance default
309
310 :returns: a proxy that can be used as a context manager to release the lock
311
312 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if this
313 instance was invalidated by :func:`os.fork`, or if :meth:`close` was called
314 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
315
316 """
317 return self._acquire("write", timeout, blocking=blocking)
318
319 def close(self) -> None:
320 """
321 Release any held lock and release internal filesystem resources.
322
323 Idempotent. After calling this method the instance can no longer acquire locks — subsequent acquires raise
324 :class:`RuntimeError`. A fork-invalidated instance is closed without raising.
325 """
326 self.release(force=True)
327 with self._locks.internal:
328 if self._closed:
329 return
330 self._closed = True
331 if self._readers_dir_fd is not None:
332 with suppress(OSError):
333 os.close(self._readers_dir_fd)
334 self._readers_dir_fd = None
335
336 def release(self, *, force: bool = False) -> None:
337 """
338 Release one level of the current lock.
339
340 When the lock level reaches zero the heartbeat thread is stopped and the held marker file is unlinked. On a
341 fork-invalidated instance (that is, the child of a :func:`os.fork` call made while the parent held a lock)
342 this method is a no-op so inherited ``with`` blocks can unwind cleanly in the child.
343
344 :param force: if ``True``, release the lock completely regardless of the current lock level
345
346 :raises RuntimeError: if no lock is currently held and *force* is ``False``
347
348 """
349 with self._locks.internal:
350 if self._fork_invalidated:
351 # Inherited state from the parent is meaningless in the child; clear any counters and return.
352 self._hold = None
353 return
354 hold = self._hold
355 if hold is None:
356 if force:
357 return
358 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
359 raise RuntimeError(msg)
360 if force:
361 hold.level = 0
362 else:
363 hold.level -= 1
364 if hold.level > 0:
365 return
366 self._hold = None
367
368 # Order matters: signal → join → unlink. A late tick on a deleted marker is harmless, and the
369 # token check in the heartbeat callback would catch any re-acquisition race, but joining first
370 # removes even that theoretical race.
371 hold.heartbeat_stop.set()
372 hold.heartbeat_thread.join(timeout=self.heartbeat_interval + 1.0)
373 if hold.is_reader:
374 _unlink(hold.marker_name, dir_fd=self._readers_dir_fd)
375 else:
376 _unlink(hold.marker_name)
377
378 @classmethod
379 def get_lock(
380 cls,
381 lock_file: str | os.PathLike[str],
382 timeout: float = -1,
383 *,
384 blocking: bool = True,
385 ) -> SoftReadWriteLock:
386 """
387 Return the singleton :class:`SoftReadWriteLock` for *lock_file*.
388
389 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
390 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
391 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
392
393 :returns: the singleton lock instance
394
395 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
396
397 """
398 return cls(lock_file, timeout, blocking=blocking)
399
400 def _acquire(
401 self,
402 mode: _Mode,
403 timeout: float | None,
404 *,
405 blocking: bool | None,
406 ) -> AcquireReturnProxy:
407 effective_timeout = self.timeout if timeout is None else timeout
408 effective_blocking = self.blocking if blocking is None else blocking
409
410 with self._locks.internal:
411 if self._fork_invalidated:
412 msg = f"SoftReadWriteLock on {self.lock_file} was invalidated by fork(); construct a new instance"
413 raise RuntimeError(msg)
414 if self._closed:
415 msg = f"SoftReadWriteLock on {self.lock_file} has been closed"
416 raise RuntimeError(msg)
417 if self._hold is not None:
418 return self._validate_reentrant(mode)
419
420 start = time.perf_counter()
421 if not effective_blocking:
422 acquired = self._locks.transaction.acquire(blocking=False)
423 elif effective_timeout == -1:
424 acquired = self._locks.transaction.acquire(blocking=True)
425 else:
426 acquired = self._locks.transaction.acquire(blocking=True, timeout=effective_timeout)
427 if not acquired:
428 raise Timeout(self.lock_file) from None
429 try:
430 with self._locks.internal:
431 if self._hold is not None:
432 return self._validate_reentrant(mode)
433
434 deadline = None if effective_timeout == -1 else start + effective_timeout
435 token = secrets.token_hex(16)
436 if mode == "write":
437 marker_name, is_reader = self._acquire_writer_slot(
438 token, deadline=deadline, blocking=effective_blocking
439 )
440 else:
441 marker_name, is_reader = self._acquire_reader_slot(
442 token, deadline=deadline, blocking=effective_blocking
443 )
444
445 stop_event = threading.Event()
446 heartbeat = _HeartbeatThread(
447 refresh=self._refresh_marker,
448 interval=self.heartbeat_interval,
449 stop_event=stop_event,
450 name=f"filelock-heartbeat-{id(self):x}",
451 )
452
453 with self._locks.internal:
454 self._hold = _Hold(
455 level=1,
456 mode=mode,
457 write_thread_id=threading.get_ident() if mode == "write" else None,
458 marker_name=marker_name,
459 is_reader=is_reader,
460 token=token,
461 heartbeat_thread=heartbeat,
462 heartbeat_stop=stop_event,
463 )
464
465 heartbeat.start()
466 return AcquireReturnProxy(lock=self)
467 finally:
468 self._locks.transaction.release()
469
470 def _validate_reentrant(self, mode: _Mode) -> AcquireReturnProxy:
471 hold = self._hold
472 assert hold is not None # noqa: S101
473 if hold.mode != mode:
474 opposite = "write" if mode == "read" else "read"
475 direction = "downgrade" if mode == "read" else "upgrade"
476 msg = (
477 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
478 f"already holding a {opposite} lock ({direction} not allowed)"
479 )
480 raise RuntimeError(msg)
481 if mode == "write" and (cur := threading.get_ident()) != hold.write_thread_id:
482 msg = (
483 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
484 f"from thread {cur} while it is held by thread {hold.write_thread_id}"
485 )
486 raise RuntimeError(msg)
487 hold.level += 1
488 return AcquireReturnProxy(lock=self)
489
490 def _acquire_writer_slot(
491 self,
492 token: str,
493 *,
494 deadline: float | None,
495 blocking: bool,
496 ) -> tuple[str, bool]:
497 # Phase 2 scans readers/ via dirfd (where supported), so we need it open even though writers never
498 # create files inside.
499 self._open_readers_dir()
500
501 def try_claim_writer() -> bool:
502 with self._locks.state:
503 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
504 if _file_exists(self._paths.write):
505 return False
506 try:
507 _atomic_create_marker(self._paths.write, token)
508 except FileExistsError:
509 return False
510 return True
511
512 def readers_drained_touching() -> bool:
513 with self._locks.state:
514 # Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed
515 # ``stale_threshold`` under contention and a peer would treat us as stale and evict us.
516 with suppress(OSError):
517 _touch(self._paths.write)
518 self._break_stale_readers(time.time())
519 return not self._any_readers()
520
521 self._wait_for(try_claim_writer, deadline=deadline, blocking=blocking)
522 try:
523 self._wait_for(readers_drained_touching, deadline=deadline, blocking=blocking)
524 except Timeout:
525 # Give up our writer claim so readers can make progress again.
526 _unlink(self._paths.write)
527 raise
528 return self._paths.write, False
529
530 def _acquire_reader_slot(
531 self,
532 token: str,
533 *,
534 deadline: float | None,
535 blocking: bool,
536 ) -> tuple[str, bool]:
537 self._open_readers_dir()
538 reader_name = f"{uuid.uuid4().hex}.{os.getpid()}"
539 dir_fd = self._readers_dir_fd
540 full_reader_path = str(Path(self._paths.readers) / reader_name)
541
542 def try_claim_reader() -> bool:
543 with self._locks.state:
544 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
545 if _file_exists(self._paths.write):
546 return False
547 if dir_fd is not None:
548 _atomic_create_marker(reader_name, token, dir_fd=dir_fd)
549 else: # pragma: win32 cover
550 _atomic_create_marker(full_reader_path, token)
551 return True
552
553 self._wait_for(try_claim_reader, deadline=deadline, blocking=blocking)
554 return (reader_name if dir_fd is not None else full_reader_path), True
555
556 def _wait_for(
557 self,
558 predicate: Callable[[], bool],
559 *,
560 deadline: float | None,
561 blocking: bool,
562 ) -> None:
563 while True:
564 if predicate():
565 return
566 now = time.perf_counter()
567 if not blocking:
568 raise Timeout(self.lock_file)
569 if deadline is not None and now >= deadline:
570 raise Timeout(self.lock_file)
571 sleep_for = self.poll_interval
572 if deadline is not None:
573 sleep_for = min(sleep_for, max(deadline - now, 0.0))
574 time.sleep(sleep_for)
575
576 def _open_readers_dir(self) -> None:
577 readers_path = Path(self._paths.readers)
578 with suppress(FileExistsError):
579 readers_path.mkdir(mode=0o700)
580 # mkdir has no O_NOFOLLOW, so verify via lstat that we did not land on an attacker-placed symlink
581 # or a regular file before we open or scan inside.
582 st = os.lstat(self._paths.readers)
583 if stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode):
584 msg = f"{self._paths.readers} exists but is not a directory or is a symlink; refusing to use it"
585 raise RuntimeError(msg)
586 if self._readers_dir_fd is None and _SUPPORTS_DIR_FD:
587 flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) | _O_NOFOLLOW
588 self._readers_dir_fd = os.open(self._paths.readers, flags)
589
590 def _any_readers(self) -> bool:
591 for _ in self._iter_reader_entries():
592 return True
593 return False
594
595 def _iter_reader_entries(self) -> Generator[tuple[str, bool]]:
596 """
597 Yield ``(name, dirfd_relative)`` pairs for every live reader marker.
598
599 ``dirfd_relative`` is ``True`` when *name* should be passed to ``dir_fd=``-aware syscalls; ``False``
600 when *name* is a full path because dirfd-relative I/O is unavailable on this platform.
601 """
602 if self._readers_dir_fd is not None:
603 with os.scandir(self._readers_dir_fd) as it:
604 for entry in it:
605 if not _is_housekeeping_name(entry.name):
606 yield entry.name, True
607 return
608 readers_path = Path(self._paths.readers) # pragma: win32 cover
609 with os.scandir(readers_path) as it: # pragma: win32 cover
610 for entry in it: # pragma: win32 cover
611 if not _is_housekeeping_name(entry.name): # pragma: win32 cover
612 yield str(readers_path / entry.name), False # pragma: win32 cover
613
614 def _break_stale_readers(self, now: float) -> None:
615 names: list[tuple[str, int | None]] = []
616 try:
617 for name, dirfd_relative in self._iter_reader_entries():
618 names.append((name, self._readers_dir_fd if dirfd_relative else None))
619 except OSError: # pragma: no cover - transient NFS scandir hiccup
620 return
621 for name, fd in names:
622 _break_stale_marker(name, stale_threshold=self.stale_threshold, now=now, dir_fd=fd)
623
624 def _refresh_marker(self) -> bool:
625 with self._locks.internal:
626 hold = self._hold
627 if hold is None: # pragma: no cover - race between stop_event.set and join
628 return False
629 marker_name = hold.marker_name
630 token = hold.token
631 dir_fd = self._readers_dir_fd if hold.is_reader else None
632
633 read_result = _read_marker(marker_name, dir_fd=dir_fd)
634 if read_result is None:
635 return False
636 info, _mtime = read_result
637 # Token mismatch means another process already evicted our marker and created its own; stop the
638 # thread so it does not touch a stranger's file.
639 if info is None or not hmac.compare_digest(info.token, token):
640 return False
641 try:
642 _touch(marker_name, dir_fd=dir_fd)
643 except FileNotFoundError: # pragma: no cover - race between successful read and touch
644 return False
645 return True
646
647 def _reset_after_fork_in_child(self) -> None: # pragma: no cover - fork child not tracked
648 # Replace every lock this instance owns with a fresh one; the inherited locks may still be held
649 # by threads that no longer exist in the child. The readers dirfd and the SoftFileLock state
650 # mutex both get dropped for the same reason — the child re-creates them on its next acquire.
651 self._locks = _Locks(
652 internal=threading.Lock(),
653 transaction=threading.Lock(),
654 state=SoftFileLock(self._paths.state, timeout=-1),
655 )
656 self._hold = None
657 self._readers_dir_fd = None
658 self._fork_invalidated = True
659
660
661class _HeartbeatThread(threading.Thread):
662 def __init__(
663 self,
664 refresh: Callable[[], bool],
665 interval: float,
666 stop_event: threading.Event,
667 name: str,
668 ) -> None:
669 super().__init__(name=name, daemon=True)
670 self._refresh = refresh
671 self._interval = interval
672 self._stop_event = stop_event
673
674 def run(self) -> None:
675 while not self._stop_event.wait(self._interval):
676 if not self._refresh():
677 self._stop_event.set()
678 return
679
680
681def _atomic_create_marker(name: str, token: str, *, dir_fd: int | None = None) -> None:
682 # O_NOFOLLOW blocks the symlink-overwrite attack where an attacker pre-creates the marker path as a
683 # symlink pointing at a victim file. Mode 0o600 keeps the token unreadable to other users.
684 flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY | _O_NOFOLLOW
685 if _SUPPORTS_DIR_FD and dir_fd is not None:
686 fd = os.open(name, flags, 0o600, dir_fd=dir_fd)
687 else:
688 fd = os.open(name, flags, 0o600)
689 try:
690 content = f"{token}\n{os.getpid()}\n{socket.gethostname()}\n".encode("ascii")
691 os.write(fd, content)
692 finally:
693 os.close(fd)
694
695
696def _read_marker(name: str, *, dir_fd: int | None = None) -> tuple[_MarkerInfo | None, float] | None:
697 # O_NOFOLLOW is defense in depth: we already created this file, but a hostile replacement by symlink
698 # between create and read would be caught here.
699 flags = os.O_RDONLY | _O_NOFOLLOW
700 try:
701 fd = os.open(name, flags, dir_fd=dir_fd) if _SUPPORTS_DIR_FD and dir_fd is not None else os.open(name, flags)
702 except OSError:
703 return None
704 try:
705 try:
706 st = os.fstat(fd)
707 data = os.read(fd, _MAX_MARKER_SIZE + 1)
708 except OSError: # pragma: no cover - fstat/read after a successful open is hard to provoke
709 return None
710 finally:
711 os.close(fd)
712 return _parse_marker_bytes(data), st.st_mtime
713
714
715def _parse_marker_bytes(data: bytes) -> _MarkerInfo | None:
716 # Trust nothing about attacker-controlled markers; any deviation returns None so callers fall through
717 # to stale cleanup. ``re.match`` caches compiled patterns internally, so the regex is built only once
718 # despite being defined inline.
719 if not data or len(data) > _MAX_MARKER_SIZE:
720 return None
721 try:
722 text = data.decode("ascii")
723 except UnicodeDecodeError:
724 return None
725 match = re.match(
726 r"""
727 \A # start of string
728 (?P<token> [0-9a-f]{32} ) \n # 128-bit hex token
729 (?P<pid> [1-9][0-9]{0,9} ) \n # decimal pid: no leading zero, ≤ 10 digits
730 (?P<hostname> [\x21-\x7e]{1,253}) # printable non-whitespace ASCII (RFC 1123 hostname limit)
731 \n* # tolerate sloppy writers that append extra newlines
732 \Z # end of string
733 """,
734 text,
735 re.VERBOSE,
736 )
737 if match is None:
738 return None
739 pid = int(match["pid"], 10)
740 if pid > 2**31 - 1:
741 return None
742 return _MarkerInfo(token=match["token"], pid=pid, hostname=match["hostname"])
743
744
745def _break_stale_marker( # noqa: PLR0911
746 name: str,
747 *,
748 stale_threshold: float,
749 now: float,
750 dir_fd: int | None = None,
751) -> bool:
752 # Atomic break pattern: read → rename to unique break-name → re-verify → unlink. The rename gives us a
753 # private name nobody else can touch; if the re-verify sees a newer mtime or a different token, the
754 # legitimate holder's heartbeat fired between read and rename and we must abort (leaving the .break.*
755 # file behind rather than rollback-renaming, because rollback is itself racy).
756 read_result = _read_marker(name, dir_fd=dir_fd)
757 if read_result is None:
758 return False
759 info_before, mtime_before = read_result
760 if now - mtime_before <= stale_threshold:
761 return False
762 if info_before is None:
763 _unlink(name, dir_fd=dir_fd)
764 return True
765
766 break_name = f"{name}{_BREAK_SUFFIX}.{os.getpid()}.{secrets.token_hex(16)}"
767 try:
768 if _SUPPORTS_DIR_FD and dir_fd is not None:
769 os.rename(name, break_name, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
770 else:
771 Path(name).rename(break_name)
772 except OSError: # pragma: no cover - race where the marker vanishes between read and rename
773 return False
774
775 read_after = _read_marker(break_name, dir_fd=dir_fd)
776 if read_after is None: # pragma: no cover - race where a peer unlinks the break-name file
777 return False
778 info_after, mtime_after = read_after
779 if info_after is None: # pragma: no cover - content replaced post-rename by a racing peer
780 _unlink(break_name, dir_fd=dir_fd)
781 return True
782 if not hmac.compare_digest(info_before.token, info_after.token): # pragma: no cover - race only
783 return False
784 if mtime_after > mtime_before: # pragma: no cover - heartbeat raced our rename
785 return False
786 _unlink(break_name, dir_fd=dir_fd)
787 return True
788
789
790def _unlink(name: str, *, dir_fd: int | None = None) -> None:
791 with suppress(FileNotFoundError):
792 if _SUPPORTS_DIR_FD and dir_fd is not None:
793 # Path.unlink has no dir_fd support, so we stay on os.unlink for the dirfd path.
794 os.unlink(name, dir_fd=dir_fd)
795 else:
796 Path(name).unlink()
797
798
799def _touch(name: str, *, dir_fd: int | None = None) -> None:
800 if _SUPPORTS_DIR_FD and dir_fd is not None:
801 os.utime(name, None, dir_fd=dir_fd)
802 else:
803 os.utime(name, None)
804
805
806def _file_exists(path: str) -> bool:
807 try:
808 st = os.lstat(path)
809 except FileNotFoundError:
810 return False
811 return stat.S_ISREG(st.st_mode)
812
813
814def _is_housekeeping_name(name: str) -> bool:
815 return name.startswith(".") or _BREAK_SUFFIX in name
816
817
818def _reset_all_after_fork() -> None: # pragma: no cover - fork child, not tracked by coverage
819 global _all_instances_lock # noqa: PLW0603
820 # User-created threading locks do not auto-reset across fork: any lock held by a parent thread stays
821 # locked in the child with no owner to release it. Replace the module-level lock and every instance's
822 # locks with fresh ones; the child is single-threaded at this point so no synchronization is needed.
823 _all_instances_lock = threading.Lock()
824 for instance in list(_all_instances.values()):
825 instance._reset_after_fork_in_child() # noqa: SLF001
826
827
828def _cleanup_all_instances() -> None: # pragma: no cover - runs from atexit at interpreter shutdown
829 for instance in list(_all_instances.values()):
830 with suppress(Exception):
831 instance.release(force=True)
832
833
834def _register_hooks() -> None:
835 global _atexit_registered, _fork_registered # noqa: PLW0603
836 if not _atexit_registered:
837 atexit.register(_cleanup_all_instances)
838 _atexit_registered = True
839 # after_in_child replaces inherited state so the child cannot double-own any lock the parent held.
840 if not _fork_registered and hasattr(os, "register_at_fork"):
841 os.register_at_fork(after_in_child=_reset_all_after_fork)
842 _fork_registered = True
843
844
845__all__ = [
846 "SoftReadWriteLock",
847]