Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1from __future__ import annotations 

2 

3import atexit 

4import logging 

5import os 

6import pathlib 

7import sqlite3 

8import threading 

9import time 

10from contextlib import contextmanager, suppress 

11from typing import TYPE_CHECKING, Literal 

12from weakref import WeakValueDictionary 

13 

14from ._api import AcquireReturnProxy 

15from ._error import Timeout 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Generator 

19 

20_LOGGER = logging.getLogger("filelock") 

21 

22_all_connections: set[sqlite3.Connection] = set() 

23_all_connections_lock = threading.Lock() 

24 

25 

26def _cleanup_connections() -> None: 

27 with _all_connections_lock: 

28 for con in list(_all_connections): 

29 with suppress(sqlite3.ProgrammingError): 

30 con.close() 

31 

32 

33atexit.register(_cleanup_connections) 

34 

35# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days). 

36_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 

37 

38 

39def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int: 

40 if blocking is False: 

41 return 0 

42 

43 if timeout == -1: 

44 return _MAX_SQLITE_TIMEOUT_MS 

45 

46 if timeout < 0: 

47 msg = "timeout must be a non-negative number or -1" 

48 raise ValueError(msg) 

49 

50 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout 

51 timeout_ms = int(remaining * 1000) 

52 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: 

53 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) 

54 return _MAX_SQLITE_TIMEOUT_MS 

55 return timeout_ms 

56 

57 

58class _ReadWriteLockMeta(type): 

59 """ 

60 Metaclass that handles singleton resolution when is_singleton=True. 

61 

62 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently 

63 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify. 

64 """ 

65 

66 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] 

67 _instances_lock: threading.Lock 

68 

69 def __call__( 

70 cls, 

71 lock_file: str | os.PathLike[str], 

72 timeout: float = -1, 

73 *, 

74 blocking: bool = True, 

75 is_singleton: bool = True, 

76 ) -> ReadWriteLock: 

77 if not is_singleton: 

78 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

79 

80 normalized = pathlib.Path(lock_file).resolve() 

81 with cls._instances_lock: 

82 if normalized not in cls._instances: 

83 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

84 cls._instances[normalized] = instance 

85 else: 

86 instance = cls._instances[normalized] 

87 

88 if instance.timeout != timeout or instance.blocking != blocking: 

89 msg = ( 

90 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," 

91 f" cannot be changed to timeout={timeout}, blocking={blocking}" 

92 ) 

93 raise ValueError(msg) 

94 return instance 

95 

96 

97class ReadWriteLock(metaclass=_ReadWriteLockMeta): 

98 """ 

99 Cross-process read-write lock backed by SQLite. 

100 

101 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode 

102 (multiple ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but 

103 upgrading from read to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are 

104 pinned to the thread that acquired them. 

105 

106 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same 

107 instance. The lock file must use a ``.db`` extension (SQLite database). 

108 

109 :param lock_file: path to the SQLite database file used as the lock 

110 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

111 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

112 :param is_singleton: if ``True``, reuse existing instances for the same resolved path 

113 

114 .. versionadded:: 3.21.0 

115 """ 

116 

117 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary() 

118 _instances_lock = threading.Lock() 

119 

120 @classmethod 

121 def get_lock( 

122 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True 

123 ) -> ReadWriteLock: 

124 """ 

125 Return the singleton :class:`ReadWriteLock` for *lock_file*. 

126 

127 :param lock_file: path to the SQLite database file used as the lock 

128 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

129 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

130 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values 

131 :return: the singleton lock instance 

132 """ 

133 return cls(lock_file, timeout, blocking=blocking) 

134 

135 def __init__( 

136 self, 

137 lock_file: str | os.PathLike[str], 

138 timeout: float = -1, 

139 *, 

140 blocking: bool = True, 

141 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__ 

142 ) -> None: 

143 self.lock_file = os.fspath(lock_file) 

144 self.timeout = timeout 

145 self.blocking = blocking 

146 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work 

147 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback 

148 self._lock_level = 0 

149 self._current_mode: Literal["read", "write"] | None = None 

150 self._write_thread_id: int | None = None 

151 self._con = sqlite3.connect(self.lock_file, check_same_thread=False) 

152 with _all_connections_lock: 

153 _all_connections.add(self._con) 

154 

155 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None: 

156 if timeout == -1: 

157 # blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics 

158 acquired = self._transaction_lock.acquire(blocking) 

159 else: 

160 acquired = self._transaction_lock.acquire(blocking, timeout) 

161 if not acquired: 

162 raise Timeout(self.lock_file) from None 

163 

164 def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy: 

165 if self._current_mode != mode: 

166 msg = ( 

167 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " 

168 f"already holding a {opposite} lock ({direction} not allowed)" 

169 ) 

170 raise RuntimeError(msg) 

171 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id: 

