Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_read_write.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import atexit
4import logging
5import os
6import pathlib
7import sqlite3
8import threading
9import time
10from contextlib import contextmanager, suppress
11from typing import TYPE_CHECKING, Literal
12from weakref import WeakValueDictionary
14from ._api import AcquireReturnProxy
15from ._error import Timeout
17if TYPE_CHECKING:
18 from collections.abc import Generator
20_LOGGER = logging.getLogger("filelock")
22_all_connections: set[sqlite3.Connection] = set()
23_all_connections_lock = threading.Lock()
26def _cleanup_connections() -> None:
27 with _all_connections_lock:
28 for con in list(_all_connections):
29 with suppress(sqlite3.ProgrammingError):
30 con.close()
33atexit.register(_cleanup_connections)
35# sqlite3_busy_timeout() accepts a C int, max 2_147_483_647 on 32-bit. Use a lower value to be safe (~23 days).
36_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
39def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int:
40 if blocking is False:
41 return 0
43 if timeout == -1:
44 return _MAX_SQLITE_TIMEOUT_MS
46 if timeout < 0:
47 msg = "timeout must be a non-negative number or -1"
48 raise ValueError(msg)
50 remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout
51 timeout_ms = int(remaining * 1000)
52 if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
53 _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
54 return _MAX_SQLITE_TIMEOUT_MS
55 return timeout_ms
58class _ReadWriteLockMeta(type):
59 """
60 Metaclass that handles singleton resolution when is_singleton=True.
62 Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently
63 returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify.
64 """
66 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock]
67 _instances_lock: threading.Lock
69 def __call__(
70 cls,
71 lock_file: str | os.PathLike[str],
72 timeout: float = -1,
73 *,
74 blocking: bool = True,
75 is_singleton: bool = True,
76 ) -> ReadWriteLock:
77 if not is_singleton:
78 return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
80 normalized = pathlib.Path(lock_file).resolve()
81 with cls._instances_lock:
82 if normalized not in cls._instances:
83 instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
84 cls._instances[normalized] = instance
85 else:
86 instance = cls._instances[normalized]
88 if instance.timeout != timeout or instance.blocking != blocking:
89 msg = (
90 f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking},"
91 f" cannot be changed to timeout={timeout}, blocking={blocking}"
92 )
93 raise ValueError(msg)
94 return instance
97class ReadWriteLock(metaclass=_ReadWriteLockMeta):
98 """
99 Cross-process read-write lock backed by SQLite.
101 Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode
102 (multiple ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but
103 upgrading from read to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are
104 pinned to the thread that acquired them.
106 By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same
107 instance. The lock file must use a ``.db`` extension (SQLite database).
109 :param lock_file: path to the SQLite database file used as the lock
110 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
111 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
112 :param is_singleton: if ``True``, reuse existing instances for the same resolved path
114 .. versionadded:: 3.21.0
115 """
117 _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary()
118 _instances_lock = threading.Lock()
120 @classmethod
121 def get_lock(
122 cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True
123 ) -> ReadWriteLock:
124 """
125 Return the singleton :class:`ReadWriteLock` for *lock_file*.
127 :param lock_file: path to the SQLite database file used as the lock
128 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
129 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
130 :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values
131 :return: the singleton lock instance
132 """
133 return cls(lock_file, timeout, blocking=blocking)
135 def __init__(
136 self,
137 lock_file: str | os.PathLike[str],
138 timeout: float = -1,
139 *,
140 blocking: bool = True,
141 is_singleton: bool = True, # noqa: ARG002 # consumed by _ReadWriteLockMeta.__call__
142 ) -> None:
143 self.lock_file = os.fspath(lock_file)
144 self.timeout = timeout
145 self.blocking = blocking
146 self._transaction_lock = threading.Lock() # serializes the (possibly blocking) SQLite transaction work
147 self._internal_lock = threading.Lock() # protects _lock_level / _current_mode updates and rollback
148 self._lock_level = 0
149 self._current_mode: Literal["read", "write"] | None = None
150 self._write_thread_id: int | None = None
151 self._con = sqlite3.connect(self.lock_file, check_same_thread=False)
152 with _all_connections_lock:
153 _all_connections.add(self._con)
155 def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None:
156 if timeout == -1:
157 # blocking=True with no timeout means wait indefinitely per threading.Lock.acquire semantics
158 acquired = self._transaction_lock.acquire(blocking)
159 else:
160 acquired = self._transaction_lock.acquire(blocking, timeout)
161 if not acquired:
162 raise Timeout(self.lock_file) from None
164 def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy:
165 if self._current_mode != mode:
166 msg = (
167 f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): "
168 f"already holding a {opposite} lock ({direction} not allowed)"
169 )
170 raise RuntimeError(msg)
171 if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id:
172 msg = (
173 f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) "
174 f"from thread {cur} while it is held by thread {self._write_thread_id}"
175 )
176 raise RuntimeError(msg)
177 self._lock_level += 1
178 return AcquireReturnProxy(lock=self)
180 def _configure_and_begin(
181 self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float
182 ) -> None:
183 waited = time.perf_counter() - start_time
184 timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)
185 self._con.execute(f"PRAGMA busy_timeout={timeout_ms};")
186 # Use legacy journal mode (not WAL) because WAL does not block readers when a concurrent EXCLUSIVE
187 # write transaction is active, making read-write locking impossible without modifying table data.
188 # MEMORY is safe here since no actual writes happen — crashes cannot corrupt the DB.
189 # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
190 #
191 # Set here (not in __init__) because this pragma itself may block on a locked database,
192 # so it must run after busy_timeout is configured above.
193 self._con.execute("PRAGMA journal_mode=MEMORY;")
194 # Recompute remaining timeout after the potentially blocking journal_mode pragma.
195 waited = time.perf_counter() - start_time
196 if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms:
197 self._con.execute(f"PRAGMA busy_timeout={recomputed};")
198 stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;"
199 self._con.execute(stmt)
200 if mode == "read":
201 # A SELECT is needed to force SQLite to actually acquire the SHARED lock on the database.
202 # https://www.sqlite.org/lockingv3.html#transaction_control
203 self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;")
205 def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy:
206 opposite = "write" if mode == "read" else "read"
207 direction = "downgrade" if mode == "read" else "upgrade"
209 with self._internal_lock:
210 if self._lock_level > 0:
211 return self._validate_reentrant(mode, opposite, direction)
213 start_time = time.perf_counter()
214 self._acquire_transaction_lock(blocking=blocking, timeout=timeout)
215 try:
216 # Double-check: another thread may have acquired the lock while we waited on _transaction_lock.
217 with self._internal_lock:
218 if self._lock_level > 0:
219 return self._validate_reentrant(mode, opposite, direction)
221 self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time)
223 with self._internal_lock:
224 self._current_mode = mode
225 self._lock_level = 1
226 if mode == "write":
227 self._write_thread_id = threading.get_ident()
229 return AcquireReturnProxy(lock=self)
231 except sqlite3.OperationalError as exc:
232 if "database is locked" not in str(exc):
233 raise
234 raise Timeout(self.lock_file) from None
235 finally:
236 self._transaction_lock.release()
238 def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
239 """
240 Acquire a shared read lock.
242 If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire
243 a read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed).
245 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
246 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
247 :raises RuntimeError: if a write lock is already held on this instance
248 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
249 :return: a proxy that can be used as a context manager to release the lock
250 """
251 return self._acquire("read", timeout, blocking=blocking)
253 def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy:
254 """
255 Acquire an exclusive write lock.
257 If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant).
258 Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not
259 allowed). Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises
260 :class:`RuntimeError`.
262 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
263 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
264 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
265 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
266 :return: a proxy that can be used as a context manager to release the lock
267 """
268 return self._acquire("write", timeout, blocking=blocking)
270 def release(self, *, force: bool = False) -> None:
271 """
272 Release one level of the current lock.
274 When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock.
276 :param force: if ``True``, release the lock completely regardless of the current lock level
277 :raises RuntimeError: if no lock is currently held and *force* is ``False``
278 """
279 should_rollback = False
280 with self._internal_lock:
281 if self._lock_level == 0:
282 if force:
283 return
284 msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held"
285 raise RuntimeError(msg)
286 if force:
287 self._lock_level = 0
288 else:
289 self._lock_level -= 1
290 if self._lock_level == 0:
291 self._current_mode = None
292 self._write_thread_id = None
293 should_rollback = True
294 if should_rollback:
295 self._con.rollback()
297 @contextmanager
298 def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
299 """
300 Context manager that acquires and releases a shared read lock.
302 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
304 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
305 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
306 """
307 if timeout is None:
308 timeout = self.timeout
309 if blocking is None:
310 blocking = self.blocking
311 self.acquire_read(timeout, blocking=blocking)
312 try:
313 yield
314 finally:
315 self.release()
317 @contextmanager
318 def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]:
319 """
320 Context manager that acquires and releases an exclusive write lock.
322 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
324 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
325 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
326 """
327 if timeout is None:
328 timeout = self.timeout
329 if blocking is None:
330 blocking = self.blocking
331 self.acquire_write(timeout, blocking=blocking)
332 try:
333 yield
334 finally:
335 self.release()
337 def close(self) -> None:
338 """
339 Release the lock (if held) and close the underlying SQLite connection.
341 After calling this method, the lock instance is no longer usable.
342 """
343 self.release(force=True)
344 self._con.close()
345 with _all_connections_lock:
346 _all_connections.discard(self._con)