Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_soft_rw/_sync.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

398 statements  

1"""Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives.""" 

2 

3from __future__ import annotations 

4 

5import atexit 

6import hmac 

7import os 

8import re 

9import secrets 

10import socket 

11import stat 

12import sys 

13import threading 

14import time 

15import uuid 

16from contextlib import contextmanager, suppress 

17from dataclasses import dataclass 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Literal 

20from weakref import WeakValueDictionary 

21 

22from filelock._api import AcquireReturnProxy 

23from filelock._error import Timeout 

24from filelock._soft import SoftFileLock 

25from filelock._util import ensure_directory_exists 

26 

27if TYPE_CHECKING: 

28 from collections.abc import Callable, Generator 

29 

30 

31_Mode = Literal["read", "write"] 

32_BREAK_SUFFIX = ".break" 

33_MAX_MARKER_SIZE = 1024 

34_O_NOFOLLOW = getattr(os, "O_NOFOLLOW", 0) 

35# dirfd-relative I/O is a Unix-only optimization; Windows cannot ``os.open()`` a directory at all, and 

36# its ``os`` module skips dir_fd support entirely. When disabled, callers fall back to full-path ops. 

37_SUPPORTS_DIR_FD = sys.platform != "win32" and os.open in os.supports_dir_fd 

38 

39_all_instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary() 

40_all_instances_lock = threading.Lock() 

41_atexit_registered = False 

42_fork_registered = False 

43 

44 

45@dataclass(frozen=True) 

46class _Paths: 

47 state: str 

48 write: str 

49 readers: str 

50 

51 

52@dataclass 

53class _Locks: 

54 internal: threading.Lock 

55 transaction: threading.Lock 

56 state: SoftFileLock 

57 

58 

59@dataclass(frozen=True) 

60class _MarkerInfo: 

61 token: str 

62 pid: int 

63 hostname: str 

64 

65 

66@dataclass 

67class _Hold: 

68 """Everything that exists only while a lock is held; ``None`` when the instance has no lock.""" 

69 

70 level: int 

71 mode: _Mode 

72 write_thread_id: int | None 

73 marker_name: str 

74 is_reader: bool 

75 token: str 

76 heartbeat_thread: _HeartbeatThread 

77 heartbeat_stop: threading.Event 

78 

79 

80class _SoftRWMeta(type): 

81 _instances: WeakValueDictionary[Path, SoftReadWriteLock] 

82 _instances_lock: threading.Lock 

83 

84 def __call__( # noqa: PLR0913 

85 cls, 

86 lock_file: str | os.PathLike[str], 

87 timeout: float = -1, 

88 *, 

89 blocking: bool = True, 

90 is_singleton: bool = True, 

91 heartbeat_interval: float = 30.0, 

92 stale_threshold: float | None = None, 

93 poll_interval: float = 0.25, 

94 ) -> SoftReadWriteLock: 

95 if not is_singleton: 

96 return super().__call__( 

97 lock_file, 

98 timeout, 

99 blocking=blocking, 

100 is_singleton=is_singleton, 

101 heartbeat_interval=heartbeat_interval, 

102 stale_threshold=stale_threshold, 

103 poll_interval=poll_interval, 

104 ) 

105 

106 normalized = Path(lock_file).resolve() 

107 with cls._instances_lock: 

108 instance = cls._instances.get(normalized) 

109 if instance is None: 

110 instance = super().__call__( 

111 lock_file, 

112 timeout, 

113 blocking=blocking, 

114 is_singleton=is_singleton, 

115 heartbeat_interval=heartbeat_interval, 

116 stale_threshold=stale_threshold, 

117 poll_interval=poll_interval, 

118 ) 

119 cls._instances[normalized] = instance 

120 elif instance.timeout != timeout or instance.blocking != blocking: 

121 msg = ( 

122 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," 

123 f" cannot be changed to timeout={timeout}, blocking={blocking}" 

124 ) 

125 raise ValueError(msg) 

126 return instance 

127 

128 

129class SoftReadWriteLock(metaclass=_SoftRWMeta): 

