1"""Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives."""
2
3from __future__ import annotations
4
5import atexit
6import hmac
7import os
8import re
9import secrets
10import socket
11import stat
12import sys
13import threading
14import time
15import uuid
16from contextlib import contextmanager, suppress
17from dataclasses import dataclass
18from pathlib import Path
19from typing import TYPE_CHECKING, Literal
20from weakref import WeakValueDictionary
21
22from filelock._api import AcquireReturnProxy
23from filelock._error import Timeout
24from filelock._soft import SoftFileLock
25from filelock._util import ensure_directory_exists
26
27if TYPE_CHECKING:
28 from collections.abc import Callable, Generator
29
30
31_Mode = Literal["read", "write"]
32_BREAK_SUFFIX = ".break"
33_MAX_MARKER_SIZE = 1024
34_O_NOFOLLOW = getattr(os, "O_NOFOLLOW", 0)
35_O_NONBLOCK = getattr(os, "O_NONBLOCK", 0)
36# dirfd-relative I/O is a Unix-only optimization; Windows cannot ``os.open()`` a directory at all, and
37# its ``os`` module skips dir_fd support entirely. When disabled, callers fall back to full-path ops.
38_SUPPORTS_DIR_FD = sys.platform != "win32" and os.open in os.supports_dir_fd
39
40_all_instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary()
41_all_instances_lock = threading.Lock()
42_atexit_registered = False
43_fork_registered = False
44
45
46@dataclass(frozen=True)
47class _Paths:
48 state: str
49 write: str
50 readers: str
51
52
53@dataclass
54class _Locks:
55 internal: threading.Lock
56 transaction: threading.Lock
57 state: SoftFileLock
58
59
60@dataclass(frozen=True)
61class _MarkerInfo:
62 token: str
63 pid: int
64 hostname: str
65
66
67@dataclass
68class _Hold:
69 """Everything that exists only while a lock is held; ``None`` when the instance has no lock."""
70
71 level: int
72 mode: _Mode
73 write_thread_id: int | None
74 marker_name: str
75 is_reader: bool
76 token: str
77 heartbeat_thread: _HeartbeatThread
78 heartbeat_stop: threading.Event
79
80
81class _SoftRWMeta(type):
82 _instances: WeakValueDictionary[Path, SoftReadWriteLock]
83 _instances_lock: threading.Lock
84
85 def __call__( # noqa: PLR0913
86 cls,
87 lock_file: str | os.PathLike[str],
88 timeout: float = -1,
89 *,
90 blocking: bool = True,
91 is_singleton: bool = True,
92 heartbeat_interval: float = 30.0,
93 stale_threshold: float | None = None,
94 poll_interval: float = 0.25,
95 ) -> SoftReadWriteLock:
96 if not is_singleton:
97 return super().__call__(
98 lock_file,
99 timeout,
100 blocking=blocking,
101 is_singleton=is_singleton,
102 heartbeat_interval=heartbeat_interval,
103 stale_threshold=stale_threshold,
104 poll_interval=poll_interval,
105 )
106
107 normalized = Path(lock_file).resolve()
108 with cls._instances_lock:
109 instance = cls._instances.get(normalized)
110 if instance is None:
111 instance = super().__call__(
112 lock_file,
113 timeout,
114 blocking=blocking,
115 is_singleton=is_singleton,
116 heartbeat_interval=heartbeat_interval,
117 stale_threshold=stale_threshold,
118 poll_interval=poll_interval,
119 )
120 cls._instances[normalized] = instance
121 elif instance.timeout != timeout or instance.blocking != blocking:
122 msg = (
123 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
124 f" cannot be changed to timeout={timeout}, blocking={blocking}"
125 )
126 raise ValueError(msg)
127 return instance
128
129
130class SoftReadWriteLock(metaclass=_SoftRWMeta):
131 """
132 Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives.
133
134 Use this class instead of :class:`~filelock.ReadWriteLock` when the lock file lives on a network
135 filesystem (NFS, Lustre with ``-o flock``, HPC cluster shared storage). ``ReadWriteLock`` is backed
136 by SQLite and cannot run on NFS because SQLite's ``fcntl`` locking is unreliable there.
137
138 Layout on disk for a lock at ``foo.lock``:
139
140 - ``foo.lock.state`` — a :class:`SoftFileLock` taken only during state transitions (microseconds).
141 - ``foo.lock.write`` — writer marker; its presence means a writer is claiming or holding the lock.
142 - ``foo.lock.readers/<host>.<pid>.<uuid>`` — one file per reader.
143
144 Each marker stores a random token (``secrets.token_hex(16)``), the holder's pid, and the holder's
145 hostname. A daemon heartbeat thread refreshes ``mtime`` on every held marker. A marker whose mtime
146 has not advanced in ``stale_threshold`` seconds may be evicted by any process on any host, giving
147 correct behavior when a compute node crashes with a lock held.
148
149 Writer acquire is two-phase and writer-preferring: phase 1 claims ``.write`` (blocking any new
150 reader), phase 2 waits for existing readers to drain. Writer starvation is impossible.
151
152 Reentrancy, upgrade/downgrade rules, thread pinning, and singleton caching by resolved path match
153 :class:`~filelock.ReadWriteLock`.
154
155 Forking while holding a lock invalidates the inherited instance in the child so the child cannot
156 double-own the lock with its parent; ``release()`` on a fork-invalidated instance is a no-op, and
157 the child must re-acquire if it needs a lock.
158
159 Trust boundary: protects against same-UID non-cooperating processes (one host or cross-host) and
160 same-host different-UID users via ``0o600`` / ``0o700`` permissions. Does not protect against root
161 compromise, NTP tampering on same-UID cross-host nodes, or multi-tenant mounts where hostile
162 co-tenants share the UID.
163
164 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
165 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
166 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention
167 :param is_singleton: if ``True``, reuse existing instances for the same resolved path
168 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s
169 :param stale_threshold: seconds of ``mtime`` inactivity before a marker is stale; defaults to
170 ``3 * heartbeat_interval``, matching etcd's ``LeaseKeepAlive`` convention
171 :param poll_interval: seconds between acquire retries under contention; default 0.25 s
172
173 .. versionadded:: 3.27.0
174
175 """
176
177 _instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary()
178 _instances_lock = threading.Lock()
179
180 def __init__( # noqa: PLR0913
181 self,
182 lock_file: str | os.PathLike[str],
183 timeout: float = -1,
184 *,
185 blocking: bool = True,
186 is_singleton: bool = True, # noqa: ARG002
187 heartbeat_interval: float = 30.0,
188 stale_threshold: float | None = None,
189 poll_interval: float = 0.25,
190 ) -> None:
191 if heartbeat_interval <= 0:
192 msg = f"heartbeat_interval must be positive, got {heartbeat_interval}"
193 raise ValueError(msg)
194 if stale_threshold is None:
195 stale_threshold = heartbeat_interval * 3
196 if stale_threshold <= heartbeat_interval:
197 msg = f"stale_threshold must exceed heartbeat_interval ({stale_threshold} <= {heartbeat_interval})"
198 raise ValueError(msg)
199 if poll_interval <= 0:
200 msg = f"poll_interval must be positive, got {poll_interval}"
201 raise ValueError(msg)
202
203 self.lock_file: str = os.fspath(lock_file)
204 self.timeout: float = timeout
205 self.blocking: bool = blocking
206 self.heartbeat_interval: float = heartbeat_interval
207 self.stale_threshold: float = stale_threshold
208 self.poll_interval: float = poll_interval
209
210 self._paths = _Paths(
211 state=f"{self.lock_file}.state",
212 write=f"{self.lock_file}.write",
213 readers=f"{self.lock_file}.readers",
214 )
215 ensure_directory_exists(self.lock_file)
216 self._locks = _Locks(
217 internal=threading.Lock(),
218 transaction=threading.Lock(),
219 state=SoftFileLock(self._paths.state, timeout=-1),
220 )
221 self._readers_dir_fd: int | None = None
222 self._hold: _Hold | None = None
223 self._fork_invalidated: bool = False
224 self._closed: bool = False
225
226 with _all_instances_lock:
227 _all_instances[Path(self.lock_file).resolve()] = self
228 _register_hooks()
229
230 @contextmanager
231 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
232 """
233 Context manager that acquires and releases a shared read lock.
234
235 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
236
237 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
238 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
239
240 :raises RuntimeError: if a write lock is already held on this instance
241 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
242
243 """
244 self.acquire_read(timeout, blocking=blocking)
245 try:
246 yield
247 finally:
248 self.release()
249
250 @contextmanager
251 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
252 """
253 Context manager that acquires and releases an exclusive write lock.
254
255 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
256
257 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
258 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
259
260 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
261 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
262
263 """
264 self.acquire_write(timeout, blocking=blocking)
265 try:
266 yield
267 finally:
268 self.release()
269
270 def acquire_read(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy:
271 """
272 Acquire a shared read lock.
273
274 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
275 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). On the 0→1
276 transition a daemon heartbeat thread is started that refreshes the reader marker's ``mtime`` every
277 ``heartbeat_interval`` seconds so peers on other hosts do not evict the marker as stale.
278
279 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block
280 indefinitely
281 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable;
282 ``None`` uses the instance default
283
284 :returns: a proxy that can be used as a context manager to release the lock
285
286 :raises RuntimeError: if a write lock is already held on this instance, if this instance was invalidated by
287 :func:`os.fork`, or if :meth:`close` was called
288 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
289
290 """
291 return self._acquire("read", timeout, blocking=blocking)
292
293 def acquire_write(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy:
294 """
295 Acquire an exclusive write lock.
296
297 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
298 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not
299 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
300 :class:`RuntimeError`.
301
302 Writer acquisition runs in two phases. Phase 1 atomically claims ``<path>.write`` via ``O_CREAT | O_EXCL``,
303 which immediately blocks any new reader on any host. Phase 2 waits for existing readers to drain. Writer
304 starvation is impossible: new readers see ``<path>.write`` during phase 2 and wait behind the pending writer.
305
306 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block
307 indefinitely
308 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable;
309 ``None`` uses the instance default
310
311 :returns: a proxy that can be used as a context manager to release the lock
312
313 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if this
314 instance was invalidated by :func:`os.fork`, or if :meth:`close` was called
315 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
316
317 """
318 return self._acquire("write", timeout, blocking=blocking)
319
320 def close(self) -> None:
321 """
322 Release any held lock and release internal filesystem resources.
323
324 Idempotent. After calling this method the instance can no longer acquire locks — subsequent acquires raise
325 :class:`RuntimeError`. A fork-invalidated instance is closed without raising.
326 """
327 self.release(force=True)
328 with self._locks.internal:
329 if self._closed:
330 return
331 self._closed = True
332 if self._readers_dir_fd is not None:
333 with suppress(OSError):
334 os.close(self._readers_dir_fd)
335 self._readers_dir_fd = None
336
337 def release(self, *, force: bool = False) -> None:
338 """
339 Release one level of the current lock.
340
341 When the lock level reaches zero the heartbeat thread is stopped and the held marker file is unlinked. On a
342 fork-invalidated instance (that is, the child of a :func:`os.fork` call made while the parent held a lock)
343 this method is a no-op so inherited ``with`` blocks can unwind cleanly in the child.
344
345 :param force: if ``True``, release the lock completely regardless of the current lock level
346
347 :raises RuntimeError: if no lock is currently held and *force* is ``False``
348
349 """
350 with self._locks.internal:
351 if self._fork_invalidated:
352 # Inherited state from the parent is meaningless in the child; clear any counters and return.
353 self._hold = None
354 return
355 hold = self._hold
356 if hold is None:
357 if force:
358 return
359 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
360 raise RuntimeError(msg)
361 if force:
362 hold.level = 0
363 else:
364 hold.level -= 1
365 if hold.level > 0:
366 return
367 self._hold = None
368
369 # Order matters: signal → join → unlink. A late tick on a deleted marker is harmless, and the
370 # token check in the heartbeat callback would catch any re-acquisition race, but joining first
371 # removes even that theoretical race.
372 hold.heartbeat_stop.set()
373 hold.heartbeat_thread.join(timeout=self.heartbeat_interval + 1.0)
374 if hold.is_reader:
375 _unlink(hold.marker_name, dir_fd=self._readers_dir_fd)
376 else:
377 _unlink(hold.marker_name)
378
379 @classmethod
380 def get_lock(
381 cls,
382 lock_file: str | os.PathLike[str],
383 timeout: float = -1,
384 *,
385 blocking: bool = True,
386 ) -> SoftReadWriteLock:
387 """
388 Return the singleton :class:`SoftReadWriteLock` for *lock_file*.
389
390 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
391 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
392 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
393
394 :returns: the singleton lock instance
395
396 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
397
398 """
399 return cls(lock_file, timeout, blocking=blocking)
400
401 def _acquire(
402 self,
403 mode: _Mode,
404 timeout: float | None,
405 *,
406 blocking: bool | None,
407 ) -> AcquireReturnProxy:
408 timeout = self.timeout if timeout is None else timeout
409 blocking = self.blocking if blocking is None else blocking
410
411 with self._locks.internal:
412 if self._fork_invalidated:
413 msg = f"SoftReadWriteLock on {self.lock_file} was invalidated by fork(); construct a new instance"
414 raise RuntimeError(msg)
415 if self._closed:
416 msg = f"SoftReadWriteLock on {self.lock_file} has been closed"
417 raise RuntimeError(msg)
418 if self._hold is not None:
419 return self._validate_reentrant(mode)
420
421 start = time.perf_counter()
422 if not blocking:
423 acquired = self._locks.transaction.acquire(blocking=False)
424 elif timeout == -1:
425 acquired = self._locks.transaction.acquire(blocking=True)
426 else:
427 acquired = self._locks.transaction.acquire(blocking=True, timeout=timeout)
428 if not acquired:
429 raise Timeout(self.lock_file) from None
430 try:
431 return self._do_acquire_inner(mode, timeout, start, blocking=blocking)
432 finally:
433 self._locks.transaction.release()
434
435 def _do_acquire_inner(
436 self,
437 mode: _Mode,
438 effective_timeout: float,
439 start: float,
440 *,
441 blocking: bool,
442 ) -> AcquireReturnProxy:
443 with self._locks.internal:
444 if self._hold is not None:
445 return self._validate_reentrant(mode)
446 deadline = None if effective_timeout == -1 else start + effective_timeout
447 token = secrets.token_hex(16)
448 if mode == "write":
449 marker_name, is_reader = self._acquire_writer_slot(token, deadline=deadline, blocking=blocking)
450 else:
451 marker_name, is_reader = self._acquire_reader_slot(token, deadline=deadline, blocking=blocking)
452 stop_event = threading.Event()
453 heartbeat = _HeartbeatThread(
454 refresh=self._refresh_marker,
455 interval=self.heartbeat_interval,
456 stop_event=stop_event,
457 name=f"filelock-heartbeat-{id(self):x}",
458 )
459 with self._locks.internal:
460 self._hold = _Hold(
461 level=1,
462 mode=mode,
463 write_thread_id=threading.get_ident() if mode == "write" else None,
464 marker_name=marker_name,
465 is_reader=is_reader,
466 token=token,
467 heartbeat_thread=heartbeat,
468 heartbeat_stop=stop_event,
469 )
470 heartbeat.start()
471 return AcquireReturnProxy(lock=self)
472
473 def _validate_reentrant(self, mode: _Mode) -> AcquireReturnProxy:
474 hold = self._hold
475 assert hold is not None # noqa: S101
476 if hold.mode != mode:
477 opposite = "write" if mode == "read" else "read"
478 direction = "downgrade" if mode == "read" else "upgrade"
479 msg = (
480 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
481 f"already holding a {opposite} lock ({direction} not allowed)"
482 )
483 raise RuntimeError(msg)
484 if mode == "write" and (cur := threading.get_ident()) != hold.write_thread_id:
485 msg = (
486 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
487 f"from thread {cur} while it is held by thread {hold.write_thread_id}"
488 )
489 raise RuntimeError(msg)
490 hold.level += 1
491 return AcquireReturnProxy(lock=self)
492
493 def _acquire_writer_slot(
494 self,
495 token: str,
496 *,
497 deadline: float | None,
498 blocking: bool,
499 ) -> tuple[str, bool]:
500 # Phase 2 scans readers/ via dirfd (where supported), so we need it open even though writers never
501 # create files inside.
502 self._open_readers_dir()
503
504 def try_claim_writer() -> bool:
505 with self._locks.state:
506 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
507 if _file_exists(self._paths.write):
508 return False
509 try:
510 _atomic_create_marker(self._paths.write, token)
511 except FileExistsError:
512 return False
513 return True
514
515 def readers_drained_touching() -> bool:
516 with self._locks.state:
517 # Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed
518 # ``stale_threshold`` under contention and a peer would treat us as stale and evict us.
519 with suppress(OSError):
520 _touch(self._paths.write)
521 self._break_stale_readers(time.time())
522 return not self._any_readers()
523
524 self._wait_for(try_claim_writer, deadline=deadline, blocking=blocking)
525 try:
526 self._wait_for(readers_drained_touching, deadline=deadline, blocking=blocking)
527 except Timeout:
528 # Give up our writer claim so readers can make progress again.
529 _unlink(self._paths.write)
530 raise
531 return self._paths.write, False
532
533 def _acquire_reader_slot(
534 self,
535 token: str,
536 *,
537 deadline: float | None,
538 blocking: bool,
539 ) -> tuple[str, bool]:
540 self._open_readers_dir()
541 reader_name = f"{uuid.uuid4().hex}.{os.getpid()}"
542 dir_fd = self._readers_dir_fd
543 full_reader_path = str(Path(self._paths.readers) / reader_name)
544
545 def try_claim_reader() -> bool:
546 with self._locks.state:
547 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
548 if _file_exists(self._paths.write):
549 return False
550 if dir_fd is not None:
551 _atomic_create_marker(reader_name, token, dir_fd=dir_fd)
552 else: # pragma: win32 cover
553 _atomic_create_marker(full_reader_path, token)
554 return True
555
556 self._wait_for(try_claim_reader, deadline=deadline, blocking=blocking)
557 return (reader_name if dir_fd is not None else full_reader_path), True
558
559 def _wait_for(
560 self,
561 predicate: Callable[[], bool],
562 *,
563 deadline: float | None,
564 blocking: bool,
565 ) -> None:
566 while True:
567 if predicate():
568 return
569 now = time.perf_counter()
570 if not blocking:
571 raise Timeout(self.lock_file)
572 if deadline is not None and now >= deadline:
573 raise Timeout(self.lock_file)
574 sleep_for = self.poll_interval
575 if deadline is not None:
576 sleep_for = min(sleep_for, max(deadline - now, 0.0))
577 time.sleep(sleep_for)
578
579 def _open_readers_dir(self) -> None:
580 readers_path = Path(self._paths.readers)
581 with suppress(FileExistsError):
582 readers_path.mkdir(mode=0o700)
583 # mkdir has no O_NOFOLLOW, so verify via lstat that we did not land on an attacker-placed symlink
584 # or a regular file before we open or scan inside.
585 st = os.lstat(self._paths.readers)
586 if stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode):
587 msg = f"{self._paths.readers} exists but is not a directory or is a symlink; refusing to use it"
588 raise RuntimeError(msg)
589 if self._readers_dir_fd is None and _SUPPORTS_DIR_FD:
590 flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) | _O_NOFOLLOW
591 self._readers_dir_fd = os.open(self._paths.readers, flags)
592
593 def _any_readers(self) -> bool:
594 for _ in self._iter_reader_entries():
595 return True
596 return False
597
598 def _iter_reader_entries(self) -> Generator[tuple[str, bool]]:
599 """
600 Yield ``(name, dirfd_relative)`` pairs for every live reader marker.
601
602 ``dirfd_relative`` is ``True`` when *name* should be passed to ``dir_fd=``-aware syscalls; ``False``
603 when *name* is a full path because dirfd-relative I/O is unavailable on this platform.
604 """
605 if self._readers_dir_fd is not None:
606 with os.scandir(self._readers_dir_fd) as it:
607 for entry in it:
608 if not _is_housekeeping_name(entry.name):
609 yield entry.name, True
610 return
611 readers_path = Path(self._paths.readers) # pragma: win32 cover
612 with os.scandir(readers_path) as it: # pragma: win32 cover
613 for entry in it: # pragma: win32 cover
614 if not _is_housekeeping_name(entry.name): # pragma: win32 cover
615 yield str(readers_path / entry.name), False # pragma: win32 cover
616
617 def _break_stale_readers(self, now: float) -> None:
618 names: list[tuple[str, int | None]] = []
619 try:
620 for name, dirfd_relative in self._iter_reader_entries():
621 names.append((name, self._readers_dir_fd if dirfd_relative else None))
622 except OSError: # pragma: no cover - transient NFS scandir hiccup
623 return
624 for name, fd in names:
625 _break_stale_marker(name, stale_threshold=self.stale_threshold, now=now, dir_fd=fd)
626
627 def _refresh_marker(self) -> bool:
628 with self._locks.internal:
629 hold = self._hold
630 if hold is None: # pragma: no cover - race between stop_event.set and join
631 return False
632 marker_name = hold.marker_name
633 token = hold.token
634 dir_fd = self._readers_dir_fd if hold.is_reader else None
635
636 read_result = _read_marker(marker_name, dir_fd=dir_fd)
637 if read_result is None:
638 return False
639 info, _mtime = read_result
640 # Token mismatch means another process already evicted our marker and created its own; stop the
641 # thread so it does not touch a stranger's file.
642 if info is None or not hmac.compare_digest(info.token, token):
643 return False
644 # A transient touch failure (ESTALE / EIO on the NFS-style filesystems this lock targets) must not
645 # kill the heartbeat thread: the read above just confirmed the marker is still ours, so swallow the
646 # error and retry on the next tick rather than letting the lease lapse while we still believe we
647 # hold the lock. FileNotFoundError is different in kind -- the marker we just read has since been
648 # unlinked, i.e. a peer evicted us -- so stop the heartbeat at once instead of waiting a tick.
649 try:
650 _touch(marker_name, dir_fd=dir_fd)
651 except FileNotFoundError:
652 return False
653 except OSError:
654 pass
655 return True
656
657 def _reset_after_fork_in_child(self) -> None: # pragma: no cover - fork child not tracked
658 # Replace every lock this instance owns with a fresh one; the inherited locks may still be held
659 # by threads that no longer exist in the child. The readers dirfd and the SoftFileLock state
660 # mutex both get dropped for the same reason — the child re-creates them on its next acquire.
661 self._locks = _Locks(
662 internal=threading.Lock(),
663 transaction=threading.Lock(),
664 state=SoftFileLock(self._paths.state, timeout=-1),
665 )
666 self._hold = None
667 self._readers_dir_fd = None
668 self._fork_invalidated = True
669
670
671class _HeartbeatThread(threading.Thread):
672 def __init__(
673 self,
674 refresh: Callable[[], bool],
675 interval: float,
676 stop_event: threading.Event,
677 name: str,
678 ) -> None:
679 super().__init__(name=name, daemon=True)
680 self._refresh = refresh
681 self._interval = interval
682 self._stop_event = stop_event
683
684 def run(self) -> None:
685 while not self._stop_event.wait(self._interval):
686 if not self._refresh():
687 self._stop_event.set()
688 return
689
690
691def _atomic_create_marker(name: str, token: str, *, dir_fd: int | None = None) -> None:
692 # O_NOFOLLOW blocks the symlink-overwrite attack where an attacker pre-creates the marker path as a
693 # symlink pointing at a victim file. Mode 0o600 keeps the token unreadable to other users.
694 flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY | _O_NOFOLLOW
695 if _SUPPORTS_DIR_FD and dir_fd is not None:
696 fd = os.open(name, flags, 0o600, dir_fd=dir_fd)
697 else:
698 fd = os.open(name, flags, 0o600)
699 try:
700 content = f"{token}\n{os.getpid()}\n{socket.gethostname()}\n".encode("ascii")
701 os.write(fd, content)
702 finally:
703 os.close(fd)
704
705
706def _read_marker(name: str, *, dir_fd: int | None = None) -> tuple[_MarkerInfo | None, float] | None:
707 # The file is ours; these guard a hostile mid-flight swap. O_NOFOLLOW rejects a symlink; O_NONBLOCK keeps
708 # a real FIFO from blocking the open forever, so it reads as a malformed marker instead of wedging a peer
709 # that holds the state lock.
710 flags = os.O_RDONLY | _O_NOFOLLOW | _O_NONBLOCK
711 try:
712 fd = os.open(name, flags, dir_fd=dir_fd) if _SUPPORTS_DIR_FD and dir_fd is not None else os.open(name, flags)
713 except OSError:
714 return None
715 try:
716 try:
717 st = os.fstat(fd)
718 data = os.read(fd, _MAX_MARKER_SIZE + 1)
719 except OSError: # pragma: no cover - e.g. EAGAIN from a hostile FIFO that has a writer attached
720 return None
721 finally:
722 os.close(fd)
723 return _parse_marker_bytes(data), st.st_mtime
724
725
726def _parse_marker_bytes(data: bytes) -> _MarkerInfo | None:
727 # Trust nothing about attacker-controlled markers; any deviation returns None so callers fall through
728 # to stale cleanup. ``re.match`` caches compiled patterns internally, so the regex is built only once
729 # despite being defined inline.
730 if not data or len(data) > _MAX_MARKER_SIZE:
731 return None
732 try:
733 text = data.decode("ascii")
734 except UnicodeDecodeError:
735 return None
736 match = re.match(
737 r"""
738 \A # start of string
739 (?P<token> [0-9a-f]{32} ) \n # 128-bit hex token
740 (?P<pid> [1-9][0-9]{0,9} ) \n # decimal pid: no leading zero, ≤ 10 digits
741 (?P<hostname> [\x21-\x7e]{1,253}) # printable non-whitespace ASCII (RFC 1123 hostname limit)
742 \n* # tolerate sloppy writers that append extra newlines
743 \Z # end of string
744 """,
745 text,
746 re.VERBOSE,
747 )
748 if match is None:
749 return None
750 pid = int(match["pid"], 10)
751 if pid > 2**31 - 1:
752 return None
753 return _MarkerInfo(token=match["token"], pid=pid, hostname=match["hostname"])
754
755
756def _break_stale_marker( # noqa: PLR0911
757 name: str,
758 *,
759 stale_threshold: float,
760 now: float,
761 dir_fd: int | None = None,
762) -> bool:
763 # Atomic break pattern: read → rename to unique break-name → re-verify → unlink. The rename gives us a
764 # private name nobody else can touch; if the re-verify sees a newer mtime or a different token, the
765 # legitimate holder's heartbeat fired between read and rename and we must abort (leaving the .break.*
766 # file behind rather than rollback-renaming, because rollback is itself racy).
767 read_result = _read_marker(name, dir_fd=dir_fd)
768 if read_result is None:
769 return False
770 info_before, mtime_before = read_result
771 if now - mtime_before <= stale_threshold:
772 return False
773 if info_before is None:
774 _unlink(name, dir_fd=dir_fd)
775 return True
776
777 break_name = f"{name}{_BREAK_SUFFIX}.{os.getpid()}.{secrets.token_hex(16)}"
778 try:
779 if _SUPPORTS_DIR_FD and dir_fd is not None:
780 os.rename(name, break_name, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
781 else:
782 Path(name).rename(break_name)
783 except OSError: # pragma: no cover - race where the marker vanishes between read and rename
784 return False
785
786 read_after = _read_marker(break_name, dir_fd=dir_fd)
787 if read_after is None: # pragma: no cover - race where a peer unlinks the break-name file
788 return False
789 info_after, mtime_after = read_after
790 if info_after is None: # pragma: no cover - content replaced post-rename by a racing peer
791 _unlink(break_name, dir_fd=dir_fd)
792 return True
793 if not hmac.compare_digest(info_before.token, info_after.token): # pragma: no cover - race only
794 return False
795 if mtime_after > mtime_before: # pragma: no cover - heartbeat raced our rename
796 return False
797 _unlink(break_name, dir_fd=dir_fd)
798 return True
799
800
801def _unlink(name: str, *, dir_fd: int | None = None) -> None:
802 with suppress(FileNotFoundError):
803 if _SUPPORTS_DIR_FD and dir_fd is not None:
804 # Path.unlink has no dir_fd support, so we stay on os.unlink for the dirfd path.
805 os.unlink(name, dir_fd=dir_fd)
806 else:
807 Path(name).unlink()
808
809
810def _touch(name: str, *, dir_fd: int | None = None) -> None:
811 if _SUPPORTS_DIR_FD and dir_fd is not None:
812 os.utime(name, None, dir_fd=dir_fd)
813 else:
814 os.utime(name, None)
815
816
817def _file_exists(path: str) -> bool:
818 try:
819 st = os.lstat(path)
820 except FileNotFoundError:
821 return False
822 return stat.S_ISREG(st.st_mode)
823
824
825def _is_housekeeping_name(name: str) -> bool:
826 return name.startswith(".") or _BREAK_SUFFIX in name
827
828
829def _reset_all_after_fork() -> None: # pragma: no cover - fork child, not tracked by coverage
830 global _all_instances_lock # noqa: PLW0603
831 # User-created threading locks do not auto-reset across fork: any lock held by a parent thread stays
832 # locked in the child with no owner to release it. Replace the module-level lock and every instance's
833 # locks with fresh ones; the child is single-threaded at this point so no synchronization is needed.
834 _all_instances_lock = threading.Lock()
835 for instance in list(_all_instances.values()):
836 instance._reset_after_fork_in_child() # noqa: SLF001
837
838
839def _cleanup_all_instances() -> None: # pragma: no cover - runs from atexit at interpreter shutdown
840 for instance in list(_all_instances.values()):
841 with suppress(Exception):
842 instance.release(force=True)
843
844
845def _register_hooks() -> None:
846 global _atexit_registered, _fork_registered # noqa: PLW0603
847 if not _atexit_registered:
848 atexit.register(_cleanup_all_instances)
849 _atexit_registered = True
850 # after_in_child replaces inherited state so the child cannot double-own any lock the parent held.
851 if not _fork_registered and hasattr(os, "register_at_fork"):
852 os.register_at_fork(after_in_child=_reset_all_after_fork)
853 _fork_registered = True
854
855
856__all__ = [
857 "SoftReadWriteLock",
858]