Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyrate_limiter/sqlite_bucket.py: 42%
93 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import sqlite3
2from hashlib import sha1
3from pathlib import Path
4from tempfile import gettempdir
5from threading import RLock
6from typing import Iterable
7from typing import Iterator
8from typing import List
9from typing import Optional
10from typing import Union
12from .bucket import AbstractBucket
14TEMP_DIR = Path(gettempdir())
15DEFAULT_DB_PATH = TEMP_DIR / "pyrate_limiter.sqlite"
16LOCK_PATH = TEMP_DIR / "pyrate_limiter.lock"
17SQLITE_MAX_VARIABLE_NUMBER = 999
20class SQLiteBucket(AbstractBucket):
21 """Bucket backed by a SQLite database. Will be stored in the system temp directory by default.
23 Notes on concurrency:
25 * Thread-safe
26 * For usage with multiprocessing, see :py:class:`.FileLockSQLiteBucket`.
27 * Transactions are locked at the bucket level, but not at the connection or database level.
28 * The default isolation level is used (autocommit).
29 * Multitple buckets may be used in parallel, but a given bucket will only be used by one
30 thread/process at a time.
32 Args:
33 maxsize: Maximum number of items in the bucket
34 identity: Bucket identity, used as the table name
35 path: Path to the SQLite database file; defaults to a temp file in the system temp directory
36 kwargs: Additional keyword arguments for :py:func:`sqlite3.connect`
37 """
39 def __init__(
40 self,
41 maxsize: int = 0,
42 identity: str = None,
43 path: Union[Path, str] = DEFAULT_DB_PATH,
44 **kwargs,
45 ):
46 super().__init__(maxsize=maxsize)
47 kwargs.setdefault("check_same_thread", False)
48 self.connection_kwargs = kwargs
50 self._connection: Optional[sqlite3.Connection] = None
51 self._lock = RLock()
52 self._path = Path(path)
53 self._size: Optional[int] = None
55 if not identity:
56 raise ValueError("Bucket identity is required")
58 # Hash identity to use as a table name, to avoid potential issues with user-provided values
59 self.table = f"ratelimit_{sha1(identity.encode()).hexdigest()}"
61 @property
62 def connection(self) -> sqlite3.Connection:
63 """Create a database connection and initialize the table, if it hasn't already been done.
64 This is safe to leave open, but may be manually closed with :py:meth:`.close`, if needed.
65 """
66 if not self._connection:
67 self.connection_kwargs.setdefault("check_same_thread", False)
68 self._connection = sqlite3.connect(str(self._path), **self.connection_kwargs)
69 assert self._connection
70 self._connection.execute(
71 f"CREATE TABLE IF NOT EXISTS {self.table} (idx INTEGER PRIMARY KEY AUTOINCREMENT, value REAL)"
72 )
73 return self._connection
75 def lock_acquire(self):
76 """Acquire a lock prior to beginning a new transaction"""
77 self._lock.acquire()
79 def lock_release(self):
80 """Release lock following a transaction"""
81 self._lock.release()
83 def close(self):
84 """Close the database connection"""
85 if self._connection:
86 self._connection.close()
87 self._connection = None
89 def size(self) -> int:
90 """Keep bucket size in memory to avoid some unnecessary reads"""
91 if self._size is None:
92 self._size = self._query_size()
93 return self._size
95 def _query_size(self) -> int:
96 """Keep bucket size in memory to avoid some unnecessary reads"""
97 return self.connection.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0]
99 def _update_size(self, amount: int):
100 self._size = self.size() + amount
102 def put(self, item: float) -> int:
103 """Put an item in the bucket.
104 Return 1 if successful, else 0
105 """
106 if self.size() < self.maxsize():
107 self.connection.execute(f"INSERT INTO {self.table} (value) VALUES (?)", (item,))
108 self.connection.commit()
109 self._update_size(1)
110 return 1
111 return 0
113 def get(self, number: int = 1) -> int:
114 """Get items and remove them from the bucket in the FIFO fashion.
115 Return the number of items that have been removed.
116 """
117 keys = [str(key) for key in self._get_keys(number)]
118 for chunk in chunkify(keys, SQLITE_MAX_VARIABLE_NUMBER):
119 placeholders = ",".join("?" * len(chunk))
120 self.connection.execute(f"DELETE FROM {self.table} WHERE idx IN ({placeholders})", chunk)
121 self.connection.commit()
123 self._update_size(0 - len(keys))
124 return len(keys)
126 def _get_keys(self, number: int = 1) -> List[float]:
127 rows = self.connection.execute(f"SELECT idx FROM {self.table} ORDER BY idx LIMIT ?", (number,)).fetchall()
128 return [row[0] for row in rows]
130 def all_items(self) -> List[float]:
131 """Return a list as copies of all items in the bucket"""
132 rows = self.connection.execute(f"SELECT value FROM {self.table} ORDER BY idx").fetchall()
133 return [row[0] for row in rows]
135 def flush(self):
136 self.connection.execute(f"DELETE FROM {self.table}")
137 self.connection.commit()
140def chunkify(iterable: Iterable, max_size: int) -> Iterator[List]:
141 """Split an iterable into chunks of a max size"""
142 iterable = list(iterable)
143 for index in range(0, len(iterable), max_size):
144 yield iterable[index : index + max_size]
147# Create file lock in module scope to reuse across buckets
148try:
149 from filelock import FileLock
151 FILE_LOCK = FileLock(str(LOCK_PATH))
152except ImportError:
153 pass
156class FileLockSQLiteBucket(SQLiteBucket):
157 """Bucket backed by a SQLite database and file lock. Suitable for usage from multiple processes
158 with no shared state. Requires installing [py-filelock](https://py-filelock.readthedocs.io).
160 The file lock is reentrant and shared across buckets, allowing a process to access multiple
161 buckets at once.
162 """
164 def __init__(self, **kwargs):
165 # If not installed, raise ImportError at init time instead of at module import time
166 from filelock import FileLock # noqa: F401
168 super().__init__(**kwargs)
169 self._lock = FILE_LOCK
171 def size(self) -> int:
172 """Query current size from the database for each call instead of keeping in memory"""
173 return self._query_size()
175 def _update_size(self, _):
176 pass