130 """ 

131 Cross-process and cross-host reader/writer lock built on :class:`SoftFileLock` primitives. 

132 

133 Use this class instead of :class:`~filelock.ReadWriteLock` when the lock file lives on a network 

134 filesystem (NFS, Lustre with ``-o flock``, HPC cluster shared storage). ``ReadWriteLock`` is backed 

135 by SQLite and cannot run on NFS because SQLite's ``fcntl`` locking is unreliable there. 

136 

137 Layout on disk for a lock at ``foo.lock``: 

138 

139 - ``foo.lock.state`` — a :class:`SoftFileLock` taken only during state transitions (microseconds). 

140 - ``foo.lock.write`` — writer marker; its presence means a writer is claiming or holding the lock. 

141 - ``foo.lock.readers/<host>.<pid>.<uuid>`` — one file per reader. 

142 

143 Each marker stores a random token (``secrets.token_hex(16)``), the holder's pid, and the holder's 

144 hostname. A daemon heartbeat thread refreshes ``mtime`` on every held marker. A marker whose mtime 

145 has not advanced in ``stale_threshold`` seconds may be evicted by any process on any host, giving 

146 correct behavior when a compute node crashes with a lock held. 

147 

148 Writer acquire is two-phase and writer-preferring: phase 1 claims ``.write`` (blocking any new 

149 reader), phase 2 waits for existing readers to drain. Writer starvation is impossible. 

150 

151 Reentrancy, upgrade/downgrade rules, thread pinning, and singleton caching by resolved path match 

152 :class:`~filelock.ReadWriteLock`. 

153 

154 Forking while holding a lock invalidates the inherited instance in the child so the child cannot 

155 double-own the lock with its parent; ``release()`` on a fork-invalidated instance is a no-op, and 

156 the child must re-acquire if it needs a lock. 

157 

158 Trust boundary: protects against same-UID non-cooperating processes (one host or cross-host) and 

159 same-host different-UID users via ``0o600`` / ``0o700`` permissions. Does not protect against root 

160 compromise, NTP tampering on same-UID cross-host nodes, or multi-tenant mounts where hostile 

161 co-tenants share the UID. 

162 

163 :param lock_file: path to the lock file; sidecar state/write/readers live next to it 

164 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

165 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention 

166 :param is_singleton: if ``True``, reuse existing instances for the same resolved path 

167 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s 

168 :param stale_threshold: seconds of ``mtime`` inactivity before a marker is stale; defaults to 

169 ``3 * heartbeat_interval``, matching etcd's ``LeaseKeepAlive`` convention 

170 :param poll_interval: seconds between acquire retries under contention; default 0.25 s 

171 

172 .. versionadded:: 3.27.0 

173 

174 """ 

175 

176 _instances: WeakValueDictionary[Path, SoftReadWriteLock] = WeakValueDictionary() 

177 _instances_lock = threading.Lock() 

178 

179 def __init__( # noqa: PLR0913 

180 self, 

181 lock_file: str | os.PathLike[str], 

182 timeout: float = -1, 

183 *, 

184 blocking: bool = True, 

185 is_singleton: bool = True, # noqa: ARG002 

186 heartbeat_interval: float = 30.0, 

187 stale_threshold: float | None = None, 

188 poll_interval: float = 0.25, 

189 ) -> None: 

190 if heartbeat_interval <= 0: 

191 msg = f"heartbeat_interval must be positive, got {heartbeat_interval}" 

192 raise ValueError(msg) 

193 if stale_threshold is None: 

194 stale_threshold = heartbeat_interval * 3 

195 if stale_threshold <= heartbeat_interval: 

196 msg = f"stale_threshold must exceed heartbeat_interval ({stale_threshold} <= {heartbeat_interval})" 

197 raise ValueError(msg) 

198 if poll_interval <= 0: 

199 msg = f"poll_interval must be positive, got {poll_interval}" 

200 raise ValueError(msg) 

201 

202 self.lock_file: str = os.fspath(lock_file) 

203 self.timeout: float = timeout 

204 self.blocking: bool = blocking 

205 self.heartbeat_interval: float = heartbeat_interval 

206 self.stale_threshold: float = stale_threshold 

207 self.poll_interval: float = poll_interval 

