Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/lock.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1import logging 

2import threading 

3import time as mod_time 

4import uuid 

5from types import SimpleNamespace, TracebackType 

6from typing import Optional, Type 

7 

8from redis.exceptions import LockError, LockNotOwnedError 

9from redis.typing import Number 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14class Lock: 

15 """ 

16 A shared, distributed Lock. Using Redis for locking allows the Lock 

17 to be shared across processes and/or machines. 

18 

19 It's left to the user to resolve deadlock issues and make sure 

20 multiple clients play nicely together. 

21 """ 

22 

23 lua_release = None 

24 lua_extend = None 

25 lua_reacquire = None 

26 

27 # KEYS[1] - lock name 

28 # ARGV[1] - token 

29 # return 1 if the lock was released, otherwise 0 

30 LUA_RELEASE_SCRIPT = """ 

31 local token = redis.call('get', KEYS[1]) 

32 if not token or token ~= ARGV[1] then 

33 return 0 

34 end 

35 redis.call('del', KEYS[1]) 

36 return 1 

37 """ 

38 

39 # KEYS[1] - lock name 

40 # ARGV[1] - token 

41 # ARGV[2] - additional milliseconds 

42 # ARGV[3] - "0" if the additional time should be added to the lock's 

43 # existing ttl or "1" if the existing ttl should be replaced 

44 # return 1 if the locks time was extended, otherwise 0 

45 LUA_EXTEND_SCRIPT = """ 

46 local token = redis.call('get', KEYS[1]) 

47 if not token or token ~= ARGV[1] then 

48 return 0 

49 end 

50 local expiration = redis.call('pttl', KEYS[1]) 

51 if not expiration then 

52 expiration = 0 

53 end 

54 if expiration < 0 then 

55 return 0 

56 end 

57 

58 local newttl = ARGV[2] 

59 if ARGV[3] == "0" then 

60 newttl = ARGV[2] + expiration 

61 end 

62 redis.call('pexpire', KEYS[1], newttl) 

63 return 1 

64 """ 

65 

66 # KEYS[1] - lock name 

67 # ARGV[1] - token 

68 # ARGV[2] - milliseconds 

69 # return 1 if the locks time was reacquired, otherwise 0 

70 LUA_REACQUIRE_SCRIPT = """ 

71 local token = redis.call('get', KEYS[1]) 

72 if not token or token ~= ARGV[1] then 

73 return 0 

74 end 

75 redis.call('pexpire', KEYS[1], ARGV[2]) 

76 return 1 

77 """ 

78 

79 def __init__( 

80 self, 

81 redis, 

82 name: str, 

83 timeout: Optional[Number] = None, 

84 sleep: Number = 0.1, 

85 blocking: bool = True, 

86 blocking_timeout: Optional[Number] = None, 

87 thread_local: bool = True, 

88 raise_on_release_error: bool = True, 

89 ): 

90 """ 

91 Create a new Lock instance named ``name`` using the Redis client 

92 supplied by ``redis``. 

93 

94 ``timeout`` indicates a maximum life for the lock in seconds. 

95 By default, it will remain locked until release() is called. 

96 ``timeout`` can be specified as a float or integer, both representing 

97 the number of seconds to wait. 

98 

99 ``sleep`` indicates the amount of time to sleep in seconds per loop 

100 iteration when the lock is in blocking mode and another client is 

101 currently holding the lock. 

102 

103 ``blocking`` indicates whether calling ``acquire`` should block until 

104 the lock has been acquired or to fail immediately, causing ``acquire`` 

105 to return False and the lock not being acquired. Defaults to True. 

106 Note this value can be overridden by passing a ``blocking`` 

107 argument to ``acquire``. 

108 

109 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

110 spend trying to acquire the lock. A value of ``None`` indicates 

111 continue trying forever. ``blocking_timeout`` can be specified as a 

112 float or integer, both representing the number of seconds to wait. 

113 

114 ``thread_local`` indicates whether the lock token is placed in 

115 thread-local storage. By default, the token is placed in thread local 

116 storage so that a thread only sees its token, not a token set by 

117 another thread. Consider the following timeline: 

118 

119 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

120 thread-1 sets the token to "abc" 

121 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

122 Lock instance. 

123 time: 5, thread-1 has not yet completed. redis expires the lock 

124 key. 

125 time: 5, thread-2 acquired `my-lock` now that it's available. 

126 thread-2 sets the token to "xyz" 

127 time: 6, thread-1 finishes its work and calls release(). if the 

128 token is *not* stored in thread local storage, then 

129 thread-1 would see the token value as "xyz" and would be 

130 able to successfully release the thread-2's lock. 

131 

132 ``raise_on_release_error`` indicates whether to raise an exception when 

133 the lock is no longer owned when exiting the context manager. By default, 

134 this is True, meaning an exception will be raised. If False, the warning 

135 will be logged and the exception will be suppressed. 

136 

137 In some use cases it's necessary to disable thread local storage. For 

138 example, if you have code where one thread acquires a lock and passes 

139 that lock instance to a worker thread to release later. If thread 

140 local storage isn't disabled in this case, the worker thread won't see 

141 the token set by the thread that acquired the lock. Our assumption 

142 is that these cases aren't common and as such default to using 

143 thread local storage. 

144 """ 

