Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/client.py: 24%

933 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:09 +0000

1import copy 

2import datetime 

3import re 

4import threading 

5import time 

6import warnings 

7from itertools import chain 

8from typing import Optional 

9 

10from redis.commands import ( 

11 CoreCommands, 

12 RedisModuleCommands, 

13 SentinelCommands, 

14 list_or_args, 

15) 

16from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection 

17from redis.credentials import CredentialProvider 

18from redis.exceptions import ( 

19 ConnectionError, 

20 ExecAbortError, 

21 ModuleError, 

22 PubSubError, 

23 RedisError, 

24 ResponseError, 

25 TimeoutError, 

26 WatchError, 

27) 

28from redis.lock import Lock 

29from redis.retry import Retry 

30from redis.utils import safe_str, str_if_bytes 

31 

32SYM_EMPTY = b"" 

33EMPTY_RESPONSE = "EMPTY_RESPONSE" 

34 

35# some responses (ie. dump) are binary, and just meant to never be decoded 

36NEVER_DECODE = "NEVER_DECODE" 

37 

38 

39def timestamp_to_datetime(response): 

40 "Converts a unix timestamp to a Python datetime object" 

41 if not response: 

42 return None 

43 try: 

44 response = int(response) 

45 except ValueError: 

46 return None 

47 return datetime.datetime.fromtimestamp(response) 

48 

49 

50def string_keys_to_dict(key_string, callback): 

51 return dict.fromkeys(key_string.split(), callback) 

52 

53 

54class CaseInsensitiveDict(dict): 

55 "Case insensitive dict implementation. Assumes string keys only." 

56 

57 def __init__(self, data): 

58 for k, v in data.items(): 

59 self[k.upper()] = v 

60 

61 def __contains__(self, k): 

62 return super().__contains__(k.upper()) 

63 

64 def __delitem__(self, k): 

65 super().__delitem__(k.upper()) 

66 

67 def __getitem__(self, k): 

68 return super().__getitem__(k.upper()) 

69 

70 def get(self, k, default=None): 

71 return super().get(k.upper(), default) 

72 

73 def __setitem__(self, k, v): 

74 super().__setitem__(k.upper(), v) 

75 

76 def update(self, data): 

77 data = CaseInsensitiveDict(data) 

78 super().update(data) 

79 

80 

81def parse_debug_object(response): 

82 "Parse the results of Redis's DEBUG OBJECT command into a Python dict" 

83 # The 'type' of the object is the first item in the response, but isn't 

84 # prefixed with a name 

85 response = str_if_bytes(response) 

86 response = "type:" + response 

87 response = dict(kv.split(":") for kv in response.split()) 

88 

89 # parse some expected int values from the string response 

90 # note: this cmd isn't spec'd so these may not appear in all redis versions 

91 int_fields = ("refcount", "serializedlength", "lru", "lru_seconds_idle") 

92 for field in int_fields: 

93 if field in response: 

94 response[field] = int(response[field]) 

95 

96 return response 

97 

98 

99def parse_object(response, infotype): 

100 """Parse the results of an OBJECT command""" 

101 if infotype in ("idletime", "refcount"): 

102 return int_or_none(response) 

103 return response 

104 

105 

106def parse_info(response): 

107 """Parse the result of Redis's INFO command into a Python dict""" 

108 info = {} 

109 response = str_if_bytes(response) 

110 

111 def get_value(value): 

112 if "," not in value or "=" not in value: 

113 try: 

114 if "." in value: 

115 return float(value) 

116 else: 

117 return int(value) 

118 except ValueError: 

119 return value 

120 else: 

121 sub_dict = {} 

122 for item in value.split(","): 

123 k, v = item.rsplit("=", 1) 

124 sub_dict[k] = get_value(v) 

125 return sub_dict 

126 

127 for line in response.splitlines(): 

128 if line and not line.startswith("#"): 

129 if line.find(":") != -1: 

130 # Split, the info fields keys and values. 

131 # Note that the value may contain ':'. but the 'host:' 

132 # pseudo-command is the only case where the key contains ':' 

133 key, value = line.split(":", 1) 

134 if key == "cmdstat_host": 

135 key, value = line.rsplit(":", 1) 

136 

137 if key == "module": 

138 # Hardcode a list for key 'modules' since there could be 

139 # multiple lines that started with 'module' 

140 info.setdefault("modules", []).append(get_value(value)) 

141 else: 

142 info[key] = get_value(value) 

143 else: 

144 # if the line isn't splittable, append it to the "__raw__" key 

145 info.setdefault("__raw__", []).append(line) 

146 

147 return info 

148 

149 

150def parse_memory_stats(response, **kwargs): 

151 """Parse the results of MEMORY STATS""" 

152 stats = pairs_to_dict(response, decode_keys=True, decode_string_values=True) 

153 for key, value in stats.items(): 

154 if key.startswith("db."): 

155 stats[key] = pairs_to_dict( 

156 value, decode_keys=True, decode_string_values=True 

157 ) 

158 return stats 

159 

160 

161SENTINEL_STATE_TYPES = { 

162 "can-failover-its-master": int, 

163 "config-epoch": int, 

164 "down-after-milliseconds": int, 

165 "failover-timeout": int, 

166 "info-refresh": int, 

167 "last-hello-message": int, 

168 "last-ok-ping-reply": int, 

169 "last-ping-reply": int, 

170 "last-ping-sent": int, 

171 "master-link-down-time": int, 

172 "master-port": int, 

173 "num-other-sentinels": int, 

174 "num-slaves": int, 

175 "o-down-time": int, 

176 "pending-commands": int, 

177 "parallel-syncs": int, 

178 "port": int, 

179 "quorum": int, 

180 "role-reported-time": int, 

181 "s-down-time": int, 

182 "slave-priority": int, 

183 "slave-repl-offset": int, 

184 "voted-leader-epoch": int, 

185} 

186 

187 

188def parse_sentinel_state(item): 

189 result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES) 

190 flags = set(result["flags"].split(",")) 

191 for name, flag in ( 

192 ("is_master", "master"), 

193 ("is_slave", "slave"), 

194 ("is_sdown", "s_down"), 

195 ("is_odown", "o_down"), 

196 ("is_sentinel", "sentinel"), 

197 ("is_disconnected", "disconnected"), 

198 ("is_master_down", "master_down"), 

199 ): 

200 result[name] = flag in flags 

201 return result 

202 

203 

204def parse_sentinel_master(response): 

205 return parse_sentinel_state(map(str_if_bytes, response)) 

206 

207 

208def parse_sentinel_masters(response): 

209 result = {} 

210 for item in response: 

211 state = parse_sentinel_state(map(str_if_bytes, item)) 

212 result[state["name"]] = state 

213 return result 

214 

215 

216def parse_sentinel_slaves_and_sentinels(response): 

217 return [parse_sentinel_state(map(str_if_bytes, item)) for item in response] 

218 

219 

220def parse_sentinel_get_master(response): 

221 return response and (response[0], int(response[1])) or None 

222 

223 

224def pairs_to_dict(response, decode_keys=False, decode_string_values=False): 

225 """Create a dict given a list of key/value pairs""" 

226 if response is None: 

227 return {} 

228 if decode_keys or decode_string_values: 

229 # the iter form is faster, but I don't know how to make that work 

230 # with a str_if_bytes() map 

231 keys = response[::2] 

232 if decode_keys: 

233 keys = map(str_if_bytes, keys) 

234 values = response[1::2] 

235 if decode_string_values: 

236 values = map(str_if_bytes, values) 

237 return dict(zip(keys, values)) 

238 else: 

239 it = iter(response) 

240 return dict(zip(it, it)) 

241 

242 

243def pairs_to_dict_typed(response, type_info): 

244 it = iter(response) 

245 result = {} 

246 for key, value in zip(it, it): 

247 if key in type_info: 

248 try: 

249 value = type_info[key](value) 

250 except Exception: 

251 # if for some reason the value can't be coerced, just use 

252 # the string value 

253 pass 

254 result[key] = value 

255 return result 

256 

257 

258def zset_score_pairs(response, **options): 

259 """ 

260 If ``withscores`` is specified in the options, return the response as 

261 a list of (value, score) pairs 

262 """ 

263 if not response or not options.get("withscores"): 

264 return response 

265 score_cast_func = options.get("score_cast_func", float) 

266 it = iter(response) 

267 return list(zip(it, map(score_cast_func, it))) 

268 

269 

270def sort_return_tuples(response, **options): 

271 """ 

272 If ``groups`` is specified, return the response as a list of 

273 n-element tuples with n being the value found in options['groups'] 

274 """ 

275 if not response or not options.get("groups"): 

276 return response 

277 n = options["groups"] 

278 return list(zip(*[response[i::n] for i in range(n)])) 

279 

280 

281def int_or_none(response): 

282 if response is None: 

283 return None 

284 return int(response) 

285 

286 

287def parse_stream_list(response): 

288 if response is None: 

289 return None 

290 data = [] 

291 for r in response: 

292 if r is not None: 

293 data.append((r[0], pairs_to_dict(r[1]))) 

294 else: 

295 data.append((None, None)) 

296 return data 

297 

298 

299def pairs_to_dict_with_str_keys(response): 

300 return pairs_to_dict(response, decode_keys=True) 

301 

302 

303def parse_list_of_dicts(response): 

304 return list(map(pairs_to_dict_with_str_keys, response)) 

305 

306 

307def parse_xclaim(response, **options): 

