Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/lock.py: 26%
109 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import threading
2import time as mod_time
3import uuid
4from types import SimpleNamespace, TracebackType
5from typing import Optional, Type
7from redis.exceptions import LockError, LockNotOwnedError
8from redis.typing import Number
11class Lock:
12 """
13 A shared, distributed Lock. Using Redis for locking allows the Lock
14 to be shared across processes and/or machines.
16 It's left to the user to resolve deadlock issues and make sure
17 multiple clients play nicely together.
18 """
20 lua_release = None
21 lua_extend = None
22 lua_reacquire = None
24 # KEYS[1] - lock name
25 # ARGV[1] - token
26 # return 1 if the lock was released, otherwise 0
27 LUA_RELEASE_SCRIPT = """
28 local token = redis.call('get', KEYS[1])
29 if not token or token ~= ARGV[1] then
30 return 0
31 end
32 redis.call('del', KEYS[1])
33 return 1
34 """
36 # KEYS[1] - lock name
37 # ARGV[1] - token
38 # ARGV[2] - additional milliseconds
39 # ARGV[3] - "0" if the additional time should be added to the lock's
40 # existing ttl or "1" if the existing ttl should be replaced
41 # return 1 if the locks time was extended, otherwise 0
42 LUA_EXTEND_SCRIPT = """
43 local token = redis.call('get', KEYS[1])
44 if not token or token ~= ARGV[1] then
45 return 0
46 end
47 local expiration = redis.call('pttl', KEYS[1])
48 if not expiration then
49 expiration = 0
50 end
51 if expiration < 0 then
52 return 0
53 end
55 local newttl = ARGV[2]
56 if ARGV[3] == "0" then
57 newttl = ARGV[2] + expiration
58 end
59 redis.call('pexpire', KEYS[1], newttl)
60 return 1
61 """
63 # KEYS[1] - lock name
64 # ARGV[1] - token
65 # ARGV[2] - milliseconds
66 # return 1 if the locks time was reacquired, otherwise 0
67 LUA_REACQUIRE_SCRIPT = """
68 local token = redis.call('get', KEYS[1])
69 if not token or token ~= ARGV[1] then
70 return 0
71 end
72 redis.call('pexpire', KEYS[1], ARGV[2])
73 return 1
74 """
76 def __init__(
77 self,
78 redis,
79 name: str,
80 timeout: Optional[Number] = None,
81 sleep: Number = 0.1,
82 blocking: bool = True,
83 blocking_timeout: Optional[Number] = None,
84 thread_local: bool = True,
85 ):
86 """
87 Create a new Lock instance named ``name`` using the Redis client
88 supplied by ``redis``.
90 ``timeout`` indicates a maximum life for the lock in seconds.
91 By default, it will remain locked until release() is called.
92 ``timeout`` can be specified as a float or integer, both representing
93 the number of seconds to wait.
95 ``sleep`` indicates the amount of time to sleep in seconds per loop
96 iteration when the lock is in blocking mode and another client is
97 currently holding the lock.
99 ``blocking`` indicates whether calling ``acquire`` should block until
100 the lock has been acquired or to fail immediately, causing ``acquire``
101 to return False and the lock not being acquired. Defaults to True.
102 Note this value can be overridden by passing a ``blocking``
103 argument to ``acquire``.
105 ``blocking_timeout`` indicates the maximum amount of time in seconds to
106 spend trying to acquire the lock. A value of ``None`` indicates
107 continue trying forever. ``blocking_timeout`` can be specified as a
108 float or integer, both representing the number of seconds to wait.
110 ``thread_local`` indicates whether the lock token is placed in
111 thread-local storage. By default, the token is placed in thread local
112 storage so that a thread only sees its token, not a token set by
113 another thread. Consider the following timeline:
115 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
116 thread-1 sets the token to "abc"
117 time: 1, thread-2 blocks trying to acquire `my-lock` using the
118 Lock instance.
119 time: 5, thread-1 has not yet completed. redis expires the lock
120 key.
121 time: 5, thread-2 acquired `my-lock` now that it's available.
122 thread-2 sets the token to "xyz"
123 time: 6, thread-1 finishes its work and calls release(). if the
124 token is *not* stored in thread local storage, then
125 thread-1 would see the token value as "xyz" and would be
126 able to successfully release the thread-2's lock.
128 In some use cases it's necessary to disable thread local storage. For
129 example, if you have code where one thread acquires a lock and passes
130 that lock instance to a worker thread to release later. If thread
131 local storage isn't disabled in this case, the worker thread won't see
132 the token set by the thread that acquired the lock. Our assumption
133 is that these cases aren't common and as such default to using
134 thread local storage.
135 """
136 self.redis = redis
137 self.name = name
138 self.timeout = timeout
139 self.sleep = sleep
140 self.blocking = blocking
141 self.blocking_timeout = blocking_timeout
142 self.thread_local = bool(thread_local)
143 self.local = threading.local() if self.thread_local else SimpleNamespace()
144 self.local.token = None
145 self.register_scripts()
147 def register_scripts(self) -> None:
148 cls = self.__class__
149 client = self.redis
150 if cls.lua_release is None:
151 cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
152 if cls.lua_extend is None:
153 cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
154 if cls.lua_reacquire is None:
155 cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT)
157 def __enter__(self) -> "Lock":
158 if self.acquire():
159 return self
160 raise LockError(
161 "Unable to acquire lock within the time specified",
162 lock_name=self.name,
163 )
165 def __exit__(
166 self,
167 exc_type: Optional[Type[BaseException]],
168 exc_value: Optional[BaseException],
169 traceback: Optional[TracebackType],
170 ) -> None:
171 self.release()
173 def acquire(
174 self,
175 sleep: Optional[Number] = None,
176 blocking: Optional[bool] = None,
177 blocking_timeout: Optional[Number] = None,
178 token: Optional[str] = None,
179 ):
180 """
181 Use Redis to hold a shared, distributed lock named ``name``.
182 Returns True once the lock is acquired.
184 If ``blocking`` is False, always return immediately. If the lock
185 was acquired, return True, otherwise return False.
187 ``blocking_timeout`` specifies the maximum number of seconds to
188 wait trying to acquire the lock.
190 ``token`` specifies the token value to be used. If provided, token
191 must be a bytes object or a string that can be encoded to a bytes
192 object with the default encoding. If a token isn't specified, a UUID
193 will be generated.
194 """
195 if sleep is None:
196 sleep = self.sleep
197 if token is None:
198 token = uuid.uuid1().hex.encode()
199 else:
200 encoder = self.redis.get_encoder()
201 token = encoder.encode(token)
202 if blocking is None:
203 blocking = self.blocking
204 if blocking_timeout is None:
205 blocking_timeout = self.blocking_timeout
206 stop_trying_at = None
207 if blocking_timeout is not None:
208 stop_trying_at = mod_time.monotonic() + blocking_timeout
209 while True:
210 if self.do_acquire(token):
211 self.local.token = token
212 return True
213 if not blocking:
214 return False
215 next_try_at = mod_time.monotonic() + sleep
216 if stop_trying_at is not None and next_try_at > stop_trying_at:
217 return False
218 mod_time.sleep(sleep)
220 def do_acquire(self, token: str) -> bool:
221 if self.timeout:
222 # convert to milliseconds
223 timeout = int(self.timeout * 1000)
224 else:
225 timeout = None
226 if self.redis.set(self.name, token, nx=True, px=timeout):
227 return True
228 return False
230 def locked(self) -> bool:
231 """
232 Returns True if this key is locked by any process, otherwise False.
233 """
234 return self.redis.get(self.name) is not None
236 def owned(self) -> bool:
237 """
238 Returns True if this key is locked by this lock, otherwise False.
239 """
240 stored_token = self.redis.get(self.name)
241 # need to always compare bytes to bytes
242 # TODO: this can be simplified when the context manager is finished
243 if stored_token and not isinstance(stored_token, bytes):
244 encoder = self.redis.get_encoder()
245 stored_token = encoder.encode(stored_token)
246 return self.local.token is not None and stored_token == self.local.token
248 def release(self) -> None:
249 """
250 Releases the already acquired lock
251 """
252 expected_token = self.local.token
253 if expected_token is None:
254 raise LockError("Cannot release an unlocked lock", lock_name=self.name)
255 self.local.token = None
256 self.do_release(expected_token)
258 def do_release(self, expected_token: str) -> None:
259 if not bool(
260 self.lua_release(keys=[self.name], args=[expected_token], client=self.redis)
261 ):
262 raise LockNotOwnedError(
263 "Cannot release a lock that's no longer owned",
264 lock_name=self.name,
265 )
267 def extend(self, additional_time: int, replace_ttl: bool = False) -> bool:
268 """
269 Adds more time to an already acquired lock.
271 ``additional_time`` can be specified as an integer or a float, both
272 representing the number of seconds to add.
274 ``replace_ttl`` if False (the default), add `additional_time` to
275 the lock's existing ttl. If True, replace the lock's ttl with
276 `additional_time`.
277 """
278 if self.local.token is None:
279 raise LockError("Cannot extend an unlocked lock", lock_name=self.name)
280 if self.timeout is None:
281 raise LockError("Cannot extend a lock with no timeout", lock_name=self.name)
282 return self.do_extend(additional_time, replace_ttl)
284 def do_extend(self, additional_time: int, replace_ttl: bool) -> bool:
285 additional_time = int(additional_time * 1000)
286 if not bool(
287 self.lua_extend(
288 keys=[self.name],
289 args=[self.local.token, additional_time, "1" if replace_ttl else "0"],
290 client=self.redis,
291 )
292 ):
293 raise LockNotOwnedError(
294 "Cannot extend a lock that's no longer owned",
295 lock_name=self.name,
296 )
297 return True
299 def reacquire(self) -> bool:
300 """
301 Resets a TTL of an already acquired lock back to a timeout value.
302 """
303 if self.local.token is None:
304 raise LockError("Cannot reacquire an unlocked lock", lock_name=self.name)
305 if self.timeout is None:
306 raise LockError(
307 "Cannot reacquire a lock with no timeout",
308 lock_name=self.name,
309 )
310 return self.do_reacquire()
312 def do_reacquire(self) -> bool:
313 timeout = int(self.timeout * 1000)
314 if not bool(
315 self.lua_reacquire(
316 keys=[self.name], args=[self.local.token, timeout], client=self.redis
317 )
318 ):
319 raise LockNotOwnedError(
320 "Cannot reacquire a lock that's no longer owned",
321 lock_name=self.name,
322 )
323 return True