Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import atexit
4import logging
5import os
6import pathlib
7import sqlite3
8import threading
9import time
10from contextlib import contextmanager, suppress
11from typing import TYPE_CHECKING, Literal
12from weakref import WeakValueDictionary
14from ._api import AcquireReturnProxy
15from ._error import Timeout
17if TYPE_CHECKING:
18 from collections.abc import Generator
20_LOGGER = logging.getLogger("filelock")
22_all_connections: set[sqlite3.Connection] = set()
23_all_connections_lock = threading.Lock()
26def _cleanup_connections() -> None:
27 with _all_connections_lock:
28 for con in list(_all_connections):
29 with suppress(Exception):
30 con.close()
31 _all_connections.clear()
34atexit.register(_cleanup_connections)
36# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days).
37_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
40def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int:
41 if blocking is False:
42 return 0
44 if timeout == -1:
45 return _MAX_SQLITE_TIMEOUT_MS
47 if timeout < 0:
48 msg = "timeout must be a non-negative number or -1"
49 raise ValueError(msg)
51 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout
52 timeout_ms = int(remaining * 1000)
53 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
54 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
55 return _MAX_SQLITE_TIMEOUT_MS
56 return timeout_ms
59class _ReadWriteLockMeta(type):
60 """
61 Metaclass that handles singleton resolution when is_singleton=True.
63 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently
64 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify.
66 """
68 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock]
69 _instances_lock: threading.Lock
71 def __call__(
72 cls,
73 lock_file: str | os.PathLike[str],
74 timeout: float = -1,
75 *,
76 blocking: bool = True,
77 is_singleton: bool = True,
78 ) -> ReadWriteLock:
79 if not is_singleton:
80 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
82 normalized = pathlib.Path(lock_file).resolve()
83 with cls._instances_lock:
84 if normalized not in cls._instances:
85 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
86 cls._instances[normalized] = instance
87 else:
88 instance = cls._instances[normalized]
90 if instance.timeout != timeout or instance.blocking != blocking:
91 msg = (
92 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
93 f" cannot be changed to timeout={timeout}, blocking={blocking}"
94 )
95 raise ValueError(msg)
96 return instance
99class ReadWriteLock(metaclass=_ReadWriteLockMeta):
100 """
101 Cross-process read-write lock backed by SQLite.
103 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple
104 ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read
105 to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that
106 acquired them.
108 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same
109 instance. The lock file must use a ``.db`` extension (SQLite database).
111 :param lock_file: path to the SQLite database file used as the lock
112 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
113 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
114 :param is_singleton: if ``True``, reuse existing instances for the same resolved path
116 .. versionadded:: 3.21.0
118 """
120 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary()
121 _instances_lock = threading.Lock()
123 @classmethod
124 def get_lock(
125 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True
126 ) -> ReadWriteLock:
127 """
128 Return the singleton :class:`ReadWriteLock` for *lock_file*.
130 :param lock_file: path to the SQLite database file used as the lock
131 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
132 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
134 :returns: the singleton lock instance
136 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
138 """
139 return cls(lock_file, timeout, blocking=blocking)
141 def __init__(
142 self,
143 lock_file: str | os.PathLike[str],
144 timeout: float = -1,
145 *,
146 blocking: bool = True,
147 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__
148 ) -> None:
149 self.lock_file = os.fspath(lock_file)
150 self.timeout = timeout
151 self.blocking = blocking
152 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work
153 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback
154 self._lock_level = 0
155 self._current_mode: Literal["read", "write"] | None = None
156 self._write_thread_id: int | None = None
157 self._con = sqlite3.connect(self.lock_file, check_same_thread=False)
158 with _all_connections_lock:
159 _all_connections.add(self._con)
161 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None:
162 if not blocking:
163 acquired = self._transaction_lock.acquire(blocking=False)
164 elif timeout == -1:
165 acquired = self._transaction_lock.acquire(blocking=True)
166 else:
167 acquired = self._transaction_lock.acquire(blocking=True, timeout=timeout)
168 if not acquired:
169 raise Timeout(self.lock_file) from None
171 def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy:
172 if self._current_mode != mode:
173 msg = (
174 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
175 f"already holding a {opposite} lock ({direction} not allowed)"
176 )
177 raise RuntimeError(msg)
178 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id:
179 msg = (
180 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
181 f"from thread {cur} while it is held by thread {self._write_thread_id}"
182 )
183 raise RuntimeError(msg)
184 self._lock_level += 1
185 return AcquireReturnProxy(lock=self)
187 def _configure_and_begin(
188 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float
189 ) -> None:
190 waited = time.perf_counter() - start_time
191 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)
192 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close()
193 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE
194 # write transaction is active, making read-write locking impossible without modifying table data.
195 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB.
196 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
197 #
198 # Set here (not in __init__) because this pragma itself may block on a locked database,
199 # so it must run after busy_timeout is configured above.
200 self._con.execute("PRAGMA journal_mode=MEMORY;").close()
201 # Recompute remaining timeout after the potentially blocking journal_mode pragma.
202 waited = time.perf_counter() - start_time
203 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms:
204 self._con.execute(f"PRAGMA busy_timeout={recomputed};").close()
205 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;"
206 self._con.execute(stmt).close()
207 if mode == "read":
208 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database.
209 # https://www.sqlite.org/lockingv3.html#transaction_control
210 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close()
212 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy:
213 opposite = "write" if mode == "read" else "read"
214 direction = "downgrade" if mode == "read" else "upgrade"
216 with self._internal_lock:
217 if self._lock_level > 0:
218 return self._validate_reentrant(mode, opposite, direction)
220 start_time = time.perf_counter()
221 self._acquire_transaction_lock(blocking=blocking, timeout=timeout)
222 try:
223 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock.
224 with self._internal_lock:
225 if self._lock_level > 0:
226 return self._validate_reentrant(mode, opposite, direction)
228 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time)
230 with self._internal_lock:
231 self._current_mode = mode
232 self._lock_level = 1
233 if mode == "write":
234 self._write_thread_id = threading.get_ident()
236 return AcquireReturnProxy(lock=self)
238 except sqlite3.OperationalError as exc:
239 if "database is locked" not in str(exc):
240 raise
241 raise Timeout(self.lock_file) from None
242 finally:
243 self._transaction_lock.release()
245 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
246 """
247 Acquire a shared read lock.
249 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a
250 read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed).
252 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
253 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
255 :returns: a proxy that can be used as a context manager to release the lock
257 :raises RuntimeError: if a write lock is already held on this instance
258 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
260 """
261 return self._acquire("read", timeout, blocking=blocking)
263 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
264 """
265 Acquire an exclusive write lock.
267 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
268 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed).
269 Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
270 :class:`RuntimeError`.
272 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
273 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
275 :returns: a proxy that can be used as a context manager to release the lock
277 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
278 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
280 """
281 return self._acquire("write", timeout, blocking=blocking)
283 def release(self, *, force: bool = False) -> None:
284 """
285 Release one level of the current lock.
287 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock.
289 :param force: if ``True``, release the lock completely regardless of the current lock level
291 :raises RuntimeError: if no lock is currently held and *force* is ``False``
293 """
294 should_rollback = False
295 with self._internal_lock:
296 if self._lock_level == 0:
297 if force:
298 return
299 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
300 raise RuntimeError(msg)
301 if force:
302 self._lock_level = 0
303 else:
304 self._lock_level -= 1
305 if self._lock_level == 0:
306 self._current_mode = None
307 self._write_thread_id = None
308 should_rollback = True
309 if should_rollback:
310 self._con.rollback()
312 @contextmanager
313 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
314 """
315 Context manager that acquires and releases a shared read lock.
317 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
319 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
320 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
322 """
323 if timeout is None:
324 timeout = self.timeout
325 if blocking is None:
326 blocking = self.blocking
327 self.acquire_read(timeout, blocking=blocking)
328 try:
329 yield
330 finally:
331 self.release()
333 @contextmanager
334 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
335 """
336 Context manager that acquires and releases an exclusive write lock.
338 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
340 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
341 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
343 """
344 if timeout is None:
345 timeout = self.timeout
346 if blocking is None:
347 blocking = self.blocking
348 self.acquire_write(timeout, blocking=blocking)
349 try:
350 yield
351 finally:
352 self.release()
354 def close(self) -> None:
355 """
356 Release the lock (if held) and close the underlying SQLite connection.
358 After calling this method, the lock instance is no longer usable.
360 """
361 self.release(force=True)
362 self._con.close()
363 with _all_connections_lock:
364 _all_connections.discard(self._con)