308 if options.get("parse_justid", False): 

309 return response 

310 return parse_stream_list(response) 

311 

312 

313def parse_xautoclaim(response, **options): 

314 if options.get("parse_justid", False): 

315 return response[1] 

316 response[1] = parse_stream_list(response[1]) 

317 return response 

318 

319 

320def parse_xinfo_stream(response, **options): 

321 data = pairs_to_dict(response, decode_keys=True) 

322 if not options.get("full", False): 

323 first = data["first-entry"] 

324 if first is not None: 

325 data["first-entry"] = (first[0], pairs_to_dict(first[1])) 

326 last = data["last-entry"] 

327 if last is not None: 

328 data["last-entry"] = (last[0], pairs_to_dict(last[1])) 

329 else: 

330 data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]} 

331 data["groups"] = [ 

332 pairs_to_dict(group, decode_keys=True) for group in data["groups"] 

333 ] 

334 return data 

335 

336 

337def parse_xread(response): 

338 if response is None: 

339 return [] 

340 return [[r[0], parse_stream_list(r[1])] for r in response] 

341 

342 

343def parse_xpending(response, **options): 

344 if options.get("parse_detail", False): 

345 return parse_xpending_range(response) 

346 consumers = [{"name": n, "pending": int(p)} for n, p in response[3] or []] 

347 return { 

348 "pending": response[0], 

349 "min": response[1], 

350 "max": response[2], 

351 "consumers": consumers, 

352 } 

353 

354 

355def parse_xpending_range(response): 

356 k = ("message_id", "consumer", "time_since_delivered", "times_delivered") 

357 return [dict(zip(k, r)) for r in response] 

358 

359 

360def float_or_none(response): 

361 if response is None: 

362 return None 

363 return float(response) 

364 

365 

366def bool_ok(response): 

367 return str_if_bytes(response) == "OK" 

368 

369 

370def parse_zadd(response, **options): 

371 if response is None: 

372 return None 

373 if options.get("as_score"): 

374 return float(response) 

375 return int(response) 

376 

377 

378def parse_client_list(response, **options): 

379 clients = [] 

380 for c in str_if_bytes(response).splitlines(): 

381 # Values might contain '=' 

382 clients.append(dict(pair.split("=", 1) for pair in c.split(" "))) 

383 return clients 

384 

385 

386def parse_config_get(response, **options): 

387 response = [str_if_bytes(i) if i is not None else None for i in response] 

388 return response and pairs_to_dict(response) or {} 

389 

390 

391def parse_scan(response, **options): 

392 cursor, r = response 

393 return int(cursor), r 

394 

395 

396def parse_hscan(response, **options): 

397 cursor, r = response 

398 return int(cursor), r and pairs_to_dict(r) or {} 

399 

400 

401def parse_zscan(response, **options): 

402 score_cast_func = options.get("score_cast_func", float) 

403 cursor, r = response 

404 it = iter(r) 

405 return int(cursor), list(zip(it, map(score_cast_func, it))) 

406 

407 

408def parse_zmscore(response, **options): 

409 # zmscore: list of scores (double precision floating point number) or nil 

410 return [float(score) if score is not None else None for score in response] 

411 

412 

413def parse_slowlog_get(response, **options): 

414 space = " " if options.get("decode_responses", False) else b" " 

415 

416 def parse_item(item): 

417 result = {"id": item[0], "start_time": int(item[1]), "duration": int(item[2])} 

418 # Redis Enterprise injects another entry at index [3], which has 

419 # the complexity info (i.e. the value N in case the command has 

420 # an O(N) complexity) instead of the command. 

421 if isinstance(item[3], list): 

422 result["command"] = space.join(item[3]) 

423 else: 

424 result["complexity"] = item[3] 

425 result["command"] = space.join(item[4]) 

426 return result 

427 

428 return [parse_item(item) for item in response] 

429 

430 

431def parse_stralgo(response, **options): 

432 """ 

433 Parse the response from `STRALGO` command. 

434 Without modifiers the returned value is string. 

435 When LEN is given the command returns the length of the result 

436 (i.e integer). 

437 When IDX is given the command returns a dictionary with the LCS 

438 length and all the ranges in both the strings, start and end 

439 offset for each string, where there are matches. 

440 When WITHMATCHLEN is given, each array representing a match will 

441 also have the length of the match at the beginning of the array. 

442 """ 

443 if options.get("len", False): 

444 return int(response) 

445 if options.get("idx", False): 

446 if options.get("withmatchlen", False): 

447 matches = [ 

448 [(int(match[-1]))] + list(map(tuple, match[:-1])) 

449 for match in response[1] 

450 ] 

451 else: 

452 matches = [list(map(tuple, match)) for match in response[1]] 

453 return { 

454 str_if_bytes(response[0]): matches, 

455 str_if_bytes(response[2]): int(response[3]), 

456 } 

457 return str_if_bytes(response) 

458 

459 

460def parse_cluster_info(response, **options): 

461 response = str_if_bytes(response) 

462 return dict(line.split(":") for line in response.splitlines() if line) 

463 

464 

465def _parse_node_line(line): 

466 line_items = line.split(" ") 

467 node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8] 

468 addr = addr.split("@")[0] 

469 node_dict = { 

470 "node_id": node_id, 

471 "flags": flags, 

472 "master_id": master_id, 

473 "last_ping_sent": ping, 

474 "last_pong_rcvd": pong, 

475 "epoch": epoch, 

476 "slots": [], 

477 "migrations": [], 

478 "connected": True if connected == "connected" else False, 

479 } 

480 if len(line_items) >= 9: 

481 slots, migrations = _parse_slots(line_items[8:]) 

482 node_dict["slots"], node_dict["migrations"] = slots, migrations 

483 return addr, node_dict 

484 

485 

486def _parse_slots(slot_ranges): 

487 slots, migrations = [], [] 

488 for s_range in slot_ranges: 

489 if "->-" in s_range: 

490 slot_id, dst_node_id = s_range[1:-1].split("->-", 1) 

491 migrations.append( 

492 {"slot": slot_id, "node_id": dst_node_id, "state": "migrating"} 

493 ) 

494 elif "-<-" in s_range: 

495 slot_id, src_node_id = s_range[1:-1].split("-<-", 1) 

496 migrations.append( 

497 {"slot": slot_id, "node_id": src_node_id, "state": "importing"} 

498 ) 

499 else: 

500 s_range = [sl for sl in s_range.split("-")] 

501 slots.append(s_range) 

502 

503 return slots, migrations 

504 

505 

506def parse_cluster_nodes(response, **options): 

507 """ 

508 @see: https://redis.io/commands/cluster-nodes # string / bytes 

509 @see: https://redis.io/commands/cluster-replicas # list of string / bytes 

510 """ 

511 if isinstance(response, (str, bytes)): 

512 response = response.splitlines() 

513 return dict(_parse_node_line(str_if_bytes(node)) for node in response) 

514 

515 

516def parse_geosearch_generic(response, **options): 

517 """ 

518 Parse the response of 'GEOSEARCH', GEORADIUS' and 'GEORADIUSBYMEMBER' 

519 commands according to 'withdist', 'withhash' and 'withcoord' labels. 

520 """ 

521 if options["store"] or options["store_dist"]: 

522 # `store` and `store_dist` cant be combined 

523 # with other command arguments. 

524 # relevant to 'GEORADIUS' and 'GEORADIUSBYMEMBER' 

525 return response 

526 

527 if type(response) != list: 

528 response_list = [response] 

529 else: 

530 response_list = response 

531 

532 if not options["withdist"] and not options["withcoord"] and not options["withhash"]: 

533 # just a bunch of places 

534 return response_list 

535 

536 cast = { 

537 "withdist": float, 

538 "withcoord": lambda ll: (float(ll[0]), float(ll[1])), 

539 "withhash": int, 

540 } 

541 

542 # zip all output results with each casting function to get 

543 # the properly native Python value. 

544 f = [lambda x: x] 

545 f += [cast[o] for o in ["withdist", "withhash", "withcoord"] if options[o]] 

546 return [list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list] 

547 

548 

549def parse_command(response, **options): 

550 commands = {} 

551 for command in response: 

552 cmd_dict = {} 

553 cmd_name = str_if_bytes(command[0]) 

554 cmd_dict["name"] = cmd_name 

555 cmd_dict["arity"] = int(command[1]) 

556 cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]] 

557 cmd_dict["first_key_pos"] = command[3] 

558 cmd_dict["last_key_pos"] = command[4] 

559 cmd_dict["step_count"] = command[5] 

560 if len(command) > 7: 

561 cmd_dict["tips"] = command[7] 

562 cmd_dict["key_specifications"] = command[8] 

563 cmd_dict["subcommands"] = command[9] 

564 commands[cmd_name] = cmd_dict 

565 return commands 

566 

567 

568def parse_pubsub_numsub(response, **options): 

569 return list(zip(response[0::2], response[1::2])) 

570 

571 

572def parse_client_kill(response, **options): 

573 if isinstance(response, int): 

574 return response 

575 return str_if_bytes(response) == "OK" 

576 

577 

578def parse_acl_getuser(response, **options): 

579 if response is None: 

580 return None 

581 data = pairs_to_dict(response, decode_keys=True) 

582 

583 # convert everything but user-defined data in 'keys' to native strings 

584 data["flags"] = list(map(str_if_bytes, data["flags"])) 

585 data["passwords"] = list(map(str_if_bytes, data["passwords"])) 

