Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_soft_rw/_sync.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

405 statements  

1"""Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives.""" 

2 

3from __future__ import annotations 

4 

5import atexit 

6import hmac 

7import os 

8import re 

9import secrets 

10import socket 

11import stat 

12import sys 

13import threading 

14import time 

15import uuid 

16from contextlib import contextmanager, suppress 

17from dataclasses import dataclass 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Literal 

20from weakref import WeakValueDictionary 

21 

22from filelock._api import AcquireReturnProxy 

23from filelock._error import Timeout 

24from filelock._soft import SoftFileLock 

25from filelock._util import ensure_directory_exists 

26 

27if TYPE_CHECKING: 

28 from collections.abc import Callable, Generator 

29 

30 

31_Mode = Literal["read", "write"] 

32_BREAK_SUFFIX = ".break" 

33_MAX_MARKER_SIZE = 1024 

34_O_NOFOLLOW = getattr(os, "O_NOFOLLOW", 0) 

35_O_NONBLOCK = getattr(os, "O_NONBLOCK", 0) 

36# dirfd-relative I/O is a Unix-only optimization; Windows cannot ``os.open()`` a directory at all, and 

37# its ``os`` module skips dir_fd support entirely. When disabled, callers fall back to full-path ops. 

38_SUPPORTS_DIR_FD = sys.platform != "win32" and os.open in os.supports_dir_fd 

39 

40_all_instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary() 

41_all_instances_lock = threading.Lock() 

42_atexit_registered = False 

43_fork_registered = False 

44 

45 

46@dataclass(frozen=True) 

47class _Paths: 

48 state: str 

49 write: str 

50 readers: str 

51 

52 

53@dataclass 

54class _Locks: 

55 internal: threading.Lock 

56 transaction: threading.Lock 

57 state: SoftFileLock 

58 

59 

60@dataclass(frozen=True) 

61class _MarkerInfo: 

62 token: str 

63 pid: int 

64 hostname: str 

65 

66 

67@dataclass 

68class _Hold: 

69 """Everything that exists only while a lock is held; ``None`` when the instance has no lock.""" 

70 

71 level: int 

72 mode: _Mode 

73 write_thread_id: int | None 

74 marker_name: str 

75 is_reader: bool 

76 token: str 

77 heartbeat_thread: _HeartbeatThread 

78 heartbeat_stop: threading.Event 

79 

80 

81class _SoftRWMeta(type): 

82 _instances: WeakValueDictionary[Path, SoftReadWriteLock] 

83 _instances_lock: threading.Lock 

84 

85 def __call__( # noqa: PLR0913 

86 cls, 

87 lock_file: str | os.PathLike[str], 

88 timeout: float = -1, 

89 *, 

90 blocking: bool = True, 

91 is_singleton: bool = True, 

92 heartbeat_interval: float = 30.0, 

93 stale_threshold: float | None = None, 

94 poll_interval: float = 0.25, 

95 ) -> SoftReadWriteLock: 

96 if not is_singleton: 

97 return super().__call__( 

98 lock_file, 

99 timeout, 

100 blocking=blocking, 

101 is_singleton=is_singleton, 

102 heartbeat_interval=heartbeat_interval, 

103 stale_threshold=stale_threshold, 

104 poll_interval=poll_interval, 

105 ) 

106 

107 normalized = Path(lock_file).resolve() 

108 with cls._instances_lock: 

109 instance = cls._instances.get(normalized) 

110 if instance is None: 

111 instance = super().__call__( 

112 lock_file, 

113 timeout, 

114 blocking=blocking, 

115 is_singleton=is_singleton, 

116 heartbeat_interval=heartbeat_interval, 

117 stale_threshold=stale_threshold, 

118 poll_interval=poll_interval, 

119 ) 

120 cls._instances[normalized] = instance 

121 elif instance.timeout != timeout or instance.blocking != blocking: 

122 msg = ( 

123 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," 

124 f" cannot be changed to timeout={timeout}, blocking={blocking}" 

125 ) 

126 raise ValueError(msg) 

127 return instance 

128 

129 

130class SoftReadWriteLock(metaclass=_SoftRWMeta): 

