Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/lock.py: 26%

109 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import threading 

2import time as mod_time 

3import uuid 

4from types import SimpleNamespace, TracebackType 

5from typing import Optional, Type 

6 

7from redis.exceptions import LockError, LockNotOwnedError 

8from redis.typing import Number 

9 

10 

11class Lock: 

12 """ 

13 A shared, distributed Lock. Using Redis for locking allows the Lock 

14 to be shared across processes and/or machines. 

15 

16 It's left to the user to resolve deadlock issues and make sure 

17 multiple clients play nicely together. 

18 """ 

19 

20 lua_release = None 

21 lua_extend = None 

22 lua_reacquire = None 

23 

24 # KEYS[1] - lock name 

25 # ARGV[1] - token 

26 # return 1 if the lock was released, otherwise 0 

27 LUA_RELEASE_SCRIPT = """ 

28 local token = redis.call('get', KEYS[1]) 

29 if not token or token ~= ARGV[1] then 

30 return 0 

31 end 

32 redis.call('del', KEYS[1]) 

33 return 1 

34 """ 

35 

36 # KEYS[1] - lock name 

37 # ARGV[1] - token 

38 # ARGV[2] - additional milliseconds 

39 # ARGV[3] - "0" if the additional time should be added to the lock's 

40 # existing ttl or "1" if the existing ttl should be replaced 

41 # return 1 if the locks time was extended, otherwise 0 

42 LUA_EXTEND_SCRIPT = """ 

43 local token = redis.call('get', KEYS[1]) 

44 if not token or token ~= ARGV[1] then 

45 return 0 

46 end 

47 local expiration = redis.call('pttl', KEYS[1]) 

48 if not expiration then 

49 expiration = 0 

50 end 

51 if expiration < 0 then 

52 return 0 

53 end 

54 

55 local newttl = ARGV[2] 

56 if ARGV[3] == "0" then 

57 newttl = ARGV[2] + expiration 

58 end 

59 redis.call('pexpire', KEYS[1], newttl) 

60 return 1 

61 """ 

62 

63 # KEYS[1] - lock name 

64 # ARGV[1] - token 

65 # ARGV[2] - milliseconds 

66 # return 1 if the locks time was reacquired, otherwise 0 

67 LUA_REACQUIRE_SCRIPT = """ 

68 local token = redis.call('get', KEYS[1]) 

69 if not token or token ~= ARGV[1] then 

70 return 0 

71 end 

72 redis.call('pexpire', KEYS[1], ARGV[2]) 

73 return 1 

74 """ 

75 

76 def __init__( 

77 self, 

78 redis, 

79 name: str, 

80 timeout: Optional[Number] = None, 

81 sleep: Number = 0.1, 

82 blocking: bool = True, 

83 blocking_timeout: Optional[Number] = None, 

84 thread_local: bool = True, 

85 ): 

