Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import atexit
4import logging
5import os
6import pathlib
7import sqlite3
8import threading
9import time
10from contextlib import contextmanager, suppress
11from typing import TYPE_CHECKING, Literal
12from weakref import WeakValueDictionary
14from ._api import AcquireReturnProxy
15from ._error import Timeout
17if TYPE_CHECKING:
18 from collections.abc import Generator
20_LOGGER = logging.getLogger("filelock")
22_all_connections: set[sqlite3.Connection] = set()
23_all_connections_lock = threading.Lock()
26def _cleanup_connections() -> None:
27 with _all_connections_lock:
28 for con in list(_all_connections):
29 with suppress(Exception):
30 con.close()
31 _all_connections.clear()
34atexit.register(_cleanup_connections)
36# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days).
37_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
40def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int:
41 if blocking is False:
42 return 0
44 if timeout == -1:
45 return _MAX_SQLITE_TIMEOUT_MS
47 if timeout < 0:
48 msg = "timeout must be a non-negative number or -1"
49 raise ValueError(msg)
51 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout
52 timeout_ms = int(remaining * 1000)
53 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
54 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
55 return _MAX_SQLITE_TIMEOUT_MS
56 return timeout_ms
59class _ReadWriteLockMeta(type):
60 """
61 Metaclass that handles singleton resolution when is_singleton=True.
63 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently
64 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify.
66 """
68 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock]
69 _instances_lock: threading.Lock
71 def __call__(
72 cls,
73 lock_file: str | os.PathLike[str],
74 timeout: float = -1,
75 *,
76 blocking: bool = True,
77 is_singleton: bool = True,
78 ) -> ReadWriteLock:
79 if not is_singleton:
80 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
82 normalized = pathlib.Path(lock_file).resolve()
83 with cls._instances_lock:
84 if normalized not in cls._instances:
85 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
86 cls._instances[normalized] = instance
87 else:
88 instance = cls._instances[normalized]
90 if instance.timeout != timeout or instance.blocking != blocking:
91 msg = (
92 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
93 f" cannot be changed to timeout={timeout}, blocking={blocking}"
94 )
95 raise ValueError(msg)
96 return instance
99class ReadWriteLock(metaclass=_ReadWriteLockMeta):
100 """
101 Cross-process read-write lock backed by SQLite.
103 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple
104 ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read
105 to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that
106 acquired them.
108 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same
109 instance. The lock file must use a ``.db`` extension (SQLite database).
111 :param lock_file: path to the SQLite database file used as the lock
112 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
113 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
114 :param is_singleton: if ``True``, reuse existing instances for the same resolved path
116 .. versionadded:: 3.21.0
118 """
120 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary()
121 _instances_lock = threading.Lock()
123 @classmethod
124 def get_lock(
125 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True
126 ) -> ReadWriteLock:
127 """
128 Return the singleton :class:`ReadWriteLock` for *lock_file*.
130 :param lock_file: path to the SQLite database file used as the lock
131 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
132 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
134 :returns: the singleton lock instance
136 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
138 """
139 return cls(lock_file, timeout, blocking=blocking)
141 def __init__(
142 self,
143 lock_file: str | os.PathLike[str],
144 timeout: float = -1,
145 *,
146 blocking: bool = True,
147 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__
148 ) -> None:
149 self.lock_file = os.fspath(lock_file)
150 self.timeout = timeout
151 self.blocking = blocking
152 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work
153 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback
154 self._lock_level = 0
155 self._current_mode: Literal["read", "write"] | None = None
156 self._write_thread_id: int | None = None
157 self._con = sqlite3.connect(self.lock_file, check_same_thread=False)
158 with _all_connections_lock:
159 _all_connections.add(self._con)
161 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None:
162 if timeout == -1:
163 # blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics
164 acquired = self._transaction_lock.acquire(blocking)
165 else:
166 acquired = self._transaction_lock.acquire(blocking, timeout)
167 if not acquired:
168 raise Timeout(self.lock_file) from None
170 def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy:
171 if self._current_mode != mode:
172 msg = (
173 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
174 f"already holding a {opposite} lock ({direction} not allowed)"
175 )
176 raise RuntimeError(msg)
177 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id:
178 msg = (
179 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
180 f"from thread {cur} while it is held by thread {self._write_thread_id}"
181 )
182 raise RuntimeError(msg)
183 self._lock_level += 1
184 return AcquireReturnProxy(lock=self)
186 def _configure_and_begin(
187 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float
188 ) -> None:
189 waited = time.perf_counter() - start_time
190 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)
191 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close()
192 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE
193 # write transaction is active, making read-write locking impossible without modifying table data.
194 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB.
195 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
196 #
197 # Set here (not in __init__) because this pragma itself may block on a locked database,
198 # so it must run after busy_timeout is configured above.
199 self._con.execute("PRAGMA journal_mode=MEMORY;").close()
200 # Recompute remaining timeout after the potentially blocking journal_mode pragma.
201 waited = time.perf_counter() - start_time
202 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms:
203 self._con.execute(f"PRAGMA busy_timeout={recomputed};").close()
204 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;"
205 self._con.execute(stmt).close()
206 if mode == "read":
207 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database.
208 # https://www.sqlite.org/lockingv3.html#transaction_control
209 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close()
211 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy:
212 opposite = "write" if mode == "read" else "read"
213 direction = "downgrade" if mode == "read" else "upgrade"
215 with self._internal_lock:
216 if self._lock_level > 0:
217 return self._validate_reentrant(mode, opposite, direction)
219 start_time = time.perf_counter()
220 self._acquire_transaction_lock(blocking=blocking, timeout=timeout)
221 try:
222 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock.
223 with self._internal_lock:
224 if self._lock_level > 0:
225 return self._validate_reentrant(mode, opposite, direction)
227 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time)
229 with self._internal_lock:
230 self._current_mode = mode
231 self._lock_level = 1
232 if mode == "write":
233 self._write_thread_id = threading.get_ident()
235 return AcquireReturnProxy(lock=self)
237 except sqlite3.OperationalError as exc:
238 if "database is locked" not in str(exc):
239 raise
240 raise Timeout(self.lock_file) from None
241 finally:
242 self._transaction_lock.release()
244 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
245 """
246 Acquire a shared read lock.
248 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
249 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed).
251 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
252 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
254 :returns: a proxy that can be used as a context manager to release the lock
256 :raises RuntimeError: if a write lock is already held on this instance
257 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
259 """
260 return self._acquire("read", timeout, blocking=blocking)
262 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
263 """
264 Acquire an exclusive write lock.
266 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
267 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed).
268 Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
269 :class:`RuntimeError`.
271 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
272 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
274 :returns: a proxy that can be used as a context manager to release the lock
276 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
277 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
279 """
280 return self._acquire("write", timeout, blocking=blocking)
282 def release(self, *, force: bool = False) -> None:
283 """
284 Release one level of the current lock.
286 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock.
288 :param force: if ``True``, release the lock completely regardless of the current lock level
290 :raises RuntimeError: if no lock is currently held and *force* is ``False``
292 """
293 should_rollback = False
294 with self._internal_lock:
295 if self._lock_level == 0:
296 if force:
297 return
298 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
299 raise RuntimeError(msg)
300 if force:
301 self._lock_level = 0
302 else:
303 self._lock_level -= 1
304 if self._lock_level == 0:
305 self._current_mode = None
306 self._write_thread_id = None
307 should_rollback = True
308 if should_rollback:
309 self._con.rollback()
311 @contextmanager
312 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
313 """
314 Context manager that acquires and releases a shared read lock.
316 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
318 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
319 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
321 """
322 if timeout is None:
323 timeout = self.timeout
324 if blocking is None:
325 blocking = self.blocking
326 self.acquire_read(timeout, blocking=blocking)
327 try:
328 yield
329 finally:
330 self.release()
332 @contextmanager
333 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
334 """
335 Context manager that acquires and releases an exclusive write lock.
337 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
339 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
340 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
342 """
343 if timeout is None:
344 timeout = self.timeout
345 if blocking is None:
346 blocking = self.blocking
347 self.acquire_write(timeout, blocking=blocking)
348 try:
349 yield
350 finally:
351 self.release()
353 def close(self) -> None:
354 """
355 Release the lock (if held) and close the underlying SQLite connection.
357 After calling this method, the lock instance is no longer usable.
359 """
360 self.release(force=True)
361 self._con.close()
362 with _all_connections_lock:
363 _all_connections.discard(self._con)