586 data["commands"] = str_if_bytes(data["commands"]) 

587 if isinstance(data["keys"], str) or isinstance(data["keys"], bytes): 

588 data["keys"] = list(str_if_bytes(data["keys"]).split(" ")) 

589 if data["keys"] == [""]: 

590 data["keys"] = [] 

591 if "channels" in data: 

592 if isinstance(data["channels"], str) or isinstance(data["channels"], bytes): 

593 data["channels"] = list(str_if_bytes(data["channels"]).split(" ")) 

594 if data["channels"] == [""]: 

595 data["channels"] = [] 

596 if "selectors" in data: 

597 data["selectors"] = [ 

598 list(map(str_if_bytes, selector)) for selector in data["selectors"] 

599 ] 

600 

601 # split 'commands' into separate 'categories' and 'commands' lists 

602 commands, categories = [], [] 

603 for command in data["commands"].split(" "): 

604 if "@" in command: 

605 categories.append(command) 

606 else: 

607 commands.append(command) 

608 

609 data["commands"] = commands 

610 data["categories"] = categories 

611 data["enabled"] = "on" in data["flags"] 

612 return data 

613 

614 

615def parse_acl_log(response, **options): 

616 if response is None: 

617 return None 

618 if isinstance(response, list): 

619 data = [] 

620 for log in response: 

621 log_data = pairs_to_dict(log, True, True) 

622 client_info = log_data.get("client-info", "") 

623 log_data["client-info"] = parse_client_info(client_info) 

624 

625 # float() is lossy comparing to the "double" in C 

626 log_data["age-seconds"] = float(log_data["age-seconds"]) 

627 data.append(log_data) 

628 else: 

629 data = bool_ok(response) 

630 return data 

631 

632 

633def parse_client_info(value): 

634 """ 

635 Parsing client-info in ACL Log in following format. 

636 "key1=value1 key2=value2 key3=value3" 

637 """ 

638 client_info = {} 

639 infos = str_if_bytes(value).split(" ") 

640 for info in infos: 

641 key, value = info.split("=") 

642 client_info[key] = value 

643 

644 # Those fields are defined as int in networking.c 

645 for int_key in { 

646 "id", 

647 "age", 

648 "idle", 

649 "db", 

650 "sub", 

651 "psub", 

652 "multi", 

653 "qbuf", 

654 "qbuf-free", 

655 "obl", 

656 "argv-mem", 

657 "oll", 

658 "omem", 

659 "tot-mem", 

660 }: 

661 client_info[int_key] = int(client_info[int_key]) 

662 return client_info 

663 

664 

665def parse_module_result(response): 

666 if isinstance(response, ModuleError): 

667 raise response 

668 return True 

669 

670 

671def parse_set_result(response, **options): 

672 """ 

673 Handle SET result since GET argument is available since Redis 6.2. 

674 Parsing SET result into: 

675 - BOOL 

676 - String when GET argument is used 

677 """ 

678 if options.get("get"): 

679 # Redis will return a getCommand result. 

680 # See `setGenericCommand` in t_string.c 

681 return response 

682 return response and str_if_bytes(response) == "OK" 

683 

684 

685class AbstractRedis: 

686 RESPONSE_CALLBACKS = { 

687 **string_keys_to_dict( 

688 "AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT " 

689 "HEXISTS HMSET MOVE MSETNX PERSIST " 

690 "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX", 

691 bool, 

692 ), 

693 **string_keys_to_dict( 

694 "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN " 

695 "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD " 

696 "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN " 

697 "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM " 

698 "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE", 

699 int, 

700 ), 

701 **string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float), 

702 **string_keys_to_dict( 

703 # these return OK, or int if redis-server is >=1.3.4 

704 "LPUSH RPUSH", 

705 lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK", 

706 ), 

707 **string_keys_to_dict("SORT", sort_return_tuples), 

708 **string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none), 

709 **string_keys_to_dict( 

710 "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READONLY READWRITE " 

711 "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ", 

712 bool_ok, 

713 ), 

714 **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None), 

715 **string_keys_to_dict( 

716 "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set() 

717 ), 

718 **string_keys_to_dict( 

719 "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE " 

720 "ZREVRANGE ZREVRANGEBYSCORE", 

721 zset_score_pairs, 

722 ), 

723 **string_keys_to_dict( 

724 "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None 

725 ), 

726 **string_keys_to_dict("ZRANK ZREVRANK", int_or_none), 

727 **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list), 

728 **string_keys_to_dict("XREAD XREADGROUP", parse_xread), 

729 **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True), 

730 "ACL CAT": lambda r: list(map(str_if_bytes, r)), 

731 "ACL DELUSER": int, 

732 "ACL GENPASS": str_if_bytes, 

733 "ACL GETUSER": parse_acl_getuser, 

734 "ACL HELP": lambda r: list(map(str_if_bytes, r)), 

735 "ACL LIST": lambda r: list(map(str_if_bytes, r)), 

736 "ACL LOAD": bool_ok, 

737 "ACL LOG": parse_acl_log, 

738 "ACL SAVE": bool_ok, 

739 "ACL SETUSER": bool_ok, 

740 "ACL USERS": lambda r: list(map(str_if_bytes, r)), 

741 "ACL WHOAMI": str_if_bytes, 

742 "CLIENT GETNAME": str_if_bytes, 

743 "CLIENT ID": int, 

744 "CLIENT KILL": parse_client_kill, 

745 "CLIENT LIST": parse_client_list, 

746 "CLIENT INFO": parse_client_info, 

747 "CLIENT SETNAME": bool_ok, 

748 "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False, 

749 "CLIENT PAUSE": bool_ok, 

750 "CLIENT GETREDIR": int, 

751 "CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)), 

752 "CLUSTER ADDSLOTS": bool_ok, 

753 "CLUSTER ADDSLOTSRANGE": bool_ok, 

754 "CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x), 

755 "CLUSTER COUNTKEYSINSLOT": lambda x: int(x), 

756 "CLUSTER DELSLOTS": bool_ok, 

757 "CLUSTER DELSLOTSRANGE": bool_ok, 

758 "CLUSTER FAILOVER": bool_ok, 

759 "CLUSTER FORGET": bool_ok, 

760 "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)), 

761 "CLUSTER INFO": parse_cluster_info, 

762 "CLUSTER KEYSLOT": lambda x: int(x), 

763 "CLUSTER MEET": bool_ok, 

764 "CLUSTER NODES": parse_cluster_nodes, 

765 "CLUSTER REPLICAS": parse_cluster_nodes, 

766 "CLUSTER REPLICATE": bool_ok, 

767 "CLUSTER RESET": bool_ok, 

768 "CLUSTER SAVECONFIG": bool_ok, 

769 "CLUSTER SET-CONFIG-EPOCH": bool_ok, 

770 "CLUSTER SETSLOT": bool_ok, 

771 "CLUSTER SLAVES": parse_cluster_nodes, 

772 "COMMAND": parse_command, 

773 "COMMAND COUNT": int, 

774 "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)), 

775 "CONFIG GET": parse_config_get, 

776 "CONFIG RESETSTAT": bool_ok, 

777 "CONFIG SET": bool_ok, 

778 "DEBUG OBJECT": parse_debug_object, 

779 "FUNCTION DELETE": bool_ok, 

780 "FUNCTION FLUSH": bool_ok, 

781 "FUNCTION RESTORE": bool_ok, 

782 "GEOHASH": lambda r: list(map(str_if_bytes, r)), 

783 "GEOPOS": lambda r: list( 

784 map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r) 

785 ), 

786 "GEOSEARCH": parse_geosearch_generic, 

787 "GEORADIUS": parse_geosearch_generic, 

788 "GEORADIUSBYMEMBER": parse_geosearch_generic, 

789 "HGETALL": lambda r: r and pairs_to_dict(r) or {}, 

790 "HSCAN": parse_hscan, 

791 "INFO": parse_info, 

792 "LASTSAVE": timestamp_to_datetime, 

793 "MEMORY PURGE": bool_ok, 

794 "MEMORY STATS": parse_memory_stats, 

795 "MEMORY USAGE": int_or_none, 

796 "MODULE LOAD": parse_module_result, 

797 "MODULE UNLOAD": parse_module_result, 

798 "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r], 

799 "OBJECT": parse_object, 

800 "PING": lambda r: str_if_bytes(r) == "PONG", 

801 "QUIT": bool_ok, 

802 "STRALGO": parse_stralgo, 

803 "PUBSUB NUMSUB": parse_pubsub_numsub, 

804 "RANDOMKEY": lambda r: r and r or None, 

805 "RESET": str_if_bytes, 

806 "SCAN": parse_scan, 

807 "SCRIPT EXISTS": lambda r: list(map(bool, r)), 

808 "SCRIPT FLUSH": bool_ok, 

809 "SCRIPT KILL": bool_ok, 

810 "SCRIPT LOAD": str_if_bytes, 

811 "SENTINEL CKQUORUM": bool_ok, 

812 "SENTINEL FAILOVER": bool_ok, 

813 "SENTINEL FLUSHCONFIG": bool_ok, 

814 "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master, 

815 "SENTINEL MASTER": parse_sentinel_master, 

816 "SENTINEL MASTERS": parse_sentinel_masters, 

817 "SENTINEL MONITOR": bool_ok, 

818 "SENTINEL RESET": bool_ok, 

819 "SENTINEL REMOVE": bool_ok, 

820 "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels, 

821 "SENTINEL SET": bool_ok, 

822 "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels, 

