Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

175 statements  

1from __future__ import annotations 

2 

3import atexit 

4import logging 

5import os 

6import pathlib 

7import sqlite3 

8import threading 

9import time 

10from contextlib import contextmanager, suppress 

11from typing import TYPE_CHECKING, Literal 

12from weakref import WeakValueDictionary 

13 

14from ._api import AcquireReturnProxy 

15from ._error import Timeout 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Generator 

19 

20_LOGGER = logging.getLogger("filelock") 

21 

22_all_connections: set[sqlite3.Connection] = set() 

23_all_connections_lock = threading.Lock() 

24 

25 

26def _cleanup_connections() -> None: 

27 with _all_connections_lock: 

28 for con in list(_all_connections): 

29 with suppress(Exception): 

30 con.close() 

31 _all_connections.clear() 

32 

33 

34atexit.register(_cleanup_connections) 

35 

36# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days). 

37_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 

38 

39 

40def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int: 

41 if blocking is False: 

42 return 0 

43 

44 if timeout == -1: 

45 return _MAX_SQLITE_TIMEOUT_MS 

46 

47 if timeout < 0: 

48 msg = "timeout must be a non-negative number or -1" 

49 raise ValueError(msg) 

50 

51 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout 

52 timeout_ms = int(remaining * 1000) 

53 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: 

54 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) 

55 return _MAX_SQLITE_TIMEOUT_MS 

56 return timeout_ms 

57 

58 

59class _ReadWriteLockMeta(type): 

60 """ 

61 Metaclass that handles singleton resolution when is_singleton=True. 

62 

63 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently 

64 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify. 

65 

66 """ 

67 

68 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] 

69 _instances_lock: threading.Lock 

70 

71 def __call__( 

72 cls, 

73 lock_file: str | os.PathLike[str], 

74 timeout: float = -1, 

75 *, 

76 blocking: bool = True, 

77 is_singleton: bool = True, 

78 ) -> ReadWriteLock: 

79 if not is_singleton: 

80 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

81 

82 normalized = pathlib.Path(lock_file).resolve() 

83 with cls._instances_lock: 

84 if normalized not in cls._instances: 

85 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

86 cls._instances[normalized] = instance 

87 else: 

88 instance = cls._instances[normalized] 

89 

90 if instance.timeout != timeout or instance.blocking != blocking: 

91 msg = ( 

92 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," 

93 f" cannot be changed to timeout={timeout}, blocking={blocking}" 

94 ) 

95 raise ValueError(msg) 

96 return instance 

97 

98 

99class ReadWriteLock(metaclass=_ReadWriteLockMeta): 

100 """ 

101 Cross-process read-write lock backed by SQLite. 

102 

103 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple 

104 ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read 

105 to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that 

106 acquired them. 

107 

108 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same 

109 instance. The lock file must use a ``.db`` extension (SQLite database). 

110 

111 :param lock_file: path to the SQLite database file used as the lock 

112 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

113 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

114 :param is_singleton: if ``True``, reuse existing instances for the same resolved path 

115 

116 .. versionadded:: 3.21.0 

117 

118 """ 

119 

120 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary() 

121 _instances_lock = threading.Lock() 

122 

123 @classmethod 

124 def get_lock( 

125 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True 

126 ) -> ReadWriteLock: 

127 """ 

128 Return the singleton :class:`ReadWriteLock` for *lock_file*. 

129 

130 :param lock_file: path to the SQLite database file used as the lock 

131 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

132 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

133 

134 :returns: the singleton lock instance 

135 

136 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values 

137 

138 """ 

139 return cls(lock_file, timeout, blocking=blocking) 

140 

141 def __init__( 

142 self, 

143 lock_file: str | os.PathLike[str], 

144 timeout: float = -1, 

145 *, 

146 blocking: bool = True, 

147 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__ 

148 ) -> None: 

149 self.lock_file = os.fspath(lock_file) 

150 self.timeout = timeout 

151 self.blocking = blocking 

152 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work 

153 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback 

154 self._lock_level = 0 

155 self._current_mode: Literal["read", "write"] | None = None 

156 self._write_thread_id: int | None = None 

157 self._con = sqlite3.connect(self.lock_file, check_same_thread=False) 

158 with _all_connections_lock: 

159 _all_connections.add(self._con) 

160 

161 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None: 

162 if not blocking: 

163 acquired = self._transaction_lock.acquire(blocking=False) 

164 elif timeout == -1: 

165 acquired = self._transaction_lock.acquire(blocking=True) 

166 else: 

167 acquired = self._transaction_lock.acquire(blocking=True, timeout=timeout) 

168 if not acquired: 

169 raise Timeout(self.lock_file) from None 

170 

171 def _validate_reentrant(self, mode: Literal["read", "write"]) -> AcquireReturnProxy: 

172 opposite = "write" if mode == "read" else "read" 

173 direction = "downgrade" if mode == "read" else "upgrade" 

174 if self._current_mode != mode: 

175 msg = ( 

176 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " 

177 f"already holding a {opposite} lock ({direction} not allowed)" 

178 ) 

179 raise RuntimeError(msg) 

180 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id: 