131 """ 

132 Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives. 

133 

134 Use this class instead of :class:`~filelock.ReadWriteLock` when the lock file lives on a network 

135 filesystem (NFS, Lustre with ``-o flock``, HPC cluster shared storage). ``ReadWriteLock`` is backed 

136 by SQLite and cannot run on NFS because SQLite's ``fcntl`` locking is unreliable there. 

137 

138 Layout on disk for a lock at ``foo.lock``: 

139 

140 - ``foo.lock.state`` — a :class:`SoftFileLock` taken only during state transitions (microseconds). 

141 - ``foo.lock.write`` — writer marker; its presence means a writer is claiming or holding the lock. 

142 - ``foo.lock.readers/<host>.<pid>.<uuid>`` — one file per reader. 

143 

144 Each marker stores a random token (``secrets.token_hex(16)``), the holder's pid, and the holder's 

145 hostname. A daemon heartbeat thread refreshes ``mtime`` on every held marker. A marker whose mtime 

146 has not advanced in ``stale_threshold`` seconds may be evicted by any process on any host, giving 

147 correct behavior when a compute node crashes with a lock held. 

148 

149 Writer acquire is two-phase and writer-preferring: phase 1 claims ``.write`` (blocking any new 

150 reader), phase 2 waits for existing readers to drain. Writer starvation is impossible. 

151 

152 Reentrancy, upgrade/downgrade rules, thread pinning, and singleton caching by resolved path match 

153 :class:`~filelock.ReadWriteLock`. 

154 

155 Forking while holding a lock invalidates the inherited instance in the child so the child cannot 

156 double-own the lock with its parent; ``release()`` on a fork-invalidated instance is a no-op, and 

157 the child must re-acquire if it needs a lock. 

158 

159 Trust boundary: protects against same-UID non-cooperating processes (one host or cross-host) and 

160 same-host different-UID users via ``0o600`` / ``0o700`` permissions. Does not protect against root 

161 compromise, NTP tampering on same-UID cross-host nodes, or multi-tenant mounts where hostile 

162 co-tenants share the UID. 

163 

164 :param lock_file: path to the lock file; sidecar state/write/readers live next to it 

165 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

166 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention 

167 :param is_singleton: if ``True``, reuse existing instances for the same resolved path 

168 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s 

169 :param stale_threshold: seconds of ``mtime`` inactivity before a marker is stale; defaults to 

170 ``3 * heartbeat_interval``, matching etcd's ``LeaseKeepAlive`` convention 

171 :param poll_interval: seconds between acquire retries under contention; default 0.25 s 

172 

173 .. versionadded:: 3.27.0 

174 

175 """ 

176 

177 _instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary() 

178 _instances_lock = threading.Lock() 

179 

180 def __init__( # noqa: PLR0913 

181 self, 

182 lock_file: str | os.PathLike[str], 

183 timeout: float = -1, 

184 *, 

185 blocking: bool = True, 

186 is_singleton: bool = True, # noqa: ARG002 

187 heartbeat_interval: float = 30.0, 

188 stale_threshold: float | None = None, 

189 poll_interval: float = 0.25, 

190 ) -> None: 

191 if heartbeat_interval <= 0: 

192 msg = f"heartbeat_interval must be positive, got {heartbeat_interval}" 

193 raise ValueError(msg) 

194 if stale_threshold is None: 

195 stale_threshold = heartbeat_interval * 3 

196 if stale_threshold <= heartbeat_interval: 

197 msg = f"stale_threshold must exceed heartbeat_interval ({stale_threshold} <= {heartbeat_interval})" 

198 raise ValueError(msg) 

199 if poll_interval <= 0: 

200 msg = f"poll_interval must be positive, got {poll_interval}" 

201 raise ValueError(msg) 

202 

203 self.lock_file: str = os.fspath(lock_file) 

204 self.timeout: float = timeout 

205 self.blocking: bool = blocking 

206 self.heartbeat_interval: float = heartbeat_interval 

207 self.stale_threshold: float = stale_threshold 

208 self.poll_interval: float = poll_interval 

209 

210 self._paths = _Paths( 

211 state=f"{self.lock_file}.state", 

212 write=f"{self.lock_file}.write", 

213 readers=f"{self.lock_file}.readers", 

214 ) 