208 

209 self._paths = _Paths( 

210 state=f"{self.lock_file}.state", 

211 write=f"{self.lock_file}.write", 

212 readers=f"{self.lock_file}.readers", 

213 ) 

214 ensure_directory_exists(self.lock_file) 

215 self._locks = _Locks( 

216 internal=threading.Lock(), 

217 transaction=threading.Lock(), 

218 state=SoftFileLock(self._paths.state, timeout=-1), 

219 ) 

220 self._readers_dir_fd: int | None = None 

221 self._hold: _Hold | None = None 

222 self._fork_invalidated: bool = False 

223 self._closed: bool = False 

224 

225 with _all_instances_lock: 

226 _all_instances[Path(self.lock_file).resolve()] = self 

227 _register_hooks() 

228 

229 @contextmanager 

230 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

231 """ 

232 Context manager that acquires and releases a shared read lock. 

233 

234 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

235 

236 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

237 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

238 

239 :raises RuntimeError: if a write lock is already held on this instance 

240 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

241 

242 """ 

243 self.acquire_read(timeout, blocking=blocking) 

244 try: 

245 yield 

246 finally: 

247 self.release() 

248 

249 @contextmanager 

250 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

251 """ 

252 Context manager that acquires and releases an exclusive write lock. 

253 

254 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

255 

256 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

257 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

258 

259 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

260 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

261 

262 """ 

263 self.acquire_write(timeout, blocking=blocking) 

264 try: 

265 yield 

266 finally: 

267 self.release() 

268 

