Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

171 statements  

1from __future__ import annotations 

2 

3import atexit 

4import logging 

5import os 

6import pathlib 

7import sqlite3 

8import threading 

9import time 

10from contextlib import contextmanager, suppress 

11from typing import TYPE_CHECKING, Literal 

12from weakref import WeakValueDictionary 

13 

14from ._api import AcquireReturnProxy 

15from ._error import Timeout 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Generator 

19 

20_LOGGER = logging.getLogger("filelock") 

21 

22_all_connections: set[sqlite3.Connection] = set() 

23_all_connections_lock = threading.Lock() 

24 

25 

26def _cleanup_connections() -> None: 

27 with _all_connections_lock: 

28 for con in list(_all_connections): 

29 with suppress(Exception): 

30 con.close() 

31 _all_connections.clear() 

32 

33 

34atexit.register(_cleanup_connections) 

35 

36# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days). 

37_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 

38 

39 

40def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int: 

41 if blocking is False: 

42 return 0 

43 

44 if timeout == -1: 

45 return _MAX_SQLITE_TIMEOUT_MS 

46 

47 if timeout < 0: 

48 msg = "timeout must be a non-negative number or -1" 

49 raise ValueError(msg) 

50 

51 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout 

52 timeout_ms = int(remaining * 1000) 

53 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: 

54 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) 

55 return _MAX_SQLITE_TIMEOUT_MS 

56 return timeout_ms 

57 

58 

59class _ReadWriteLockMeta(type): 

60 """ 

61 Metaclass that handles singleton resolution when is_singleton=True. 

62 

63 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently 

64 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify. 

65 

66 """ 

67 

68 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] 

69 _instances_lock: threading.Lock 

70 

71 def __call__( 

72 cls, 

73 lock_file: str | os.PathLike[str], 

74 timeout: float = -1, 

75 *, 

76 blocking: bool = True, 

77 is_singleton: bool = True, 

78 ) -> ReadWriteLock: 

79 if not is_singleton: 

80 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

81 

82 normalized = pathlib.Path(lock_file).resolve() 

83 with cls._instances_lock: 

84 if normalized not in cls._instances: 

85 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

86 cls._instances[normalized] = instance 

87 else: 

88 instance = cls._instances[normalized] 

89 

90 if instance.timeout != timeout or instance.blocking != blocking: 

91 msg = ( 

92 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," 

93 f" cannot be changed to timeout={timeout}, blocking={blocking}" 

94 ) 

95 raise ValueError(msg) 

96 return instance 

97 

98 

99class ReadWriteLock(metaclass=_ReadWriteLockMeta): 

100 """ 

101 Cross-process read-write lock backed by SQLite. 

102 

103 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple 

104 ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read 

105 to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that 

106 acquired them. 

107 

108 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same 

109 instance. The lock file must use a ``.db`` extension (SQLite database). 

110 

111 :param lock_file: path to the SQLite database file used as the lock 

112 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

113 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

114 :param is_singleton: if ``True``, reuse existing instances for the same resolved path 

115 

116 .. versionadded:: 3.21.0 

117 

118 """ 

119 

120 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary() 

121 _instances_lock = threading.Lock() 

122 

123 @classmethod 

124 def get_lock( 

125 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True 

126 ) -> ReadWriteLock: 

127 """ 

128 Return the singleton :class:`ReadWriteLock` for *lock_file*. 

129 

130 :param lock_file: path to the SQLite database file used as the lock 

131 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

132 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

133 

134 :returns: the singleton lock instance 

135 

136 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values 

137 

138 """ 

139 return cls(lock_file, timeout, blocking=blocking) 

140 

141 def __init__( 

142 self, 

143 lock_file: str | os.PathLike[str], 

144 timeout: float = -1, 

145 *, 

146 blocking: bool = True, 

147 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__ 

148 ) -> None: 

149 self.lock_file = os.fspath(lock_file) 

150 self.timeout = timeout 

151 self.blocking = blocking 

152 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work 

153 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback 

154 self._lock_level = 0 

155 self._current_mode: Literal["read", "write"] | None = None 

156 self._write_thread_id: int | None = None 

157 self._con = sqlite3.connect(self.lock_file, check_same_thread=False) 

158 with _all_connections_lock: 

159 _all_connections.add(self._con) 

160 

161 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None: 

162 if timeout == -1: 

163 # blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics 

164 acquired = self._transaction_lock.acquire(blocking) 

165 else: 

166 acquired = self._transaction_lock.acquire(blocking, timeout) 

167 if not acquired: 

168 raise Timeout(self.lock_file) from None 

169 

170 def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy: 

171 if self._current_mode != mode: 

172 msg = ( 

173 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " 

174 f"already holding a {opposite} lock ({direction} not allowed)" 

175 ) 

176 raise RuntimeError(msg) 

177 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id: 

178 msg = ( 

179 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " 

180 f"from thread {cur} while it is held by thread {self._write_thread_id}" 

181 ) 

