Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/client.py: 24%

941 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:16 +0000

1import copy 

2import datetime 

3import re 

4import threading 

5import time 

6import warnings 

7from itertools import chain 

8from typing import Optional 

9 

10from redis.commands import ( 

11 CoreCommands, 

12 RedisModuleCommands, 

13 SentinelCommands, 

14 list_or_args, 

15) 

16from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection 

17from redis.credentials import CredentialProvider 

18from redis.exceptions import ( 

19 ConnectionError, 

20 ExecAbortError, 

21 ModuleError, 

22 PubSubError, 

23 RedisError, 

24 ResponseError, 

25 TimeoutError, 

26 WatchError, 

27) 

28from redis.lock import Lock 

29from redis.retry import Retry 

30from redis.utils import safe_str, str_if_bytes 

31 

32SYM_EMPTY = b"" 

33EMPTY_RESPONSE = "EMPTY_RESPONSE" 

34 

35# some responses (ie. dump) are binary, and just meant to never be decoded 

36NEVER_DECODE = "NEVER_DECODE" 

37 

38 

39def timestamp_to_datetime(response): 

40 "Converts a unix timestamp to a Python datetime object" 

41 if not response: 

42 return None 

43 try: 

44 response = int(response) 

45 except ValueError: 

46 return None 

47 return datetime.datetime.fromtimestamp(response) 

48 

49 

50def string_keys_to_dict(key_string, callback): 

51 return dict.fromkeys(key_string.split(), callback) 

52 

53 

54class CaseInsensitiveDict(dict): 

55 "Case insensitive dict implementation. Assumes string keys only." 

56 

57 def __init__(self, data): 

58 for k, v in data.items(): 

59 self[k.upper()] = v 

60 

61 def __contains__(self, k): 

62 return super().__contains__(k.upper()) 

63 

64 def __delitem__(self, k): 

65 super().__delitem__(k.upper()) 

66 

67 def __getitem__(self, k): 

68 return super().__getitem__(k.upper()) 

69 

70 def get(self, k, default=None): 

71 return super().get(k.upper(), default) 

72 

73 def __setitem__(self, k, v): 

74 super().__setitem__(k.upper(), v) 

75 

76 def update(self, data): 

77 data = CaseInsensitiveDict(data) 

78 super().update(data) 

79 

80 

81def parse_debug_object(response): 

82 "Parse the results of Redis's DEBUG OBJECT command into a Python dict" 

83 # The 'type' of the object is the first item in the response, but isn't 

84 # prefixed with a name 

85 response = str_if_bytes(response) 

86 response = "type:" + response 

87 response = dict(kv.split(":") for kv in response.split()) 

88 

89 # parse some expected int values from the string response 

90 # note: this cmd isn't spec'd so these may not appear in all redis versions 

91 int_fields = ("refcount", "serializedlength", "lru", "lru_seconds_idle") 

92 for field in int_fields: 

93 if field in response: 

94 response[field] = int(response[field]) 

95 

96 return response 

97 

98 

99def parse_object(response, infotype): 

100 """Parse the results of an OBJECT command""" 

101 if infotype in ("idletime", "refcount"): 

102 return int_or_none(response) 

103 return response 

104 

105 

106def parse_info(response): 

107 """Parse the result of Redis's INFO command into a Python dict""" 

108 info = {} 

109 response = str_if_bytes(response) 

110 

111 def get_value(value): 

112 if "," not in value or "=" not in value: 

113 try: 

114 if "." in value: 

115 return float(value) 

116 else: 

117 return int(value) 

118 except ValueError: 

119 return value 

120 else: 

121 sub_dict = {} 

122 for item in value.split(","): 

123 k, v = item.rsplit("=", 1) 

124 sub_dict[k] = get_value(v) 

125 return sub_dict 

126 

127 for line in response.splitlines(): 

128 if line and not line.startswith("#"): 

129 if line.find(":") != -1: 

130 # Split, the info fields keys and values. 

131 # Note that the value may contain ':'. but the 'host:' 

132 # pseudo-command is the only case where the key contains ':' 

133 key, value = line.split(":", 1) 

134 if key == "cmdstat_host": 

135 key, value = line.rsplit(":", 1) 

136 

137 if key == "module": 

138 # Hardcode a list for key 'modules' since there could be 

139 # multiple lines that started with 'module' 

140 info.setdefault("modules", []).append(get_value(value)) 

141 else: 

142 info[key] = get_value(value) 

143 else: 

144 # if the line isn't splittable, append it to the "__raw__" key 

145 info.setdefault("__raw__", []).append(line) 

146 

147 return info 

148 

149 

150def parse_memory_stats(response, **kwargs): 

151 """Parse the results of MEMORY STATS""" 

152 stats = pairs_to_dict(response, decode_keys=True, decode_string_values=True) 

153 for key, value in stats.items(): 

154 if key.startswith("db."): 

155 stats[key] = pairs_to_dict( 

156 value, decode_keys=True, decode_string_values=True 

157 ) 

158 return stats 

159 

160 

161SENTINEL_STATE_TYPES = { 

162 "can-failover-its-master": int, 

163 "config-epoch": int, 

164 "down-after-milliseconds": int, 

165 "failover-timeout": int, 

166 "info-refresh": int, 

167 "last-hello-message": int, 

168 "last-ok-ping-reply": int, 

169 "last-ping-reply": int, 

170 "last-ping-sent": int, 

171 "master-link-down-time": int, 

172 "master-port": int, 

173 "num-other-sentinels": int, 

174 "num-slaves": int, 

175 "o-down-time": int, 

176 "pending-commands": int, 

177 "parallel-syncs": int, 

178 "port": int, 

179 "quorum": int, 

180 "role-reported-time": int, 

181 "s-down-time": int, 

182 "slave-priority": int, 

183 "slave-repl-offset": int, 

184 "voted-leader-epoch": int, 

185} 

186 

187 

188def parse_sentinel_state(item): 

189 result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES) 

190 flags = set(result["flags"].split(",")) 

191 for name, flag in ( 

192 ("is_master", "master"), 

193 ("is_slave", "slave"), 

194 ("is_sdown", "s_down"), 

195 ("is_odown", "o_down"), 

196 ("is_sentinel", "sentinel"), 

197 ("is_disconnected", "disconnected"), 

198 ("is_master_down", "master_down"), 

199 ): 

200 result[name] = flag in flags 

201 return result 

202 

203 

204def parse_sentinel_master(response): 

205 return parse_sentinel_state(map(str_if_bytes, response)) 

206 

207 

208def parse_sentinel_masters(response): 

209 result = {} 

210 for item in response: 

211 state = parse_sentinel_state(map(str_if_bytes, item)) 

212 result[state["name"]] = state 

213 return result 

214 

215 

216def parse_sentinel_slaves_and_sentinels(response): 

217 return [parse_sentinel_state(map(str_if_bytes, item)) for item in response] 

218 

219 

220def parse_sentinel_get_master(response): 

221 return response and (response[0], int(response[1])) or None 

222 

223 

224def pairs_to_dict(response, decode_keys=False, decode_string_values=False): 

225 """Create a dict given a list of key/value pairs""" 

226 if response is None: 

227 return {} 

228 if decode_keys or decode_string_values: 

229 # the iter form is faster, but I don't know how to make that work 

230 # with a str_if_bytes() map 

231 keys = response[::2] 

232 if decode_keys: 

233 keys = map(str_if_bytes, keys) 

234 values = response[1::2] 

235 if decode_string_values: 

236 values = map(str_if_bytes, values) 

237 return dict(zip(keys, values)) 

238 else: 

239 it = iter(response) 

240 return dict(zip(it, it)) 

241 

242 

243def pairs_to_dict_typed(response, type_info): 

244 it = iter(response) 

245 result = {} 

246 for key, value in zip(it, it): 

247 if key in type_info: 

248 try: 

249 value = type_info[key](value) 

250 except Exception: 

251 # if for some reason the value can't be coerced, just use 

252 # the string value 

253 pass 

254 result[key] = value 

255 return result 

256 

257 

258def zset_score_pairs(response, **options): 

259 """ 

260 If ``withscores`` is specified in the options, return the response as 

261 a list of (value, score) pairs 

262 """ 

263 if not response or not options.get("withscores"): 

264 return response 

265 score_cast_func = options.get("score_cast_func", float) 

266 it = iter(response) 

267 return list(zip(it, map(score_cast_func, it))) 

268 

269 

270def sort_return_tuples(response, **options): 

271 """ 

272 If ``groups`` is specified, return the response as a list of 

273 n-element tuples with n being the value found in options['groups'] 

274 """ 

275 if not response or not options.get("groups"): 

276 return response 

277 n = options["groups"] 

278 return list(zip(*[response[i::n] for i in range(n)])) 

279 

280 

281def int_or_none(response): 

282 if response is None: 

283 return None 

284 return int(response) 

285 

286 

287def parse_stream_list(response): 

288 if response is None: 

289 return None 

290 data = [] 

291 for r in response: 

292 if r is not None: 

293 data.append((r[0], pairs_to_dict(r[1]))) 

294 else: 

295 data.append((None, None)) 

296 return data 

297 

298 

299def pairs_to_dict_with_str_keys(response): 

300 return pairs_to_dict(response, decode_keys=True) 

301 

302 

303def parse_list_of_dicts(response): 

304 return list(map(pairs_to_dict_with_str_keys, response)) 

305 

306 

307def parse_xclaim(response, **options): 

308 if options.get("parse_justid", False): 

309 return response 