269 def acquire_read(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy: 

270 """ 

271 Acquire a shared read lock. 

272 

273 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a 

274 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). On the 0→1 

275 transition a daemon heartbeat thread is started that refreshes the reader marker's ``mtime`` every 

276 ``heartbeat_interval`` seconds so peers on other hosts do not evict the marker as stale. 

277 

278 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block 

279 indefinitely 

280 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable; 

281 ``None`` uses the instance default 

282 

283 :returns: a proxy that can be used as a context manager to release the lock 

284 

285 :raises RuntimeError: if a write lock is already held on this instance, if this instance was invalidated by 

286 :func:`os.fork`, or if :meth:`close` was called 

287 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

288 

289 """ 

290 return self._acquire("read", timeout, blocking=blocking) 

291 

292 def acquire_write(self, timeout: float | None = None, *, blocking: bool | None = None) -> AcquireReturnProxy: 

293 """ 

294 Acquire an exclusive write lock. 

295 

296 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). 

297 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not 

298 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises 

299 :class:`RuntimeError`. 

300 

301 Writer acquisition runs in two phases. Phase 1 atomically claims ``<path>.write`` via ``O_CREAT | O_EXCL``, 

302 which immediately blocks any new reader on any host. Phase 2 waits for existing readers to drain. Writer 

303 starvation is impossible: new readers see ``<path>.write`` during phase 2 and wait behind the pending writer. 

304 

305 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default; ``-1`` means block 

306 indefinitely 

307 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable; 

308 ``None`` uses the instance default 

309 

310 :returns: a proxy that can be used as a context manager to release the lock 

311 

312 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if this 

313 instance was invalidated by :func:`os.fork`, or if :meth:`close` was called 

314 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

315 

316 """ 

317 return self._acquire("write", timeout, blocking=blocking) 

318 

319 def close(self) -> None: 

320 """ 

321 Release any held lock and release internal filesystem resources. 

322 

323 Idempotent. After calling this method the instance can no longer acquire locks — subsequent acquires raise 

324 :class:`RuntimeError`. A fork-invalidated instance is closed without raising. 

325 """ 

326 self.release(force=True) 

327 with self._locks.internal: 

328 if self._closed: 

329 return 

330 self._closed = True 

331 if self._readers_dir_fd is not None: 

332 with suppress(OSError): 

333 os.close(self._readers_dir_fd) 

334 self._readers_dir_fd = None 

335 

336 def release(self, *, force: bool = False) -> None: 

337 """ 

338 Release one level of the current lock. 

339 

340 When the lock level reaches zero the heartbeat thread is stopped and the held marker file is unlinked. On a 

341 fork-invalidated instance (that is, the child of a :func:`os.fork` call made while the parent held a lock) 

342 this method is a no-op so inherited ``with`` blocks can unwind cleanly in the child. 

343 

344 :param force: if ``True``, release the lock completely regardless of the current lock level 

345 

346 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

347 

348 """ 

349 with self._locks.internal: 

350 if self._fork_invalidated: 

351 # Inherited state from the parent is meaningless in the child; clear any counters and return. 

352 self._hold = None 

353 return 

354 hold = self._hold 

355 if hold is None: 

356 if force: 

357 return 

358 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" 

359 raise RuntimeError(msg) 

360 if force: 

361 hold.level = 0 

362 else: 

363 hold.level -= 1 

364 if hold.level > 0: 

365 return 

366 self._hold = None 

367 

368 # Order matters: signal → join → unlink. A late tick on a deleted marker is harmless, and the 

369 # token check in the heartbeat callback would catch any re-acquisition race, but joining first 

370 # removes even that theoretical race. 

371 hold.heartbeat_stop.set() 

372 hold.heartbeat_thread.join(timeout=self.heartbeat_interval + 1.0) 

373 if hold.is_reader: 

374 _unlink(hold.marker_name, dir_fd=self._readers_dir_fd) 

375 else: 

376 _unlink(hold.marker_name) 

377 

378 @classmethod 

379 def get_lock( 

380 cls, 

381 lock_file: str | os.PathLike[str], 

382 timeout: float = -1, 

383 *, 

384 blocking: bool = True, 

385 ) -> SoftReadWriteLock: 

386 """ 

387 Return the singleton :class:`SoftReadWriteLock` for *lock_file*. 

388 

389 :param lock_file: path to the lock file; sidecar state/write/readers live next to it 

390 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

391 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

392 

393 :returns: the singleton lock instance 

394 

395 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values 

396 

397 """ 

398 return cls(lock_file, timeout, blocking=blocking) 

399 

400 def _acquire( 

401 self, 

402 mode: _Mode, 

403 timeout: float | None, 

404 *, 

405 blocking: bool | None, 

406 ) -> AcquireReturnProxy: 

407 effective_timeout = self.timeout if timeout is None else timeout 

408 effective_blocking = self.blocking if blocking is None else blocking 

409 

410 with self._locks.internal: 

411 if self._fork_invalidated: 

412 msg = f"SoftReadWriteLock on {self.lock_file} was invalidated by fork(); construct a new instance" 

413 raise RuntimeError(msg) 

414 if self._closed: 

415 msg = f"SoftReadWriteLock on {self.lock_file} has been closed" 

416 raise RuntimeError(msg) 

417 if self._hold is not None: 

418 return self._validate_reentrant(mode) 

419 

420 start = time.perf_counter() 

421 if not effective_blocking: 

422 acquired = self._locks.transaction.acquire(blocking=False) 

423 elif effective_timeout == -1: 

424 acquired = self._locks.transaction.acquire(blocking=True) 

425 else: 

426 acquired = self._locks.transaction.acquire(blocking=True, timeout=effective_timeout) 

427 if not acquired: 

428 raise Timeout(self.lock_file) from None 

429 try: 

430 with self._locks.internal: 

431 if self._hold is not None: 

432 return self._validate_reentrant(mode) 

433 

434 deadline = None if effective_timeout == -1 else start + effective_timeout 

435 token = secrets.token_hex(16) 

436 if mode == "write": 

437 marker_name, is_reader = self._acquire_writer_slot( 

438 token, deadline=deadline, blocking=effective_blocking 

439 ) 

440 else: 

441 marker_name, is_reader = self._acquire_reader_slot( 

442 token, deadline=deadline, blocking=effective_blocking 

443 ) 

444 

445 stop_event = threading.Event() 

446 heartbeat = _HeartbeatThread( 

447 refresh=self._refresh_marker, 

448 interval=self.heartbeat_interval, 

449 stop_event=stop_event, 

450 name=f"filelock-heartbeat-{id(self):x}", 

451 ) 

452 

453 with self._locks.internal: 

454 self._hold = _Hold( 

455 level=1, 

456 mode=mode, 

457 write_thread_id=threading.get_ident() if mode == "write" else None, 

458 marker_name=marker_name, 

459 is_reader=is_reader, 

460 token=token, 

461 heartbeat_thread=heartbeat, 

462 heartbeat_stop=stop_event, 

463 ) 

464 

465 heartbeat.start() 

466 return AcquireReturnProxy(lock=self) 

467 finally: 

468 self._locks.transaction.release() 

469 

470 def _validate_reentrant(self, mode: _Mode) -> AcquireReturnProxy: 

471 hold = self._hold 

472 assert hold is not None # noqa: S101 

473 if hold.mode != mode: 

474 opposite = "write" if mode == "read" else "read" 

475 direction = "downgrade" if mode == "read" else "upgrade" 

476 msg = ( 

477 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " 

478 f"already holding a {opposite} lock ({direction} not allowed)" 

479 ) 

480 raise RuntimeError(msg) 

481 if mode == "write" and (cur := threading.get_ident()) != hold.write_thread_id: 

482 msg = ( 

483 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " 

484 f"from thread {cur} while it is held by thread {hold.write_thread_id}" 

485 ) 

486 raise RuntimeError(msg) 

487 hold.level += 1 

488 return AcquireReturnProxy(lock=self) 

489 

490 def _acquire_writer_slot( 

491 self, 

492 token: str, 

493 *, 

494 deadline: float | None, 

495 blocking: bool, 

496 ) -> tuple[str, bool]: 

497 # Phase 2 scans readers/ via dirfd (where supported), so we need it open even though writers never 

498 # create files inside. 

499 self._open_readers_dir() 

500 

501 def try_claim_writer() -> bool: 

502 with self._locks.state: 

503 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time()) 