182 raise RuntimeError(msg) 

183 self._lock_level += 1 

184 return AcquireReturnProxy(lock=self) 

185 

186 def _configure_and_begin( 

187 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float 

188 ) -> None: 

189 waited = time.perf_counter() - start_time 

190 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited) 

191 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close() 

192 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE 

193 # write transaction is active, making read-write locking impossible without modifying table data. 

194 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB. 

195 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions 

196 # 

197 # Set here (not in __init__) because this pragma itself may block on a locked database, 

198 # so it must run after busy_timeout is configured above. 

199 self._con.execute("PRAGMA journal_mode=MEMORY;").close() 

200 # Recompute remaining timeout after the potentially blocking journal_mode pragma. 

201 waited = time.perf_counter() - start_time 

202 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms: 

203 self._con.execute(f"PRAGMA busy_timeout={recomputed};").close() 

204 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;" 

205 self._con.execute(stmt).close() 

206 if mode == "read": 

207 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database. 

208 # https://www.sqlite.org/lockingv3.html#transaction_control 

209 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close() 

210 

211 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy: 

212 opposite = "write" if mode == "read" else "read" 

213 direction = "downgrade" if mode == "read" else "upgrade" 

214 

215 with self._internal_lock: 

216 if self._lock_level > 0: 

217 return self._validate_reentrant(mode, opposite, direction) 

218 

219 start_time = time.perf_counter() 

220 self._acquire_transaction_lock(blocking=blocking, timeout=timeout) 

221 try: 

222 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock. 

223 with self._internal_lock: 

224 if self._lock_level > 0: 

225 return self._validate_reentrant(mode, opposite, direction) 

226 

227 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time) 

228 

229 with self._internal_lock: 

230 self._current_mode = mode 

231 self._lock_level = 1 

232 if mode == "write": 

233 self._write_thread_id = threading.get_ident() 

234 

235 return AcquireReturnProxy(lock=self) 

236 

237 except sqlite3.OperationalError as exc: 

238 if "database is locked" not in str(exc): 

239 raise 

240 raise Timeout(self.lock_file) from None 

241 finally: 

242 self._transaction_lock.release() 

243 

244 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

245 """ 

246 Acquire a shared read lock. 

247 

248 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a 

249 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). 

250 

251 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

252 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

253 

254 :returns: a proxy that can be used as a context manager to release the lock 

255 

256 :raises RuntimeError: if a write lock is already held on this instance 

257 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

258 

259 """ 

260 return self._acquire("read", timeout, blocking=blocking) 

261 

262 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

263 """ 

264 Acquire an exclusive write lock. 

265 

266 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). 

267 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed). 

268 Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises 

269 :class:`RuntimeError`. 

270 

271 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

272 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

273 

274 :returns: a proxy that can be used as a context manager to release the lock 

275 

276 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

277 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

278 

279 """ 

280 return self._acquire("write", timeout, blocking=blocking) 

281 

282 def release(self, *, force: bool = False) -> None: 

283 """ 

284 Release one level of the current lock. 

285 

286 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock. 

287 

288 :param force: if ``True``, release the lock completely regardless of the current lock level 

289 

290 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

291 

292 """ 

293 should_rollback = False 

294 with self._internal_lock: 

295 if self._lock_level == 0: 

296 if force: 

297 return 

298 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" 

299 raise RuntimeError(msg) 

300 if force: 

301 self._lock_level = 0 

302 else: 

303 self._lock_level -= 1 

304 if self._lock_level == 0: 

305 self._current_mode = None 

306 self._write_thread_id = None 

307 should_rollback = True 

308 if should_rollback: 

309 self._con.rollback() 

310 

311 @contextmanager 

312 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

313 """ 

314 Context manager that acquires and releases a shared read lock. 

315 

316 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

317 

318 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

319 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

320 

321 """ 

322 if timeout is None: 

323 timeout = self.timeout 

324 if blocking is None: 

325 blocking = self.blocking 

326 self.acquire_read(timeout, blocking=blocking) 

327 try: 

328 yield 

329 finally: 

330 self.release() 

331 

332 @contextmanager 

333 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

334 """ 

335 Context manager that acquires and releases an exclusive write lock. 

336 

337 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

338 

339 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

340 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

341 

342 """ 

343 if timeout is None: 

344 timeout = self.timeout 

345 if blocking is None: 

346 blocking = self.blocking 

347 self.acquire_write(timeout, blocking=blocking) 

348 try: 

349 yield 

350 finally: 

351 self.release() 

352 

353 def close(self) -> None: 

354 """ 

355 Release the lock (if held) and close the underlying SQLite connection. 

356 

357 After calling this method, the lock instance is no longer usable. 

358 

359 """ 

360 self.release(force=True) 

361 self._con.close() 

362 with _all_connections_lock: 

363 _all_connections.discard(self._con)