Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/lock.py: 26%
109 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
1import threading
2import time as mod_time
3import uuid
4from types import SimpleNamespace, TracebackType
5from typing import Optional, Type
7from redis.exceptions import LockError, LockNotOwnedError
8from redis.typing import Number
11class Lock:
12 """
13 A shared, distributed Lock. Using Redis for locking allows the Lock
14 to be shared across processes and/or machines.
16 It's left to the user to resolve deadlock issues and make sure
17 multiple clients play nicely together.
18 """
20 lua_release = None
21 lua_extend = None
22 lua_reacquire = None
24 # KEYS[1] - lock name
25 # ARGV[1] - token
26 # return 1 if the lock was released, otherwise 0
27 LUA_RELEASE_SCRIPT = """
28 local token = redis.call('get', KEYS[1])
29 if not token or token ~= ARGV[1] then
30 return 0
31 end
32 redis.call('del', KEYS[1])
33 return 1
34 """
36 # KEYS[1] - lock name
37 # ARGV[1] - token
38 # ARGV[2] - additional milliseconds
39 # ARGV[3] - "0" if the additional time should be added to the lock's
40 # existing ttl or "1" if the existing ttl should be replaced
41 # return 1 if the locks time was extended, otherwise 0
42 LUA_EXTEND_SCRIPT = """
43 local token = redis.call('get', KEYS[1])
44 if not token or token ~= ARGV[1] then
45 return 0
46 end
47 local expiration = redis.call('pttl', KEYS[1])
48 if not expiration then
49 expiration = 0
50 end
51 if expiration < 0 then
52 return 0
53 end
55 local newttl = ARGV[2]
56 if ARGV[3] == "0" then
57 newttl = ARGV[2] + expiration
58 end
59 redis.call('pexpire', KEYS[1], newttl)
60 return 1
61 """
63 # KEYS[1] - lock name
64 # ARGV[1] - token
65 # ARGV[2] - milliseconds
66 # return 1 if the locks time was reacquired, otherwise 0
67 LUA_REACQUIRE_SCRIPT = """
68 local token = redis.call('get', KEYS[1])
69 if not token or token ~= ARGV[1] then
70 return 0
71 end
72 redis.call('pexpire', KEYS[1], ARGV[2])
73 return 1
74 """
76 def __init__(
77 self,
78 redis,
79 name: str,
80 timeout: Optional[Number] = None,
81 sleep: Number = 0.1,
82 blocking: bool = True,
83 blocking_timeout: Optional[Number] = None,
84 thread_local: bool = True,
85 ):
86 """
87 Create a new Lock instance named ``name`` using the Redis client
88 supplied by ``redis``.
90 ``timeout`` indicates a maximum life for the lock in seconds.
91 By default, it will remain locked until release() is called.
92 ``timeout`` can be specified as a float or integer, both representing
93 the number of seconds to wait.
95 ``sleep`` indicates the amount of time to sleep in seconds per loop
96 iteration when the lock is in blocking mode and another client is
97 currently holding the lock.
99 ``blocking`` indicates whether calling ``acquire`` should block until
100 the lock has been acquired or to fail immediately, causing ``acquire``
101 to return False and the lock not being acquired. Defaults to True.
102 Note this value can be overridden by passing a ``blocking``
103 argument to ``acquire``.
105 ``blocking_timeout`` indicates the maximum amount of time in seconds to
106 spend trying to acquire the lock. A value of ``None`` indicates
107 continue trying forever. ``blocking_timeout`` can be specified as a
108 float or integer, both representing the number of seconds to wait.
110 ``thread_local`` indicates whether the lock token is placed in
111 thread-local storage. By default, the token is placed in thread local
112 storage so that a thread only sees its token, not a token set by
113 another thread. Consider the following timeline:
115 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
116 thread-1 sets the token to "abc"
117 time: 1, thread-2 blocks trying to acquire `my-lock` using the
118 Lock instance.
119 time: 5, thread-1 has not yet completed. redis expires the lock
120 key.
121 time: 5, thread-2 acquired `my-lock` now that it's available.
122 thread-2 sets the token to "xyz"
123 time: 6, thread-1 finishes its work and calls release(). if the
124 token is *not* stored in thread local storage, then
125 thread-1 would see the token value as "xyz" and would be
126 able to successfully release the thread-2's lock.
128 In some use cases it's necessary to disable thread local storage. For
129 example, if you have code where one thread acquires a lock and passes
130 that lock instance to a worker thread to release later. If thread
131 local storage isn't disabled in this case, the worker thread won't see
132 the token set by the thread that acquired the lock. Our assumption
133 is that these cases aren't common and as such default to using
134 thread local storage.
135 """
136 self.redis = redis
137 self.name = name
138 self.timeout = timeout
139 self.sleep = sleep
140 self.blocking = blocking
141 self.blocking_timeout = blocking_timeout
142 self.thread_local = bool(thread_local)
143 self.local = threading.local() if self.thread_local else SimpleNamespace()
144 self.local.token = None
145 self.register_scripts()
147 def register_scripts(self) -> None:
148 cls = self.__class__
149 client = self.redis
150 if cls.lua_release is None:
151 cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
152 if cls.lua_extend is None:
153 cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
154 if cls.lua_reacquire is None:
155 cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT)
157 def __enter__(self) -> "Lock":
158 if self.acquire():
159 return self
160 raise LockError("Unable to acquire lock within the time specified")
162 def __exit__(
163 self,
164 exc_type: Optional[Type[BaseException]],
165 exc_value: Optional[BaseException],
166 traceback: Optional[TracebackType],
167 ) -> None:
168 self.release()
170 def acquire(
171 self,
172 sleep: Optional[Number] = None,
173 blocking: Optional[bool] = None,
174 blocking_timeout: Optional[Number] = None,
175 token: Optional[str] = None,
176 ):
177 """
178 Use Redis to hold a shared, distributed lock named ``name``.
179 Returns True once the lock is acquired.
181 If ``blocking`` is False, always return immediately. If the lock
182 was acquired, return True, otherwise return False.
184 ``blocking_timeout`` specifies the maximum number of seconds to
185 wait trying to acquire the lock.
187 ``token`` specifies the token value to be used. If provided, token
188 must be a bytes object or a string that can be encoded to a bytes
189 object with the default encoding. If a token isn't specified, a UUID
190 will be generated.
191 """
192 if sleep is None:
193 sleep = self.sleep
194 if token is None:
195 token = uuid.uuid1().hex.encode()
196 else:
197 encoder = self.redis.get_encoder()
198 token = encoder.encode(token)
199 if blocking is None:
200 blocking = self.blocking
201 if blocking_timeout is None:
202 blocking_timeout = self.blocking_timeout
203 stop_trying_at = None
204 if blocking_timeout is not None:
205 stop_trying_at = mod_time.monotonic() + blocking_timeout
206 while True:
207 if self.do_acquire(token):
208 self.local.token = token
209 return True
210 if not blocking:
211 return False
212 next_try_at = mod_time.monotonic() + sleep
213 if stop_trying_at is not None and next_try_at > stop_trying_at:
214 return False
215 mod_time.sleep(sleep)
217 def do_acquire(self, token: str) -> bool:
218 if self.timeout:
219 # convert to milliseconds
220 timeout = int(self.timeout * 1000)
221 else:
222 timeout = None
223 if self.redis.set(self.name, token, nx=True, px=timeout):
224 return True
225 return False
227 def locked(self) -> bool:
228 """
229 Returns True if this key is locked by any process, otherwise False.
230 """
231 return self.redis.get(self.name) is not None
233 def owned(self) -> bool:
234 """
235 Returns True if this key is locked by this lock, otherwise False.
236 """
237 stored_token = self.redis.get(self.name)
238 # need to always compare bytes to bytes
239 # TODO: this can be simplified when the context manager is finished
240 if stored_token and not isinstance(stored_token, bytes):
241 encoder = self.redis.get_encoder()
242 stored_token = encoder.encode(stored_token)
243 return self.local.token is not None and stored_token == self.local.token
245 def release(self) -> None:
246 """
247 Releases the already acquired lock
248 """
249 expected_token = self.local.token
250 if expected_token is None:
251 raise LockError("Cannot release an unlocked lock")
252 self.local.token = None
253 self.do_release(expected_token)
255 def do_release(self, expected_token: str) -> None:
256 if not bool(
257 self.lua_release(keys=[self.name], args=[expected_token], client=self.redis)
258 ):
259 raise LockNotOwnedError("Cannot release a lock that's no longer owned")
261 def extend(self, additional_time: int, replace_ttl: bool = False) -> bool:
262 """
263 Adds more time to an already acquired lock.
265 ``additional_time`` can be specified as an integer or a float, both
266 representing the number of seconds to add.
268 ``replace_ttl`` if False (the default), add `additional_time` to
269 the lock's existing ttl. If True, replace the lock's ttl with
270 `additional_time`.
271 """
272 if self.local.token is None:
273 raise LockError("Cannot extend an unlocked lock")
274 if self.timeout is None:
275 raise LockError("Cannot extend a lock with no timeout")
276 return self.do_extend(additional_time, replace_ttl)
278 def do_extend(self, additional_time: int, replace_ttl: bool) -> bool:
279 additional_time = int(additional_time * 1000)
280 if not bool(
281 self.lua_extend(
282 keys=[self.name],
283 args=[self.local.token, additional_time, "1" if replace_ttl else "0"],
284 client=self.redis,
285 )
286 ):
287 raise LockNotOwnedError("Cannot extend a lock that's no longer owned")
288 return True
290 def reacquire(self) -> bool:
291 """
292 Resets a TTL of an already acquired lock back to a timeout value.
293 """
294 if self.local.token is None:
295 raise LockError("Cannot reacquire an unlocked lock")
296 if self.timeout is None:
297 raise LockError("Cannot reacquire a lock with no timeout")
298 return self.do_reacquire()
300 def do_reacquire(self) -> bool:
301 timeout = int(self.timeout * 1000)
302 if not bool(
303 self.lua_reacquire(
304 keys=[self.name], args=[self.local.token, timeout], client=self.redis
305 )
306 ):
307 raise LockNotOwnedError("Cannot reacquire a lock that's no longer owned")
308 return True