215 ensure_directory_exists(self.lock_file) 

216 self._locks = _Locks( 

217 internal=threading.Lock(), 

218 transaction=threading.Lock(), 

219 state=SoftFileLock(self._paths.state, timeout=-1), 

220 ) 

221 self._readers_dir_fd: int | None = None 

222 self._hold: _Hold | None = None 

223 self._fork_invalidated: bool = False 

224 self._closed: bool = False 

225 

226 with _all_instances_lock: 

227 _all_instances[Path(self.lock_file).resolve()] = self 

228 _register_hooks() 

229 

230 @contextmanager 

231 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

232 """ 

233 Context manager that acquires and releases a shared read lock. 

234 

235 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

236 

237 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

238 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

239 

240 :raises RuntimeError: if a write lock is already held on this instance 

241 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

242 

243 """ 

244 self.acquire_read(timeout, blocking=blocking) 

245 try: 

246 yield 

247 finally: 

248 self.release() 

249 

250 @contextmanager 

251 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

252 """ 

253 Context manager that acquires and releases an exclusive write lock. 

254 

255 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

256 

257 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

258 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

259 

260 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

261 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

262 

263 """ 

264 self.acquire_write(timeout, blocking=blocking) 

265 try: 

266 yield 

267 finally: 

268 self.release() 

269 

