1"""Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives."""
2
3from __future__ import annotations
4
5import atexit
6import hmac
7import os
8import re
9import secrets
10import socket
11import stat
12import sys
13import threading
14import time
15import uuid
16from contextlib import contextmanager, suppress
17from dataclasses import dataclass
18from pathlib import Path
19from typing import TYPE_CHECKING, Literal
20from weakref import WeakValueDictionary
21
22from filelock._api import AcquireReturnProxy
23from filelock._error import Timeout
24from filelock._soft import SoftFileLock
25from filelock._util import ensure_directory_exists
26
27if TYPE_CHECKING:
28 from collections.abc import Callable, Generator
29
30
31_Mode = Literal["read", "write"]
32_BREAK_SUFFIX = ".break"
33_MAX_MARKER_SIZE = 1024
34_O_NOFOLLOW = getattr(os, "O_NOFOLLOW", 0)
35_O_NONBLOCK = getattr(os, "O_NONBLOCK", 0)
36# dirfd-relative I/O is a Unix-only optimization; Windows cannot ``os.open()`` a directory at all, and
37# its ``os`` module skips dir_fd support entirely. When disabled, callers fall back to full-path ops.
38_SUPPORTS_DIR_FD = sys.platform != "win32" and os.open in os.supports_dir_fd
39
40_all_instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary()
41_all_instances_lock = threading.Lock()
42_atexit_registered = False
43_fork_registered = False
44
45
46@dataclass(frozen=True)
47class _Paths:
48 state: str
49 write: str
50 readers: str
51
52
53@dataclass
54class _Locks:
55 internal: threading.Lock
56 transaction: threading.Lock
57 state: SoftFileLock
58
59
60@dataclass(frozen=True)
61class _MarkerInfo:
62 token: str
63 pid: int
64 hostname: str
65
66
67@dataclass
68class _Hold:
69 """Everything that exists only while a lock is held; ``None`` when the instance has no lock."""
70
71 level: int
72 mode: _Mode
73 write_thread_id: int | None
74 marker_name: str
75 is_reader: bool
76 token: str
77 heartbeat_thread: _HeartbeatThread
78 heartbeat_stop: threading.Event
79
80
81class _SoftRWMeta(type):
82 _instances: WeakValueDictionary[Path, SoftReadWriteLock]
83 _instances_lock: threading.Lock
84
85 def __call__( # noqa: PLR0913
86 cls,
87 lock_file: str | os.PathLike[str],
88 timeout: float = -1,
89 *,
90 blocking: bool = True,
91 is_singleton: bool = True,
92 heartbeat_interval: float = 30.0,
93 stale_threshold: float | None = None,
94 poll_interval: float = 0.25,
95 ) -> SoftReadWriteLock:
96 if not is_singleton:
97 return super().__call__(
98 lock_file,
99 timeout,
100 blocking=blocking,
101 is_singleton=is_singleton,
102 heartbeat_interval=heartbeat_interval,
103 stale_threshold=stale_threshold,
104 poll_interval=poll_interval,
105 )
106
107 normalized = Path(lock_file).resolve()
108 with cls._instances_lock:
109 instance = cls._instances.get(normalized)
110 if instance is None:
111 instance = super().__call__(
112 lock_file,
113 timeout,
114 blocking=blocking,
115 is_singleton=is_singleton,
116 heartbeat_interval=heartbeat_interval,
117 stale_threshold=stale_threshold,
118 poll_interval=poll_interval,
119 )
120 cls._instances[normalized] = instance
121 elif instance.timeout != timeout or instance.blocking != blocking:
122 msg = (
123 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
124 f" cannot be changed to timeout={timeout}, blocking={blocking}"
125 )
126 raise ValueError(msg)
127 return instance
128
129
130class SoftReadWriteLock(metaclass=_SoftRWMeta):
131 """
132 Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives.
133
134 Use this class instead of :class:`~filelock.ReadWriteLock` when the lock file lives on a network
135 filesystem (NFS, Lustre with ``-o flock``, HPC cluster shared storage). ``ReadWriteLock`` is backed
136 by SQLite and cannot run on NFS because SQLite's ``fcntl`` locking is unreliable there.
137
138 Layout on disk for a lock at ``foo.lock``:
139
140 - ``foo.lock.state`` — a :class:`SoftFileLock` taken only during state transitions (microseconds).
141 - ``foo.lock.write`` — writer marker; its presence means a writer is claiming or holding the lock.
142 - ``foo.lock.readers/<host>.<pid>.<uuid>`` — one file per reader.
143
144 Each marker stores a random token (``secrets.token_hex(16)``), the holder's pid, and the holder's
145 hostname. A daemon heartbeat thread refreshes ``mtime`` on every held marker. A marker whose mtime
146 has not advanced in ``stale_threshold`` seconds may be evicted by any process on any host, giving
147 correct behavior when a compute node crashes with a lock held.
148
149 Writer acquire is two-phase and writer-preferring: phase 1 claims ``.write`` (blocking any new
150 reader), phase 2 waits for existing readers to drain. Writer starvation is impossible.
151
152 Reentrancy, upgrade/downgrade rules, thread pinning, and singleton caching by resolved path match
153 :class:`~filelock.ReadWriteLock`.
154
155 Forking while holding a lock invalidates the inherited instance in the child so the child cannot
156 double-own the lock with its parent; ``release()`` on a fork-invalidated instance is a no-op, and
157 the child must re-acquire if it needs a lock.
158
159 Trust boundary: protects against same-UID non-cooperating processes (one host or cross-host) and
160 same-host different-UID users via ``0o600`` / ``0o700`` permissions. Does not protect against root
161 compromise, NTP tampering on same-UID cross-host nodes, or multi-tenant mounts where hostile
162 co-tenants share the UID.
163
164 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
165 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
166 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention
167 :param is_singleton: if ``True``, reuse existing instances for the same resolved path
168 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s
169 :param stale_threshold: seconds of ``mtime`` inactivity before a marker is stale; defaults to
170 ``3 * heartbeat_interval``, matching etcd's ``LeaseKeepAlive`` convention
171 :param poll_interval: seconds between acquire retries under contention; default 0.25 s
172
173 .. versionadded:: 3.27.0
174
175 """
176
177 _instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary()
178 _instances_lock = threading.Lock()
179
180 def __init__( # noqa: PLR0913
181 self,
182 lock_file: str | os.PathLike[str],
183 timeout: float = -1,
184 *,
185 blocking: bool = True,
186 is_singleton: bool = True, # noqa: ARG002
187 heartbeat_interval: float = 30.0,
188 stale_threshold: float | None = None,
189 poll_interval: float = 0.25,
190 ) -> None:
191 if heartbeat_interval <= 0:
192 msg = f"heartbeat_interval must be positive, got {heartbeat_interval}"
193 raise ValueError(msg)
194 if stale_threshold is None:
195 stale_threshold = heartbeat_interval * 3
196 if stale_threshold <= heartbeat_interval:
197 msg = f"stale_threshold must exceed heartbeat_interval ({stale_threshold} <= {heartbeat_interval})"
198 raise ValueError(msg)
199 if poll_interval <= 0:
200 msg = f"poll_interval must be positive, got {poll_interval}"
201 raise ValueError(msg)
202
203 self.lock_file: str = os.fspath(lock_file)
204 self.timeout: float = timeout
205 self.blocking: bool = blocking
206 self.heartbeat_interval: float = heartbeat_interval
207 self.stale_threshold: float = stale_threshold
208 self.poll_interval: float = poll_interval
209
210 self._paths = _Paths(
211 state=f"{self.lock_file}.state",
212 write=f"{self.lock_file}.write",
213 readers=f"{self.lock_file}.readers",
214 )
215 ensure_directory_exists(self.lock_file)
216 self._locks = _Locks(
217 internal=threading.Lock(),
218 transaction=threading.Lock(),
219 state=SoftFileLock(self._paths.state, timeout=-1),
220 )
221 self._readers_dir_fd: int | None = None
222 self._hold: _Hold | None = None
223 self._fork_invalidated: bool = False
224 self._closed: bool = False
225
226 with _all_instances_lock:
227 _all_instances[Path(self.lock_file).resolve()] = self
228 _register_hooks()
229
230 @contextmanager
231 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
232 """
233 Context manager that acquires and releases a shared read lock.
234
235 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
236
237 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
238 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
239
240 :raises RuntimeError: if a write lock is already held on this instance
241 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
242
243 """
244 self.acquire_read(timeout, blocking=blocking)
245 try:
246 yield
247 finally:
248 self.release()
249
250 @contextmanager
251 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
252 """
253 Context manager that acquires and releases an exclusive write lock.
254
255 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
256
257 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
258 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
259
260 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
261 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
262
263 """
264 self.acquire_write(timeout, blocking=blocking)
265 try:
266 yield
267 finally:
268 self.release()
269
270 def acquire_read(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy:
271 """
272 Acquire a shared read lock.
273
274 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
275 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). On the 0→1
276 transition a daemon heartbeat thread is started that refreshes the reader marker's ``mtime`` every
277 ``heartbeat_interval`` seconds so peers on other hosts do not evict the marker as stale.
278
279 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block
280 indefinitely
281 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable;
282 ``None`` uses the instance default
283
284 :returns: a proxy that can be used as a context manager to release the lock
285
286 :raises RuntimeError: if a write lock is already held on this instance, if this instance was invalidated by
287 :func:`os.fork`, or if :meth:`close` was called
288 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
289
290 """
291 return self._acquire("read", timeout, blocking=blocking)
292
293 def acquire_write(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy:
294 """
295 Acquire an exclusive write lock.
296
297 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
298 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not
299 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
300 :class:`RuntimeError`.
301
302 Writer acquisition runs in two phases. Phase 1 atomically claims ``<path>.write`` via ``O_CREAT | O_EXCL``,
303 which immediately blocks any new reader on any host. Phase 2 waits for existing readers to drain. Writer
304 starvation is impossible: new readers see ``<path>.write`` during phase 2 and wait behind the pending writer.
305
306 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block
307 indefinitely
308 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable;
309 ``None`` uses the instance default
310
311 :returns: a proxy that can be used as a context manager to release the lock
312
313 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if this
314 instance was invalidated by :func:`os.fork`, or if :meth:`close` was called
315 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
316
317 """
318 return self._acquire("write", timeout, blocking=blocking)
319
320 def close(self) -> None:
321 """
322 Release any held lock and release internal filesystem resources.
323
324 Idempotent. After calling this method the instance can no longer acquire locks — subsequent acquires raise
325 :class:`RuntimeError`. A fork-invalidated instance is closed without raising.
326 """
327 self.release(force=True)
328 with self._locks.internal:
329 if self._closed:
330 return
331 self._closed = True
332 if self._readers_dir_fd is not None:
333 with suppress(OSError):
334 os.close(self._readers_dir_fd)
335 self._readers_dir_fd = None
336
337 def release(self, *, force: bool = False) -> None:
338 """
339 Release one level of the current lock.
340
341 When the lock level reaches zero the heartbeat thread is stopped and the held marker file is unlinked. On a
342 fork-invalidated instance (that is, the child of a :func:`os.fork` call made while the parent held a lock)
343 this method is a no-op so inherited ``with`` blocks can unwind cleanly in the child.
344
345 :param force: if ``True``, release the lock completely regardless of the current lock level
346
347 :raises RuntimeError: if no lock is currently held and *force* is ``False``
348
349 """
350 with self._locks.internal:
351 if self._fork_invalidated:
352 # Inherited state from the parent is meaningless in the child; clear any counters and return.
353 self._hold = None
354 return
355 hold = self._hold
356 if hold is None:
357 if force:
358 return
359 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
360 raise RuntimeError(msg)
361 if force:
362 hold.level = 0
363 else:
364 hold.level -= 1
365 if hold.level > 0:
366 return
367 self._hold = None
368
369 # Order matters: signal → join → unlink. A late tick on a deleted marker is harmless, and the
370 # token check in the heartbeat callback would catch any re-acquisition race, but joining first
371 # removes even that theoretical race.
372 hold.heartbeat_stop.set()
373 hold.heartbeat_thread.join(timeout=self.heartbeat_interval + 1.0)
374 if hold.is_reader:
375 _unlink(hold.marker_name, dir_fd=self._readers_dir_fd)
376 else:
377 _unlink(hold.marker_name)
378
379 @classmethod
380 def get_lock(
381 cls,
382 lock_file: str | os.PathLike[str],
383 timeout: float = -1,
384 *,
385 blocking: bool = True,
386 ) -> SoftReadWriteLock:
387 """
388 Return the singleton :class:`SoftReadWriteLock` for *lock_file*.
389
390 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
391 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
392 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
393
394 :returns: the singleton lock instance
395
396 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
397
398 """
399 return cls(lock_file, timeout, blocking=blocking)
400
401 def _acquire(
402 self,
403 mode: _Mode,
404 timeout: float | None,
405 *,
406 blocking: bool | None,
407 ) -> AcquireReturnProxy:
408 timeout = self.timeout if timeout is None else timeout
409 blocking = self.blocking if blocking is None else blocking
410
411 with self._locks.internal:
412 if self._fork_invalidated:
413 msg = f"SoftReadWriteLock on {self.lock_file} was invalidated by fork(); construct a new instance"
414 raise RuntimeError(msg)
415 if self._closed:
416 msg = f"SoftReadWriteLock on {self.lock_file} has been closed"
417 raise RuntimeError(msg)
418 if self._hold is not None:
419 return self._validate_reentrant(mode)
420
421 start = time.perf_counter()
422 if not blocking:
423 acquired = self._locks.transaction.acquire(blocking=False)
424 elif timeout == -1:
425 acquired = self._locks.transaction.acquire(blocking=True)
426 else:
427 acquired = self._locks.transaction.acquire(blocking=True, timeout=timeout)
428 if not acquired:
429 raise Timeout(self.lock_file) from None
430 try:
431 return self._do_acquire_inner(mode, timeout, start, blocking=blocking)
432 finally:
433 self._locks.transaction.release()
434
435 def _do_acquire_inner(
436 self,
437 mode: _Mode,
438 effective_timeout: float,
439 start: float,
440 *,
441 blocking: bool,
442 ) -> AcquireReturnProxy:
443 with self._locks.internal:
444 if self._hold is not None:
445 return self._validate_reentrant(mode)
446 deadline = None if effective_timeout == -1 else start + effective_timeout
447 token = secrets.token_hex(16)
448 if mode == "write":
449 marker_name, is_reader = self._acquire_writer_slot(token, deadline=deadline, blocking=blocking)
450 else:
451 marker_name, is_reader = self._acquire_reader_slot(token, deadline=deadline, blocking=blocking)
452 stop_event = threading.Event()
453 heartbeat = _HeartbeatThread(
454 refresh=self._refresh_marker,
455 interval=self.heartbeat_interval,
456 stop_event=stop_event,
457 name=f"filelock-heartbeat-{id(self):x}",
458 )
459 with self._locks.internal:
460 self._hold = _Hold(
461 level=1,
462 mode=mode,
463 write_thread_id=threading.get_ident() if mode == "write" else None,
464 marker_name=marker_name,
465 is_reader=is_reader,
466 token=token,
467 heartbeat_thread=heartbeat,
468 heartbeat_stop=stop_event,
469 )
470 heartbeat.start()
471 return AcquireReturnProxy(lock=self)
472
473 def _validate_reentrant(self, mode: _Mode) -> AcquireReturnProxy:
474 hold = self._hold
475 assert hold is not None # noqa: S101
476 if hold.mode != mode:
477 opposite = "write" if mode == "read" else "read"
478 direction = "downgrade" if mode == "read" else "upgrade"
479 msg = (
480 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
481 f"already holding a {opposite} lock ({direction} not allowed)"
482 )
483 raise RuntimeError(msg)
484 if mode == "write" and (cur := threading.get_ident()) != hold.write_thread_id:
485 msg = (
486 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
487 f"from thread {cur} while it is held by thread {hold.write_thread_id}"
488 )
489 raise RuntimeError(msg)
490 hold.level += 1
491 return AcquireReturnProxy(lock=self)
492
493 def _acquire_writer_slot(
494 self,
495 token: str,
496 *,
497 deadline: float | None,
498 blocking: bool,
499 ) -> tuple[str, bool]:
500 # Phase 2 scans readers/ via dirfd (where supported), so we need it open even though writers never
501 # create files inside.
502 self._open_readers_dir()
503
504 def try_claim_writer() -> bool:
505 with self._locks.state:
506 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
507 if _file_exists(self._paths.write):
508 return False
509 try:
510 _atomic_create_marker(self._paths.write, token)
511 except FileExistsError:
512 return False
513 return True
514
515 def readers_drained_touching() -> bool:
516 with self._locks.state:
517 # Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed
518 # ``stale_threshold`` under contention and a peer would treat us as stale and evict us.
519 with suppress(OSError):
520 _touch(self._paths.write)
521 self._break_stale_readers(time.time())
522 return not self._any_readers()
523
524 self._wait_for(try_claim_writer, deadline=deadline, blocking=blocking)
525 try:
526 self._wait_for(readers_drained_touching, deadline=deadline, blocking=blocking)
527 except Timeout:
528 # Give up our writer claim so readers can make progress again.
529 _unlink(self._paths.write)
530 raise
531 return self._paths.write, False
532
533 def _acquire_reader_slot(
534 self,
535 token: str,
536 *,
537 deadline: float | None,
538 blocking: bool,
539 ) -> tuple[str, bool]:
540 self._open_readers_dir()
541 reader_name = f"{uuid.uuid4().hex}.{os.getpid()}"
542 dir_fd = self._readers_dir_fd
543 full_reader_path = str(Path(self._paths.readers) / reader_name)
544
545 def try_claim_reader() -> bool:
546 with self._locks.state:
547 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
548 if _file_exists(self._paths.write):
549 return False
550 if dir_fd is not None:
551 _atomic_create_marker(reader_name, token, dir_fd=dir_fd)
552 else: # pragma: win32 cover
553 _atomic_create_marker(full_reader_path, token)
554 return True
555
556 self._wait_for(try_claim_reader, deadline=deadline, blocking=blocking)
557 return (reader_name if dir_fd is not None else full_reader_path), True
558
559 def _wait_for(
560 self,
561 predicate: Callable[[], bool],
562 *,
563 deadline: float | None,
564 blocking: bool,
565 ) -> None:
566 while True:
567 if predicate():
568 return
569 now = time.perf_counter()
570 if not blocking:
571 raise Timeout(self.lock_file)
572 if deadline is not None and now >= deadline:
573 raise Timeout(self.lock_file)
574 sleep_for = self.poll_interval
575 if deadline is not None:
576 sleep_for = min(sleep_for, max(deadline - now, 0.0))
577 time.sleep(sleep_for)
578
579 def _open_readers_dir(self) -> None:
580 readers_path = Path(self._paths.readers)
581 with suppress(FileExistsError):
582 readers_path.mkdir(mode=0o700)
583 # mkdir has no O_NOFOLLOW, so verify via lstat that we did not land on an attacker-placed symlink
584 # or a regular file before we open or scan inside.
585 st = os.lstat(self._paths.readers)
586 if stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode):
587 msg = f"{self._paths.readers} exists but is not a directory or is a symlink; refusing to use it"
588 raise RuntimeError(msg)
589 if self._readers_dir_fd is None and _SUPPORTS_DIR_FD:
590 flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) | _O_NOFOLLOW
591 self._readers_dir_fd = os.open(self._paths.readers, flags)
592
593 def _any_readers(self) -> bool:
594 for _ in self._iter_reader_entries():
595 return True
596 return False
597
598 def _iter_reader_entries(self) -> Generator[tuple[str, bool]]:
599 """
600 Yield ``(name, dirfd_relative)`` pairs for every live reader marker.
601
602 ``dirfd_relative`` is ``True`` when *name* should be passed to ``dir_fd=``-aware syscalls; ``False``
603 when *name* is a full path because dirfd-relative I/O is unavailable on this platform.
604 """
605 if self._readers_dir_fd is not None:
606 with os.scandir(self._readers_dir_fd) as it:
607 for entry in it:
608 if not _is_housekeeping_name(entry.name):
609 yield entry.name, True
610 return
611 readers_path = Path(self._paths.readers) # pragma: win32 cover
612 with os.scandir(readers_path) as it: # pragma: win32 cover
613 for entry in it: # pragma: win32 cover
614 if not _is_housekeeping_name(entry.name): # pragma: win32 cover
615 yield str(readers_path / entry.name), False # pragma: win32 cover
616
617 def _break_stale_readers(self, now: float) -> None:
618 names: list[tuple[str, int | None]] = []
619 try:
620 for name, dirfd_relative in self._iter_reader_entries():
621 names.append((name, self._readers_dir_fd if dirfd_relative else None))
622 except OSError: # pragma: no cover - transient NFS scandir hiccup
623 return
624 for name, fd in names:
625 _break_stale_marker(name, stale_threshold=self.stale_threshold, now=now, dir_fd=fd)
626
627 def _refresh_marker(self) -> bool:
628 with self._locks.internal:
629 hold = self._hold
630 if hold is None: # pragma: no cover - race between stop_event.set and join
631 return False
632 marker_name = hold.marker_name
633 token = hold.token
634 dir_fd = self._readers_dir_fd if hold.is_reader else None
635
636 read_result = _read_marker(marker_name, dir_fd=dir_fd)
637 if read_result is None:
638 return False
639 info, _mtime = read_result
640 # Token mismatch means another process already evicted our marker and created its own; stop the
641 # thread so it does not touch a stranger's file.
642 if info is None or not hmac.compare_digest(info.token, token):
643 return False
644 try:
645 _touch(marker_name, dir_fd=dir_fd)
646 except FileNotFoundError: # pragma: no cover - race between successful read and touch
647 return False
648 return True
649
650 def _reset_after_fork_in_child(self) -> None: # pragma: no cover - fork child not tracked
651 # Replace every lock this instance owns with a fresh one; the inherited locks may still be held
652 # by threads that no longer exist in the child. The readers dirfd and the SoftFileLock state
653 # mutex both get dropped for the same reason — the child re-creates them on its next acquire.
654 self._locks = _Locks(
655 internal=threading.Lock(),
656 transaction=threading.Lock(),
657 state=SoftFileLock(self._paths.state, timeout=-1),
658 )
659 self._hold = None
660 self._readers_dir_fd = None
661 self._fork_invalidated = True
662
663
664class _HeartbeatThread(threading.Thread):
665 def __init__(
666 self,
667 refresh: Callable[[], bool],
668 interval: float,
669 stop_event: threading.Event,
670 name: str,
671 ) -> None:
672 super().__init__(name=name, daemon=True)
673 self._refresh = refresh
674 self._interval = interval
675 self._stop_event = stop_event
676
677 def run(self) -> None:
678 while not self._stop_event.wait(self._interval):
679 if not self._refresh():
680 self._stop_event.set()
681 return
682
683
684def _atomic_create_marker(name: str, token: str, *, dir_fd: int | None = None) -> None:
685 # O_NOFOLLOW blocks the symlink-overwrite attack where an attacker pre-creates the marker path as a
686 # symlink pointing at a victim file. Mode 0o600 keeps the token unreadable to other users.
687 flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY | _O_NOFOLLOW
688 if _SUPPORTS_DIR_FD and dir_fd is not None:
689 fd = os.open(name, flags, 0o600, dir_fd=dir_fd)
690 else:
691 fd = os.open(name, flags, 0o600)
692 try:
693 content = f"{token}\n{os.getpid()}\n{socket.gethostname()}\n".encode("ascii")
694 os.write(fd, content)
695 finally:
696 os.close(fd)
697
698
699def _read_marker(name: str, *, dir_fd: int | None = None) -> tuple[_MarkerInfo | None, float] | None:
700 # The file is ours; these guard a hostile mid-flight swap. O_NOFOLLOW rejects a symlink; O_NONBLOCK keeps
701 # a real FIFO from blocking the open forever, so it reads as a malformed marker instead of wedging a peer
702 # that holds the state lock.
703 flags = os.O_RDONLY | _O_NOFOLLOW | _O_NONBLOCK
704 try:
705 fd = os.open(name, flags, dir_fd=dir_fd) if _SUPPORTS_DIR_FD and dir_fd is not None else os.open(name, flags)
706 except OSError:
707 return None
708 try:
709 try:
710 st = os.fstat(fd)
711 data = os.read(fd, _MAX_MARKER_SIZE + 1)
712 except OSError: # pragma: no cover - e.g. EAGAIN from a hostile FIFO that has a writer attached
713 return None
714 finally:
715 os.close(fd)
716 return _parse_marker_bytes(data), st.st_mtime
717
718
719def _parse_marker_bytes(data: bytes) -> _MarkerInfo | None:
720 # Trust nothing about attacker-controlled markers; any deviation returns None so callers fall through
721 # to stale cleanup. ``re.match`` caches compiled patterns internally, so the regex is built only once
722 # despite being defined inline.
723 if not data or len(data) > _MAX_MARKER_SIZE:
724 return None
725 try:
726 text = data.decode("ascii")
727 except UnicodeDecodeError:
728 return None
729 match = re.match(
730 r"""
731 \A # start of string
732 (?P<token> [0-9a-f]{32} ) \n # 128-bit hex token
733 (?P<pid> [1-9][0-9]{0,9} ) \n # decimal pid: no leading zero, ≤ 10 digits
734 (?P<hostname> [\x21-\x7e]{1,253}) # printable non-whitespace ASCII (RFC 1123 hostname limit)
735 \n* # tolerate sloppy writers that append extra newlines
736 \Z # end of string
737 """,
738 text,
739 re.VERBOSE,
740 )
741 if match is None:
742 return None
743 pid = int(match["pid"], 10)
744 if pid > 2**31 - 1:
745 return None
746 return _MarkerInfo(token=match["token"], pid=pid, hostname=match["hostname"])
747
748
749def _break_stale_marker( # noqa: PLR0911
750 name: str,
751 *,
752 stale_threshold: float,
753 now: float,
754 dir_fd: int | None = None,
755) -> bool:
756 # Atomic break pattern: read → rename to unique break-name → re-verify → unlink. The rename gives us a
757 # private name nobody else can touch; if the re-verify sees a newer mtime or a different token, the
758 # legitimate holder's heartbeat fired between read and rename and we must abort (leaving the .break.*
759 # file behind rather than rollback-renaming, because rollback is itself racy).
760 read_result = _read_marker(name, dir_fd=dir_fd)
761 if read_result is None:
762 return False
763 info_before, mtime_before = read_result
764 if now - mtime_before <= stale_threshold:
765 return False
766 if info_before is None:
767 _unlink(name, dir_fd=dir_fd)
768 return True
769
770 break_name = f"{name}{_BREAK_SUFFIX}.{os.getpid()}.{secrets.token_hex(16)}"
771 try:
772 if _SUPPORTS_DIR_FD and dir_fd is not None:
773 os.rename(name, break_name, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
774 else:
775 Path(name).rename(break_name)
776 except OSError: # pragma: no cover - race where the marker vanishes between read and rename
777 return False
778
779 read_after = _read_marker(break_name, dir_fd=dir_fd)
780 if read_after is None: # pragma: no cover - race where a peer unlinks the break-name file
781 return False
782 info_after, mtime_after = read_after
783 if info_after is None: # pragma: no cover - content replaced post-rename by a racing peer
784 _unlink(break_name, dir_fd=dir_fd)
785 return True
786 if not hmac.compare_digest(info_before.token, info_after.token): # pragma: no cover - race only
787 return False
788 if mtime_after > mtime_before: # pragma: no cover - heartbeat raced our rename
789 return False
790 _unlink(break_name, dir_fd=dir_fd)
791 return True
792
793
794def _unlink(name: str, *, dir_fd: int | None = None) -> None:
795 with suppress(FileNotFoundError):
796 if _SUPPORTS_DIR_FD and dir_fd is not None:
797 # Path.unlink has no dir_fd support, so we stay on os.unlink for the dirfd path.
798 os.unlink(name, dir_fd=dir_fd)
799 else:
800 Path(name).unlink()
801
802
803def _touch(name: str, *, dir_fd: int | None = None) -> None:
804 if _SUPPORTS_DIR_FD and dir_fd is not None:
805 os.utime(name, None, dir_fd=dir_fd)
806 else:
807 os.utime(name, None)
808
809
810def _file_exists(path: str) -> bool:
811 try:
812 st = os.lstat(path)
813 except FileNotFoundError:
814 return False
815 return stat.S_ISREG(st.st_mode)
816
817
818def _is_housekeeping_name(name: str) -> bool:
819 return name.startswith(".") or _BREAK_SUFFIX in name
820
821
822def _reset_all_after_fork() -> None: # pragma: no cover - fork child, not tracked by coverage
823 global _all_instances_lock # noqa: PLW0603
824 # User-created threading locks do not auto-reset across fork: any lock held by a parent thread stays
825 # locked in the child with no owner to release it. Replace the module-level lock and every instance's
826 # locks with fresh ones; the child is single-threaded at this point so no synchronization is needed.
827 _all_instances_lock = threading.Lock()
828 for instance in list(_all_instances.values()):
829 instance._reset_after_fork_in_child() # noqa: SLF001
830
831
832def _cleanup_all_instances() -> None: # pragma: no cover - runs from atexit at interpreter shutdown
833 for instance in list(_all_instances.values()):
834 with suppress(Exception):
835 instance.release(force=True)
836
837
838def _register_hooks() -> None:
839 global _atexit_registered, _fork_registered # noqa: PLW0603
840 if not _atexit_registered:
841 atexit.register(_cleanup_all_instances)
842 _atexit_registered = True
843 # after_in_child replaces inherited state so the child cannot double-own any lock the parent held.
844 if not _fork_registered and hasattr(os, "register_at_fork"):
845 os.register_at_fork(after_in_child=_reset_all_after_fork)
846 _fork_registered = True
847
848
849__all__ = [
850 "SoftReadWriteLock",
851]