145 self.redis = redis 

146 self.name = name 

147 self.timeout = timeout 

148 self.sleep = sleep 

149 self.blocking = blocking 

150 self.blocking_timeout = blocking_timeout 

151 self.thread_local = bool(thread_local) 

152 self.raise_on_release_error = raise_on_release_error 

153 self.local = threading.local() if self.thread_local else SimpleNamespace() 

154 self.local.token = None 

155 self.register_scripts() 

156 

157 def register_scripts(self) -> None: 

158 cls = self.__class__ 

159 client = self.redis 

160 if cls.lua_release is None: 

161 cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT) 

162 if cls.lua_extend is None: 

163 cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT) 

164 if cls.lua_reacquire is None: 

165 cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT) 

166 

167 def __enter__(self) -> "Lock": 

168 if self.acquire(): 

169 return self 

170 raise LockError( 

171 "Unable to acquire lock within the time specified", 

172 lock_name=self.name, 

173 ) 

174 

175 def __exit__( 

176 self, 

177 exc_type: Optional[Type[BaseException]], 

178 exc_value: Optional[BaseException], 

179 traceback: Optional[TracebackType], 

180 ) -> None: 

181 try: 

182 self.release() 

183 except LockError: 

184 if self.raise_on_release_error: 

185 raise 

186 logger.warning( 

187 "Lock was unlocked or no longer owned when exiting context manager." 

188 ) 

189 

190 def acquire( 

191 self, 

192 sleep: Optional[Number] = None, 

193 blocking: Optional[bool] = None, 

194 blocking_timeout: Optional[Number] = None, 

195 token: Optional[str] = None, 

196 ): 

197 """ 

198 Use Redis to hold a shared, distributed lock named ``name``. 

199 Returns True once the lock is acquired. 

200 

201 If ``blocking`` is False, always return immediately. If the lock 

202 was acquired, return True, otherwise return False. 

203 

204 ``blocking_timeout`` specifies the maximum number of seconds to 

205 wait trying to acquire the lock. 

206 

207 ``token`` specifies the token value to be used. If provided, token 

208 must be a bytes object or a string that can be encoded to a bytes 

209 object with the default encoding. If a token isn't specified, a UUID 

210 will be generated. 

211 """ 

212 if sleep is None: 

213 sleep = self.sleep 

214 if token is None: 

215 token = uuid.uuid1().hex.encode() 

216 else: 

217 encoder = self.redis.get_encoder() 

218 token = encoder.encode(token) 

219 if blocking is None: 

220 blocking = self.blocking 

221 if blocking_timeout is None: 

222 blocking_timeout = self.blocking_timeout 

223 stop_trying_at = None 

224 if blocking_timeout is not None: 

225 stop_trying_at = mod_time.monotonic() + blocking_timeout 

226 while True: 

227 if self.do_acquire(token): 

228 self.local.token = token 

229 return True 

230 if not blocking: 