270 def acquire_read(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy: 

271 """ 

272 Acquire a shared read lock. 

273 

274 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a 

275 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). On the 0→1 

276 transition a daemon heartbeat thread is started that refreshes the reader marker's ``mtime`` every 

277 ``heartbeat_interval`` seconds so peers on other hosts do not evict the marker as stale. 

278 

279 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block 

280 indefinitely 

281 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable; 

282 ``None`` uses the instance default 

283 

284 :returns: a proxy that can be used as a context manager to release the lock 

285 

286 :raises RuntimeError: if a write lock is already held on this instance, if this instance was invalidated by 

287 :func:`os.fork`, or if :meth:`close` was called 

288 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

289 

290 """ 

291 return self._acquire("read", timeout, blocking=blocking) 

292 

293 def acquire_write(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy: 

294 """ 

295 Acquire an exclusive write lock. 

296 

297 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). 

298 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not 

299 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises 

300 :class:`RuntimeError`. 

301 

302 Writer acquisition runs in two phases. Phase 1 atomically claims ``<path>.write`` via ``O_CREAT | O_EXCL``, 

303 which immediately blocks any new reader on any host. Phase 2 waits for existing readers to drain. Writer 

304 starvation is impossible: new readers see ``<path>.write`` during phase 2 and wait behind the pending writer. 

305 

306 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block 

307 indefinitely 

308 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable; 

309 ``None`` uses the instance default 

310 

311 :returns: a proxy that can be used as a context manager to release the lock 

312 

313 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if this 

314 instance was invalidated by :func:`os.fork`, or if :meth:`close` was called 

315 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

316 

317 """ 

318 return self._acquire("write", timeout, blocking=blocking) 

319 

320 def close(self) -> None: 

321 """ 

322 Release any held lock and release internal filesystem resources. 

323 

324 Idempotent. After calling this method the instance can no longer acquire locks — subsequent acquires raise 

325 :class:`RuntimeError`. A fork-invalidated instance is closed without raising. 

326 """ 

327 self.release(force=True) 

328 with self._locks.internal: 

329 if self._closed: 

330 return 

331 self._closed = True 

332 if self._readers_dir_fd is not None: 

333 with suppress(OSError): 

334 os.close(self._readers_dir_fd) 

335 self._readers_dir_fd = None 

336 

337 def release(self, *, force: bool = False) -> None: 

338 """ 

339 Release one level of the current lock. 

340 

341 When the lock level reaches zero the heartbeat thread is stopped and the held marker file is unlinked. On a 

342 fork-invalidated instance (that is, the child of a :func:`os.fork` call made while the parent held a lock) 

343 this method is a no-op so inherited ``with`` blocks can unwind cleanly in the child. 

344 

345 :param force: if ``True``, release the lock completely regardless of the current lock level 

346 

347 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

348 

349 """ 

350 with self._locks.internal: 

351 if self._fork_invalidated: 

352 # Inherited state from the parent is meaningless in the child; clear any counters and return. 

353 self._hold = None 

354 return 

355 hold = self._hold 

356 if hold is None: 

357 if force: 

358 return 

359 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" 

360 raise RuntimeError(msg) 

361 if force: 

362 hold.level = 0 

363 else: 

364 hold.level -= 1 

365 if hold.level > 0: 

366 return 

367 self._hold = None 

368 

369 # Order matters: signal → join → unlink. A late tick on a deleted marker is harmless, and the 

370 # token check in the heartbeat callback would catch any re-acquisition race, but joining first 

371 # removes even that theoretical race. 

372 hold.heartbeat_stop.set() 

373 hold.heartbeat_thread.join(timeout=self.heartbeat_interval + 1.0) 

374 if hold.is_reader: 

375 _unlink(hold.marker_name, dir_fd=self._readers_dir_fd) 

376 else: 

377 _unlink(hold.marker_name) 

378 

379 @classmethod 

380 def get_lock( 

381 cls, 

382 lock_file: str | os.PathLike[str], 

383 timeout: float = -1, 

384 *, 

385 blocking: bool = True, 

386 ) -> SoftReadWriteLock: 

387 """ 

388 Return the singleton :class:`SoftReadWriteLock` for *lock_file*. 

389 

390 :param lock_file: path to the lock file; sidecar state/write/readers live next to it 

391 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

392 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

393 

394 :returns: the singleton lock instance 

395 

396 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values 

397 

398 """ 

399 return cls(lock_file, timeout, blocking=blocking) 

400 

401 def _acquire( 

402 self, 

403 mode: _Mode, 

404 timeout: float | None, 

405 *, 

406 blocking: bool | None, 

407 ) -> AcquireReturnProxy: 

408 timeout = self.timeout if timeout is None else timeout 

409 blocking = self.blocking if blocking is None else blocking 

410 

411 with self._locks.internal: 

412 if self._fork_invalidated: 

413 msg = f"SoftReadWriteLock on {self.lock_file} was invalidated by fork(); construct a new instance" 

414 raise RuntimeError(msg) 

415 if self._closed: 

416 msg = f"SoftReadWriteLock on {self.lock_file} has been closed" 

417 raise RuntimeError(msg) 

418 if self._hold is not None: 

419 return self._validate_reentrant(mode) 

420 

421 start = time.perf_counter() 

422 if not blocking: 

423 acquired = self._locks.transaction.acquire(blocking=False) 

424 elif timeout == -1: 

425 acquired = self._locks.transaction.acquire(blocking=True) 

426 else: 

427 acquired = self._locks.transaction.acquire(blocking=True, timeout=timeout) 

428 if not acquired: 

429 raise Timeout(self.lock_file) from None 

430 try: 

431 return self._do_acquire_inner(mode, timeout, start, blocking=blocking) 

432 finally: 

433 self._locks.transaction.release() 

434 

435 def _do_acquire_inner( 

436 self, 

437 mode: _Mode, 

438 effective_timeout: float, 

439 start: float, 

440 *, 

441 blocking: bool, 

442 ) -> AcquireReturnProxy: 

443 with self._locks.internal: 

444 if self._hold is not None: 

445 return self._validate_reentrant(mode) 

446 deadline = None if effective_timeout == -1 else start + effective_timeout 

447 token = secrets.token_hex(16) 

448 if mode == "write": 

449 marker_name, is_reader = self._acquire_writer_slot(token, deadline=deadline, blocking=blocking) 

450 else: 

451 marker_name, is_reader = self._acquire_reader_slot(token, deadline=deadline, blocking=blocking) 

452 stop_event = threading.Event() 

453 heartbeat = _HeartbeatThread( 

454 refresh=self._refresh_marker, 

455 interval=self.heartbeat_interval, 

456 stop_event=stop_event, 

457 name=f"filelock-heartbeat-{id(self):x}", 

458 ) 

459 with self._locks.internal: 

460 self._hold = _Hold( 

461 level=1, 

462 mode=mode, 

463 write_thread_id=threading.get_ident() if mode == "write" else None, 

464 marker_name=marker_name, 

465 is_reader=is_reader, 

466 token=token, 

467 heartbeat_thread=heartbeat, 

468 heartbeat_stop=stop_event, 

469 ) 

470 heartbeat.start() 

471 return AcquireReturnProxy(lock=self) 

472 

473 def _validate_reentrant(self, mode: _Mode) -> AcquireReturnProxy: 

474 hold = self._hold 

475 assert hold is not None # noqa: S101 

476 if hold.mode != mode: 

477 opposite = "write" if mode == "read" else "read" 

478 direction = "downgrade" if mode == "read" else "upgrade" 

479 msg = ( 

480 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " 

481 f"already holding a {opposite} lock ({direction} not allowed)" 

482 ) 

483 raise RuntimeError(msg) 

484 if mode == "write" and (cur := threading.get_ident()) != hold.write_thread_id: 

485 msg = ( 

486 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " 

487 f"from thread {cur} while it is held by thread {hold.write_thread_id}" 

488 ) 

489 raise RuntimeError(msg) 

490 hold.level += 1 

491 return AcquireReturnProxy(lock=self) 

492 

493 def _acquire_writer_slot( 

494 self, 

495 token: str, 

496 *, 

497 deadline: float | None, 

498 blocking: bool, 

499 ) -> tuple[str, bool]: 

500 # Phase 2 scans readers/ via dirfd (where supported), so we need it open even though writers never 

501 # create files inside. 

502 self._open_readers_dir() 

503 

504 def try_claim_writer() -> bool: 

505 with self._locks.state: 

506 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time()) 

507 if _file_exists(self._paths.write): 

508 return False 

509 try: 

510 _atomic_create_marker(self._paths.write, token) 

511 except FileExistsError: 

512 return False 

513 return True 

514 

515 def readers_drained_touching() -> bool: 

516 with self._locks.state: 

517 # Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed 

518 # ``stale_threshold`` under contention and a peer would treat us as stale and evict us. 

519 with suppress(OSError): 

520 _touch(self._paths.write) 

521 self._break_stale_readers(time.time()) 

522 return not self._any_readers() 

523 

524 self._wait_for(try_claim_writer, deadline=deadline, blocking=blocking) 

525 try: 

526 self._wait_for(readers_drained_touching, deadline=deadline, blocking=blocking) 

527 except Timeout: 

528 # Give up our writer claim so readers can make progress again. 

529 _unlink(self._paths.write) 

530 raise 

531 return self._paths.write, False 

532 

533 def _acquire_reader_slot( 

534 self, 

535 token: str, 

536 *, 

537 deadline: float | None, 

538 blocking: bool, 

539 ) -> tuple[str, bool]: 

540 self._open_readers_dir() 

541 reader_name = f"{uuid.uuid4().hex}.{os.getpid()}" 

542 dir_fd = self._readers_dir_fd 

543 full_reader_path = str(Path(self._paths.readers) / reader_name) 

544 

545 def try_claim_reader() -> bool: 

546 with self._locks.state: 

547 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time()) 