310 return parse_stream_list(response) 

311 

312 

313def parse_xautoclaim(response, **options): 

314 if options.get("parse_justid", False): 

315 return response[1] 

316 response[1] = parse_stream_list(response[1]) 

317 return response 

318 

319 

320def parse_xinfo_stream(response, **options): 

321 data = pairs_to_dict(response, decode_keys=True) 

322 if not options.get("full", False): 

323 first = data["first-entry"] 

324 if first is not None: 

325 data["first-entry"] = (first[0], pairs_to_dict(first[1])) 

326 last = data["last-entry"] 

327 if last is not None: 

328 data["last-entry"] = (last[0], pairs_to_dict(last[1])) 

329 else: 

330 data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]} 

331 data["groups"] = [ 

332 pairs_to_dict(group, decode_keys=True) for group in data["groups"] 

333 ] 

334 return data 

335 

336 

337def parse_xread(response): 

338 if response is None: 

339 return [] 

340 return [[r[0], parse_stream_list(r[1])] for r in response] 

341 

342 

343def parse_xpending(response, **options): 

344 if options.get("parse_detail", False): 

345 return parse_xpending_range(response) 

346 consumers = [{"name": n, "pending": int(p)} for n, p in response[3] or []] 

347 return { 

348 "pending": response[0], 

349 "min": response[1], 

350 "max": response[2], 

351 "consumers": consumers, 

352 } 

353 

354 

355def parse_xpending_range(response): 

356 k = ("message_id", "consumer", "time_since_delivered", "times_delivered") 

357 return [dict(zip(k, r)) for r in response] 

358 

359 

360def float_or_none(response): 

361 if response is None: 

362 return None 

363 return float(response) 

364 

365 

366def bool_ok(response): 

367 return str_if_bytes(response) == "OK" 

368 

369 

370def parse_zadd(response, **options): 

371 if response is None: 

372 return None 

373 if options.get("as_score"): 

374 return float(response) 

375 return int(response) 

376 

377 

378def parse_client_list(response, **options): 

379 clients = [] 

380 for c in str_if_bytes(response).splitlines(): 

381 # Values might contain '=' 

382 clients.append(dict(pair.split("=", 1) for pair in c.split(" "))) 

383 return clients 

384 

385 

386def parse_config_get(response, **options): 

387 response = [str_if_bytes(i) if i is not None else None for i in response] 

388 return response and pairs_to_dict(response) or {} 

389 

390 

391def parse_scan(response, **options): 

392 cursor, r = response 

393 return int(cursor), r 

394 

395 

396def parse_hscan(response, **options): 

397 cursor, r = response 

398 return int(cursor), r and pairs_to_dict(r) or {} 

399 

400 

401def parse_zscan(response, **options): 

402 score_cast_func = options.get("score_cast_func", float) 

403 cursor, r = response 

404 it = iter(r) 

405 return int(cursor), list(zip(it, map(score_cast_func, it))) 

406 

407 

408def parse_zmscore(response, **options): 

409 # zmscore: list of scores (double precision floating point number) or nil 

410 return [float(score) if score is not None else None for score in response] 

411 

412 

413def parse_slowlog_get(response, **options): 

414 space = " " if options.get("decode_responses", False) else b" " 

415 

416 def parse_item(item): 

417 result = {"id": item[0], "start_time": int(item[1]), "duration": int(item[2])} 

418 # Redis Enterprise injects another entry at index [3], which has 

419 # the complexity info (i.e. the value N in case the command has 

420 # an O(N) complexity) instead of the command. 

421 if isinstance(item[3], list): 

422 result["command"] = space.join(item[3]) 

423 result["client_address"] = item[4] 

424 result["client_name"] = item[5] 

425 else: 

426 result["complexity"] = item[3] 

427 result["command"] = space.join(item[4]) 

428 result["client_address"] = item[5] 

429 result["client_name"] = item[6] 

430 return result 

431 

432 return [parse_item(item) for item in response] 

433 

434 

435def parse_stralgo(response, **options): 

436 """ 

437 Parse the response from `STRALGO` command. 

438 Without modifiers the returned value is string. 

439 When LEN is given the command returns the length of the result 

440 (i.e integer). 

441 When IDX is given the command returns a dictionary with the LCS 

442 length and all the ranges in both the strings, start and end 

443 offset for each string, where there are matches. 

444 When WITHMATCHLEN is given, each array representing a match will 

445 also have the length of the match at the beginning of the array. 

446 """ 

447 if options.get("len", False): 

448 return int(response) 

449 if options.get("idx", False): 

450 if options.get("withmatchlen", False): 

451 matches = [ 

452 [(int(match[-1]))] + list(map(tuple, match[:-1])) 

453 for match in response[1] 

454 ] 

455 else: 

456 matches = [list(map(tuple, match)) for match in response[1]] 

457 return { 

458 str_if_bytes(response[0]): matches, 

459 str_if_bytes(response[2]): int(response[3]), 

460 } 

461 return str_if_bytes(response) 

462 

463 

464def parse_cluster_info(response, **options): 

465 response = str_if_bytes(response) 

466 return dict(line.split(":") for line in response.splitlines() if line) 

467 

468 

469def _parse_node_line(line): 

470 line_items = line.split(" ") 

471 node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8] 

472 addr = addr.split("@")[0] 

473 node_dict = { 

474 "node_id": node_id, 

475 "flags": flags, 

476 "master_id": master_id, 

477 "last_ping_sent": ping, 

478 "last_pong_rcvd": pong, 

479 "epoch": epoch, 

480 "slots": [], 

481 "migrations": [], 

482 "connected": True if connected == "connected" else False, 

483 } 

484 if len(line_items) >= 9: 

485 slots, migrations = _parse_slots(line_items[8:]) 

486 node_dict["slots"], node_dict["migrations"] = slots, migrations 

487 return addr, node_dict 

488 

489 

490def _parse_slots(slot_ranges): 

491 slots, migrations = [], [] 

492 for s_range in slot_ranges: 

493 if "->-" in s_range: 

494 slot_id, dst_node_id = s_range[1:-1].split("->-", 1) 

495 migrations.append( 

496 {"slot": slot_id, "node_id": dst_node_id, "state": "migrating"} 

497 ) 

498 elif "-<-" in s_range: 

499 slot_id, src_node_id = s_range[1:-1].split("-<-", 1) 

500 migrations.append( 

501 {"slot": slot_id, "node_id": src_node_id, "state": "importing"} 

502 ) 

503 else: 

504 s_range = [sl for sl in s_range.split("-")] 

505 slots.append(s_range) 

506 

507 return slots, migrations 

508 

509 

510def parse_cluster_nodes(response, **options): 

511 """ 

512 @see: https://redis.io/commands/cluster-nodes # string / bytes 

513 @see: https://redis.io/commands/cluster-replicas # list of string / bytes 

514 """ 

515 if isinstance(response, (str, bytes)): 

516 response = response.splitlines() 

517 return dict(_parse_node_line(str_if_bytes(node)) for node in response) 

518 

519 

520def parse_geosearch_generic(response, **options): 

521 """ 

522 Parse the response of 'GEOSEARCH', GEORADIUS' and 'GEORADIUSBYMEMBER' 

523 commands according to 'withdist', 'withhash' and 'withcoord' labels. 

524 """ 

525 try: 

526 if options["store"] or options["store_dist"]: 

527 # `store` and `store_dist` cant be combined 

528 # with other command arguments. 

529 # relevant to 'GEORADIUS' and 'GEORADIUSBYMEMBER' 

530 return response 

531 except KeyError: # it means the command was sent via execute_command 

532 return response 

533 

534 if type(response) != list: 

535 response_list = [response] 

536 else: 

537 response_list = response 

538 

539 if not options["withdist"] and not options["withcoord"] and not options["withhash"]: 

540 # just a bunch of places 

541 return response_list 

542 

543 cast = { 

544 "withdist": float, 

545 "withcoord": lambda ll: (float(ll[0]), float(ll[1])), 

546 "withhash": int, 

547 } 

548 

549 # zip all output results with each casting function to get 

550 # the properly native Python value. 

551 f = [lambda x: x] 

552 f += [cast[o] for o in ["withdist", "withhash", "withcoord"] if options[o]] 

553 return [list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list] 

554 

555 

556def parse_command(response, **options): 

557 commands = {} 

558 for command in response: 

559 cmd_dict = {} 

560 cmd_name = str_if_bytes(command[0]) 

561 cmd_dict["name"] = cmd_name 

562 cmd_dict["arity"] = int(command[1]) 

563 cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]] 

564 cmd_dict["first_key_pos"] = command[3] 

565 cmd_dict["last_key_pos"] = command[4] 

566 cmd_dict["step_count"] = command[5] 

567 if len(command) > 7: 

568 cmd_dict["tips"] = command[7] 

569 cmd_dict["key_specifications"] = command[8] 

570 cmd_dict["subcommands"] = command[9] 

571 commands[cmd_name] = cmd_dict 

572 return commands 

573 

574 

575def parse_pubsub_numsub(response, **options): 

576 return list(zip(response[0::2], response[1::2])) 

577 

578 

579def parse_client_kill(response, **options): 

580 if isinstance(response, int): 

581 return response 

582 return str_if_bytes(response) == "OK" 

583 

584 

585def parse_acl_getuser(response, **options): 

586 if response is None: 

587 return None 

588 data = pairs_to_dict(response, decode_keys=True) 

589 

590 # convert everything but user-defined data in 'keys' to native strings 

