Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

173 statements  

1from __future__ import annotations 

2 

3import atexit 

4import logging 

5import os 

6import pathlib 

7import sqlite3 

8import threading 

9import time 

10from contextlib import contextmanager, suppress 

11from typing import TYPE_CHECKING, Literal 

12from weakref import WeakValueDictionary 

13 

14from ._api import AcquireReturnProxy 

15from ._error import Timeout 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Generator 

19 

20_LOGGER = logging.getLogger("filelock") 

21 

22_all_connections: set[sqlite3.Connection] = set() 

23_all_connections_lock = threading.Lock() 

24 

25 

26def _cleanup_connections() -> None: 

27 with _all_connections_lock: 

28 for con in list(_all_connections): 

29 with suppress(Exception): 

30 con.close() 

31 _all_connections.clear() 

32 

33 

34atexit.register(_cleanup_connections) 

35 

36# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days). 

37_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 

38 

39 

40def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int: 

41 if blocking is False: 

42 return 0 

43 

44 if timeout == -1: 

45 return _MAX_SQLITE_TIMEOUT_MS 

46 

47 if timeout < 0: 

48 msg = "timeout must be a non-negative number or -1" 

49 raise ValueError(msg) 

50 

51 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout 

52 timeout_ms = int(remaining * 1000) 

53 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: 

54 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) 

55 return _MAX_SQLITE_TIMEOUT_MS 

56 return timeout_ms 

57 

58 

59class _ReadWriteLockMeta(type): 

60 """ 

61 Metaclass that handles singleton resolution when is_singleton=True. 

62 

63 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently 

64 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify. 

65 

66 """ 

67 

68 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] 

69 _instances_lock: threading.Lock 

70 

71 def __call__( 

72 cls, 

73 lock_file: str | os.PathLike[str], 

74 timeout: float = -1, 

75 *, 

76 blocking: bool = True, 

77 is_singleton: bool = True, 

78 ) -> ReadWriteLock: 

79 if not is_singleton: 

80 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

81 

82 normalized = pathlib.Path(lock_file).resolve() 

83 with cls._instances_lock: 

84 if normalized not in cls._instances: 

85 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

86 cls._instances[normalized] = instance 

87 else: 

88 instance = cls._instances[normalized] 

89 

90 if instance.timeout != timeout or instance.blocking != blocking: 

91 msg = ( 

92 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," 

93 f" cannot be changed to timeout={timeout}, blocking={blocking}" 

94 ) 

95 raise ValueError(msg) 

96 return instance 

97 

98 

99class ReadWriteLock(metaclass=_ReadWriteLockMeta): 

100 """ 

101 Cross-process read-write lock backed by SQLite. 

102 

103 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple 

104 ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read 

105 to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that 

106 acquired them. 

107 

108 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same 

109 instance. The lock file must use a ``.db`` extension (SQLite database). 

110 

111 :param lock_file: path to the SQLite database file used as the lock 

112 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

113 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

114 :param is_singleton: if ``True``, reuse existing instances for the same resolved path 

115 

116 .. versionadded:: 3.21.0 

117 

118 """ 

119 

120 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary() 

121 _instances_lock = threading.Lock() 

122 

123 @classmethod 

124 def get_lock( 

125 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True 

126 ) -> ReadWriteLock: 

127 """ 

128 Return the singleton :class:`ReadWriteLock` for *lock_file*. 

129 

130 :param lock_file: path to the SQLite database file used as the lock 

131 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

132 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

133 

134 :returns: the singleton lock instance 

135 

136 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values 

137 

138 """ 

139 return cls(lock_file, timeout, blocking=blocking) 

140 

141 def __init__( 

142 self, 

143 lock_file: str | os.PathLike[str], 

144 timeout: float = -1, 

145 *, 

146 blocking: bool = True, 

147 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__ 

148 ) -> None: 

149 self.lock_file = os.fspath(lock_file) 

150 self.timeout = timeout 

151 self.blocking = blocking 

152 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work 

153 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback 

154 self._lock_level = 0 

155 self._current_mode: Literal["read", "write"] | None = None 

156 self._write_thread_id: int | None = None 

157 self._con = sqlite3.connect(self.lock_file, check_same_thread=False) 

158 with _all_connections_lock: 

159 _all_connections.add(self._con) 

160 

161 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None: 

162 if not blocking: 

163 acquired = self._transaction_lock.acquire(blocking=False) 

164 elif timeout == -1: 

165 acquired = self._transaction_lock.acquire(blocking=True) 

166 else: 

167 acquired = self._transaction_lock.acquire(blocking=True, timeout=timeout) 

168 if not acquired: 

169 raise Timeout(self.lock_file) from None 

170 

171 def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy: 

172 if self._current_mode != mode: 

173 msg = ( 

174 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " 

175 f"already holding a {opposite} lock ({direction} not allowed)" 

176 ) 

177 raise RuntimeError(msg) 

178 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id: 