548 if _file_exists(self._paths.write): 

549 return False 

550 if dir_fd is not None: 

551 _atomic_create_marker(reader_name, token, dir_fd=dir_fd) 

552 else: # pragma: win32 cover 

553 _atomic_create_marker(full_reader_path, token) 

554 return True 

555 

556 self._wait_for(try_claim_reader, deadline=deadline, blocking=blocking) 

557 return (reader_name if dir_fd is not None else full_reader_path), True 

558 

559 def _wait_for( 

560 self, 

561 predicate: Callable[[], bool], 

562 *, 

563 deadline: float | None, 

564 blocking: bool, 

565 ) -> None: 

566 while True: 

567 if predicate(): 

568 return 

569 now = time.perf_counter() 

570 if not blocking: 

571 raise Timeout(self.lock_file) 

572 if deadline is not None and now >= deadline: 

573 raise Timeout(self.lock_file) 

574 sleep_for = self.poll_interval 

575 if deadline is not None: 

576 sleep_for = min(sleep_for, max(deadline - now, 0.0)) 

577 time.sleep(sleep_for) 

578 

579 def _open_readers_dir(self) -> None: 

580 readers_path = Path(self._paths.readers) 

581 with suppress(FileExistsError): 

582 readers_path.mkdir(mode=0o700) 