504 if _file_exists(self._paths.write): 

505 return False 

506 try: 

507 _atomic_create_marker(self._paths.write, token) 

508 except FileExistsError: 

509 return False 

510 return True 

511 

512 def readers_drained_touching() -> bool: 

513 with self._locks.state: 

514 # Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed 

515 # ``stale_threshold`` under contention and a peer would treat us as stale and evict us. 

516 with suppress(OSError): 

517 _touch(self._paths.write) 

518 self._break_stale_readers(time.time()) 

519 return not self._any_readers() 

520 

521 self._wait_for(try_claim_writer, deadline=deadline, blocking=blocking) 

522 try: 

523 self._wait_for(readers_drained_touching, deadline=deadline, blocking=blocking) 

524 except Timeout: 

525 # Give up our writer claim so readers can make progress again. 

526 _unlink(self._paths.write) 

527 raise 

528 return self._paths.write, False 

529 

530 def _acquire_reader_slot( 

531 self, 

532 token: str, 

533 *, 

534 deadline: float | None, 

535 blocking: bool, 

536 ) -> tuple[str, bool]: 

537 self._open_readers_dir() 

538 reader_name = f"{uuid.uuid4().hex}.{os.getpid()}" 

539 dir_fd = self._readers_dir_fd 

540 full_reader_path = str(Path(self._paths.readers) / reader_name) 

541 

542 def try_claim_reader() -> bool: 

543 with self._locks.state: 

544 _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time()) 

545 if _file_exists(self._paths.write): 

546 return False 

547 if dir_fd is not None: 

548 _atomic_create_marker(reader_name, token, dir_fd=dir_fd) 

549 else: # pragma: win32 cover 

550 _atomic_create_marker(full_reader_path, token) 

551 return True 

552 

553 self._wait_for(try_claim_reader, deadline=deadline, blocking=blocking) 

554 return (reader_name if dir_fd is not None else full_reader_path), True 

555 

556 def _wait_for( 

557 self, 

558 predicate: Callable[[], bool], 

559 *, 

560 deadline: float | None, 

561 blocking: bool, 

562 ) -> None: 

563 while True: 

564 if predicate(): 

565 return 

566 now = time.perf_counter() 

567 if not blocking: 

568 raise Timeout(self.lock_file) 

569 if deadline is not None and now >= deadline: 

570 raise Timeout(self.lock_file) 