591 data["flags"] = list(map(str_if_bytes, data["flags"])) 

592 data["passwords"] = list(map(str_if_bytes, data["passwords"])) 

593 data["commands"] = str_if_bytes(data["commands"]) 

594 if isinstance(data["keys"], str) or isinstance(data["keys"], bytes): 

595 data["keys"] = list(str_if_bytes(data["keys"]).split(" ")) 

596 if data["keys"] == [""]: 

597 data["keys"] = [] 

598 if "channels" in data: 

599 if isinstance(data["channels"], str) or isinstance(data["channels"], bytes): 

600 data["channels"] = list(str_if_bytes(data["channels"]).split(" ")) 

601 if data["channels"] == [""]: 

602 data["channels"] = [] 

603 if "selectors" in data: 

604 data["selectors"] = [ 

605 list(map(str_if_bytes, selector)) for selector in data["selectors"] 

606 ] 

607 

608 # split 'commands' into separate 'categories' and 'commands' lists 

609 commands, categories = [], [] 

610 for command in data["commands"].split(" "): 

611 if "@" in command: 

612 categories.append(command) 

613 else: 

614 commands.append(command) 

615 

616 data["commands"] = commands 

617 data["categories"] = categories 

618 data["enabled"] = "on" in data["flags"] 

619 return data 

620 

621 

622def parse_acl_log(response, **options): 

623 if response is None: 

624 return None 

625 if isinstance(response, list): 

626 data = [] 

627 for log in response: 

628 log_data = pairs_to_dict(log, True, True) 

629 client_info = log_data.get("client-info", "") 

630 log_data["client-info"] = parse_client_info(client_info) 

631 

632 # float() is lossy comparing to the "double" in C 

633 log_data["age-seconds"] = float(log_data["age-seconds"]) 

634 data.append(log_data) 

635 else: 

636 data = bool_ok(response) 

637 return data 

638 

639 

640def parse_client_info(value): 

641 """ 

642 Parsing client-info in ACL Log in following format. 

643 "key1=value1 key2=value2 key3=value3" 

644 """ 

645 client_info = {} 

646 infos = str_if_bytes(value).split(" ") 

647 for info in infos: 

648 key, value = info.split("=") 

649 client_info[key] = value 

650 

651 # Those fields are defined as int in networking.c 

652 for int_key in { 

653 "id", 

654 "age", 

655 "idle", 

656 "db", 

657 "sub", 

658 "psub", 

659 "multi", 

660 "qbuf", 

661 "qbuf-free", 

662 "obl", 

663 "argv-mem", 

664 "oll", 

665 "omem", 

666 "tot-mem", 

667 }: 

668 client_info[int_key] = int(client_info[int_key]) 

669 return client_info 

670 

671 

672def parse_module_result(response): 

673 if isinstance(response, ModuleError): 

674 raise response 

675 return True 

676 

677 

678def parse_set_result(response, **options): 

679 """ 

680 Handle SET result since GET argument is available since Redis 6.2. 

681 Parsing SET result into: 

682 - BOOL 

683 - String when GET argument is used 

684 """ 

685 if options.get("get"): 

686 # Redis will return a getCommand result. 

687 # See `setGenericCommand` in t_string.c 

688 return response 

689 return response and str_if_bytes(response) == "OK" 

690 

691 

692class AbstractRedis: 

693 RESPONSE_CALLBACKS = { 

694 **string_keys_to_dict( 

695 "AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT " 

696 "HEXISTS HMSET MOVE MSETNX PERSIST " 

697 "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX", 

698 bool, 

699 ), 

700 **string_keys_to_dict( 

701 "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN " 

702 "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD " 

703 "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN " 

704 "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM " 

705 "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE", 

706 int, 

707 ), 

708 **string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float), 

709 **string_keys_to_dict( 

710 # these return OK, or int if redis-server is >=1.3.4 

711 "LPUSH RPUSH", 

712 lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK", 

713 ), 

714 **string_keys_to_dict("SORT", sort_return_tuples), 

715 **string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none), 

716 **string_keys_to_dict( 

717 "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READONLY READWRITE " 

718 "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ", 

719 bool_ok, 

720 ), 

721 **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None), 

722 **string_keys_to_dict( 

723 "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set() 

724 ), 

725 **string_keys_to_dict( 

726 "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE " 

727 "ZREVRANGE ZREVRANGEBYSCORE", 

728 zset_score_pairs, 

729 ), 

730 **string_keys_to_dict( 

731 "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None 

732 ), 

733 **string_keys_to_dict("ZRANK ZREVRANK", int_or_none), 

734 **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list), 

735 **string_keys_to_dict("XREAD XREADGROUP", parse_xread), 

736 **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True), 

737 "ACL CAT": lambda r: list(map(str_if_bytes, r)), 

738 "ACL DELUSER": int, 

739 "ACL GENPASS": str_if_bytes, 

740 "ACL GETUSER": parse_acl_getuser, 

741 "ACL HELP": lambda r: list(map(str_if_bytes, r)), 

742 "ACL LIST": lambda r: list(map(str_if_bytes, r)), 

743 "ACL LOAD": bool_ok, 

744 "ACL LOG": parse_acl_log, 

745 "ACL SAVE": bool_ok, 

746 "ACL SETUSER": bool_ok, 

747 "ACL USERS": lambda r: list(map(str_if_bytes, r)), 

748 "ACL WHOAMI": str_if_bytes, 

749 "CLIENT GETNAME": str_if_bytes, 

750 "CLIENT ID": int, 

751 "CLIENT KILL": parse_client_kill, 

752 "CLIENT LIST": parse_client_list, 

753 "CLIENT INFO": parse_client_info, 

754 "CLIENT SETNAME": bool_ok, 

755 "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False, 

756 "CLIENT PAUSE": bool_ok, 

757 "CLIENT GETREDIR": int, 

758 "CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)), 

759 "CLUSTER ADDSLOTS": bool_ok, 

760 "CLUSTER ADDSLOTSRANGE": bool_ok, 

761 "CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x), 

762 "CLUSTER COUNTKEYSINSLOT": lambda x: int(x), 

763 "CLUSTER DELSLOTS": bool_ok, 

764 "CLUSTER DELSLOTSRANGE": bool_ok, 

765 "CLUSTER FAILOVER": bool_ok, 

766 "CLUSTER FORGET": bool_ok, 

767 "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)), 

768 "CLUSTER INFO": parse_cluster_info, 

769 "CLUSTER KEYSLOT": lambda x: int(x), 

770 "CLUSTER MEET": bool_ok, 

771 "CLUSTER NODES": parse_cluster_nodes, 

772 "CLUSTER REPLICAS": parse_cluster_nodes, 

773 "CLUSTER REPLICATE": bool_ok, 

774 "CLUSTER RESET": bool_ok, 

775 "CLUSTER SAVECONFIG": bool_ok, 

776 "CLUSTER SET-CONFIG-EPOCH": bool_ok, 

777 "CLUSTER SETSLOT": bool_ok, 

778 "CLUSTER SLAVES": parse_cluster_nodes, 

779 "COMMAND": parse_command, 

780 "COMMAND COUNT": int, 

781 "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)), 

782 "CONFIG GET": parse_config_get, 

783 "CONFIG RESETSTAT": bool_ok, 

784 "CONFIG SET": bool_ok, 

785 "DEBUG OBJECT": parse_debug_object, 

786 "FUNCTION DELETE": bool_ok, 

787 "FUNCTION FLUSH": bool_ok, 

788 "FUNCTION RESTORE": bool_ok, 

789 "GEOHASH": lambda r: list(map(str_if_bytes, r)), 

790 "GEOPOS": lambda r: list( 

791 map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r) 

792 ), 

793 "GEOSEARCH": parse_geosearch_generic, 

794 "GEORADIUS": parse_geosearch_generic, 

795 "GEORADIUSBYMEMBER": parse_geosearch_generic, 

796 "HGETALL": lambda r: r and pairs_to_dict(r) or {}, 

797 "HSCAN": parse_hscan, 

798 "INFO": parse_info, 

799 "LASTSAVE": timestamp_to_datetime, 

800 "MEMORY PURGE": bool_ok, 

801 "MEMORY STATS": parse_memory_stats, 

802 "MEMORY USAGE": int_or_none, 

803 "MODULE LOAD": parse_module_result, 

804 "MODULE UNLOAD": parse_module_result, 

805 "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r], 

806 "OBJECT": parse_object, 

807 "PING": lambda r: str_if_bytes(r) == "PONG", 

808 "QUIT": bool_ok, 

809 "STRALGO": parse_stralgo, 

810 "PUBSUB NUMSUB": parse_pubsub_numsub, 

811 "RANDOMKEY": lambda r: r and r or None, 

812 "RESET": str_if_bytes, 

813 "SCAN": parse_scan, 

814 "SCRIPT EXISTS": lambda r: list(map(bool, r)), 

815 "SCRIPT FLUSH": bool_ok, 

816 "SCRIPT KILL": bool_ok, 

817 "SCRIPT LOAD": str_if_bytes, 

818 "SENTINEL CKQUORUM": bool_ok, 

819 "SENTINEL FAILOVER": bool_ok, 

820 "SENTINEL FLUSHCONFIG": bool_ok, 

821 "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master, 

822 "SENTINEL MASTER": parse_sentinel_master, 

823 "SENTINEL MASTERS": parse_sentinel_masters, 

824 "SENTINEL MONITOR": bool_ok, 

825 "SENTINEL RESET": bool_ok, 

826 "SENTINEL REMOVE": bool_ok, 