583 # mkdir has no O_NOFOLLOW, so verify via lstat that we did not land on an attacker-placed symlink 

584 # or a regular file before we open or scan inside. 

585 st = os.lstat(self._paths.readers) 

586 if stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode): 

587 msg = f"{self._paths.readers} exists but is not a directory or is a symlink; refusing to use it" 

588 raise RuntimeError(msg) 

589 if self._readers_dir_fd is None and _SUPPORTS_DIR_FD: 

590 flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) | _O_NOFOLLOW 

591 self._readers_dir_fd = os.open(self._paths.readers, flags) 

592 

593 def _any_readers(self) -> bool: 

594 for _ in self._iter_reader_entries(): 

595 return True 

596 return False 

597 

598 def _iter_reader_entries(self) -> Generator[tuple[str, bool]]: 

599 """ 

600 Yield ``(name, dirfd_relative)`` pairs for every live reader marker. 

601 

602 ``dirfd_relative`` is ``True`` when *name* should be passed to ``dir_fd=``-aware syscalls; ``False`` 

603 when *name* is a full path because dirfd-relative I/O is unavailable on this platform. 

604 """ 

605 if self._readers_dir_fd is not None: 

606 with os.scandir(self._readers_dir_fd) as it: 

607 for entry in it: 

608 if not _is_housekeeping_name(entry.name): 

609 yield entry.name, True 

610 return 

611 readers_path = Path(self._paths.readers) # pragma: win32 cover 

612 with os.scandir(readers_path) as it: # pragma: win32 cover 

613 for entry in it: # pragma: win32 cover 

614 if not _is_housekeeping_name(entry.name): # pragma: win32 cover 

615 yield str(readers_path / entry.name), False # pragma: win32 cover 

616 

617 def _break_stale_readers(self, now: float) -> None: 

618 names: list[tuple[str, int | None]] = [] 

619 try: 

620 for name, dirfd_relative in self._iter_reader_entries(): 

621 names.append((name, self._readers_dir_fd if dirfd_relative else None)) 

622 except OSError: # pragma: no cover - transient NFS scandir hiccup 

623 return 

624 for name, fd in names: 

625 _break_stale_marker(name, stale_threshold=self.stale_threshold, now=now, dir_fd=fd) 

626 

627 def _refresh_marker(self) -> bool: 

628 with self._locks.internal: 

629 hold = self._hold 

630 if hold is None: # pragma: no cover - race between stop_event.set and join 

631 return False 

632 marker_name = hold.marker_name 

633 token = hold.token 

634 dir_fd = self._readers_dir_fd if hold.is_reader else None 

635 

636 read_result = _read_marker(marker_name, dir_fd=dir_fd) 

637 if read_result is None: 

638 return False 

639 info, _mtime = read_result 

640 # Token mismatch means another process already evicted our marker and created its own; stop the 

641 # thread so it does not touch a stranger's file. 

642 if info is None or not hmac.compare_digest(info.token, token): 

643 return False 

644 # A transient touch failure (ESTALE / EIO on the NFS-style filesystems this lock targets) must not 

645 # kill the heartbeat thread: the read above just confirmed the marker is still ours, so swallow the 

646 # error and retry on the next tick rather than letting the lease lapse while we still believe we 

647 # hold the lock. FileNotFoundError is different in kind -- the marker we just read has since been 

648 # unlinked, i.e. a peer evicted us -- so stop the heartbeat at once instead of waiting a tick. 

649 try: 

650 _touch(marker_name, dir_fd=dir_fd) 

651 except FileNotFoundError: 

652 return False 

653 except OSError: 

654 pass 

655 return True 

656 

657 def _reset_after_fork_in_child(self) -> None: # pragma: no cover - fork child not tracked 

658 # Replace every lock this instance owns with a fresh one; the inherited locks may still be held 

659 # by threads that no longer exist in the child. The readers dirfd and the SoftFileLock state 

660 # mutex both get dropped for the same reason — the child re-creates them on its next acquire. 

661 self._locks = _Locks( 

662 internal=threading.Lock(), 

663 transaction=threading.Lock(), 

664 state=SoftFileLock(self._paths.state, timeout=-1), 

665 ) 

666 self._hold = None 

667 self._readers_dir_fd = None 