571 sleep_for = self.poll_interval 

572 if deadline is not None: 

573 sleep_for = min(sleep_for, max(deadline - now, 0.0)) 

574 time.sleep(sleep_for) 

575 

576 def _open_readers_dir(self) -> None: 

577 readers_path = Path(self._paths.readers) 

578 with suppress(FileExistsError): 

579 readers_path.mkdir(mode=0o700) 

580 # mkdir has no O_NOFOLLOW, so verify via lstat that we did not land on an attacker-placed symlink 

581 # or a regular file before we open or scan inside. 

582 st = os.lstat(self._paths.readers) 

583 if stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode): 

584 msg = f"{self._paths.readers} exists but is not a directory or is a symlink; refusing to use it" 

585 raise RuntimeError(msg) 

586 if self._readers_dir_fd is None and _SUPPORTS_DIR_FD: 

587 flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) | _O_NOFOLLOW 

588 self._readers_dir_fd = os.open(self._paths.readers, flags) 

589 

590 def _any_readers(self) -> bool: 

591 for _ in self._iter_reader_entries(): 

592 return True 

593 return False 

594 

595 def _iter_reader_entries(self) -> Generator[tuple[str, bool]]: 

596 """ 

597 Yield ``(name, dirfd_relative)`` pairs for every live reader marker. 

598 

599 ``dirfd_relative`` is ``True`` when *name* should be passed to ``dir_fd=``-aware syscalls; ``False`` 

600 when *name* is a full path because dirfd-relative I/O is unavailable on this platform. 

601 """ 

602 if self._readers_dir_fd is not None: 

603 with os.scandir(self._readers_dir_fd) as it: 

604 for entry in it: 

605 if not _is_housekeeping_name(entry.name): 

606 yield entry.name, True 

607 return 

608 readers_path = Path(self._paths.readers) # pragma: win32 cover 

609 with os.scandir(readers_path) as it: # pragma: win32 cover 

610 for entry in it: # pragma: win32 cover 

611 if not _is_housekeeping_name(entry.name): # pragma: win32 cover 

612 yield str(readers_path / entry.name), False # pragma: win32 cover 

613 

614 def _break_stale_readers(self, now: float) -> None: 

615 names: list[tuple[str, int | None]] = [] 

616 try: 

617 for name, dirfd_relative in self._iter_reader_entries(): 

618 names.append((name, self._readers_dir_fd if dirfd_relative else None)) 

619 except OSError: # pragma: no cover - transient NFS scandir hiccup 

620 return 

621 for name, fd in names: 

622 _break_stale_marker(name, stale_threshold=self.stale_threshold, now=now, dir_fd=fd) 

623 

624 def _refresh_marker(self) -> bool: 

625 with self._locks.internal: 

626 hold = self._hold 

627 if hold is None: # pragma: no cover - race between stop_event.set and join 

628 return False 

629 marker_name = hold.marker_name 

630 token = hold.token 

631 dir_fd = self._readers_dir_fd if hold.is_reader else None 

632 

633 read_result = _read_marker(marker_name, dir_fd=dir_fd) 

634 if read_result is None: 

635 return False 

636 info, _mtime = read_result 

637 # Token mismatch means another process already evicted our marker and created its own; stop the 

638 # thread so it does not touch a stranger's file. 

639 if info is None or not hmac.compare_digest(info.token, token): 

640 return False 

641 try: 

642 _touch(marker_name, dir_fd=dir_fd) 

643 except FileNotFoundError: # pragma: no cover - race between successful read and touch 

644 return False 

645 return True 

646 

647 def _reset_after_fork_in_child(self) -> None: # pragma: no cover - fork child not tracked 

648 # Replace every lock this instance owns with a fresh one; the inherited locks may still be held 

649 # by threads that no longer exist in the child. The readers dirfd and the SoftFileLock state 

650 # mutex both get dropped for the same reason — the child re-creates them on its next acquire. 

651 self._locks = _Locks( 

652 internal=threading.Lock(), 

653 transaction=threading.Lock(), 

654 state=SoftFileLock(self._paths.state, timeout=-1), 

655 ) 

656 self._hold = None 

