Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/lock.py: 26%

109 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:16 +0000

1import threading 

2import time as mod_time 

3import uuid 

4from types import SimpleNamespace, TracebackType 

5from typing import Optional, Type 

6 

7from redis.exceptions import LockError, LockNotOwnedError 

8from redis.typing import Number 

9 

10 

11class Lock: 

12 """ 

13 A shared, distributed Lock. Using Redis for locking allows the Lock 

14 to be shared across processes and/or machines. 

15 

16 It's left to the user to resolve deadlock issues and make sure 

17 multiple clients play nicely together. 

18 """ 

19 

20 lua_release = None 

21 lua_extend = None 

22 lua_reacquire = None 

23 

24 # KEYS[1] - lock name 

25 # ARGV[1] - token 

26 # return 1 if the lock was released, otherwise 0 

27 LUA_RELEASE_SCRIPT = """ 

28 local token = redis.call('get', KEYS[1]) 

29 if not token or token ~= ARGV[1] then 

30 return 0 

31 end 

32 redis.call('del', KEYS[1]) 

33 return 1 

34 """ 

35 

36 # KEYS[1] - lock name 

37 # ARGV[1] - token 

38 # ARGV[2] - additional milliseconds 

39 # ARGV[3] - "0" if the additional time should be added to the lock's 

40 # existing ttl or "1" if the existing ttl should be replaced 

41 # return 1 if the locks time was extended, otherwise 0 

42 LUA_EXTEND_SCRIPT = """ 

43 local token = redis.call('get', KEYS[1]) 

44 if not token or token ~= ARGV[1] then 

45 return 0 

46 end 

47 local expiration = redis.call('pttl', KEYS[1]) 

48 if not expiration then 

49 expiration = 0 

50 end 

51 if expiration < 0 then 

52 return 0 

53 end 

54 

55 local newttl = ARGV[2] 

56 if ARGV[3] == "0" then 

57 newttl = ARGV[2] + expiration 

58 end 

59 redis.call('pexpire', KEYS[1], newttl) 

60 return 1 

61 """ 

62 

63 # KEYS[1] - lock name 

64 # ARGV[1] - token 

65 # ARGV[2] - milliseconds 

66 # return 1 if the locks time was reacquired, otherwise 0 

67 LUA_REACQUIRE_SCRIPT = """ 

68 local token = redis.call('get', KEYS[1]) 

69 if not token or token ~= ARGV[1] then 

70 return 0 

71 end 

72 redis.call('pexpire', KEYS[1], ARGV[2]) 

73 return 1 

74 """ 

75 

76 def __init__( 

77 self, 

78 redis, 

79 name: str, 

80 timeout: Optional[Number] = None, 

81 sleep: Number = 0.1, 

82 blocking: bool = True, 

83 blocking_timeout: Optional[Number] = None, 

84 thread_local: bool = True, 

85 ): 

86 """ 

87 Create a new Lock instance named ``name`` using the Redis client 

88 supplied by ``redis``. 

89 

90 ``timeout`` indicates a maximum life for the lock in seconds. 

91 By default, it will remain locked until release() is called. 

92 ``timeout`` can be specified as a float or integer, both representing 

93 the number of seconds to wait. 

94 

95 ``sleep`` indicates the amount of time to sleep in seconds per loop 

96 iteration when the lock is in blocking mode and another client is 

97 currently holding the lock. 

98 

99 ``blocking`` indicates whether calling ``acquire`` should block until 

100 the lock has been acquired or to fail immediately, causing ``acquire`` 

101 to return False and the lock not being acquired. Defaults to True. 

102 Note this value can be overridden by passing a ``blocking`` 

103 argument to ``acquire``. 

104 

105 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

106 spend trying to acquire the lock. A value of ``None`` indicates 

107 continue trying forever. ``blocking_timeout`` can be specified as a 

108 float or integer, both representing the number of seconds to wait. 

109 

110 ``thread_local`` indicates whether the lock token is placed in 

111 thread-local storage. By default, the token is placed in thread local 

112 storage so that a thread only sees its token, not a token set by 

113 another thread. Consider the following timeline: 

114 

115 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

116 thread-1 sets the token to "abc" 

117 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

118 Lock instance. 

119 time: 5, thread-1 has not yet completed. redis expires the lock 

120 key. 

121 time: 5, thread-2 acquired `my-lock` now that it's available. 

122 thread-2 sets the token to "xyz" 

123 time: 6, thread-1 finishes its work and calls release(). if the 

124 token is *not* stored in thread local storage, then 

125 thread-1 would see the token value as "xyz" and would be 

126 able to successfully release the thread-2's lock. 

127 

128 In some use cases it's necessary to disable thread local storage. For 

129 example, if you have code where one thread acquires a lock and passes 

130 that lock instance to a worker thread to release later. If thread 

131 local storage isn't disabled in this case, the worker thread won't see 

132 the token set by the thread that acquired the lock. Our assumption 

133 is that these cases aren't common and as such default to using 

134 thread local storage. 

135 """ 

136 self.redis = redis 

137 self.name = name 

138 self.timeout = timeout 

139 self.sleep = sleep 

140 self.blocking = blocking 

141 self.blocking_timeout = blocking_timeout 

142 self.thread_local = bool(thread_local) 

143 self.local = threading.local() if self.thread_local else SimpleNamespace() 

144 self.local.token = None 

145 self.register_scripts() 

146 

147 def register_scripts(self) -> None: 

148 cls = self.__class__ 

149 client = self.redis 

150 if cls.lua_release is None: 

151 cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT) 

152 if cls.lua_extend is None: 

153 cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT) 