179 msg = ( 

180 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " 

181 f"from thread {cur} while it is held by thread {self._write_thread_id}" 

182 ) 

183 raise RuntimeError(msg) 

184 self._lock_level += 1 

185 return AcquireReturnProxy(lock=self) 

186 

187 def _configure_and_begin( 

188 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float 

189 ) -> None: 

190 waited = time.perf_counter() - start_time 

191 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited) 

192 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close() 

193 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE 

194 # write transaction is active, making read-write locking impossible without modifying table data. 

195 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB. 

196 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions 

197 # 

198 # Set here (not in __init__) because this pragma itself may block on a locked database, 

199 # so it must run after busy_timeout is configured above. 

200 self._con.execute("PRAGMA journal_mode=MEMORY;").close() 

201 # Recompute remaining timeout after the potentially blocking journal_mode pragma. 

202 waited = time.perf_counter() - start_time 

203 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms: 

204 self._con.execute(f"PRAGMA busy_timeout={recomputed};").close() 

205 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;" 

206 self._con.execute(stmt).close() 

207 if mode == "read": 

208 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database. 

209 # https://www.sqlite.org/lockingv3.html#transaction_control 

210 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close() 

211 

212 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy: 

213 opposite = "write" if mode == "read" else "read" 

214 direction = "downgrade" if mode == "read" else "upgrade" 

215 

216 with self._internal_lock: 

217 if self._lock_level > 0: 

218 return self._validate_reentrant(mode, opposite, direction) 

219 

220 start_time = time.perf_counter() 

221 self._acquire_transaction_lock(blocking=blocking, timeout=timeout) 

222 try: 

223 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock. 

224 with self._internal_lock: 

225 if self._lock_level > 0: 

226 return self._validate_reentrant(mode, opposite, direction) 

227 

228 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time) 

229 

230 with self._internal_lock: 

231 self._current_mode = mode 

232 self._lock_level = 1 

233 if mode == "write": 

234 self._write_thread_id = threading.get_ident() 

235 

236 return AcquireReturnProxy(lock=self) 

237 

238 except sqlite3.OperationalError as exc: 

239 if "database is locked" not in str(exc): 

240 raise 

241 raise Timeout(self.lock_file) from None 

242 finally: 

243 self._transaction_lock.release() 

244 

245 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

246 """ 

247 Acquire a shared read lock. 

248 

249 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a 

250 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). 

251 

252 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

253 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

254 

255 :returns: a proxy that can be used as a context manager to release the lock 

256 

257 :raises RuntimeError: if a write lock is already held on this instance 

258 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

259 

260 """ 

261 return self._acquire("read", timeout, blocking=blocking) 

262 

263 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

264 """ 

265 Acquire an exclusive write lock. 

266 

267 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). 

268 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed). 

269 Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises 

270 :class:`RuntimeError`. 

271 

272 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

273 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

274 

275 :returns: a proxy that can be used as a context manager to release the lock 

276 

277 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

278 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

279 

280 """ 

281 return self._acquire("write", timeout, blocking=blocking) 

282 

283 def release(self, *, force: bool = False) -> None: 

284 """ 

285 Release one level of the current lock. 

286 

287 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock. 

288 

289 :param force: if ``True``, release the lock completely regardless of the current lock level 

290 

291 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

292 

293 """ 

294 should_rollback = False 

295 with self._internal_lock: 

296 if self._lock_level == 0: 

297 if force: 

298 return 

299 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" 

300 raise RuntimeError(msg) 

301 if force: 

302 self._lock_level = 0 

303 else: 

304 self._lock_level -= 1 

305 if self._lock_level == 0: 

306 self._current_mode = None 

307 self._write_thread_id = None 

308 should_rollback = True 

309 if should_rollback: 

310 self._con.rollback() 

311 

312 @contextmanager 

313 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

314 """ 

315 Context manager that acquires and releases a shared read lock. 

316 

317 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

318 

319 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

320 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

321 

322 """ 

323 if timeout is None: 

324 timeout = self.timeout 

325 if blocking is None: 

326 blocking = self.blocking 

327 self.acquire_read(timeout, blocking=blocking) 

328 try: 

329 yield 

330 finally: 

331 self.release() 

332 

333 @contextmanager 

334 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

335 """ 

336 Context manager that acquires and releases an exclusive write lock. 

337 

338 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

339 

340 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

341 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

342 

343 """ 

344 if timeout is None: 

345 timeout = self.timeout 

346 if blocking is None: 

347 blocking = self.blocking 

348 self.acquire_write(timeout, blocking=blocking) 

349 try: 

350 yield 

351 finally: 

352 self.release() 

353 

354 def close(self) -> None: 

355 """ 

356 Release the lock (if held) and close the underlying SQLite connection. 

357 

358 After calling this method, the lock instance is no longer usable. 

359 

360 """ 

361 self.release(force=True) 

362 self._con.close() 

363 with _all_connections_lock: 

364 _all_connections.discard(self._con)