657 self._readers_dir_fd = None 

658 self._fork_invalidated = True 

659 

660 

661class _HeartbeatThread(threading.Thread): 

662 def __init__( 

663 self, 

664 refresh: Callable[[], bool], 

665 interval: float, 

666 stop_event: threading.Event, 

667 name: str, 

668 ) -> None: 

669 super().__init__(name=name, daemon=True) 

670 self._refresh = refresh 

671 self._interval = interval 

672 self._stop_event = stop_event 

673 

674 def run(self) -> None: 

675 while not self._stop_event.wait(self._interval): 

676 if not self._refresh(): 

677 self._stop_event.set() 

678 return 

679 

680 

681def _atomic_create_marker(name: str, token: str, *, dir_fd: int | None = None) -> None: 

682 # O_NOFOLLOW blocks the symlink-overwrite attack where an attacker pre-creates the marker path as a 

683 # symlink pointing at a victim file. Mode 0o600 keeps the token unreadable to other users. 

684 flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY | _O_NOFOLLOW 

685 if _SUPPORTS_DIR_FD and dir_fd is not None: 

686 fd = os.open(name, flags, 0o600, dir_fd=dir_fd) 

687 else: 

688 fd = os.open(name, flags, 0o600) 

689 try: 

690 content = f"{token}\n{os.getpid()}\n{socket.gethostname()}\n".encode("ascii") 

691 os.write(fd, content) 

692 finally: 

693 os.close(fd) 

694 

695 

696def _read_marker(name: str, *, dir_fd: int | None = None) -> tuple[_MarkerInfo | None, float] | None: 

697 # O_NOFOLLOW is defense in depth: we already created this file, but a hostile replacement by symlink 

698 # between create and read would be caught here. 

699 flags = os.O_RDONLY | _O_NOFOLLOW 

700 try: 

701 fd = os.open(name, flags, dir_fd=dir_fd) if _SUPPORTS_DIR_FD and dir_fd is not None else os.open(name, flags) 

702 except OSError: 

703 return None 

704 try: 

705 try: 

706 st = os.fstat(fd) 

707 data = os.read(fd, _MAX_MARKER_SIZE + 1) 

708 except OSError: # pragma: no cover - fstat/read after a successful open is hard to provoke 

709 return None 

710 finally: 

711 os.close(fd) 

712 return _parse_marker_bytes(data), st.st_mtime 

713 

714 

715def _parse_marker_bytes(data: bytes) -> _MarkerInfo | None: 

716 # Trust nothing about attacker-controlled markers; any deviation returns None so callers fall through 

717 # to stale cleanup. ``re.match`` caches compiled patterns internally, so the regex is built only once 

718 # despite being defined inline. 

719 if not data or len(data) > _MAX_MARKER_SIZE: 

720 return None 

721 try: 

722 text = data.decode("ascii") 

723 except UnicodeDecodeError: 

724 return None 

725 match = re.match( 

726 r""" 

727 \A # start of string 

728 (?P<token> [0-9a-f]{32} ) \n # 128-bit hex token 

729 (?P<pid> [1-9][0-9]{0,9} ) \n # decimal pid: no leading zero, ≤ 10 digits 

730 (?P<hostname> [\x21-\x7e]{1,253}) # printable non-whitespace ASCII (RFC 1123 hostname limit) 

731 \n* # tolerate sloppy writers that append extra newlines 

732 \Z # end of string 

733 """, 

734 text, 

735 re.VERBOSE, 

736 ) 

737 if match is None: 

738 return None 

739 pid = int(match["pid"], 10) 

740 if pid > 2**31 - 1: 

741 return None 

742 return _MarkerInfo(token=match["token"], pid=pid, hostname=match["hostname"]) 

743 

744 

745def _break_stale_marker( # noqa: PLR0911 

746 name: str, 

747 *, 

748 stale_threshold: float, 

749 now: float, 

750 dir_fd: int | None = None, 

751) -> bool: 

752 # Atomic break pattern: read → rename to unique break-name → re-verify → unlink. The rename gives us a 

753 # private name nobody else can touch; if the re-verify sees a newer mtime or a different token, the 