181 msg = ( 

182 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " 

183 f"from thread {cur} while it is held by thread {self._write_thread_id}" 

184 ) 

185 raise RuntimeError(msg) 

186 self._lock_level += 1 

187 return AcquireReturnProxy(lock=self) 

188 

189 def _configure_and_begin( 

190 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float 

191 ) -> None: 

192 waited = time.perf_counter() - start_time 

193 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited) 

194 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close() 

195 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE 

196 # write transaction is active, making read-write locking impossible without modifying table data. 

197 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB. 

198 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions 

199 # 

200 # Set here (not in __init__) because this pragma itself may block on a locked database, 

201 # so it must run after busy_timeout is configured above. 

202 self._con.execute("PRAGMA journal_mode=MEMORY;").close() 

203 # Recompute remaining timeout after the potentially blocking journal_mode pragma. 

204 waited = time.perf_counter() - start_time 

205 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms: 

206 self._con.execute(f"PRAGMA busy_timeout={recomputed};").close() 

207 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;" 

208 self._con.execute(stmt).close() 

209 if mode == "read": 

210 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database. 

211 # https://www.sqlite.org/lockingv3.html#transaction_control 

212 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close() 

213 

214 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy: 

215 with self._internal_lock: 

216 if self._lock_level > 0: 

217 return self._validate_reentrant(mode) 

218 

219 start_time = time.perf_counter() 

220 self._acquire_transaction_lock(blocking=blocking, timeout=timeout) 

221 try: 

222 return self._do_acquire_inner(mode, timeout, blocking=blocking, start_time=start_time) 

223 except sqlite3.OperationalError as exc: 

224 if "database is locked" not in str(exc): 

225 raise 

226 raise Timeout(self.lock_file) from None 

227 finally: 

228 self._transaction_lock.release() 

229 

230 def _do_acquire_inner( 

231 self, 

232 mode: Literal["read", "write"], 

233 timeout: float, 

234 *, 

235 blocking: bool, 

236 start_time: float, 

237 ) -> AcquireReturnProxy: 

238 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock. 

239 with self._internal_lock: 

240 if self._lock_level > 0: 

241 return self._validate_reentrant(mode) 

242 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time) 

243 with self._internal_lock: 

244 self._current_mode = mode 

245 self._lock_level = 1 

246 if mode == "write": 

247 self._write_thread_id = threading.get_ident() 

248 return AcquireReturnProxy(lock=self) 

249 

250 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

251 """ 

252 Acquire a shared read lock. 

253 

254 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a 

255 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). 

256 

257 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

258 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

259 

260 :returns: a proxy that can be used as a context manager to release the lock 

261 

262 :raises RuntimeError: if a write lock is already held on this instance 

263 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

264 

265 """ 

266 return self._acquire("read", timeout, blocking=blocking) 

267 

268 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

269 """ 

270 Acquire an exclusive write lock. 

271 

272 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). 

273 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed). 

274 Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises 

275 :class:`RuntimeError`. 

276 

277 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

278 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

279 

280 :returns: a proxy that can be used as a context manager to release the lock 

281 

282 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

283 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

284 

285 """ 

286 return self._acquire("write", timeout, blocking=blocking) 

287 

288 def release(self, *, force: bool = False) -> None: 

289 """ 

290 Release one level of the current lock. 

291 

292 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock. 

293 

294 :param force: if ``True``, release the lock completely regardless of the current lock level 

295 

296 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

297 

298 """ 

299 should_rollback = False 

300 with self._internal_lock: 

301 if self._lock_level == 0: 

302 if force: 

303 return 

304 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" 

305 raise RuntimeError(msg) 

306 if force: 

307 self._lock_level = 0 

308 else: 

309 self._lock_level -= 1 

310 if self._lock_level == 0: 

311 self._current_mode = None 

312 self._write_thread_id = None 

313 should_rollback = True 

314 if should_rollback: 

315 self._con.rollback() 

316 

317 @contextmanager 

318 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

319 """ 

320 Context manager that acquires and releases a shared read lock. 

321 

322 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

323 

324 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

325 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

326 

327 """ 

328 if timeout is None: 

329 timeout = self.timeout 

330 if blocking is None: 

331 blocking = self.blocking 

332 self.acquire_read(timeout, blocking=blocking) 

333 try: 

334 yield 

335 finally: 

336 self.release() 

337 

338 @contextmanager 

339 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

340 """ 

341 Context manager that acquires and releases an exclusive write lock. 

342 

343 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

344 

345 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

346 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

347 

348 """ 

349 if timeout is None: 

350 timeout = self.timeout 

351 if blocking is None: 

352 blocking = self.blocking 

353 self.acquire_write(timeout, blocking=blocking) 

354 try: 

355 yield 

356 finally: 

357 self.release() 

358 

359 def close(self) -> None: 

360 """ 

361 Release the lock (if held) and close the underlying SQLite connection. 

362 

363 After calling this method, the lock instance is no longer usable. 

364 

365 """ 

366 self.release(force=True) 

367 self._con.close() 

368 with _all_connections_lock: 

369 _all_connections.discard(self._con)