Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/lock.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import threading
3import time as mod_time
4import uuid
5from types import SimpleNamespace, TracebackType
6from typing import Optional, Type
8from redis.exceptions import LockError, LockNotOwnedError
9from redis.typing import Number
11logger = logging.getLogger(__name__)
14class Lock:
15 """
16 A shared, distributed Lock. Using Redis for locking allows the Lock
17 to be shared across processes and/or machines.
19 It's left to the user to resolve deadlock issues and make sure
20 multiple clients play nicely together.
21 """
23 lua_release = None
24 lua_extend = None
25 lua_reacquire = None
27 # KEYS[1] - lock name
28 # ARGV[1] - token
29 # return 1 if the lock was released, otherwise 0
30 LUA_RELEASE_SCRIPT = """
31 local token = redis.call('get', KEYS[1])
32 if not token or token ~= ARGV[1] then
33 return 0
34 end
35 redis.call('del', KEYS[1])
36 return 1
37 """
39 # KEYS[1] - lock name
40 # ARGV[1] - token
41 # ARGV[2] - additional milliseconds
42 # ARGV[3] - "0" if the additional time should be added to the lock's
43 # existing ttl or "1" if the existing ttl should be replaced
44 # return 1 if the locks time was extended, otherwise 0
45 LUA_EXTEND_SCRIPT = """
46 local token = redis.call('get', KEYS[1])
47 if not token or token ~= ARGV[1] then
48 return 0
49 end
50 local expiration = redis.call('pttl', KEYS[1])
51 if not expiration then
52 expiration = 0
53 end
54 if expiration < 0 then
55 return 0
56 end
58 local newttl = ARGV[2]
59 if ARGV[3] == "0" then
60 newttl = ARGV[2] + expiration
61 end
62 redis.call('pexpire', KEYS[1], newttl)
63 return 1
64 """
66 # KEYS[1] - lock name
67 # ARGV[1] - token
68 # ARGV[2] - milliseconds
69 # return 1 if the locks time was reacquired, otherwise 0
70 LUA_REACQUIRE_SCRIPT = """
71 local token = redis.call('get', KEYS[1])
72 if not token or token ~= ARGV[1] then
73 return 0
74 end
75 redis.call('pexpire', KEYS[1], ARGV[2])
76 return 1
77 """
79 def __init__(
80 self,
81 redis,
82 name: str,
83 timeout: Optional[Number] = None,
84 sleep: Number = 0.1,
85 blocking: bool = True,
86 blocking_timeout: Optional[Number] = None,
87 thread_local: bool = True,
88 raise_on_release_error: bool = True,
89 ):
90 """
91 Create a new Lock instance named ``name`` using the Redis client
92 supplied by ``redis``.
94 ``timeout`` indicates a maximum life for the lock in seconds.
95 By default, it will remain locked until release() is called.
96 ``timeout`` can be specified as a float or integer, both representing
97 the number of seconds to wait.
99 ``sleep`` indicates the amount of time to sleep in seconds per loop
100 iteration when the lock is in blocking mode and another client is
101 currently holding the lock.
103 ``blocking`` indicates whether calling ``acquire`` should block until
104 the lock has been acquired or to fail immediately, causing ``acquire``
105 to return False and the lock not being acquired. Defaults to True.
106 Note this value can be overridden by passing a ``blocking``
107 argument to ``acquire``.
109 ``blocking_timeout`` indicates the maximum amount of time in seconds to
110 spend trying to acquire the lock. A value of ``None`` indicates
111 continue trying forever. ``blocking_timeout`` can be specified as a
112 float or integer, both representing the number of seconds to wait.
114 ``thread_local`` indicates whether the lock token is placed in
115 thread-local storage. By default, the token is placed in thread local
116 storage so that a thread only sees its token, not a token set by
117 another thread. Consider the following timeline:
119 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
120 thread-1 sets the token to "abc"
121 time: 1, thread-2 blocks trying to acquire `my-lock` using the
122 Lock instance.
123 time: 5, thread-1 has not yet completed. redis expires the lock
124 key.
125 time: 5, thread-2 acquired `my-lock` now that it's available.
126 thread-2 sets the token to "xyz"
127 time: 6, thread-1 finishes its work and calls release(). if the
128 token is *not* stored in thread local storage, then
129 thread-1 would see the token value as "xyz" and would be
130 able to successfully release the thread-2's lock.
132 ``raise_on_release_error`` indicates whether to raise an exception when
133 the lock is no longer owned when exiting the context manager. By default,
134 this is True, meaning an exception will be raised. If False, the warning
135 will be logged and the exception will be suppressed.
137 In some use cases it's necessary to disable thread local storage. For
138 example, if you have code where one thread acquires a lock and passes
139 that lock instance to a worker thread to release later. If thread
140 local storage isn't disabled in this case, the worker thread won't see
141 the token set by the thread that acquired the lock. Our assumption
142 is that these cases aren't common and as such default to using
143 thread local storage.
144 """
145 self.redis = redis
146 self.name = name
147 self.timeout = timeout
148 self.sleep = sleep
149 self.blocking = blocking
150 self.blocking_timeout = blocking_timeout
151 self.thread_local = bool(thread_local)
152 self.raise_on_release_error = raise_on_release_error
153 self.local = threading.local() if self.thread_local else SimpleNamespace()
154 self.local.token = None
155 self.register_scripts()
157 def register_scripts(self) -> None:
158 cls = self.__class__
159 client = self.redis
160 if cls.lua_release is None:
161 cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
162 if cls.lua_extend is None:
163 cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
164 if cls.lua_reacquire is None:
165 cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT)
167 def __enter__(self) -> "Lock":
168 if self.acquire():
169 return self
170 raise LockError(
171 "Unable to acquire lock within the time specified",
172 lock_name=self.name,
173 )
175 def __exit__(
176 self,
177 exc_type: Optional[Type[BaseException]],
178 exc_value: Optional[BaseException],
179 traceback: Optional[TracebackType],
180 ) -> None:
181 try:
182 self.release()
183 except LockError:
184 if self.raise_on_release_error:
185 raise
186 logger.warning(
187 "Lock was unlocked or no longer owned when exiting context manager."
188 )
190 def acquire(
191 self,
192 sleep: Optional[Number] = None,
193 blocking: Optional[bool] = None,
194 blocking_timeout: Optional[Number] = None,
195 token: Optional[str] = None,
196 ):
197 """
198 Use Redis to hold a shared, distributed lock named ``name``.
199 Returns True once the lock is acquired.
201 If ``blocking`` is False, always return immediately. If the lock
202 was acquired, return True, otherwise return False.
204 ``blocking_timeout`` specifies the maximum number of seconds to
205 wait trying to acquire the lock.
207 ``token`` specifies the token value to be used. If provided, token
208 must be a bytes object or a string that can be encoded to a bytes
209 object with the default encoding. If a token isn't specified, a UUID
210 will be generated.
211 """
212 if sleep is None:
213 sleep = self.sleep
214 if token is None:
215 token = uuid.uuid1().hex.encode()
216 else:
217 encoder = self.redis.get_encoder()
218 token = encoder.encode(token)
219 if blocking is None:
220 blocking = self.blocking
221 if blocking_timeout is None:
222 blocking_timeout = self.blocking_timeout
223 stop_trying_at = None
224 if blocking_timeout is not None:
225 stop_trying_at = mod_time.monotonic() + blocking_timeout
226 while True:
227 if self.do_acquire(token):
228 self.local.token = token
229 return True
230 if not blocking:
231 return False
232 next_try_at = mod_time.monotonic() + sleep
233 if stop_trying_at is not None and next_try_at > stop_trying_at:
234 return False
235 mod_time.sleep(sleep)
237 def do_acquire(self, token: str) -> bool:
238 if self.timeout:
239 # convert to milliseconds
240 timeout = int(self.timeout * 1000)
241 else:
242 timeout = None
243 if self.redis.set(self.name, token, nx=True, px=timeout):
244 return True
245 return False
247 def locked(self) -> bool:
248 """
249 Returns True if this key is locked by any process, otherwise False.
250 """
251 return self.redis.get(self.name) is not None
253 def owned(self) -> bool:
254 """
255 Returns True if this key is locked by this lock, otherwise False.
256 """
257 stored_token = self.redis.get(self.name)
258 # need to always compare bytes to bytes
259 # TODO: this can be simplified when the context manager is finished
260 if stored_token and not isinstance(stored_token, bytes):
261 encoder = self.redis.get_encoder()
262 stored_token = encoder.encode(stored_token)
263 return self.local.token is not None and stored_token == self.local.token
265 def release(self) -> None:
266 """
267 Releases the already acquired lock
268 """
269 expected_token = self.local.token
270 if expected_token is None:
271 raise LockError(
272 "Cannot release a lock that's not owned or is already unlocked.",
273 lock_name=self.name,
274 )
275 self.local.token = None
276 self.do_release(expected_token)
278 def do_release(self, expected_token: str) -> None:
279 if not bool(
280 self.lua_release(keys=[self.name], args=[expected_token], client=self.redis)
281 ):
282 raise LockNotOwnedError(
283 "Cannot release a lock that's no longer owned",
284 lock_name=self.name,
285 )
287 def extend(self, additional_time: Number, replace_ttl: bool = False) -> bool:
288 """
289 Adds more time to an already acquired lock.
291 ``additional_time`` can be specified as an integer or a float, both
292 representing the number of seconds to add.
294 ``replace_ttl`` if False (the default), add `additional_time` to
295 the lock's existing ttl. If True, replace the lock's ttl with
296 `additional_time`.
297 """
298 if self.local.token is None:
299 raise LockError("Cannot extend an unlocked lock", lock_name=self.name)
300 if self.timeout is None:
301 raise LockError("Cannot extend a lock with no timeout", lock_name=self.name)
302 return self.do_extend(additional_time, replace_ttl)
304 def do_extend(self, additional_time: Number, replace_ttl: bool) -> bool:
305 additional_time = int(additional_time * 1000)
306 if not bool(
307 self.lua_extend(
308 keys=[self.name],
309 args=[self.local.token, additional_time, "1" if replace_ttl else "0"],
310 client=self.redis,
311 )
312 ):
313 raise LockNotOwnedError(
314 "Cannot extend a lock that's no longer owned",
315 lock_name=self.name,
316 )
317 return True
319 def reacquire(self) -> bool:
320 """
321 Resets a TTL of an already acquired lock back to a timeout value.
322 """
323 if self.local.token is None:
324 raise LockError("Cannot reacquire an unlocked lock", lock_name=self.name)
325 if self.timeout is None:
326 raise LockError(
327 "Cannot reacquire a lock with no timeout",
328 lock_name=self.name,
329 )
330 return self.do_reacquire()
332 def do_reacquire(self) -> bool:
333 timeout = int(self.timeout * 1000)
334 if not bool(
335 self.lua_reacquire(
336 keys=[self.name], args=[self.local.token, timeout], client=self.redis
337 )
338 ):
339 raise LockNotOwnedError(
340 "Cannot reacquire a lock that's no longer owned",
341 lock_name=self.name,
342 )
343 return True