827 "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels, 

828 "SENTINEL SET": bool_ok, 

829 "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels, 

830 "SET": parse_set_result, 

831 "SLOWLOG GET": parse_slowlog_get, 

832 "SLOWLOG LEN": int, 

833 "SLOWLOG RESET": bool_ok, 

834 "SSCAN": parse_scan, 

835 "TIME": lambda x: (int(x[0]), int(x[1])), 

836 "XCLAIM": parse_xclaim, 

837 "XAUTOCLAIM": parse_xautoclaim, 

838 "XGROUP CREATE": bool_ok, 

839 "XGROUP DELCONSUMER": int, 

840 "XGROUP DESTROY": bool, 

841 "XGROUP SETID": bool_ok, 

842 "XINFO CONSUMERS": parse_list_of_dicts, 

843 "XINFO GROUPS": parse_list_of_dicts, 

844 "XINFO STREAM": parse_xinfo_stream, 

845 "XPENDING": parse_xpending, 

846 "ZADD": parse_zadd, 

847 "ZSCAN": parse_zscan, 

848 "ZMSCORE": parse_zmscore, 

849 } 

850 

851 

852class Redis(AbstractRedis, RedisModuleCommands, CoreCommands, SentinelCommands): 

853 """ 

854 Implementation of the Redis protocol. 

855 

856 This abstract class provides a Python interface to all Redis commands 

857 and an implementation of the Redis protocol. 

858 

859 Pipelines derive from this, implementing how 

860 the commands are sent and received to the Redis server. Based on 

861 configuration, an instance will either use a ConnectionPool, or 

862 Connection object to talk to redis. 

863 

864 It is not safe to pass PubSub or Pipeline objects between threads. 

865 """ 

866 

867 @classmethod 

868 def from_url(cls, url, **kwargs): 

869 """ 

870 Return a Redis client object configured from the given URL 

871 

872 For example:: 

873 

874 redis://[[username]:[password]]@localhost:6379/0 

875 rediss://[[username]:[password]]@localhost:6379/0 

876 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

877 

878 Three URL schemes are supported: 

879 

880 - `redis://` creates a TCP socket connection. See more at: 

881 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

882 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

883 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

884 - ``unix://``: creates a Unix Domain Socket connection. 

885 

886 The username, password, hostname, path and all querystring values 

887 are passed through urllib.parse.unquote in order to replace any 

888 percent-encoded values with their corresponding characters. 

889 

890 There are several ways to specify a database number. The first value 

891 found will be used: 

892 

893 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

894 2. If using the redis:// or rediss:// schemes, the path argument 

895 of the url, e.g. redis://localhost/0 

896 3. A ``db`` keyword argument to this function. 

897 

898 If none of these options are specified, the default db=0 is used. 

899 

900 All querystring options are cast to their appropriate Python types. 

901 Boolean arguments can be specified with string values "True"/"False" 

902 or "Yes"/"No". Values that cannot be properly cast cause a 

903 ``ValueError`` to be raised. Once parsed, the querystring arguments 

904 and keyword arguments are passed to the ``ConnectionPool``'s 

905 class initializer. In the case of conflicting arguments, querystring 

906 arguments always win. 

907 

908 """ 

909 single_connection_client = kwargs.pop("single_connection_client", False) 

910 connection_pool = ConnectionPool.from_url(url, **kwargs) 

911 return cls( 

912 connection_pool=connection_pool, 

913 single_connection_client=single_connection_client, 

914 ) 

915 

916 def __init__( 

917 self, 

918 host="localhost", 

919 port=6379, 

920 db=0, 

921 password=None, 

922 socket_timeout=None, 

923 socket_connect_timeout=None, 

924 socket_keepalive=None, 

925 socket_keepalive_options=None, 

926 connection_pool=None, 

927 unix_socket_path=None, 

928 encoding="utf-8", 

929 encoding_errors="strict", 

930 charset=None, 

931 errors=None, 

932 decode_responses=False, 

933 retry_on_timeout=False, 

934 retry_on_error=None, 

935 ssl=False, 

936 ssl_keyfile=None, 

937 ssl_certfile=None, 

938 ssl_cert_reqs="required", 

939 ssl_ca_certs=None, 

940 ssl_ca_path=None, 

941 ssl_ca_data=None, 

942 ssl_check_hostname=False, 

943 ssl_password=None, 

944 ssl_validate_ocsp=False, 

945 ssl_validate_ocsp_stapled=False, 

946 ssl_ocsp_context=None, 

947 ssl_ocsp_expected_cert=None, 

948 max_connections=None, 

949 single_connection_client=False, 

950 health_check_interval=0, 

951 client_name=None, 

952 username=None, 

953 retry=None, 

954 redis_connect_func=None, 

955 credential_provider: Optional[CredentialProvider] = None, 

956 ): 

957 """ 

958 Initialize a new Redis client. 

959 To specify a retry policy for specific errors, first set 

960 `retry_on_error` to a list of the error/s to retry on, then set 

961 `retry` to a valid `Retry` object. 

962 To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. 

963 

964 Args: 

965 

966 single_connection_client: 

967 if `True`, connection pool is not used. In that case `Redis` 

968 instance use is not thread safe. 

969 """ 

970 if not connection_pool: 

971 if charset is not None: 

972 warnings.warn( 

973 DeprecationWarning( 

974 '"charset" is deprecated. Use "encoding" instead' 

975 ) 

976 ) 

977 encoding = charset 

978 if errors is not None: 

979 warnings.warn( 

980 DeprecationWarning( 

981 '"errors" is deprecated. Use "encoding_errors" instead' 

982 ) 

983 ) 

984 encoding_errors = errors 

985 if not retry_on_error: 

986 retry_on_error = [] 

987 if retry_on_timeout is True: 

988 retry_on_error.append(TimeoutError) 

989 kwargs = { 

990 "db": db, 

991 "username": username, 

992 "password": password, 

993 "socket_timeout": socket_timeout, 

994 "encoding": encoding, 

995 "encoding_errors": encoding_errors, 

996 "decode_responses": decode_responses, 

997 "retry_on_error": retry_on_error, 

998 "retry": copy.deepcopy(retry), 

999 "max_connections": max_connections, 

1000 "health_check_interval": health_check_interval, 

1001 "client_name": client_name, 

1002 "redis_connect_func": redis_connect_func, 

1003 "credential_provider": credential_provider, 

1004 } 

1005 # based on input, setup appropriate connection args 

1006 if unix_socket_path is not None: 

1007 kwargs.update( 

1008 { 

1009 "path": unix_socket_path, 

1010 "connection_class": UnixDomainSocketConnection, 

1011 } 

1012 ) 

1013 else: 

1014 # TCP specific options 

1015 kwargs.update( 

1016 { 

1017 "host": host, 

1018 "port": port, 

1019 "socket_connect_timeout": socket_connect_timeout, 

1020 "socket_keepalive": socket_keepalive, 

1021 "socket_keepalive_options": socket_keepalive_options, 

1022 } 

1023 ) 

1024 

1025 if ssl: 

1026 kwargs.update( 

1027 { 

1028 "connection_class": SSLConnection, 

1029 "ssl_keyfile": ssl_keyfile, 

1030 "ssl_certfile": ssl_certfile, 

1031 "ssl_cert_reqs": ssl_cert_reqs, 

1032 "ssl_ca_certs": ssl_ca_certs, 

1033 "ssl_ca_data": ssl_ca_data, 

1034 "ssl_check_hostname": ssl_check_hostname, 

1035 "ssl_password": ssl_password, 

1036 "ssl_ca_path": ssl_ca_path, 

1037 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

1038 "ssl_validate_ocsp": ssl_validate_ocsp, 

1039 "ssl_ocsp_context": ssl_ocsp_context, 

1040 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

1041 } 

1042 ) 

1043 connection_pool = ConnectionPool(**kwargs) 

1044 self.connection_pool = connection_pool 

1045 self.connection = None 

1046 if single_connection_client: 

1047 self.connection = self.connection_pool.get_connection("_") 

1048 

1049 self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS) 

1050 

1051 def __repr__(self): 

1052 return f"{type(self).__name__}<{repr(self.connection_pool)}>" 

1053 

1054 def get_encoder(self): 

1055 """Get the connection pool's encoder""" 

1056 return self.connection_pool.get_encoder() 

1057 

1058 def get_connection_kwargs(self): 

1059 """Get the connection's key-word arguments""" 

1060 return self.connection_pool.connection_kwargs 

1061 

1062 def get_retry(self) -> Optional["Retry"]: 

1063 return self.get_connection_kwargs().get("retry") 

1064 

1065 def set_retry(self, retry: "Retry") -> None: 

1066 self.get_connection_kwargs().update({"retry": retry}) 

1067 self.connection_pool.set_retry(retry) 

1068 

1069 def set_response_callback(self, command, callback): 

1070 """Set a custom Response Callback""" 

1071 self.response_callbacks[command] = callback 

1072 

1073 def load_external_module(self, funcname, func): 

1074 """ 

1075 This function can be used to add externally defined redis modules, 

1076 and their namespaces to the redis client. 

1077 

1078 funcname - A string containing the name of the function to create 

1079 func - The function, being added to this class. 

1080 

1081 ex: Assume that one has a custom redis module named foomod that 

1082 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

1083 To load function functions into this namespace: 

1084 

1085 from redis import Redis 

1086 from foomodule import F 

1087 r = Redis() 

1088 r.load_external_module("foo", F) 

1089 r.foo().dothing('your', 'arguments') 

1090 

1091 For a concrete example see the reimport of the redisjson module in 

1092 tests/test_connection.py::test_loading_external_modules 

1093 """ 