823 "SET": parse_set_result, 

824 "SLOWLOG GET": parse_slowlog_get, 

825 "SLOWLOG LEN": int, 

826 "SLOWLOG RESET": bool_ok, 

827 "SSCAN": parse_scan, 

828 "TIME": lambda x: (int(x[0]), int(x[1])), 

829 "XCLAIM": parse_xclaim, 

830 "XAUTOCLAIM": parse_xautoclaim, 

831 "XGROUP CREATE": bool_ok, 

832 "XGROUP DELCONSUMER": int, 

833 "XGROUP DESTROY": bool, 

834 "XGROUP SETID": bool_ok, 

835 "XINFO CONSUMERS": parse_list_of_dicts, 

836 "XINFO GROUPS": parse_list_of_dicts, 

837 "XINFO STREAM": parse_xinfo_stream, 

838 "XPENDING": parse_xpending, 

839 "ZADD": parse_zadd, 

840 "ZSCAN": parse_zscan, 

841 "ZMSCORE": parse_zmscore, 

842 } 

843 

844 

845class Redis(AbstractRedis, RedisModuleCommands, CoreCommands, SentinelCommands): 

846 """ 

847 Implementation of the Redis protocol. 

848 

849 This abstract class provides a Python interface to all Redis commands 

850 and an implementation of the Redis protocol. 

851 

852 Pipelines derive from this, implementing how 

853 the commands are sent and received to the Redis server. Based on 

854 configuration, an instance will either use a ConnectionPool, or 

855 Connection object to talk to redis. 

856 

857 It is not safe to pass PubSub or Pipeline objects between threads. 

858 """ 

859 

860 @classmethod 

861 def from_url(cls, url, **kwargs): 

862 """ 

863 Return a Redis client object configured from the given URL 

864 

865 For example:: 

866 

867 redis://[[username]:[password]]@localhost:6379/0 

868 rediss://[[username]:[password]]@localhost:6379/0 

869 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

870 

871 Three URL schemes are supported: 

872 

873 - `redis://` creates a TCP socket connection. See more at: 

874 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

875 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

876 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

877 - ``unix://``: creates a Unix Domain Socket connection. 

878 

879 The username, password, hostname, path and all querystring values 

880 are passed through urllib.parse.unquote in order to replace any 

881 percent-encoded values with their corresponding characters. 

882 

883 There are several ways to specify a database number. The first value 

884 found will be used: 

885 

886 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

887 2. If using the redis:// or rediss:// schemes, the path argument 

888 of the url, e.g. redis://localhost/0 

889 3. A ``db`` keyword argument to this function. 

890 

891 If none of these options are specified, the default db=0 is used. 

892 

893 All querystring options are cast to their appropriate Python types. 

894 Boolean arguments can be specified with string values "True"/"False" 

895 or "Yes"/"No". Values that cannot be properly cast cause a 

896 ``ValueError`` to be raised. Once parsed, the querystring arguments 

897 and keyword arguments are passed to the ``ConnectionPool``'s 

898 class initializer. In the case of conflicting arguments, querystring 

899 arguments always win. 

900 

901 """ 

902 connection_pool = ConnectionPool.from_url(url, **kwargs) 

903 return cls(connection_pool=connection_pool) 

904 

905 def __init__( 

906 self, 

907 host="localhost", 

908 port=6379, 

909 db=0, 

910 password=None, 

911 socket_timeout=None, 

912 socket_connect_timeout=None, 

913 socket_keepalive=None, 

914 socket_keepalive_options=None, 

915 connection_pool=None, 

916 unix_socket_path=None, 

917 encoding="utf-8", 

918 encoding_errors="strict", 

919 charset=None, 

920 errors=None, 

921 decode_responses=False, 

922 retry_on_timeout=False, 

923 retry_on_error=None, 

924 ssl=False, 

925 ssl_keyfile=None, 

926 ssl_certfile=None, 

927 ssl_cert_reqs="required", 

928 ssl_ca_certs=None, 

929 ssl_ca_path=None, 

930 ssl_ca_data=None, 

931 ssl_check_hostname=False, 

932 ssl_password=None, 

933 ssl_validate_ocsp=False, 

934 ssl_validate_ocsp_stapled=False, 

935 ssl_ocsp_context=None, 

936 ssl_ocsp_expected_cert=None, 

937 max_connections=None, 

938 single_connection_client=False, 

939 health_check_interval=0, 

940 client_name=None, 

941 username=None, 

942 retry=None, 

943 redis_connect_func=None, 

944 credential_provider: Optional[CredentialProvider] = None, 

945 ): 

946 """ 

947 Initialize a new Redis client. 

948 To specify a retry policy for specific errors, first set 

949 `retry_on_error` to a list of the error/s to retry on, then set 

950 `retry` to a valid `Retry` object. 

951 To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. 

952 

953 Args: 

954 

955 single_connection_client: 

956 if `True`, connection pool is not used. In that case `Redis` 

957 instance use is not thread safe. 

958 """ 

959 if not connection_pool: 

960 if charset is not None: 

961 warnings.warn( 

962 DeprecationWarning( 

963 '"charset" is deprecated. Use "encoding" instead' 

964 ) 

965 ) 

966 encoding = charset 

967 if errors is not None: 

968 warnings.warn( 

969 DeprecationWarning( 

970 '"errors" is deprecated. Use "encoding_errors" instead' 

971 ) 

972 ) 

973 encoding_errors = errors 

974 if not retry_on_error: 

975 retry_on_error = [] 

976 if retry_on_timeout is True: 

977 retry_on_error.append(TimeoutError) 

978 kwargs = { 

979 "db": db, 

980 "username": username, 

981 "password": password, 

982 "socket_timeout": socket_timeout, 

983 "encoding": encoding, 

984 "encoding_errors": encoding_errors, 

985 "decode_responses": decode_responses, 

986 "retry_on_error": retry_on_error, 

987 "retry": copy.deepcopy(retry), 

988 "max_connections": max_connections, 

989 "health_check_interval": health_check_interval, 

990 "client_name": client_name, 

991 "redis_connect_func": redis_connect_func, 

992 "credential_provider": credential_provider, 

993 } 

994 # based on input, setup appropriate connection args 

995 if unix_socket_path is not None: 

996 kwargs.update( 

997 { 

998 "path": unix_socket_path, 

999 "connection_class": UnixDomainSocketConnection, 

1000 } 

1001 ) 

1002 else: 

1003 # TCP specific options 

1004 kwargs.update( 

1005 { 

1006 "host": host, 

1007 "port": port, 

1008 "socket_connect_timeout": socket_connect_timeout, 

1009 "socket_keepalive": socket_keepalive, 

1010 "socket_keepalive_options": socket_keepalive_options, 

1011 } 

1012 ) 

1013 

1014 if ssl: 

1015 kwargs.update( 

1016 { 

1017 "connection_class": SSLConnection, 

1018 "ssl_keyfile": ssl_keyfile, 

1019 "ssl_certfile": ssl_certfile, 

1020 "ssl_cert_reqs": ssl_cert_reqs, 

1021 "ssl_ca_certs": ssl_ca_certs, 

1022 "ssl_ca_data": ssl_ca_data, 

1023 "ssl_check_hostname": ssl_check_hostname, 

1024 "ssl_password": ssl_password, 

1025 "ssl_ca_path": ssl_ca_path, 

1026 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

1027 "ssl_validate_ocsp": ssl_validate_ocsp, 

1028 "ssl_ocsp_context": ssl_ocsp_context, 

1029 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

1030 } 

1031 ) 

1032 connection_pool = ConnectionPool(**kwargs) 

1033 self.connection_pool = connection_pool 

1034 self.connection = None 

1035 if single_connection_client: 

1036 self.connection = self.connection_pool.get_connection("_") 

1037 

1038 self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS) 

1039 

1040 def __repr__(self): 

1041 return f"{type(self).__name__}<{repr(self.connection_pool)}>" 

1042 

1043 def get_encoder(self): 

1044 """Get the connection pool's encoder""" 

1045 return self.connection_pool.get_encoder() 

1046 

1047 def get_connection_kwargs(self): 

1048 """Get the connection's key-word arguments""" 

1049 return self.connection_pool.connection_kwargs 

1050 

1051 def get_retry(self) -> Optional["Retry"]: 

1052 return self.get_connection_kwargs().get("retry") 

1053 

1054 def set_retry(self, retry: "Retry") -> None: 

1055 self.get_connection_kwargs().update({"retry": retry}) 

1056 self.connection_pool.set_retry(retry) 

1057 

1058 def set_response_callback(self, command, callback): 

1059 """Set a custom Response Callback""" 

1060 self.response_callbacks[command] = callback 

1061 

1062 def load_external_module(self, funcname, func): 

1063 """ 

1064 This function can be used to add externally defined redis modules, 

1065 and their namespaces to the redis client. 

1066 

1067 funcname - A string containing the name of the function to create 

1068 func - The function, being added to this class. 

1069 

1070 ex: Assume that one has a custom redis module named foomod that 

1071 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

1072 To load function functions into this namespace: 

1073 

1074 from redis import Redis 

1075 from foomodule import F 

1076 r = Redis() 

1077 r.load_external_module("foo", F) 

1078 r.foo().dothing('your', 'arguments') 

1079 

1080 For a concrete example see the reimport of the redisjson module in 

1081 tests/test_connection.py::test_loading_external_modules 

1082 """ 

1083 setattr(self, funcname, func) 

1084 