754 # legitimate holder's heartbeat fired between read and rename and we must abort (leaving the .break.* 

755 # file behind rather than rollback-renaming, because rollback is itself racy). 

756 read_result = _read_marker(name, dir_fd=dir_fd) 

757 if read_result is None: 

758 return False 

759 info_before, mtime_before = read_result 

760 if now - mtime_before <= stale_threshold: 

761 return False 

762 if info_before is None: 

763 _unlink(name, dir_fd=dir_fd) 

764 return True 

765 

766 break_name = f"{name}{_BREAK_SUFFIX}.{os.getpid()}.{secrets.token_hex(16)}" 

767 try: 

768 if _SUPPORTS_DIR_FD and dir_fd is not None: 

769 os.rename(name, break_name, src_dir_fd=dir_fd, dst_dir_fd=dir_fd) 

770 else: 

771 Path(name).rename(break_name) 

772 except OSError: # pragma: no cover - race where the marker vanishes between read and rename 

773 return False 

774 

775 read_after = _read_marker(break_name, dir_fd=dir_fd) 

776 if read_after is None: # pragma: no cover - race where a peer unlinks the break-name file 

777 return False 

778 info_after, mtime_after = read_after 

779 if info_after is None: # pragma: no cover - content replaced post-rename by a racing peer 

780 _unlink(break_name, dir_fd=dir_fd) 

781 return True 

782 if not hmac.compare_digest(info_before.token, info_after.token): # pragma: no cover - race only 

783 return False 

784 if mtime_after > mtime_before: # pragma: no cover - heartbeat raced our rename 

785 return False 

786 _unlink(break_name, dir_fd=dir_fd) 

787 return True 

788 

789 

790def _unlink(name: str, *, dir_fd: int | None = None) -> None: 

791 with suppress(FileNotFoundError): 

792 if _SUPPORTS_DIR_FD and dir_fd is not None: 

793 # Path.unlink has no dir_fd support, so we stay on os.unlink for the dirfd path. 

794 os.unlink(name, dir_fd=dir_fd) 

795 else: 

796 Path(name).unlink() 

797 

798 

799def _touch(name: str, *, dir_fd: int | None = None) -> None: 

800 if _SUPPORTS_DIR_FD and dir_fd is not None: 

801 os.utime(name, None, dir_fd=dir_fd) 

802 else: 

803 os.utime(name, None) 

804 

805 

806def _file_exists(path: str) -> bool: 

807 try: 

808 st = os.lstat(path) 

809 except FileNotFoundError: 

810 return False 

811 return stat.S_ISREG(st.st_mode) 

812 

813 

814def _is_housekeeping_name(name: str) -> bool: 

815 return name.startswith(".") or _BREAK_SUFFIX in name 

816 

817 

818def _reset_all_after_fork() -> None: # pragma: no cover - fork child, not tracked by coverage 

819 global _all_instances_lock # noqa: PLW0603 

820 # User-created threading locks do not auto-reset across fork: any lock held by a parent thread stays 

821 # locked in the child with no owner to release it. Replace the module-level lock and every instance's 

822 # locks with fresh ones; the child is single-threaded at this point so no synchronization is needed. 

823 _all_instances_lock = threading.Lock() 

824 for instance in list(_all_instances.values()): 

825 instance._reset_after_fork_in_child() # noqa: SLF001 

826 

827 

828def _cleanup_all_instances() -> None: # pragma: no cover - runs from atexit at interpreter shutdown 

829 for instance in list(_all_instances.values()): 

830 with suppress(Exception): 

831 instance.release(force=True) 

832 

833 

834def _register_hooks() -> None: 

835 global _atexit_registered, _fork_registered # noqa: PLW0603 

836 if not _atexit_registered: 

837 atexit.register(_cleanup_all_instances) 

838 _atexit_registered = True 

839 # after_in_child replaces inherited state so the child cannot double-own any lock the parent held. 

840 if not _fork_registered and hasattr(os, "register_at_fork"): 

841 os.register_at_fork(after_in_child=_reset_all_after_fork) 

842 _fork_registered = True 

843 

844 

845__all__ = [ 

846 "SoftReadWriteLock", 

847]