1094 setattr(self, funcname, func) 

1095 

1096 def pipeline(self, transaction=True, shard_hint=None): 

1097 """ 

1098 Return a new pipeline object that can queue multiple commands for 

1099 later execution. ``transaction`` indicates whether all commands 

1100 should be executed atomically. Apart from making a group of operations 

1101 atomic, pipelines are useful for reducing the back-and-forth overhead 

1102 between the client and server. 

1103 """ 

1104 return Pipeline( 

1105 self.connection_pool, self.response_callbacks, transaction, shard_hint 

1106 ) 

1107 

1108 def transaction(self, func, *watches, **kwargs): 

1109 """ 

1110 Convenience method for executing the callable `func` as a transaction 

1111 while watching all keys specified in `watches`. The 'func' callable 

1112 should expect a single argument which is a Pipeline object. 

1113 """ 

1114 shard_hint = kwargs.pop("shard_hint", None) 

1115 value_from_callable = kwargs.pop("value_from_callable", False) 

1116 watch_delay = kwargs.pop("watch_delay", None) 

1117 with self.pipeline(True, shard_hint) as pipe: 

1118 while True: 

1119 try: 

1120 if watches: 

1121 pipe.watch(*watches) 

1122 func_value = func(pipe) 

1123 exec_value = pipe.execute() 

1124 return func_value if value_from_callable else exec_value 

1125 except WatchError: 

1126 if watch_delay is not None and watch_delay > 0: 

1127 time.sleep(watch_delay) 

1128 continue 

1129 

1130 def lock( 

1131 self, 

1132 name, 

1133 timeout=None, 

1134 sleep=0.1, 

1135 blocking=True, 

1136 blocking_timeout=None, 

1137 lock_class=None, 

1138 thread_local=True, 

1139 ): 

1140 """ 

1141 Return a new Lock object using key ``name`` that mimics 

1142 the behavior of threading.Lock. 

1143 

1144 If specified, ``timeout`` indicates a maximum life for the lock. 

1145 By default, it will remain locked until release() is called. 

1146 

1147 ``sleep`` indicates the amount of time to sleep per loop iteration 

1148 when the lock is in blocking mode and another client is currently 

1149 holding the lock. 

1150 

1151 ``blocking`` indicates whether calling ``acquire`` should block until 

1152 the lock has been acquired or to fail immediately, causing ``acquire`` 

1153 to return False and the lock not being acquired. Defaults to True. 

1154 Note this value can be overridden by passing a ``blocking`` 

1155 argument to ``acquire``. 

1156 

1157 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1158 spend trying to acquire the lock. A value of ``None`` indicates 

1159 continue trying forever. ``blocking_timeout`` can be specified as a 

1160 float or integer, both representing the number of seconds to wait. 

1161 

1162 ``lock_class`` forces the specified lock implementation. Note that as 

1163 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1164 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1165 you have created your own custom lock class. 

1166 

1167 ``thread_local`` indicates whether the lock token is placed in 

1168 thread-local storage. By default, the token is placed in thread local 

1169 storage so that a thread only sees its token, not a token set by 

1170 another thread. Consider the following timeline: 

1171 

1172 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1173 thread-1 sets the token to "abc" 

1174 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1175 Lock instance. 

1176 time: 5, thread-1 has not yet completed. redis expires the lock 

1177 key. 

1178 time: 5, thread-2 acquired `my-lock` now that it's available. 

1179 thread-2 sets the token to "xyz" 

1180 time: 6, thread-1 finishes its work and calls release(). if the 

1181 token is *not* stored in thread local storage, then 

1182 thread-1 would see the token value as "xyz" and would be 

1183 able to successfully release the thread-2's lock. 

1184 

1185 In some use cases it's necessary to disable thread local storage. For 

1186 example, if you have code where one thread acquires a lock and passes 

1187 that lock instance to a worker thread to release later. If thread 

1188 local storage isn't disabled in this case, the worker thread won't see 

1189 the token set by the thread that acquired the lock. Our assumption 

1190 is that these cases aren't common and as such default to using 

1191 thread local storage.""" 

1192 if lock_class is None: 

1193 lock_class = Lock 

1194 return lock_class( 

1195 self, 

1196 name, 

1197 timeout=timeout, 

1198 sleep=sleep, 

1199 blocking=blocking, 

1200 blocking_timeout=blocking_timeout, 

1201 thread_local=thread_local, 

1202 ) 

1203 

1204 def pubsub(self, **kwargs): 

1205 """ 

1206 Return a Publish/Subscribe object. With this object, you can 

1207 subscribe to channels and listen for messages that get published to 

1208 them. 

1209 """ 

1210 return PubSub(self.connection_pool, **kwargs) 

1211 

1212 def monitor(self): 

1213 return Monitor(self.connection_pool) 

1214 

1215 def client(self): 

1216 return self.__class__( 

1217 connection_pool=self.connection_pool, single_connection_client=True 

1218 ) 

1219 

1220 def __enter__(self): 

1221 return self 

1222 

1223 def __exit__(self, exc_type, exc_value, traceback): 

1224 self.close() 

1225 

1226 def __del__(self): 

1227 self.close() 

1228 

1229 def close(self): 

1230 # In case a connection property does not yet exist 

1231 # (due to a crash earlier in the Redis() constructor), return 

1232 # immediately as there is nothing to clean-up. 

1233 if not hasattr(self, "connection"): 

1234 return 

1235 

1236 conn = self.connection 

1237 if conn: 

1238 self.connection = None 

1239 self.connection_pool.release(conn) 

1240 

1241 def _send_command_parse_response(self, conn, command_name, *args, **options): 

1242 """ 

1243 Send a command and parse the response 

1244 """ 

1245 conn.send_command(*args) 

1246 return self.parse_response(conn, command_name, **options) 

1247 

1248 def _disconnect_raise(self, conn, error): 

1249 """ 

1250 Close the connection and raise an exception 

1251 if retry_on_error is not set or the error 

1252 is not one of the specified error types 

1253 """ 

1254 conn.disconnect() 

1255 if ( 

1256 conn.retry_on_error is None 

1257 or isinstance(error, tuple(conn.retry_on_error)) is False 

1258 ): 

1259 raise error 

1260 

1261 # COMMAND EXECUTION AND PROTOCOL PARSING 

1262 def execute_command(self, *args, **options): 

1263 """Execute a command and return a parsed response""" 

1264 pool = self.connection_pool 

1265 command_name = args[0] 

1266 conn = self.connection or pool.get_connection(command_name, **options) 

1267 

1268 try: 

1269 return conn.retry.call_with_retry( 

1270 lambda: self._send_command_parse_response( 

1271 conn, command_name, *args, **options 

1272 ), 

1273 lambda error: self._disconnect_raise(conn, error), 

1274 ) 

1275 finally: 

1276 if not self.connection: 

1277 pool.release(conn) 

1278 

1279 def parse_response(self, connection, command_name, **options): 

1280 """Parses a response from the Redis server""" 

1281 try: 

1282 if NEVER_DECODE in options: 

1283 response = connection.read_response(disable_decoding=True) 

1284 options.pop(NEVER_DECODE) 

1285 else: 

1286 response = connection.read_response() 

1287 except ResponseError: 

1288 if EMPTY_RESPONSE in options: 

1289 return options[EMPTY_RESPONSE] 

1290 raise 

1291 

1292 if EMPTY_RESPONSE in options: 

1293 options.pop(EMPTY_RESPONSE) 

1294 

1295 if command_name in self.response_callbacks: 

1296 return self.response_callbacks[command_name](response, **options) 

1297 return response 

1298 

1299 

1300StrictRedis = Redis 

1301 

1302 

1303class Monitor: 

1304 """ 

1305 Monitor is useful for handling the MONITOR command to the redis server. 

1306 next_command() method returns one command from monitor 

1307 listen() method yields commands from monitor. 

1308 """ 

1309 

1310 monitor_re = re.compile(r"\[(\d+) (.*)\] (.*)") 

1311 command_re = re.compile(r'"(.*?)(?<!\\)"') 

1312 

1313 def __init__(self, connection_pool): 

1314 self.connection_pool = connection_pool 

1315 self.connection = self.connection_pool.get_connection("MONITOR") 

1316 

1317 def __enter__(self): 

1318 self.connection.send_command("MONITOR") 

1319 # check that monitor returns 'OK', but don't return it to user 

1320 response = self.connection.read_response() 

1321 if not bool_ok(response): 

1322 raise RedisError(f"MONITOR failed: {response}") 

1323 return self 

1324 

1325 def __exit__(self, *args): 

1326 self.connection.disconnect() 

1327 self.connection_pool.release(self.connection) 

1328 

1329 def next_command(self): 

1330 """Parse the response from a monitor command""" 

1331 response = self.connection.read_response() 

1332 if isinstance(response, bytes): 

1333 response = self.connection.encoder.decode(response, force=True) 

1334 command_time, command_data = response.split(" ", 1) 

1335 m = self.monitor_re.match(command_data) 

1336 db_id, client_info, command = m.groups() 

1337 command = " ".join(self.command_re.findall(command)) 

1338 # Redis escapes double quotes because each piece of the command 

1339 # string is surrounded by double quotes. We don't have that 

1340 # requirement so remove the escaping and leave the quote. 

1341 command = command.replace('\\"', '"') 

1342 

1343 if client_info == "lua": 