1085 def pipeline(self, transaction=True, shard_hint=None): 

1086 """ 

1087 Return a new pipeline object that can queue multiple commands for 

1088 later execution. ``transaction`` indicates whether all commands 

1089 should be executed atomically. Apart from making a group of operations 

1090 atomic, pipelines are useful for reducing the back-and-forth overhead 

1091 between the client and server. 

1092 """ 

1093 return Pipeline( 

1094 self.connection_pool, self.response_callbacks, transaction, shard_hint 

1095 ) 

1096 

1097 def transaction(self, func, *watches, **kwargs): 

1098 """ 

1099 Convenience method for executing the callable `func` as a transaction 

1100 while watching all keys specified in `watches`. The 'func' callable 

1101 should expect a single argument which is a Pipeline object. 

1102 """ 

1103 shard_hint = kwargs.pop("shard_hint", None) 

1104 value_from_callable = kwargs.pop("value_from_callable", False) 

1105 watch_delay = kwargs.pop("watch_delay", None) 

1106 with self.pipeline(True, shard_hint) as pipe: 

1107 while True: 

1108 try: 

1109 if watches: 

1110 pipe.watch(*watches) 

1111 func_value = func(pipe) 

1112 exec_value = pipe.execute() 

1113 return func_value if value_from_callable else exec_value 

1114 except WatchError: 

1115 if watch_delay is not None and watch_delay > 0: 

1116 time.sleep(watch_delay) 

1117 continue 

1118 

1119 def lock( 

1120 self, 

1121 name, 

1122 timeout=None, 

1123 sleep=0.1, 

1124 blocking=True, 

1125 blocking_timeout=None, 

1126 lock_class=None, 

1127 thread_local=True, 

1128 ): 

1129 """ 

1130 Return a new Lock object using key ``name`` that mimics 

1131 the behavior of threading.Lock. 

1132 

1133 If specified, ``timeout`` indicates a maximum life for the lock. 

1134 By default, it will remain locked until release() is called. 

1135 

1136 ``sleep`` indicates the amount of time to sleep per loop iteration 

1137 when the lock is in blocking mode and another client is currently 

1138 holding the lock. 

1139 

1140 ``blocking`` indicates whether calling ``acquire`` should block until 

1141 the lock has been acquired or to fail immediately, causing ``acquire`` 

1142 to return False and the lock not being acquired. Defaults to True. 

1143 Note this value can be overridden by passing a ``blocking`` 

1144 argument to ``acquire``. 

1145 

1146 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1147 spend trying to acquire the lock. A value of ``None`` indicates 

1148 continue trying forever. ``blocking_timeout`` can be specified as a 

1149 float or integer, both representing the number of seconds to wait. 

1150 

1151 ``lock_class`` forces the specified lock implementation. Note that as 

1152 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1153 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1154 you have created your own custom lock class. 

1155 

1156 ``thread_local`` indicates whether the lock token is placed in 

1157 thread-local storage. By default, the token is placed in thread local 

1158 storage so that a thread only sees its token, not a token set by 

1159 another thread. Consider the following timeline: 

1160 

1161 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1162 thread-1 sets the token to "abc" 

1163 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1164 Lock instance. 

1165 time: 5, thread-1 has not yet completed. redis expires the lock 

1166 key. 

1167 time: 5, thread-2 acquired `my-lock` now that it's available. 

1168 thread-2 sets the token to "xyz" 

1169 time: 6, thread-1 finishes its work and calls release(). if the 

1170 token is *not* stored in thread local storage, then 

1171 thread-1 would see the token value as "xyz" and would be 

1172 able to successfully release the thread-2's lock. 

1173 

1174 In some use cases it's necessary to disable thread local storage. For 

1175 example, if you have code where one thread acquires a lock and passes 

1176 that lock instance to a worker thread to release later. If thread 

1177 local storage isn't disabled in this case, the worker thread won't see 

1178 the token set by the thread that acquired the lock. Our assumption 

1179 is that these cases aren't common and as such default to using 

1180 thread local storage.""" 

1181 if lock_class is None: 

1182 lock_class = Lock 

1183 return lock_class( 

1184 self, 

1185 name, 

1186 timeout=timeout, 

1187 sleep=sleep, 

1188 blocking=blocking, 

1189 blocking_timeout=blocking_timeout, 

1190 thread_local=thread_local, 

1191 ) 

1192 

1193 def pubsub(self, **kwargs): 

1194 """ 

1195 Return a Publish/Subscribe object. With this object, you can 

1196 subscribe to channels and listen for messages that get published to 

1197 them. 

1198 """ 

1199 return PubSub(self.connection_pool, **kwargs) 

1200 

1201 def monitor(self): 

1202 return Monitor(self.connection_pool) 

1203 

1204 def client(self): 

1205 return self.__class__( 

1206 connection_pool=self.connection_pool, single_connection_client=True 

1207 ) 

1208 

1209 def __enter__(self): 

1210 return self 

1211 

1212 def __exit__(self, exc_type, exc_value, traceback): 

1213 self.close() 

1214 

1215 def __del__(self): 

1216 self.close() 

1217 

1218 def close(self): 

1219 # In case a connection property does not yet exist 

1220 # (due to a crash earlier in the Redis() constructor), return 

1221 # immediately as there is nothing to clean-up. 

1222 if not hasattr(self, "connection"): 

1223 return 

1224 

1225 conn = self.connection 

1226 if conn: 

1227 self.connection = None 

1228 self.connection_pool.release(conn) 

1229 

1230 def _send_command_parse_response(self, conn, command_name, *args, **options): 

1231 """ 

1232 Send a command and parse the response 

1233 """ 

1234 conn.send_command(*args) 

1235 return self.parse_response(conn, command_name, **options) 

1236 

1237 def _disconnect_raise(self, conn, error): 

1238 """ 

1239 Close the connection and raise an exception 

1240 if retry_on_error is not set or the error 

1241 is not one of the specified error types 

1242 """ 

1243 conn.disconnect() 

1244 if ( 

1245 conn.retry_on_error is None 

1246 or isinstance(error, tuple(conn.retry_on_error)) is False 

1247 ): 

1248 raise error 

1249 

1250 # COMMAND EXECUTION AND PROTOCOL PARSING 

1251 def execute_command(self, *args, **options): 

1252 """Execute a command and return a parsed response""" 

1253 pool = self.connection_pool 

1254 command_name = args[0] 

1255 conn = self.connection or pool.get_connection(command_name, **options) 

1256 

1257 try: 

1258 return conn.retry.call_with_retry( 

1259 lambda: self._send_command_parse_response( 

1260 conn, command_name, *args, **options 

1261 ), 

1262 lambda error: self._disconnect_raise(conn, error), 

1263 ) 

1264 finally: 

1265 if not self.connection: 

1266 pool.release(conn) 

1267 

1268 def parse_response(self, connection, command_name, **options): 

1269 """Parses a response from the Redis server""" 

1270 try: 

1271 if NEVER_DECODE in options: 

1272 response = connection.read_response(disable_decoding=True) 

1273 options.pop(NEVER_DECODE) 

1274 else: 

1275 response = connection.read_response() 

1276 except ResponseError: 

1277 if EMPTY_RESPONSE in options: 

1278 return options[EMPTY_RESPONSE] 

1279 raise 

1280 

1281 if EMPTY_RESPONSE in options: 

1282 options.pop(EMPTY_RESPONSE) 

1283 

1284 if command_name in self.response_callbacks: 

1285 return self.response_callbacks[command_name](response, **options) 

1286 return response 

1287 

1288 

1289StrictRedis = Redis 

1290 

1291 

1292class Monitor: 

1293 """ 

1294 Monitor is useful for handling the MONITOR command to the redis server. 

1295 next_command() method returns one command from monitor 

1296 listen() method yields commands from monitor. 

1297 """ 

1298 

1299 monitor_re = re.compile(r"\[(\d+) (.*)\] (.*)") 

1300 command_re = re.compile(r'"(.*?)(?<!\\)"') 

1301 

1302 def __init__(self, connection_pool): 

1303 self.connection_pool = connection_pool 

1304 self.connection = self.connection_pool.get_connection("MONITOR") 

1305 

1306 def __enter__(self): 

1307 self.connection.send_command("MONITOR") 

1308 # check that monitor returns 'OK', but don't return it to user 

1309 response = self.connection.read_response() 

1310 if not bool_ok(response): 

1311 raise RedisError(f"MONITOR failed: {response}") 

1312 return self 

1313 

1314 def __exit__(self, *args): 

1315 self.connection.disconnect() 

1316 self.connection_pool.release(self.connection) 

1317 

1318 def next_command(self): 

1319 """Parse the response from a monitor command""" 

1320 response = self.connection.read_response() 

1321 if isinstance(response, bytes): 

1322 response = self.connection.encoder.decode(response, force=True) 

1323 command_time, command_data = response.split(" ", 1) 

1324 m = self.monitor_re.match(command_data) 

1325 db_id, client_info, command = m.groups() 

1326 command = " ".join(self.command_re.findall(command)) 

1327 # Redis escapes double quotes because each piece of the command 

1328 # string is surrounded by double quotes. We don't have that 

1329 # requirement so remove the escaping and leave the quote. 

1330 command = command.replace('\\"', '"') 

1331 

1332 if client_info == "lua": 

1333 client_address = "lua" 

1334 client_port = "" 

1335 client_type = "lua" 

1336 elif client_info.startswith("unix"): 

