Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyrate_limiter/sqlite_bucket.py: 42%

93 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import sqlite3 

2from hashlib import sha1 

3from pathlib import Path 

4from tempfile import gettempdir 

5from threading import RLock 

6from typing import Iterable 

7from typing import Iterator 

8from typing import List 

9from typing import Optional 

10from typing import Union 

11 

12from .bucket import AbstractBucket 

13 

14TEMP_DIR = Path(gettempdir()) 

15DEFAULT_DB_PATH = TEMP_DIR / "pyrate_limiter.sqlite" 

16LOCK_PATH = TEMP_DIR / "pyrate_limiter.lock" 

17SQLITE_MAX_VARIABLE_NUMBER = 999 

18 

19 

20class SQLiteBucket(AbstractBucket): 

21 """Bucket backed by a SQLite database. Will be stored in the system temp directory by default. 

22 

23 Notes on concurrency: 

24 

25 * Thread-safe 

26 * For usage with multiprocessing, see :py:class:`.FileLockSQLiteBucket`. 

27 * Transactions are locked at the bucket level, but not at the connection or database level. 

28 * The default isolation level is used (autocommit). 

29 * Multitple buckets may be used in parallel, but a given bucket will only be used by one 

30 thread/process at a time. 

31 

32 Args: 

33 maxsize: Maximum number of items in the bucket 

34 identity: Bucket identity, used as the table name 

35 path: Path to the SQLite database file; defaults to a temp file in the system temp directory 

36 kwargs: Additional keyword arguments for :py:func:`sqlite3.connect` 

37 """ 

38 

39 def __init__( 

40 self, 

41 maxsize: int = 0, 

42 identity: str = None, 

43 path: Union[Path, str] = DEFAULT_DB_PATH, 

44 **kwargs, 

45 ): 

46 super().__init__(maxsize=maxsize) 

47 kwargs.setdefault("check_same_thread", False) 

48 self.connection_kwargs = kwargs 

49 

50 self._connection: Optional[sqlite3.Connection] = None 

51 self._lock = RLock() 

52 self._path = Path(path) 

53 self._size: Optional[int] = None 

54 

55 if not identity: 

56 raise ValueError("Bucket identity is required") 

57 

58 # Hash identity to use as a table name, to avoid potential issues with user-provided values 

59 self.table = f"ratelimit_{sha1(identity.encode()).hexdigest()}" 

60 

61 @property 

62 def connection(self) -> sqlite3.Connection: 

63 """Create a database connection and initialize the table, if it hasn't already been done. 

64 This is safe to leave open, but may be manually closed with :py:meth:`.close`, if needed. 

65 """ 

66 if not self._connection: 

67 self.connection_kwargs.setdefault("check_same_thread", False) 

68 self._connection = sqlite3.connect(str(self._path), **self.connection_kwargs) 

69 assert self._connection 

70 self._connection.execute( 

71 f"CREATE TABLE IF NOT EXISTS {self.table} (idx INTEGER PRIMARY KEY AUTOINCREMENT, value REAL)" 

72 ) 

73 return self._connection 

74 

75 def lock_acquire(self): 

76 """Acquire a lock prior to beginning a new transaction""" 

77 self._lock.acquire() 

78 

79 def lock_release(self): 

80 """Release lock following a transaction""" 

81 self._lock.release() 

82 

83 def close(self): 

84 """Close the database connection""" 

85 if self._connection: 

86 self._connection.close() 

87 self._connection = None 

88 

89 def size(self) -> int: 

90 """Keep bucket size in memory to avoid some unnecessary reads""" 

91 if self._size is None: 

92 self._size = self._query_size() 

93 return self._size 

94 

95 def _query_size(self) -> int: 

96 """Keep bucket size in memory to avoid some unnecessary reads""" 

97 return self.connection.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0] 

98 

99 def _update_size(self, amount: int): 

100 self._size = self.size() + amount 

101 

102 def put(self, item: float) -> int: 

103 """Put an item in the bucket. 

104 Return 1 if successful, else 0 

105 """ 

106 if self.size() < self.maxsize(): 

107 self.connection.execute(f"INSERT INTO {self.table} (value) VALUES (?)", (item,)) 

108 self.connection.commit() 

109 self._update_size(1) 

110 return 1 

111 return 0 

112 

113 def get(self, number: int = 1) -> int: 

114 """Get items and remove them from the bucket in the FIFO fashion. 

115 Return the number of items that have been removed. 

116 """ 

117 keys = [str(key) for key in self._get_keys(number)] 

118 for chunk in chunkify(keys, SQLITE_MAX_VARIABLE_NUMBER): 

119 placeholders = ",".join("?" * len(chunk)) 

120 self.connection.execute(f"DELETE FROM {self.table} WHERE idx IN ({placeholders})", chunk) 

121 self.connection.commit() 

122 

123 self._update_size(0 - len(keys)) 

124 return len(keys) 

125 

126 def _get_keys(self, number: int = 1) -> List[float]: 

127 rows = self.connection.execute(f"SELECT idx FROM {self.table} ORDER BY idx LIMIT ?", (number,)).fetchall() 

128 return [row[0] for row in rows] 

129 

130 def all_items(self) -> List[float]: 

131 """Return a list as copies of all items in the bucket""" 

132 rows = self.connection.execute(f"SELECT value FROM {self.table} ORDER BY idx").fetchall() 

133 return [row[0] for row in rows] 

134 

135 def flush(self): 

136 self.connection.execute(f"DELETE FROM {self.table}") 

137 self.connection.commit() 

138 

139 

140def chunkify(iterable: Iterable, max_size: int) -> Iterator[List]: 

141 """Split an iterable into chunks of a max size""" 

142 iterable = list(iterable) 

143 for index in range(0, len(iterable), max_size): 

144 yield iterable[index : index + max_size] 

145 

146 

147# Create file lock in module scope to reuse across buckets 

148try: 

149 from filelock import FileLock 

150 

151 FILE_LOCK = FileLock(str(LOCK_PATH)) 

152except ImportError: 

153 pass 

154 

155 

156class FileLockSQLiteBucket(SQLiteBucket): 

157 """Bucket backed by a SQLite database and file lock. Suitable for usage from multiple processes 

158 with no shared state. Requires installing [py-filelock](https://py-filelock.readthedocs.io). 

159 

160 The file lock is reentrant and shared across buckets, allowing a process to access multiple 

161 buckets at once. 

162 """ 

163 

164 def __init__(self, **kwargs): 

165 # If not installed, raise ImportError at init time instead of at module import time 

166 from filelock import FileLock # noqa: F401 

167 

168 super().__init__(**kwargs) 

169 self._lock = FILE_LOCK 

170 

171 def size(self) -> int: 

172 """Query current size from the database for each call instead of keeping in memory""" 

173 return self._query_size() 

174 

175 def _update_size(self, _): 

176 pass