86 """ 

87 Create a new Lock instance named ``name`` using the Redis client 

88 supplied by ``redis``. 

89 

90 ``timeout`` indicates a maximum life for the lock in seconds. 

91 By default, it will remain locked until release() is called. 

92 ``timeout`` can be specified as a float or integer, both representing 

93 the number of seconds to wait. 

94 

95 ``sleep`` indicates the amount of time to sleep in seconds per loop 

96 iteration when the lock is in blocking mode and another client is 

97 currently holding the lock. 

98 

99 ``blocking`` indicates whether calling ``acquire`` should block until 

100 the lock has been acquired or to fail immediately, causing ``acquire`` 

101 to return False and the lock not being acquired. Defaults to True. 

102 Note this value can be overridden by passing a ``blocking`` 

103 argument to ``acquire``. 

104 

105 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

106 spend trying to acquire the lock. A value of ``None`` indicates 

107 continue trying forever. ``blocking_timeout`` can be specified as a 

108 float or integer, both representing the number of seconds to wait. 

109 

110 ``thread_local`` indicates whether the lock token is placed in 

111 thread-local storage. By default, the token is placed in thread local 

112 storage so that a thread only sees its token, not a token set by 

113 another thread. Consider the following timeline: 

114 

115 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

116 thread-1 sets the token to "abc" 

117 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

118 Lock instance. 

119 time: 5, thread-1 has not yet completed. redis expires the lock 

120 key. 

121 time: 5, thread-2 acquired `my-lock` now that it's available. 

122 thread-2 sets the token to "xyz" 

123 time: 6, thread-1 finishes its work and calls release(). if the 

124 token is *not* stored in thread local storage, then 

125 thread-1 would see the token value as "xyz" and would be 

126 able to successfully release the thread-2's lock. 

127 

128 In some use cases it's necessary to disable thread local storage. For 

129 example, if you have code where one thread acquires a lock and passes 

130 that lock instance to a worker thread to release later. If thread 

131 local storage isn't disabled in this case, the worker thread won't see 

132 the token set by the thread that acquired the lock. Our assumption 

133 is that these cases aren't common and as such default to using 

134 thread local storage. 

135 """ 

136 self.redis = redis 

137 self.name = name 

138 self.timeout = timeout 

139 self.sleep = sleep 

140 self.blocking = blocking 

141 self.blocking_timeout = blocking_timeout 

142 self.thread_local = bool(thread_local) 

143 self.local = threading.local() if self.thread_local else SimpleNamespace() 

144 self.local.token = None 

145 self.register_scripts() 

146 

147 def register_scripts(self) -> None: 

148 cls = self.__class__ 

149 client = self.redis 

150 if cls.lua_release is None: 

151 cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT) 

152 if cls.lua_extend is None: 

153 cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT) 

154 if cls.lua_reacquire is None: 

155 cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT) 

156 

157 def __enter__(self) -> "Lock": 

158 if self.acquire(): 

159 return self 

160 raise LockError( 

161 "Unable to acquire lock within the time specified", 

162 lock_name=self.name, 

163 ) 

164 

165 def __exit__( 

166 self, 

167 exc_type: Optional[Type[BaseException]], 

168 exc_value: Optional[BaseException], 

169 traceback: Optional[TracebackType], 

170 ) -> None: 

171 self.release() 

172 

173 def acquire( 

174 self, 

175 sleep: Optional[Number] = None, 

176 blocking: Optional[bool] = None, 

177 blocking_timeout: Optional[Number] = None, 

178 token: Optional[str] = None, 

179 ): 

180 """ 

181 Use Redis to hold a shared, distributed lock named ``name``. 

182 Returns True once the lock is acquired. 

183 

184 If ``blocking`` is False, always return immediately. If the lock 

185 was acquired, return True, otherwise return False. 

186 

187 ``blocking_timeout`` specifies the maximum number of seconds to 

188 wait trying to acquire the lock. 

189 

190 ``token`` specifies the token value to be used. If provided, token 

191 must be a bytes object or a string that can be encoded to a bytes 

192 object with the default encoding. If a token isn't specified, a UUID 

193 will be generated. 

194 """ 

195 if sleep is None: 

196 sleep = self.sleep 

197 if token is None: 

198 token = uuid.uuid1().hex.encode() 

199 else: 

200 encoder = self.redis.get_encoder() 

201 token = encoder.encode(token) 

202 if blocking is None: 

203 blocking = self.blocking 

204 if blocking_timeout is None: 

205 blocking_timeout = self.blocking_timeout 

206 stop_trying_at = None 

207 if blocking_timeout is not None: 

208 stop_trying_at = mod_time.monotonic() + blocking_timeout 

209 while True: 

210 if self.do_acquire(token): 

211 self.local.token = token 

212 return True 

213 if not blocking: 

214 return False 

215 next_try_at = mod_time.monotonic() + sleep 

216 if stop_trying_at is not None and next_try_at > stop_trying_at: 