1337 client_address = "unix" 

1338 client_port = client_info[5:] 

1339 client_type = "unix" 

1340 else: 

1341 # use rsplit as ipv6 addresses contain colons 

1342 client_address, client_port = client_info.rsplit(":", 1) 

1343 client_type = "tcp" 

1344 return { 

1345 "time": float(command_time), 

1346 "db": int(db_id), 

1347 "client_address": client_address, 

1348 "client_port": client_port, 

1349 "client_type": client_type, 

1350 "command": command, 

1351 } 

1352 

1353 def listen(self): 

1354 """Listen for commands coming to the server.""" 

1355 while True: 

1356 yield self.next_command() 

1357 

1358 

1359class PubSub: 

1360 """ 

1361 PubSub provides publish, subscribe and listen support to Redis channels. 

1362 

1363 After subscribing to one or more channels, the listen() method will block 

1364 until a message arrives on one of the subscribed channels. That message 

1365 will be returned and it's safe to start listening again. 

1366 """ 

1367 

1368 PUBLISH_MESSAGE_TYPES = ("message", "pmessage") 

1369 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe") 

1370 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

1371 

1372 def __init__( 

1373 self, 

1374 connection_pool, 

1375 shard_hint=None, 

1376 ignore_subscribe_messages=False, 

1377 encoder=None, 

1378 ): 

1379 self.connection_pool = connection_pool 

1380 self.shard_hint = shard_hint 

1381 self.ignore_subscribe_messages = ignore_subscribe_messages 

1382 self.connection = None 

1383 self.subscribed_event = threading.Event() 

1384 # we need to know the encoding options for this connection in order 

1385 # to lookup channel and pattern names for callback handlers. 

1386 self.encoder = encoder 

1387 if self.encoder is None: 

1388 self.encoder = self.connection_pool.get_encoder() 

1389 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

1390 if self.encoder.decode_responses: 

1391 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

1392 else: 

1393 self.health_check_response = [b"pong", self.health_check_response_b] 

1394 self.reset() 

1395 

1396 def __enter__(self): 

1397 return self 

1398 

1399 def __exit__(self, exc_type, exc_value, traceback): 

1400 self.reset() 

1401 

1402 def __del__(self): 

1403 try: 

1404 # if this object went out of scope prior to shutting down 

1405 # subscriptions, close the connection manually before 

1406 # returning it to the connection pool 

1407 self.reset() 

1408 except Exception: 

1409 pass 

1410 

1411 def reset(self): 

1412 if self.connection: 

1413 self.connection.disconnect() 

1414 self.connection.clear_connect_callbacks() 

1415 self.connection_pool.release(self.connection) 

1416 self.connection = None 

1417 self.channels = {} 

1418 self.health_check_response_counter = 0 

1419 self.pending_unsubscribe_channels = set() 

1420 self.patterns = {} 

1421 self.pending_unsubscribe_patterns = set() 

1422 self.subscribed_event.clear() 

1423 

1424 def close(self): 

1425 self.reset() 

1426 

1427 def on_connect(self, connection): 

1428 "Re-subscribe to any channels and patterns previously subscribed to" 

1429 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

1430 # so we need to decode channel/pattern names back to unicode strings 

1431 # before passing them to [p]subscribe. 

1432 self.pending_unsubscribe_channels.clear() 

1433 self.pending_unsubscribe_patterns.clear() 

1434 if self.channels: 

1435 channels = {} 

1436 for k, v in self.channels.items(): 

1437 channels[self.encoder.decode(k, force=True)] = v 

1438 self.subscribe(**channels) 

1439 if self.patterns: 

1440 patterns = {} 

1441 for k, v in self.patterns.items(): 

1442 patterns[self.encoder.decode(k, force=True)] = v 

1443 self.psubscribe(**patterns) 

1444 

1445 @property 

1446 def subscribed(self): 

1447 """Indicates if there are subscriptions to any channels or patterns""" 

1448 return self.subscribed_event.is_set() 

1449 

1450 def execute_command(self, *args): 

1451 """Execute a publish/subscribe command""" 

1452 

1453 # NOTE: don't parse the response in this function -- it could pull a 

1454 # legitimate message off the stack if the connection is already 

1455 # subscribed to one or more channels 

1456 

1457 if self.connection is None: 

1458 self.connection = self.connection_pool.get_connection( 

1459 "pubsub", self.shard_hint 

1460 ) 

1461 # register a callback that re-subscribes to any channels we 

1462 # were listening to when we were disconnected 

1463 self.connection.register_connect_callback(self.on_connect) 

1464 connection = self.connection 

1465 kwargs = {"check_health": not self.subscribed} 

1466 if not self.subscribed: 

1467 self.clean_health_check_responses() 

1468 self._execute(connection, connection.send_command, *args, **kwargs) 

1469 

1470 def clean_health_check_responses(self): 

1471 """ 

1472 If any health check responses are present, clean them 

1473 """ 

1474 ttl = 10 

1475 conn = self.connection 

1476 while self.health_check_response_counter > 0 and ttl > 0: 

1477 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

1478 response = self._execute(conn, conn.read_response) 

1479 if self.is_health_check_response(response): 

1480 self.health_check_response_counter -= 1 

1481 else: 

1482 raise PubSubError( 

1483 "A non health check response was cleaned by " 

1484 "execute_command: {0}".format(response) 

1485 ) 

1486 ttl -= 1 

1487 

1488 def _disconnect_raise_connect(self, conn, error): 

1489 """ 

1490 Close the connection and raise an exception 

1491 if retry_on_timeout is not set or the error 

1492 is not a TimeoutError. Otherwise, try to reconnect 

1493 """ 

1494 conn.disconnect() 

1495 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): 

1496 raise error 

1497 conn.connect() 

1498 

1499 def _execute(self, conn, command, *args, **kwargs): 

1500 """ 

1501 Connect manually upon disconnection. If the Redis server is down, 

1502 this will fail and raise a ConnectionError as desired. 

1503 After reconnection, the ``on_connect`` callback should have been 

1504 called by the # connection to resubscribe us to any channels and 

1505 patterns we were previously listening to 

1506 """ 

1507 return conn.retry.call_with_retry( 

1508 lambda: command(*args, **kwargs), 

1509 lambda error: self._disconnect_raise_connect(conn, error), 

1510 ) 

1511 

1512 def parse_response(self, block=True, timeout=0): 

1513 """Parse the response from a publish/subscribe command""" 

1514 conn = self.connection 

1515 if conn is None: 

1516 raise RuntimeError( 

1517 "pubsub connection not set: " 

1518 "did you forget to call subscribe() or psubscribe()?" 

1519 ) 

1520 

1521 self.check_health() 

1522 

1523 def try_read(): 

1524 if not block: 

1525 if not conn.can_read(timeout=timeout): 

1526 return None 

1527 else: 

1528 conn.connect() 

1529 return conn.read_response() 

1530 

1531 response = self._execute(conn, try_read) 

1532 

1533 if self.is_health_check_response(response): 

1534 # ignore the health check message as user might not expect it 

1535 self.health_check_response_counter -= 1 

1536 return None 

1537 return response 

1538 

1539 def is_health_check_response(self, response): 

1540 """ 

1541 Check if the response is a health check response. 

1542 If there are no subscriptions redis responds to PING command with a 

1543 bulk response, instead of a multi-bulk with "pong" and the response. 

1544 """ 

1545 return response in [ 

1546 self.health_check_response, # If there was a subscription 

1547 self.health_check_response_b, # If there wasn't 

1548 ] 

1549 

1550 def check_health(self): 

1551 conn = self.connection 

1552 if conn is None: 

1553 raise RuntimeError( 

1554 "pubsub connection not set: " 

1555 "did you forget to call subscribe() or psubscribe()?" 

1556 ) 

1557 

1558 if conn.health_check_interval and time.time() > conn.next_health_check: 

1559 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1560 self.health_check_response_counter += 1 

1561 

1562 def _normalize_keys(self, data): 

1563 """ 

1564 normalize channel/pattern names to be either bytes or strings 

1565 based on whether responses are automatically decoded. this saves us 

1566 from coercing the value for each message coming in. 

1567 """ 

1568 encode = self.encoder.encode 

1569 decode = self.encoder.decode 

1570 return {decode(encode(k)): v for k, v in data.items()} 

1571 

1572 def psubscribe(self, *args, **kwargs): 

1573 """ 

1574 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1575 expect a pattern name as the key and a callable as the value. A 

1576 pattern's callable will be invoked automatically when a message is 

1577 received on that pattern rather than producing a message via 

1578 ``listen()``. 

1579 """ 

1580 if args: 

1581 args = list_or_args(args[0], args[1:]) 

1582 new_patterns = dict.fromkeys(args) 

1583 new_patterns.update(kwargs) 

1584 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1585 # update the patterns dict AFTER we send the command. we don't want to 

1586 # subscribe twice to these patterns, once for the command and again 

1587 # for the reconnection. 

1588 new_patterns = self._normalize_keys(new_patterns) 

1589 self.patterns.update(new_patterns) 

1590 if not self.subscribed: 

1591 # Set the subscribed_event flag to True 

1592 self.subscribed_event.set() 

1593 # Clear the health check counter 

1594 self.health_check_response_counter = 0 

1595 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1596 return ret_val 

1597 

1598 def punsubscribe(self, *args): 

1599 """ 

1600 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1601 all patterns. 

1602 """ 

1603 if args: 

1604 args = list_or_args(args[0], args[1:]) 