231 return False 

232 next_try_at = mod_time.monotonic() + sleep 

233 if stop_trying_at is not None and next_try_at > stop_trying_at: 

234 return False 

235 mod_time.sleep(sleep) 

236 

237 def do_acquire(self, token: str) -> bool: 

238 if self.timeout: 

239 # convert to milliseconds 

240 timeout = int(self.timeout * 1000) 

241 else: 

242 timeout = None 

243 if self.redis.set(self.name, token, nx=True, px=timeout): 

244 return True 

245 return False 

246 

247 def locked(self) -> bool: 

248 """ 

249 Returns True if this key is locked by any process, otherwise False. 

250 """ 

251 return self.redis.get(self.name) is not None 

252 

253 def owned(self) -> bool: 

254 """ 

255 Returns True if this key is locked by this lock, otherwise False. 

256 """ 

257 stored_token = self.redis.get(self.name) 

258 # need to always compare bytes to bytes 

259 # TODO: this can be simplified when the context manager is finished 

260 if stored_token and not isinstance(stored_token, bytes): 

261 encoder = self.redis.get_encoder() 

262 stored_token = encoder.encode(stored_token) 

263 return self.local.token is not None and stored_token == self.local.token 

264 

265 def release(self) -> None: 

266 """ 

267 Releases the already acquired lock 

268 """ 

269 expected_token = self.local.token 

270 if expected_token is None: 

271 raise LockError( 

272 "Cannot release a lock that's not owned or is already unlocked.", 

273 lock_name=self.name, 

274 ) 

275 self.local.token = None 

276 self.do_release(expected_token) 

277 

278 def do_release(self, expected_token: str) -> None: 

279 if not bool( 

280 self.lua_release(keys=[self.name], args=[expected_token], client=self.redis) 

281 ): 

282 raise LockNotOwnedError( 

283 "Cannot release a lock that's no longer owned", 

284 lock_name=self.name, 

285 ) 

286 

287 def extend(self, additional_time: Number, replace_ttl: bool = False) -> bool: 

288 """ 

289 Adds more time to an already acquired lock. 

290 

291 ``additional_time`` can be specified as an integer or a float, both 

292 representing the number of seconds to add. 

293 

294 ``replace_ttl`` if False (the default), add `additional_time` to 

295 the lock's existing ttl. If True, replace the lock's ttl with 

296 `additional_time`. 

297 """ 

298 if self.local.token is None: 

299 raise LockError("Cannot extend an unlocked lock", lock_name=self.name) 

300 if self.timeout is None: 

301 raise LockError("Cannot extend a lock with no timeout", lock_name=self.name) 

302 return self.do_extend(additional_time, replace_ttl) 

303 

304 def do_extend(self, additional_time: Number, replace_ttl: bool) -> bool: 

305 additional_time = int(additional_time * 1000) 

306 if not bool( 

307 self.lua_extend( 

308 keys=[self.name], 

309 args=[self.local.token, additional_time, "1" if replace_ttl else "0"], 

310 client=self.redis, 

311 ) 

312 ): 

313 raise LockNotOwnedError( 

314 "Cannot extend a lock that's no longer owned", 

315 lock_name=self.name, 

316 ) 

317 return True 

318 

319 def reacquire(self) -> bool: 

320 """ 

321 Resets a TTL of an already acquired lock back to a timeout value. 

322 """ 

323 if self.local.token is None: 

324 raise LockError("Cannot reacquire an unlocked lock", lock_name=self.name) 

325 if self.timeout is None: 

326 raise LockError( 

327 "Cannot reacquire a lock with no timeout", 

328 lock_name=self.name, 

329 ) 

330 return self.do_reacquire() 

331 

332 def do_reacquire(self) -> bool: 

333 timeout = int(self.timeout * 1000) 

334 if not bool( 

335 self.lua_reacquire( 

336 keys=[self.name], args=[self.local.token, timeout], client=self.redis 

337 ) 

338 ): 

339 raise LockNotOwnedError( 

340 "Cannot reacquire a lock that's no longer owned", 

341 lock_name=self.name, 

342 ) 

343 return True