217 return False 

218 mod_time.sleep(sleep) 

219 

220 def do_acquire(self, token: str) -> bool: 

221 if self.timeout: 

222 # convert to milliseconds 

223 timeout = int(self.timeout * 1000) 

224 else: 

225 timeout = None 

226 if self.redis.set(self.name, token, nx=True, px=timeout): 

227 return True 

228 return False 

229 

230 def locked(self) -> bool: 

231 """ 

232 Returns True if this key is locked by any process, otherwise False. 

233 """ 

234 return self.redis.get(self.name) is not None 

235 

236 def owned(self) -> bool: 

237 """ 

238 Returns True if this key is locked by this lock, otherwise False. 

239 """ 

240 stored_token = self.redis.get(self.name) 

241 # need to always compare bytes to bytes 

242 # TODO: this can be simplified when the context manager is finished 

243 if stored_token and not isinstance(stored_token, bytes): 

244 encoder = self.redis.get_encoder() 

245 stored_token = encoder.encode(stored_token) 

246 return self.local.token is not None and stored_token == self.local.token 

247 

248 def release(self) -> None: 

249 """ 

250 Releases the already acquired lock 

251 """ 

252 expected_token = self.local.token 

253 if expected_token is None: 

254 raise LockError("Cannot release an unlocked lock", lock_name=self.name) 

255 self.local.token = None 

256 self.do_release(expected_token) 

257 

258 def do_release(self, expected_token: str) -> None: 

259 if not bool( 

260 self.lua_release(keys=[self.name], args=[expected_token], client=self.redis) 

261 ): 

262 raise LockNotOwnedError( 

263 "Cannot release a lock that's no longer owned", 

264 lock_name=self.name, 

265 ) 

266 

267 def extend(self, additional_time: int, replace_ttl: bool = False) -> bool: 

268 """ 

269 Adds more time to an already acquired lock. 

270 

271 ``additional_time`` can be specified as an integer or a float, both 

272 representing the number of seconds to add. 

273 

274 ``replace_ttl`` if False (the default), add `additional_time` to 

275 the lock's existing ttl. If True, replace the lock's ttl with 

276 `additional_time`. 

277 """ 

278 if self.local.token is None: 

279 raise LockError("Cannot extend an unlocked lock", lock_name=self.name) 

280 if self.timeout is None: 

281 raise LockError("Cannot extend a lock with no timeout", lock_name=self.name) 

282 return self.do_extend(additional_time, replace_ttl) 

283 

284 def do_extend(self, additional_time: int, replace_ttl: bool) -> bool: 

285 additional_time = int(additional_time * 1000) 

286 if not bool( 

287 self.lua_extend( 

288 keys=[self.name], 

289 args=[self.local.token, additional_time, "1" if replace_ttl else "0"], 

290 client=self.redis, 

291 ) 

292 ): 

293 raise LockNotOwnedError( 

294 "Cannot extend a lock that's no longer owned", 

295 lock_name=self.name, 

296 ) 

297 return True 

298 

299 def reacquire(self) -> bool: 

300 """ 

301 Resets a TTL of an already acquired lock back to a timeout value. 

302 """ 

303 if self.local.token is None: 

304 raise LockError("Cannot reacquire an unlocked lock", lock_name=self.name) 

305 if self.timeout is None: 

306 raise LockError( 

307 "Cannot reacquire a lock with no timeout", 

308 lock_name=self.name, 

309 ) 

310 return self.do_reacquire() 

311 

312 def do_reacquire(self) -> bool: 

313 timeout = int(self.timeout * 1000) 

314 if not bool( 

315 self.lua_reacquire( 

316 keys=[self.name], args=[self.local.token, timeout], client=self.redis 

317 ) 

318 ): 

319 raise LockNotOwnedError( 

320 "Cannot reacquire a lock that's no longer owned", 

321 lock_name=self.name, 

322 ) 

323 return True