1605 patterns = self._normalize_keys(dict.fromkeys(args)) 

1606 else: 

1607 patterns = self.patterns 

1608 self.pending_unsubscribe_patterns.update(patterns) 

1609 return self.execute_command("PUNSUBSCRIBE", *args) 

1610 

1611 def subscribe(self, *args, **kwargs): 

1612 """ 

1613 Subscribe to channels. Channels supplied as keyword arguments expect 

1614 a channel name as the key and a callable as the value. A channel's 

1615 callable will be invoked automatically when a message is received on 

1616 that channel rather than producing a message via ``listen()`` or 

1617 ``get_message()``. 

1618 """ 

1619 if args: 

1620 args = list_or_args(args[0], args[1:]) 

1621 new_channels = dict.fromkeys(args) 

1622 new_channels.update(kwargs) 

1623 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1624 # update the channels dict AFTER we send the command. we don't want to 

1625 # subscribe twice to these channels, once for the command and again 

1626 # for the reconnection. 

1627 new_channels = self._normalize_keys(new_channels) 

1628 self.channels.update(new_channels) 

1629 if not self.subscribed: 

1630 # Set the subscribed_event flag to True 

1631 self.subscribed_event.set() 

1632 # Clear the health check counter 

1633 self.health_check_response_counter = 0 

1634 self.pending_unsubscribe_channels.difference_update(new_channels) 

1635 return ret_val 

1636 

1637 def unsubscribe(self, *args): 

1638 """ 

1639 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1640 all channels 

1641 """ 

1642 if args: 

1643 args = list_or_args(args[0], args[1:]) 

1644 channels = self._normalize_keys(dict.fromkeys(args)) 

1645 else: 

1646 channels = self.channels 

1647 self.pending_unsubscribe_channels.update(channels) 

1648 return self.execute_command("UNSUBSCRIBE", *args) 

1649 

1650 def listen(self): 

1651 "Listen for messages on channels this client has been subscribed to" 

1652 while self.subscribed: 

1653 response = self.handle_message(self.parse_response(block=True)) 

1654 if response is not None: 

1655 yield response 

1656 

1657 def get_message(self, ignore_subscribe_messages=False, timeout=0.0): 

1658 """ 

1659 Get the next message if one is available, otherwise None. 

1660 

1661 If timeout is specified, the system will wait for `timeout` seconds 

1662 before returning. Timeout should be specified as a floating point 

1663 number, or None, to wait indefinitely. 

1664 """ 

1665 if not self.subscribed: 

1666 # Wait for subscription 

1667 start_time = time.time() 

1668 if self.subscribed_event.wait(timeout) is True: 

1669 # The connection was subscribed during the timeout time frame. 

1670 # The timeout should be adjusted based on the time spent 

1671 # waiting for the subscription 

1672 time_spent = time.time() - start_time 

1673 timeout = max(0.0, timeout - time_spent) 

1674 else: 

1675 # The connection isn't subscribed to any channels or patterns, 

1676 # so no messages are available 

1677 return None 

1678 

1679 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1680 if response: 

1681 return self.handle_message(response, ignore_subscribe_messages) 

1682 return None 

1683 

1684 def ping(self, message=None): 

1685 """ 

1686 Ping the Redis server 

1687 """ 

1688 message = "" if message is None else message 

1689 return self.execute_command("PING", message) 

1690 

1691 def handle_message(self, response, ignore_subscribe_messages=False): 

1692 """ 

1693 Parses a pub/sub message. If the channel or pattern was subscribed to 

1694 with a message handler, the handler is invoked instead of a parsed 

1695 message being returned. 

1696 """ 

1697 if response is None: 

1698 return None 

1699 message_type = str_if_bytes(response[0]) 

1700 if message_type == "pmessage": 

1701 message = { 

1702 "type": message_type, 

1703 "pattern": response[1], 

1704 "channel": response[2], 

1705 "data": response[3], 

1706 } 

1707 elif message_type == "pong": 

1708 message = { 

1709 "type": message_type, 

1710 "pattern": None, 

1711 "channel": None, 

1712 "data": response[1], 

1713 } 

1714 else: 

1715 message = { 

1716 "type": message_type, 

1717 "pattern": None, 

1718 "channel": response[1], 

1719 "data": response[2], 

1720 } 

1721 

1722 # if this is an unsubscribe message, remove it from memory 

1723 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1724 if message_type == "punsubscribe": 

1725 pattern = response[1] 

1726 if pattern in self.pending_unsubscribe_patterns: 

1727 self.pending_unsubscribe_patterns.remove(pattern) 

1728 self.patterns.pop(pattern, None) 

1729 else: 

1730 channel = response[1] 

1731 if channel in self.pending_unsubscribe_channels: 

1732 self.pending_unsubscribe_channels.remove(channel) 

1733 self.channels.pop(channel, None) 

1734 if not self.channels and not self.patterns: 

1735 # There are no subscriptions anymore, set subscribed_event flag 

1736 # to false 

1737 self.subscribed_event.clear() 

1738 

1739 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1740 # if there's a message handler, invoke it 

1741 if message_type == "pmessage": 

1742 handler = self.patterns.get(message["pattern"], None) 

1743 else: 

1744 handler = self.channels.get(message["channel"], None) 

1745 if handler: 

1746 handler(message) 

1747 return None 

1748 elif message_type != "pong": 

1749 # this is a subscribe/unsubscribe message. ignore if we don't 

1750 # want them 

1751 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1752 return None 

1753 

1754 return message 

1755 

1756 def run_in_thread(self, sleep_time=0, daemon=False, exception_handler=None): 

1757 for channel, handler in self.channels.items(): 

1758 if handler is None: 

1759 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1760 for pattern, handler in self.patterns.items(): 

1761 if handler is None: 

1762 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1763 

1764 thread = PubSubWorkerThread( 

1765 self, sleep_time, daemon=daemon, exception_handler=exception_handler 

1766 ) 

1767 thread.start() 

1768 return thread 

1769 

1770 

1771class PubSubWorkerThread(threading.Thread): 

1772 def __init__(self, pubsub, sleep_time, daemon=False, exception_handler=None): 

1773 super().__init__() 

1774 self.daemon = daemon 

1775 self.pubsub = pubsub 

1776 self.sleep_time = sleep_time 

1777 self.exception_handler = exception_handler 

1778 self._running = threading.Event() 

1779 

1780 def run(self): 

1781 if self._running.is_set(): 

1782 return 

1783 self._running.set() 

1784 pubsub = self.pubsub 

1785 sleep_time = self.sleep_time 

1786 while self._running.is_set(): 

1787 try: 

1788 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) 

1789 except BaseException as e: 

1790 if self.exception_handler is None: 

1791 raise 

1792 self.exception_handler(e, pubsub, self) 

1793 pubsub.close() 

1794 

1795 def stop(self): 

1796 # trip the flag so the run loop exits. the run loop will 

1797 # close the pubsub connection, which disconnects the socket 

1798 # and returns the connection to the pool. 

1799 self._running.clear() 

1800 

1801 

1802class Pipeline(Redis): 

1803 """ 

1804 Pipelines provide a way to transmit multiple commands to the Redis server 

1805 in one transmission. This is convenient for batch processing, such as 

1806 saving all the values in a list to Redis. 

1807 

1808 All commands executed within a pipeline are wrapped with MULTI and EXEC 

1809 calls. This guarantees all commands executed in the pipeline will be 

1810 executed atomically. 

1811 

1812 Any command raising an exception does *not* halt the execution of 

1813 subsequent commands in the pipeline. Instead, the exception is caught 

1814 and its instance is placed into the response list returned by execute(). 

1815 Code iterating over the response list should be able to deal with an 

1816 instance of an exception as a potential value. In general, these will be 

1817 ResponseError exceptions, such as those raised when issuing a command 

1818 on a key of a different datatype. 

1819 """ 

1820 

1821 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1822 

1823 def __init__(self, connection_pool, response_callbacks, transaction, shard_hint): 

1824 self.connection_pool = connection_pool 

1825 self.connection = None 

1826 self.response_callbacks = response_callbacks 

1827 self.transaction = transaction 

1828 self.shard_hint = shard_hint 

1829 

1830 self.watching = False 

1831 self.reset() 

1832 

1833 def __enter__(self): 

1834 return self 

1835 

1836 def __exit__(self, exc_type, exc_value, traceback): 

1837 self.reset() 

1838 

1839 def __del__(self): 

1840 try: 

1841 self.reset() 

1842 except Exception: 

1843 pass 

1844 

1845 def __len__(self): 

1846 return len(self.command_stack) 

1847 

1848 def __bool__(self): 

1849 """Pipeline instances should always evaluate to True""" 

1850 return True 

1851 

1852 def reset(self): 

1853 self.command_stack = [] 

1854 self.scripts = set() 

1855 # make sure to reset the connection state in the event that we were 

1856 # watching something 

1857 if self.watching and self.connection: 

1858 try: 

1859 # call this manually since our unwatch or 

1860 # immediate_execute_command methods can call reset() 

1861 self.connection.send_command("UNWATCH") 

1862 self.connection.read_response() 

1863 except ConnectionError: 

1864 # disconnect will also remove any previous WATCHes 

1865 self.connection.disconnect() 

1866 # clean up the other instance attributes 

1867 self.watching = False 

1868 self.explicit_transaction = False 

1869 # we can safely return the connection to the pool here since we're 

1870 # sure we're no longer WATCHing anything 