154 if cls.lua_reacquire is None: 

155 cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT) 

156 

157 def __enter__(self) -> "Lock": 

158 if self.acquire(): 

159 return self 

160 raise LockError("Unable to acquire lock within the time specified") 

161 

162 def __exit__( 

163 self, 

164 exc_type: Optional[Type[BaseException]], 

165 exc_value: Optional[BaseException], 

166 traceback: Optional[TracebackType], 

167 ) -> None: 

168 self.release() 

169 

170 def acquire( 

171 self, 

172 sleep: Optional[Number] = None, 

173 blocking: Optional[bool] = None, 

174 blocking_timeout: Optional[Number] = None, 

175 token: Optional[str] = None, 

176 ): 

177 """ 

178 Use Redis to hold a shared, distributed lock named ``name``. 

179 Returns True once the lock is acquired. 

180 

181 If ``blocking`` is False, always return immediately. If the lock 

182 was acquired, return True, otherwise return False. 

183 

184 ``blocking_timeout`` specifies the maximum number of seconds to 

185 wait trying to acquire the lock. 

186 

187 ``token`` specifies the token value to be used. If provided, token 

188 must be a bytes object or a string that can be encoded to a bytes 

189 object with the default encoding. If a token isn't specified, a UUID 

190 will be generated. 

191 """ 

192 if sleep is None: 

193 sleep = self.sleep 

194 if token is None: 

195 token = uuid.uuid1().hex.encode() 

196 else: 

197 encoder = self.redis.get_encoder() 

198 token = encoder.encode(token) 

199 if blocking is None: 

200 blocking = self.blocking 

201 if blocking_timeout is None: 

202 blocking_timeout = self.blocking_timeout 

203 stop_trying_at = None 

204 if blocking_timeout is not None: 

205 stop_trying_at = mod_time.monotonic() + blocking_timeout 

206 while True: 

207 if self.do_acquire(token): 

208 self.local.token = token 

209 return True 

210 if not blocking: 

211 return False 

212 next_try_at = mod_time.monotonic() + sleep 

213 if stop_trying_at is not None and next_try_at > stop_trying_at: 

214 return False 

215 mod_time.sleep(sleep) 

216 

217 def do_acquire(self, token: str) -> bool: 

218 if self.timeout: 

219 # convert to milliseconds 

220 timeout = int(self.timeout * 1000) 

221 else: 

222 timeout = None 

223 if self.redis.set(self.name, token, nx=True, px=timeout): 

224 return True 

225 return False 

226 

227 def locked(self) -> bool: 

228 """ 

229 Returns True if this key is locked by any process, otherwise False. 

230 """ 

231 return self.redis.get(self.name) is not None 

232 

233 def owned(self) -> bool: 

234 """ 

235 Returns True if this key is locked by this lock, otherwise False. 

236 """ 

237 stored_token = self.redis.get(self.name) 

238 # need to always compare bytes to bytes 

239 # TODO: this can be simplified when the context manager is finished 

240 if stored_token and not isinstance(stored_token, bytes): 

241 encoder = self.redis.get_encoder() 

242 stored_token = encoder.encode(stored_token) 

243 return self.local.token is not None and stored_token == self.local.token 

244 

245 def release(self) -> None: 

246 """ 

247 Releases the already acquired lock 

248 """ 

249 expected_token = self.local.token 

250 if expected_token is None: 

251 raise LockError("Cannot release an unlocked lock") 

252 self.local.token = None 

253 self.do_release(expected_token) 

254 

255 def do_release(self, expected_token: str) -> None: 

256 if not bool( 

257 self.lua_release(keys=[self.name], args=[expected_token], client=self.redis) 

258 ): 

259 raise LockNotOwnedError("Cannot release a lock that's no longer owned") 

260 

261 def extend(self, additional_time: int, replace_ttl: bool = False) -> bool: 

262 """ 

263 Adds more time to an already acquired lock. 

264 

265 ``additional_time`` can be specified as an integer or a float, both 

266 representing the number of seconds to add. 

267 

268 ``replace_ttl`` if False (the default), add `additional_time` to 

269 the lock's existing ttl. If True, replace the lock's ttl with 

270 `additional_time`. 

271 """ 

272 if self.local.token is None: 

273 raise LockError("Cannot extend an unlocked lock") 

274 if self.timeout is None: 

275 raise LockError("Cannot extend a lock with no timeout") 

276 return self.do_extend(additional_time, replace_ttl) 

277 

278 def do_extend(self, additional_time: int, replace_ttl: bool) -> bool: 

279 additional_time = int(additional_time * 1000) 

280 if not bool( 

281 self.lua_extend( 

282 keys=[self.name], 

283 args=[self.local.token, additional_time, "1" if replace_ttl else "0"], 

284 client=self.redis, 

285 ) 

286 ): 

287 raise LockNotOwnedError("Cannot extend a lock that's no longer owned") 

288 return True 

289 

290 def reacquire(self) -> bool: 

291 """ 

292 Resets a TTL of an already acquired lock back to a timeout value. 

293 """ 

294 if self.local.token is None: 

295 raise LockError("Cannot reacquire an unlocked lock") 

296 if self.timeout is None: 

297 raise LockError("Cannot reacquire a lock with no timeout") 

298 return self.do_reacquire() 

299 

300 def do_reacquire(self) -> bool: 

301 timeout = int(self.timeout * 1000) 

302 if not bool( 

303 self.lua_reacquire( 

304 keys=[self.name], args=[self.local.token, timeout], client=self.redis 

305 ) 

306 ): 

307 raise LockNotOwnedError("Cannot reacquire a lock that's no longer owned") 

308 return True