1344 client_address = "lua" 

1345 client_port = "" 

1346 client_type = "lua" 

1347 elif client_info.startswith("unix"): 

1348 client_address = "unix" 

1349 client_port = client_info[5:] 

1350 client_type = "unix" 

1351 else: 

1352 # use rsplit as ipv6 addresses contain colons 

1353 client_address, client_port = client_info.rsplit(":", 1) 

1354 client_type = "tcp" 

1355 return { 

1356 "time": float(command_time), 

1357 "db": int(db_id), 

1358 "client_address": client_address, 

1359 "client_port": client_port, 

1360 "client_type": client_type, 

1361 "command": command, 

1362 } 

1363 

1364 def listen(self): 

1365 """Listen for commands coming to the server.""" 

1366 while True: 

1367 yield self.next_command() 

1368 

1369 

1370class PubSub: 

1371 """ 

1372 PubSub provides publish, subscribe and listen support to Redis channels. 

1373 

1374 After subscribing to one or more channels, the listen() method will block 

1375 until a message arrives on one of the subscribed channels. That message 

1376 will be returned and it's safe to start listening again. 

1377 """ 

1378 

1379 PUBLISH_MESSAGE_TYPES = ("message", "pmessage") 

1380 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe") 

1381 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

1382 

1383 def __init__( 

1384 self, 

1385 connection_pool, 

1386 shard_hint=None, 

1387 ignore_subscribe_messages=False, 

1388 encoder=None, 

1389 ): 

1390 self.connection_pool = connection_pool 

1391 self.shard_hint = shard_hint 

1392 self.ignore_subscribe_messages = ignore_subscribe_messages 

1393 self.connection = None 

1394 self.subscribed_event = threading.Event() 

1395 # we need to know the encoding options for this connection in order 

1396 # to lookup channel and pattern names for callback handlers. 

1397 self.encoder = encoder 

1398 if self.encoder is None: 

1399 self.encoder = self.connection_pool.get_encoder() 

1400 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

1401 if self.encoder.decode_responses: 

1402 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

1403 else: 

1404 self.health_check_response = [b"pong", self.health_check_response_b] 

1405 self.reset() 

1406 

1407 def __enter__(self): 

1408 return self 

1409 

1410 def __exit__(self, exc_type, exc_value, traceback): 

1411 self.reset() 

1412 

1413 def __del__(self): 

1414 try: 

1415 # if this object went out of scope prior to shutting down 

1416 # subscriptions, close the connection manually before 

1417 # returning it to the connection pool 

1418 self.reset() 

1419 except Exception: 

1420 pass 

1421 

1422 def reset(self): 

1423 if self.connection: 

1424 self.connection.disconnect() 

1425 self.connection.clear_connect_callbacks() 

1426 self.connection_pool.release(self.connection) 

1427 self.connection = None 

1428 self.channels = {} 

1429 self.health_check_response_counter = 0 

1430 self.pending_unsubscribe_channels = set() 

1431 self.patterns = {} 

1432 self.pending_unsubscribe_patterns = set() 

1433 self.subscribed_event.clear() 

1434 

1435 def close(self): 

1436 self.reset() 

1437 

1438 def on_connect(self, connection): 

1439 "Re-subscribe to any channels and patterns previously subscribed to" 

1440 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

1441 # so we need to decode channel/pattern names back to unicode strings 

1442 # before passing them to [p]subscribe. 

1443 self.pending_unsubscribe_channels.clear() 

1444 self.pending_unsubscribe_patterns.clear() 

1445 if self.channels: 

1446 channels = {} 

1447 for k, v in self.channels.items(): 

1448 channels[self.encoder.decode(k, force=True)] = v 

1449 self.subscribe(**channels) 

1450 if self.patterns: 

1451 patterns = {} 

1452 for k, v in self.patterns.items(): 

1453 patterns[self.encoder.decode(k, force=True)] = v 

1454 self.psubscribe(**patterns) 

1455 

1456 @property 

1457 def subscribed(self): 

1458 """Indicates if there are subscriptions to any channels or patterns""" 

1459 return self.subscribed_event.is_set() 

1460 

1461 def execute_command(self, *args): 

1462 """Execute a publish/subscribe command""" 

1463 

1464 # NOTE: don't parse the response in this function -- it could pull a 

1465 # legitimate message off the stack if the connection is already 

1466 # subscribed to one or more channels 

1467 

1468 if self.connection is None: 

1469 self.connection = self.connection_pool.get_connection( 

1470 "pubsub", self.shard_hint 

1471 ) 

1472 # register a callback that re-subscribes to any channels we 

1473 # were listening to when we were disconnected 

1474 self.connection.register_connect_callback(self.on_connect) 

1475 connection = self.connection 

1476 kwargs = {"check_health": not self.subscribed} 

1477 if not self.subscribed: 

1478 self.clean_health_check_responses() 

1479 self._execute(connection, connection.send_command, *args, **kwargs) 

1480 

1481 def clean_health_check_responses(self): 

1482 """ 

1483 If any health check responses are present, clean them 

1484 """ 

1485 ttl = 10 

1486 conn = self.connection 

1487 while self.health_check_response_counter > 0 and ttl > 0: 

1488 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

1489 response = self._execute(conn, conn.read_response) 

1490 if self.is_health_check_response(response): 

1491 self.health_check_response_counter -= 1 

1492 else: 

1493 raise PubSubError( 

1494 "A non health check response was cleaned by " 

1495 "execute_command: {0}".format(response) 

1496 ) 

1497 ttl -= 1 

1498 

1499 def _disconnect_raise_connect(self, conn, error): 

1500 """ 

1501 Close the connection and raise an exception 

1502 if retry_on_timeout is not set or the error 

1503 is not a TimeoutError. Otherwise, try to reconnect 

1504 """ 

1505 conn.disconnect() 

1506 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): 

1507 raise error 

1508 conn.connect() 

1509 

1510 def _execute(self, conn, command, *args, **kwargs): 

1511 """ 

1512 Connect manually upon disconnection. If the Redis server is down, 

1513 this will fail and raise a ConnectionError as desired. 

1514 After reconnection, the ``on_connect`` callback should have been 

1515 called by the # connection to resubscribe us to any channels and 

1516 patterns we were previously listening to 

1517 """ 

1518 return conn.retry.call_with_retry( 

1519 lambda: command(*args, **kwargs), 

1520 lambda error: self._disconnect_raise_connect(conn, error), 

1521 ) 

1522 

1523 def parse_response(self, block=True, timeout=0): 

1524 """Parse the response from a publish/subscribe command""" 

1525 conn = self.connection 

1526 if conn is None: 

1527 raise RuntimeError( 

1528 "pubsub connection not set: " 

1529 "did you forget to call subscribe() or psubscribe()?" 

1530 ) 

1531 

1532 self.check_health() 

1533 

1534 def try_read(): 

1535 if not block: 

1536 if not conn.can_read(timeout=timeout): 

1537 return None 

1538 else: 

1539 conn.connect() 

1540 return conn.read_response(disconnect_on_error=False) 

1541 

1542 response = self._execute(conn, try_read) 

1543 

1544 if self.is_health_check_response(response): 

1545 # ignore the health check message as user might not expect it 

1546 self.health_check_response_counter -= 1 

1547 return None 

1548 return response 

1549 

1550 def is_health_check_response(self, response): 

1551 """ 

1552 Check if the response is a health check response. 

1553 If there are no subscriptions redis responds to PING command with a 

1554 bulk response, instead of a multi-bulk with "pong" and the response. 

1555 """ 

1556 return response in [ 

1557 self.health_check_response, # If there was a subscription 

1558 self.health_check_response_b, # If there wasn't 

1559 ] 

1560 

1561 def check_health(self): 

1562 conn = self.connection 

1563 if conn is None: 

1564 raise RuntimeError( 

1565 "pubsub connection not set: " 

1566 "did you forget to call subscribe() or psubscribe()?" 

1567 ) 

1568 

1569 if conn.health_check_interval and time.time() > conn.next_health_check: 

1570 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1571 self.health_check_response_counter += 1 

1572 

1573 def _normalize_keys(self, data): 

1574 """ 

1575 normalize channel/pattern names to be either bytes or strings 

1576 based on whether responses are automatically decoded. this saves us 

1577 from coercing the value for each message coming in. 

1578 """ 

1579 encode = self.encoder.encode 

1580 decode = self.encoder.decode 

1581 return {decode(encode(k)): v for k, v in data.items()} 

1582 

1583 def psubscribe(self, *args, **kwargs): 

1584 """ 

1585 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1586 expect a pattern name as the key and a callable as the value. A 

1587 pattern's callable will be invoked automatically when a message is 

1588 received on that pattern rather than producing a message via 

1589 ``listen()``. 

1590 """ 

1591 if args: 

1592 args = list_or_args(args[0], args[1:]) 

1593 new_patterns = dict.fromkeys(args) 

1594 new_patterns.update(kwargs) 

1595 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1596 # update the patterns dict AFTER we send the command. we don't want to 

1597 # subscribe twice to these patterns, once for the command and again 

1598 # for the reconnection. 

1599 new_patterns = self._normalize_keys(new_patterns) 

1600 self.patterns.update(new_patterns) 

1601 if not self.subscribed: 

1602 # Set the subscribed_event flag to True 

1603 self.subscribed_event.set() 

1604 # Clear the health check counter 

1605 self.health_check_response_counter = 0 

1606 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1607 return ret_val 

1608 

1609 def punsubscribe(self, *args): 