1871 if self.connection: 

1872 self.connection_pool.release(self.connection) 

1873 self.connection = None 

1874 

1875 def multi(self): 

1876 """ 

1877 Start a transactional block of the pipeline after WATCH commands 

1878 are issued. End the transactional block with `execute`. 

1879 """ 

1880 if self.explicit_transaction: 

1881 raise RedisError("Cannot issue nested calls to MULTI") 

1882 if self.command_stack: 

1883 raise RedisError( 

1884 "Commands without an initial WATCH have already been issued" 

1885 ) 

1886 self.explicit_transaction = True 

1887 

1888 def execute_command(self, *args, **kwargs): 

1889 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1890 return self.immediate_execute_command(*args, **kwargs) 

1891 return self.pipeline_execute_command(*args, **kwargs) 

1892 

1893 def _disconnect_reset_raise(self, conn, error): 

1894 """ 

1895 Close the connection, reset watching state and 

1896 raise an exception if we were watching, 

1897 retry_on_timeout is not set, 

1898 or the error is not a TimeoutError 

1899 """ 

1900 conn.disconnect() 

1901 # if we were already watching a variable, the watch is no longer 

1902 # valid since this connection has died. raise a WatchError, which 

1903 # indicates the user should retry this transaction. 

1904 if self.watching: 

1905 self.reset() 

1906 raise WatchError( 

1907 "A ConnectionError occurred on while watching one or more keys" 

1908 ) 

1909 # if retry_on_timeout is not set, or the error is not 

1910 # a TimeoutError, raise it 

1911 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): 

1912 self.reset() 

1913 raise 

1914 

1915 def immediate_execute_command(self, *args, **options): 

1916 """ 

1917 Execute a command immediately, but don't auto-retry on a 

1918 ConnectionError if we're already WATCHing a variable. Used when 

1919 issuing WATCH or subsequent commands retrieving their values but before 

1920 MULTI is called. 

1921 """ 

1922 command_name = args[0] 

1923 conn = self.connection 

1924 # if this is the first call, we need a connection 

1925 if not conn: 

1926 conn = self.connection_pool.get_connection(command_name, self.shard_hint) 

1927 self.connection = conn 

1928 

1929 return conn.retry.call_with_retry( 

1930 lambda: self._send_command_parse_response( 

1931 conn, command_name, *args, **options 

1932 ), 

1933 lambda error: self._disconnect_reset_raise(conn, error), 

1934 ) 

1935 

1936 def pipeline_execute_command(self, *args, **options): 

1937 """ 

1938 Stage a command to be executed when execute() is next called 

1939 

1940 Returns the current Pipeline object back so commands can be 

1941 chained together, such as: 

1942 

1943 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1944 

1945 At some other point, you can then run: pipe.execute(), 

1946 which will execute all commands queued in the pipe. 

1947 """ 

1948 self.command_stack.append((args, options)) 

1949 return self 

1950 

1951 def _execute_transaction(self, connection, commands, raise_on_error): 

1952 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1953 all_cmds = connection.pack_commands( 

1954 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1955 ) 

1956 connection.send_packed_command(all_cmds) 

1957 errors = [] 

1958 

1959 # parse off the response for MULTI 

1960 # NOTE: we need to handle ResponseErrors here and continue 

1961 # so that we read all the additional command messages from 

1962 # the socket 

1963 try: 

1964 self.parse_response(connection, "_") 

1965 except ResponseError as e: 

1966 errors.append((0, e)) 

1967 

1968 # and all the other commands 

1969 for i, command in enumerate(commands): 

1970 if EMPTY_RESPONSE in command[1]: 

1971 errors.append((i, command[1][EMPTY_RESPONSE])) 

1972 else: 

1973 try: 

1974 self.parse_response(connection, "_") 

1975 except ResponseError as e: 

1976 self.annotate_exception(e, i + 1, command[0]) 

1977 errors.append((i, e)) 

1978 

1979 # parse the EXEC. 

1980 try: 

1981 response = self.parse_response(connection, "_") 

1982 except ExecAbortError: 

1983 if errors: 

1984 raise errors[0][1] 

1985 raise 

1986 

1987 # EXEC clears any watched keys 

1988 self.watching = False 

1989 

1990 if response is None: 

1991 raise WatchError("Watched variable changed.") 

1992 

1993 # put any parse errors into the response 

1994 for i, e in errors: 

1995 response.insert(i, e) 

1996 

1997 if len(response) != len(commands): 

1998 self.connection.disconnect() 

1999 raise ResponseError( 

2000 "Wrong number of response items from pipeline execution" 

2001 ) 

2002 

2003 # find any errors in the response and raise if necessary 

2004 if raise_on_error: 

2005 self.raise_first_error(commands, response) 

2006 

2007 # We have to run response callbacks manually 

2008 data = [] 

2009 for r, cmd in zip(response, commands): 

2010 if not isinstance(r, Exception): 

2011 args, options = cmd 

2012 command_name = args[0] 

2013 if command_name in self.response_callbacks: 

2014 r = self.response_callbacks[command_name](r, **options) 

2015 data.append(r) 

2016 return data 

2017 

2018 def _execute_pipeline(self, connection, commands, raise_on_error): 

2019 # build up all commands into a single request to increase network perf 

2020 all_cmds = connection.pack_commands([args for args, _ in commands]) 

2021 connection.send_packed_command(all_cmds) 

2022 

2023 response = [] 

2024 for args, options in commands: 

2025 try: 

2026 response.append(self.parse_response(connection, args[0], **options)) 

2027 except ResponseError as e: 

2028 response.append(e) 

2029 

2030 if raise_on_error: 

2031 self.raise_first_error(commands, response) 

2032 return response 

2033 

2034 def raise_first_error(self, commands, response): 

2035 for i, r in enumerate(response): 

2036 if isinstance(r, ResponseError): 

2037 self.annotate_exception(r, i + 1, commands[i][0]) 

2038 raise r 

2039 

2040 def annotate_exception(self, exception, number, command): 

2041 cmd = " ".join(map(safe_str, command)) 

2042 msg = ( 

2043 f"Command # {number} ({cmd}) of pipeline " 

2044 f"caused error: {exception.args[0]}" 

2045 ) 

2046 exception.args = (msg,) + exception.args[1:] 

2047 

2048 def parse_response(self, connection, command_name, **options): 

2049 result = Redis.parse_response(self, connection, command_name, **options) 

2050 if command_name in self.UNWATCH_COMMANDS: 

2051 self.watching = False 

2052 elif command_name == "WATCH": 

2053 self.watching = True 

2054 return result 

2055 

2056 def load_scripts(self): 

2057 # make sure all scripts that are about to be run on this pipeline exist 

2058 scripts = list(self.scripts) 

2059 immediate = self.immediate_execute_command 

2060 shas = [s.sha for s in scripts] 

2061 # we can't use the normal script_* methods because they would just 

2062 # get buffered in the pipeline. 

2063 exists = immediate("SCRIPT EXISTS", *shas) 

2064 if not all(exists): 

2065 for s, exist in zip(scripts, exists): 

2066 if not exist: 

2067 s.sha = immediate("SCRIPT LOAD", s.script) 

2068 

2069 def _disconnect_raise_reset(self, conn, error): 

2070 """ 

2071 Close the connection, raise an exception if we were watching, 

2072 and raise an exception if retry_on_timeout is not set, 

2073 or the error is not a TimeoutError 

2074 """ 

2075 conn.disconnect() 

2076 # if we were watching a variable, the watch is no longer valid 

2077 # since this connection has died. raise a WatchError, which 

2078 # indicates the user should retry this transaction. 

2079 if self.watching: 

2080 raise WatchError( 

2081 "A ConnectionError occurred on while watching one or more keys" 

2082 ) 

2083 # if retry_on_timeout is not set, or the error is not 

2084 # a TimeoutError, raise it 

2085 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): 

2086 self.reset() 

2087 raise 

2088 

2089 def execute(self, raise_on_error=True): 

2090 """Execute all the commands in the current pipeline""" 

2091 stack = self.command_stack 

2092 if not stack and not self.watching: 

2093 return [] 

2094 if self.scripts: 

2095 self.load_scripts() 

2096 if self.transaction or self.explicit_transaction: 

2097 execute = self._execute_transaction 

2098 else: 

2099 execute = self._execute_pipeline 

2100 

2101 conn = self.connection 

2102 if not conn: 

2103 conn = self.connection_pool.get_connection("MULTI", self.shard_hint) 

2104 # assign to self.connection so reset() releases the connection 

2105 # back to the pool after we're done 

2106 self.connection = conn 

2107 

2108 try: 

2109 return conn.retry.call_with_retry( 

2110 lambda: execute(conn, stack, raise_on_error), 

2111 lambda error: self._disconnect_raise_reset(conn, error), 

2112 ) 

2113 finally: 

2114 self.reset() 

2115 

2116 def discard(self): 

2117 """ 

2118 Flushes all previously queued commands 

2119 See: https://redis.io/commands/DISCARD 

2120 """ 

2121 self.execute_command("DISCARD") 

2122 

2123 def watch(self, *names): 

2124 """Watches the values at keys ``names``""" 

2125 if self.explicit_transaction: 

2126 raise RedisError("Cannot issue a WATCH after a MULTI") 

2127 return self.execute_command("WATCH", *names) 

2128 

2129 def unwatch(self): 

2130 """Unwatches all previously specified keys""" 

2131 return self.watching and self.execute_command("UNWATCH") or True