668 self._fork_invalidated = True 

669 

670 

671class _HeartbeatThread(threading.Thread): 

672 def __init__( 

673 self, 

674 refresh: Callable[[], bool], 

675 interval: float, 

676 stop_event: threading.Event, 

677 name: str, 

678 ) -> None: 

679 super().__init__(name=name, daemon=True) 

680 self._refresh = refresh 

681 self._interval = interval 

682 self._stop_event = stop_event 

683 

684 def run(self) -> None: 

685 while not self._stop_event.wait(self._interval): 

686 if not self._refresh(): 

687 self._stop_event.set() 

688 return 

689 

690 

691def _atomic_create_marker(name: str, token: str, *, dir_fd: int | None = None) -> None: 

692 # O_NOFOLLOW blocks the symlink-overwrite attack where an attacker pre-creates the marker path as a 

693 # symlink pointing at a victim file. Mode 0o600 keeps the token unreadable to other users. 

694 flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY | _O_NOFOLLOW 

695 if _SUPPORTS_DIR_FD and dir_fd is not None: 

696 fd = os.open(name, flags, 0o600, dir_fd=dir_fd) 

697 else: 

698 fd = os.open(name, flags, 0o600) 

699 try: 

700 content = f"{token}\n{os.getpid()}\n{socket.gethostname()}\n".encode("ascii") 

701 os.write(fd, content) 

702 finally: 

703 os.close(fd) 

704 

705 

706def _read_marker(name: str, *, dir_fd: int | None = None) -> tuple[_MarkerInfo | None, float] | None: 

707 # The file is ours; these guard a hostile mid-flight swap. O_NOFOLLOW rejects a symlink; O_NONBLOCK keeps 

708 # a real FIFO from blocking the open forever, so it reads as a malformed marker instead of wedging a peer 

709 # that holds the state lock. 

710 flags = os.O_RDONLY | _O_NOFOLLOW | _O_NONBLOCK 

711 try: 

712 fd = os.open(name, flags, dir_fd=dir_fd) if _SUPPORTS_DIR_FD and dir_fd is not None else os.open(name, flags) 

713 except OSError: 

714 return None 

715 try: 

716 try: 

717 st = os.fstat(fd) 

718 data = os.read(fd, _MAX_MARKER_SIZE + 1) 

719 except OSError: # pragma: no cover - e.g. EAGAIN from a hostile FIFO that has a writer attached 

720 return None 

721 finally: 

722 os.close(fd) 

723 return _parse_marker_bytes(data), st.st_mtime 

724 

725 

726def _parse_marker_bytes(data: bytes) -> _MarkerInfo | None: 

727 # Trust nothing about attacker-controlled markers; any deviation returns None so callers fall through 

728 # to stale cleanup. ``re.match`` caches compiled patterns internally, so the regex is built only once 

729 # despite being defined inline. 

730 if not data or len(data) > _MAX_MARKER_SIZE: 

731 return None 

732 try: 

733 text = data.decode("ascii") 

734 except UnicodeDecodeError: 

735 return None 

736 match = re.match( 

737 r""" 

738 \A # start of string 

739 (?P<token> [0-9a-f]{32} ) \n # 128-bit hex token 

740 (?P<pid> [1-9][0-9]{0,9} ) \n # decimal pid: no leading zero, ≤ 10 digits 

741 (?P<hostname> [\x21-\x7e]{1,253}) # printable non-whitespace ASCII (RFC 1123 hostname limit) 

742 \n* # tolerate sloppy writers that append extra newlines 

743 \Z # end of string 

744 """, 

745 text, 

746 re.VERBOSE, 

747 ) 

748 if match is None: 

749 return None 

750 pid = int(match["pid"], 10) 

751 if pid > 2**31 - 1: 

752 return None 

753 return _MarkerInfo(token=match["token"], pid=pid, hostname=match["hostname"]) 

754 

755 

756def _break_stale_marker( # noqa: PLR0911 

757 name: str, 

758 *, 

759 stale_threshold: float, 

760 now: float, 

761 dir_fd: int | None = None, 

762) -> bool: 

763 # Atomic break pattern: read → rename to unique break-name → re-verify → unlink. The rename gives us a 