1610 """ 

1611 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1612 all patterns. 

1613 """ 

1614 if args: 

1615 args = list_or_args(args[0], args[1:]) 

1616 patterns = self._normalize_keys(dict.fromkeys(args)) 

1617 else: 

1618 patterns = self.patterns 

1619 self.pending_unsubscribe_patterns.update(patterns) 

1620 return self.execute_command("PUNSUBSCRIBE", *args) 

1621 

1622 def subscribe(self, *args, **kwargs): 

1623 """ 

1624 Subscribe to channels. Channels supplied as keyword arguments expect 

1625 a channel name as the key and a callable as the value. A channel's 

1626 callable will be invoked automatically when a message is received on 

1627 that channel rather than producing a message via ``listen()`` or 

1628 ``get_message()``. 

1629 """ 

1630 if args: 

1631 args = list_or_args(args[0], args[1:]) 

1632 new_channels = dict.fromkeys(args) 

1633 new_channels.update(kwargs) 

1634 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1635 # update the channels dict AFTER we send the command. we don't want to 

1636 # subscribe twice to these channels, once for the command and again 

1637 # for the reconnection. 

1638 new_channels = self._normalize_keys(new_channels) 

1639 self.channels.update(new_channels) 

1640 if not self.subscribed: 

1641 # Set the subscribed_event flag to True 

1642 self.subscribed_event.set() 

1643 # Clear the health check counter 

1644 self.health_check_response_counter = 0 

1645 self.pending_unsubscribe_channels.difference_update(new_channels) 

1646 return ret_val 

1647 

1648 def unsubscribe(self, *args): 

1649 """ 

1650 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1651 all channels 

1652 """ 

1653 if args: 

1654 args = list_or_args(args[0], args[1:]) 

1655 channels = self._normalize_keys(dict.fromkeys(args)) 

1656 else: 

1657 channels = self.channels 

1658 self.pending_unsubscribe_channels.update(channels) 

1659 return self.execute_command("UNSUBSCRIBE", *args) 

1660 

1661 def listen(self): 

1662 "Listen for messages on channels this client has been subscribed to" 

1663 while self.subscribed: 

1664 response = self.handle_message(self.parse_response(block=True)) 

1665 if response is not None: 

1666 yield response 

1667 

1668 def get_message(self, ignore_subscribe_messages=False, timeout=0.0): 

1669 """ 

1670 Get the next message if one is available, otherwise None. 

1671 

1672 If timeout is specified, the system will wait for `timeout` seconds 

1673 before returning. Timeout should be specified as a floating point 

1674 number, or None, to wait indefinitely. 

1675 """ 

1676 if not self.subscribed: 

1677 # Wait for subscription 

1678 start_time = time.time() 

1679 if self.subscribed_event.wait(timeout) is True: 

1680 # The connection was subscribed during the timeout time frame. 

1681 # The timeout should be adjusted based on the time spent 

1682 # waiting for the subscription 

1683 time_spent = time.time() - start_time 

1684 timeout = max(0.0, timeout - time_spent) 

1685 else: 

1686 # The connection isn't subscribed to any channels or patterns, 

1687 # so no messages are available 

1688 return None 

1689 

1690 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1691 if response: 

1692 return self.handle_message(response, ignore_subscribe_messages) 

1693 return None 

1694 

1695 def ping(self, message=None): 

1696 """ 

1697 Ping the Redis server 

1698 """ 

1699 message = "" if message is None else message 

1700 return self.execute_command("PING", message) 

1701 

1702 def handle_message(self, response, ignore_subscribe_messages=False): 

1703 """ 

1704 Parses a pub/sub message. If the channel or pattern was subscribed to 

1705 with a message handler, the handler is invoked instead of a parsed 

1706 message being returned. 

1707 """ 

1708 if response is None: 

1709 return None 

1710 message_type = str_if_bytes(response[0]) 

1711 if message_type == "pmessage": 

1712 message = { 

1713 "type": message_type, 

1714 "pattern": response[1], 

1715 "channel": response[2], 

1716 "data": response[3], 

1717 } 

1718 elif message_type == "pong": 

1719 message = { 

1720 "type": message_type, 

1721 "pattern": None, 

1722 "channel": None, 

1723 "data": response[1], 

1724 } 

1725 else: 

1726 message = { 

1727 "type": message_type, 

1728 "pattern": None, 

1729 "channel": response[1], 

1730 "data": response[2], 

1731 } 

1732 

1733 # if this is an unsubscribe message, remove it from memory 

1734 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1735 if message_type == "punsubscribe": 

1736 pattern = response[1] 

1737 if pattern in self.pending_unsubscribe_patterns: 

1738 self.pending_unsubscribe_patterns.remove(pattern) 

1739 self.patterns.pop(pattern, None) 

1740 else: 

1741 channel = response[1] 

1742 if channel in self.pending_unsubscribe_channels: 

1743 self.pending_unsubscribe_channels.remove(channel) 

1744 self.channels.pop(channel, None) 

1745 if not self.channels and not self.patterns: 

1746 # There are no subscriptions anymore, set subscribed_event flag 

1747 # to false 

1748 self.subscribed_event.clear() 

1749 

1750 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1751 # if there's a message handler, invoke it 

1752 if message_type == "pmessage": 

1753 handler = self.patterns.get(message["pattern"], None) 

1754 else: 

1755 handler = self.channels.get(message["channel"], None) 

1756 if handler: 

1757 handler(message) 

1758 return None 

1759 elif message_type != "pong": 

1760 # this is a subscribe/unsubscribe message. ignore if we don't 

1761 # want them 

1762 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1763 return None 

1764 

1765 return message 

1766 

1767 def run_in_thread(self, sleep_time=0, daemon=False, exception_handler=None): 

1768 for channel, handler in self.channels.items(): 

1769 if handler is None: 

1770 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1771 for pattern, handler in self.patterns.items(): 

1772 if handler is None: 

1773 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1774 

1775 thread = PubSubWorkerThread( 

1776 self, sleep_time, daemon=daemon, exception_handler=exception_handler 

1777 ) 

1778 thread.start() 

1779 return thread 

1780 

1781 

1782class PubSubWorkerThread(threading.Thread): 

1783 def __init__(self, pubsub, sleep_time, daemon=False, exception_handler=None): 

1784 super().__init__() 

1785 self.daemon = daemon 

1786 self.pubsub = pubsub 

1787 self.sleep_time = sleep_time 

1788 self.exception_handler = exception_handler 

1789 self._running = threading.Event() 

1790 

1791 def run(self): 

1792 if self._running.is_set(): 

1793 return 

1794 self._running.set() 

1795 pubsub = self.pubsub 

1796 sleep_time = self.sleep_time 

1797 while self._running.is_set(): 

1798 try: 

1799 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) 

1800 except BaseException as e: 

1801 if self.exception_handler is None: 

1802 raise 

1803 self.exception_handler(e, pubsub, self) 

1804 pubsub.close() 

1805 

1806 def stop(self): 

1807 # trip the flag so the run loop exits. the run loop will 

1808 # close the pubsub connection, which disconnects the socket 

1809 # and returns the connection to the pool. 

1810 self._running.clear() 

1811 

1812 

1813class Pipeline(Redis): 

1814 """ 

1815 Pipelines provide a way to transmit multiple commands to the Redis server 

1816 in one transmission. This is convenient for batch processing, such as 

1817 saving all the values in a list to Redis. 

1818 

1819 All commands executed within a pipeline are wrapped with MULTI and EXEC 

1820 calls. This guarantees all commands executed in the pipeline will be 

1821 executed atomically. 

1822 

1823 Any command raising an exception does *not* halt the execution of 

1824 subsequent commands in the pipeline. Instead, the exception is caught 

1825 and its instance is placed into the response list returned by execute(). 

1826 Code iterating over the response list should be able to deal with an 

1827 instance of an exception as a potential value. In general, these will be 

1828 ResponseError exceptions, such as those raised when issuing a command 

1829 on a key of a different datatype. 

1830 """ 

1831 

1832 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1833 

1834 def __init__(self, connection_pool, response_callbacks, transaction, shard_hint): 

1835 self.connection_pool = connection_pool 

1836 self.connection = None 

1837 self.response_callbacks = response_callbacks 

1838 self.transaction = transaction 

1839 self.shard_hint = shard_hint 

1840 

1841 self.watching = False 

1842 self.reset() 

1843 

1844 def __enter__(self): 

1845 return self 

1846 

1847 def __exit__(self, exc_type, exc_value, traceback): 

1848 self.reset() 

1849 

1850 def __del__(self): 

1851 try: 

1852 self.reset() 

1853 except Exception: 

1854 pass 

1855 

1856 def __len__(self): 

1857 return len(self.command_stack) 

1858 

1859 def __bool__(self): 

1860 """Pipeline instances should always evaluate to True""" 

1861 return True 

1862 

1863 def reset(self): 

1864 self.command_stack = [] 

1865 self.scripts = set() 

1866 # make sure to reset the connection state in the event that we were 

1867 # watching something 

1868 if self.watching and self.connection: 

1869 try: 

1870 # call this manually since our unwatch or 

1871 # immediate_execute_command methods can call reset() 

1872 self.connection.send_command("UNWATCH") 

1873 self.connection.read_response() 

1874 except ConnectionError: 

1875 # disconnect will also remove any previous WATCHes 

1876 self.connection.disconnect() 

1877 # clean up the other instance attributes 

1878 self.watching = False 

1879 self.explicit_transaction = False 

1880 # we can safely return the connection to the pool here since we're 