172 msg = ( 

173 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " 

174 f"from thread {cur} while it is held by thread {self._write_thread_id}" 

175 ) 

176 raise RuntimeError(msg) 

177 self._lock_level += 1 

178 return AcquireReturnProxy(lock=self) 

179 

180 def _configure_and_begin( 

181 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float 

182 ) -> None: 

183 waited = time.perf_counter() - start_time 

184 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited) 

185 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};") 

186 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE 

187 # write transaction is active, making read-write locking impossible without modifying table data. 

188 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB. 

189 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions 

190 # 

191 # Set here (not in __init__) because this pragma itself may block on a locked database, 

192 # so it must run after busy_timeout is configured above. 

193 self._con.execute("PRAGMA journal_mode=MEMORY;") 

194 # Recompute remaining timeout after the potentially blocking journal_mode pragma. 

195 waited = time.perf_counter() - start_time 

196 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms: 

197 self._con.execute(f"PRAGMA busy_timeout={recomputed};") 

198 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;" 

199 self._con.execute(stmt) 

200 if mode == "read": 

201 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database. 

202 # https://www.sqlite.org/lockingv3.html#transaction_control 

203 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;") 

204 

205 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy: 

206 opposite = "write" if mode == "read" else "read" 

207 direction = "downgrade" if mode == "read" else "upgrade" 

208 

209 with self._internal_lock: 

210 if self._lock_level > 0: 

211 return self._validate_reentrant(mode, opposite, direction) 

212 

213 start_time = time.perf_counter() 

214 self._acquire_transaction_lock(blocking=blocking, timeout=timeout) 

215 try: 

216 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock. 

217 with self._internal_lock: 

218 if self._lock_level > 0: 

219 return self._validate_reentrant(mode, opposite, direction) 

220 

221 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time) 

222 

223 with self._internal_lock: 

224 self._current_mode = mode 

225 self._lock_level = 1 

226 if mode == "write": 

227 self._write_thread_id = threading.get_ident() 

228 

229 return AcquireReturnProxy(lock=self) 

230 

231 except sqlite3.OperationalError as exc: 

232 if "database is locked" not in str(exc): 

233 raise 

234 raise Timeout(self.lock_file) from None 

235 finally: 

236 self._transaction_lock.release() 

237 

238 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

239 """ 

240 Acquire a shared read lock. 

241 

242 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire 

243 a read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). 

244 

245 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

246 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

247 :raises RuntimeError: if a write lock is already held on this instance 

248 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

249 :return: a proxy that can be used as a context manager to release the lock 

250 """ 

251 return self._acquire("read", timeout, blocking=blocking) 

252 

253 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: 

254 """ 

255 Acquire an exclusive write lock. 

256 

257 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). 

258 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not 

259 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises 

260 :class:`RuntimeError`. 

261 

262 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

263 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

264 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

265 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

266 :return: a proxy that can be used as a context manager to release the lock 

267 """ 

268 return self._acquire("write", timeout, blocking=blocking) 

269 

270 def release(self, *, force: bool = False) -> None: 

271 """ 

272 Release one level of the current lock. 

273 

274 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock. 

275 

276 :param force: if ``True``, release the lock completely regardless of the current lock level 

277 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

278 """ 

279 should_rollback = False 

280 with self._internal_lock: 

281 if self._lock_level == 0: 

282 if force: 

283 return 

284 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" 

285 raise RuntimeError(msg) 

286 if force: 

287 self._lock_level = 0 

288 else: 

289 self._lock_level -= 1 

290 if self._lock_level == 0: 

291 self._current_mode = None 

292 self._write_thread_id = None 

293 should_rollback = True 

294 if should_rollback: 

295 self._con.rollback() 

296 

297 @contextmanager 

298 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

299 """ 

300 Context manager that acquires and releases a shared read lock. 

301 

302 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

303 

304 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

305 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

306 """ 

307 if timeout is None: 

308 timeout = self.timeout 

309 if blocking is None: 

310 blocking = self.blocking 

311 self.acquire_read(timeout, blocking=blocking) 

312 try: 

313 yield 

314 finally: 

315 self.release() 

316 

317 @contextmanager 

318 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: 

319 """ 

320 Context manager that acquires and releases an exclusive write lock. 

321 

322 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

323 

324 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

325 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

326 """ 

327 if timeout is None: 

328 timeout = self.timeout 

329 if blocking is None: 

330 blocking = self.blocking 

331 self.acquire_write(timeout, blocking=blocking) 

332 try: 

333 yield 

334 finally: 

335 self.release() 

336 

337 def close(self) -> None: 

338 """ 

339 Release the lock (if held) and close the underlying SQLite connection. 

340 

341 After calling this method, the lock instance is no longer usable. 

342 """ 

343 self.release(force=True) 

344 self._con.close() 

345 with _all_connections_lock: 

346 _all_connections.discard(self._con)