764 # private name nobody else can touch; if the re-verify sees a newer mtime or a different token, the 

765 # legitimate holder's heartbeat fired between read and rename and we must abort (leaving the .break.* 

766 # file behind rather than rollback-renaming, because rollback is itself racy). 

767 read_result = _read_marker(name, dir_fd=dir_fd) 

768 if read_result is None: 

769 return False 

770 info_before, mtime_before = read_result 

771 if now - mtime_before <= stale_threshold: 

772 return False 

773 if info_before is None: 

774 _unlink(name, dir_fd=dir_fd) 

775 return True 

776 

777 break_name = f"{name}{_BREAK_SUFFIX}.{os.getpid()}.{secrets.token_hex(16)}" 

778 try: 

779 if _SUPPORTS_DIR_FD and dir_fd is not None: 

780 os.rename(name, break_name, src_dir_fd=dir_fd, dst_dir_fd=dir_fd) 

781 else: 

782 Path(name).rename(break_name) 

783 except OSError: # pragma: no cover - race where the marker vanishes between read and rename 

784 return False 

785 

786 read_after = _read_marker(break_name, dir_fd=dir_fd) 

787 if read_after is None: # pragma: no cover - race where a peer unlinks the break-name file 

788 return False 

789 info_after, mtime_after = read_after 

790 if info_after is None: # pragma: no cover - content replaced post-rename by a racing peer 

791 _unlink(break_name, dir_fd=dir_fd) 

792 return True 

793 if not hmac.compare_digest(info_before.token, info_after.token): # pragma: no cover - race only 

794 return False 

795 if mtime_after > mtime_before: # pragma: no cover - heartbeat raced our rename 

796 return False 

797 _unlink(break_name, dir_fd=dir_fd) 

798 return True 

799 

800 

801def _unlink(name: str, *, dir_fd: int | None = None) -> None: 

802 with suppress(FileNotFoundError): 

803 if _SUPPORTS_DIR_FD and dir_fd is not None: 

804 # Path.unlink has no dir_fd support, so we stay on os.unlink for the dirfd path. 

805 os.unlink(name, dir_fd=dir_fd) 

806 else: 

807 Path(name).unlink() 

808 

809 

810def _touch(name: str, *, dir_fd: int | None = None) -> None: 

811 if _SUPPORTS_DIR_FD and dir_fd is not None: 

812 os.utime(name, None, dir_fd=dir_fd) 

813 else: 

814 os.utime(name, None) 

815 

816 

817def _file_exists(path: str) -> bool: 

818 try: 

819 st = os.lstat(path) 

820 except FileNotFoundError: 

821 return False 

822 return stat.S_ISREG(st.st_mode) 

823 

824 

825def _is_housekeeping_name(name: str) -> bool: 

826 return name.startswith(".") or _BREAK_SUFFIX in name 

827 

828 

829def _reset_all_after_fork() -> None: # pragma: no cover - fork child, not tracked by coverage 

830 global _all_instances_lock # noqa: PLW0603 

831 # User-created threading locks do not auto-reset across fork: any lock held by a parent thread stays 

832 # locked in the child with no owner to release it. Replace the module-level lock and every instance's 

833 # locks with fresh ones; the child is single-threaded at this point so no synchronization is needed. 

834 _all_instances_lock = threading.Lock() 

835 for instance in list(_all_instances.values()): 

836 instance._reset_after_fork_in_child() # noqa: SLF001 

837 

838 

839def _cleanup_all_instances() -> None: # pragma: no cover - runs from atexit at interpreter shutdown 

840 for instance in list(_all_instances.values()): 

841 with suppress(Exception): 

842 instance.release(force=True) 

843 

844 

845def _register_hooks() -> None: 

846 global _atexit_registered, _fork_registered # noqa: PLW0603 

847 if not _atexit_registered: 

848 atexit.register(_cleanup_all_instances) 

849 _atexit_registered = True 

850 # after_in_child replaces inherited state so the child cannot double-own any lock the parent held. 

851 if not _fork_registered and hasattr(os, "register_at_fork"): 

852 os.register_at_fork(after_in_child=_reset_all_after_fork) 

853 _fork_registered = True 

854 

855 

856__all__ = [ 

857 "SoftReadWriteLock", 

858]