1881 # sure we're no longer WATCHing anything 

1882 if self.connection: 

1883 self.connection_pool.release(self.connection) 

1884 self.connection = None 

1885 

1886 def multi(self): 

1887 """ 

1888 Start a transactional block of the pipeline after WATCH commands 

1889 are issued. End the transactional block with `execute`. 

1890 """ 

1891 if self.explicit_transaction: 

1892 raise RedisError("Cannot issue nested calls to MULTI") 

1893 if self.command_stack: 

1894 raise RedisError( 

1895 "Commands without an initial WATCH have already been issued" 

1896 ) 

1897 self.explicit_transaction = True 

1898 

1899 def execute_command(self, *args, **kwargs): 

1900 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1901 return self.immediate_execute_command(*args, **kwargs) 

1902 return self.pipeline_execute_command(*args, **kwargs) 

1903 

1904 def _disconnect_reset_raise(self, conn, error): 

1905 """ 

1906 Close the connection, reset watching state and 

1907 raise an exception if we were watching, 

1908 retry_on_timeout is not set, 

1909 or the error is not a TimeoutError 

1910 """ 

1911 conn.disconnect() 

1912 # if we were already watching a variable, the watch is no longer 

1913 # valid since this connection has died. raise a WatchError, which 

1914 # indicates the user should retry this transaction. 

1915 if self.watching: 

1916 self.reset() 

1917 raise WatchError( 

1918 "A ConnectionError occurred on while watching one or more keys" 

1919 ) 

1920 # if retry_on_timeout is not set, or the error is not 

1921 # a TimeoutError, raise it 

1922 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): 

1923 self.reset() 

1924 raise 

1925 

1926 def immediate_execute_command(self, *args, **options): 

1927 """ 

1928 Execute a command immediately, but don't auto-retry on a 

1929 ConnectionError if we're already WATCHing a variable. Used when 

1930 issuing WATCH or subsequent commands retrieving their values but before 

1931 MULTI is called. 

1932 """ 

1933 command_name = args[0] 

1934 conn = self.connection 

1935 # if this is the first call, we need a connection 

1936 if not conn: 

1937 conn = self.connection_pool.get_connection(command_name, self.shard_hint) 

1938 self.connection = conn 

1939 

1940 return conn.retry.call_with_retry( 

1941 lambda: self._send_command_parse_response( 

1942 conn, command_name, *args, **options 

1943 ), 

1944 lambda error: self._disconnect_reset_raise(conn, error), 

1945 ) 

1946 

1947 def pipeline_execute_command(self, *args, **options): 

1948 """ 

1949 Stage a command to be executed when execute() is next called 

1950 

1951 Returns the current Pipeline object back so commands can be 

1952 chained together, such as: 

1953 

1954 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1955 

1956 At some other point, you can then run: pipe.execute(), 

1957 which will execute all commands queued in the pipe. 

1958 """ 

1959 self.command_stack.append((args, options)) 

1960 return self 

1961 

1962 def _execute_transaction(self, connection, commands, raise_on_error): 

1963 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1964 all_cmds = connection.pack_commands( 

1965 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1966 ) 

1967 connection.send_packed_command(all_cmds) 

1968 errors = [] 

1969 

1970 # parse off the response for MULTI 

1971 # NOTE: we need to handle ResponseErrors here and continue 

1972 # so that we read all the additional command messages from 

1973 # the socket 

1974 try: 

1975 self.parse_response(connection, "_") 

1976 except ResponseError as e: 

1977 errors.append((0, e)) 

1978 

1979 # and all the other commands 

1980 for i, command in enumerate(commands): 

1981 if EMPTY_RESPONSE in command[1]: 

1982 errors.append((i, command[1][EMPTY_RESPONSE])) 

1983 else: 

1984 try: 

1985 self.parse_response(connection, "_") 

1986 except ResponseError as e: 

1987 self.annotate_exception(e, i + 1, command[0]) 

1988 errors.append((i, e)) 

1989 

1990 # parse the EXEC. 

1991 try: 

1992 response = self.parse_response(connection, "_") 

1993 except ExecAbortError: 

1994 if errors: 

1995 raise errors[0][1] 

1996 raise 

1997 

1998 # EXEC clears any watched keys 

1999 self.watching = False 

2000 

2001 if response is None: 

2002 raise WatchError("Watched variable changed.") 

2003 

2004 # put any parse errors into the response 

2005 for i, e in errors: 

2006 response.insert(i, e) 

2007 

2008 if len(response) != len(commands): 

2009 self.connection.disconnect() 

2010 raise ResponseError( 

2011 "Wrong number of response items from pipeline execution" 

2012 ) 

2013 

2014 # find any errors in the response and raise if necessary 

2015 if raise_on_error: 

2016 self.raise_first_error(commands, response) 

2017 

2018 # We have to run response callbacks manually 

2019 data = [] 

2020 for r, cmd in zip(response, commands): 

2021 if not isinstance(r, Exception): 

2022 args, options = cmd 

2023 command_name = args[0] 

2024 if command_name in self.response_callbacks: 

2025 r = self.response_callbacks[command_name](r, **options) 

2026 data.append(r) 

2027 return data 

2028 

2029 def _execute_pipeline(self, connection, commands, raise_on_error): 

2030 # build up all commands into a single request to increase network perf 

2031 all_cmds = connection.pack_commands([args for args, _ in commands]) 

2032 connection.send_packed_command(all_cmds) 

2033 

2034 response = [] 

2035 for args, options in commands: 

2036 try: 

2037 response.append(self.parse_response(connection, args[0], **options)) 

2038 except ResponseError as e: 

2039 response.append(e) 

2040 

2041 if raise_on_error: 

2042 self.raise_first_error(commands, response) 

2043 return response 

2044 

2045 def raise_first_error(self, commands, response): 

2046 for i, r in enumerate(response): 

2047 if isinstance(r, ResponseError): 

2048 self.annotate_exception(r, i + 1, commands[i][0]) 

2049 raise r 

2050 

2051 def annotate_exception(self, exception, number, command): 

2052 cmd = " ".join(map(safe_str, command)) 

2053 msg = ( 

2054 f"Command # {number} ({cmd}) of pipeline " 

2055 f"caused error: {exception.args[0]}" 

2056 ) 

2057 exception.args = (msg,) + exception.args[1:] 

2058 

2059 def parse_response(self, connection, command_name, **options): 

2060 result = Redis.parse_response(self, connection, command_name, **options) 

2061 if command_name in self.UNWATCH_COMMANDS: 

2062 self.watching = False 

2063 elif command_name == "WATCH": 

2064 self.watching = True 

2065 return result 

2066 

2067 def load_scripts(self): 

2068 # make sure all scripts that are about to be run on this pipeline exist 

2069 scripts = list(self.scripts) 

2070 immediate = self.immediate_execute_command 

2071 shas = [s.sha for s in scripts] 

2072 # we can't use the normal script_* methods because they would just 

2073 # get buffered in the pipeline. 

2074 exists = immediate("SCRIPT EXISTS", *shas) 

2075 if not all(exists): 

2076 for s, exist in zip(scripts, exists): 

2077 if not exist: 

2078 s.sha = immediate("SCRIPT LOAD", s.script) 

2079 

2080 def _disconnect_raise_reset(self, conn, error): 

2081 """ 

2082 Close the connection, raise an exception if we were watching, 

2083 and raise an exception if retry_on_timeout is not set, 

2084 or the error is not a TimeoutError 

2085 """ 

2086 conn.disconnect() 

2087 # if we were watching a variable, the watch is no longer valid 

2088 # since this connection has died. raise a WatchError, which 

2089 # indicates the user should retry this transaction. 

2090 if self.watching: 

2091 raise WatchError( 

2092 "A ConnectionError occurred on while watching one or more keys" 

2093 ) 

2094 # if retry_on_timeout is not set, or the error is not 

2095 # a TimeoutError, raise it 

2096 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): 

2097 self.reset() 

2098 raise 

2099 

2100 def execute(self, raise_on_error=True): 

2101 """Execute all the commands in the current pipeline""" 

2102 stack = self.command_stack 

2103 if not stack and not self.watching: 

2104 return [] 

2105 if self.scripts: 

2106 self.load_scripts() 

2107 if self.transaction or self.explicit_transaction: 

2108 execute = self._execute_transaction 

2109 else: 

2110 execute = self._execute_pipeline 

2111 

2112 conn = self.connection 

2113 if not conn: 

2114 conn = self.connection_pool.get_connection("MULTI", self.shard_hint) 

2115 # assign to self.connection so reset() releases the connection 

2116 # back to the pool after we're done 

2117 self.connection = conn 

2118 

2119 try: 

2120 return conn.retry.call_with_retry( 

2121 lambda: execute(conn, stack, raise_on_error), 

2122 lambda error: self._disconnect_raise_reset(conn, error), 

2123 ) 

2124 finally: 

2125 self.reset() 

2126 

2127 def discard(self): 

2128 """ 

2129 Flushes all previously queued commands 

2130 See: https://redis.io/commands/DISCARD 

2131 """ 

2132 self.execute_command("DISCARD") 

2133 

2134 def watch(self, *names): 

2135 """Watches the values at keys ``names``""" 

2136 if self.explicit_transaction: 

2137 raise RedisError("Cannot issue a WATCH after a MULTI") 

2138 return self.execute_command("WATCH", *names) 

2139 

2140 def unwatch(self): 

2141 """Unwatches all previously specified keys""" 

2142 return self.watching and self.execute_command("UNWATCH") or True