Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/client.py: 24%
933 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
1import copy
2import datetime
3import re
4import threading
5import time
6import warnings
7from itertools import chain
8from typing import Optional
10from redis.commands import (
11 CoreCommands,
12 RedisModuleCommands,
13 SentinelCommands,
14 list_or_args,
15)
16from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
17from redis.credentials import CredentialProvider
18from redis.exceptions import (
19 ConnectionError,
20 ExecAbortError,
21 ModuleError,
22 PubSubError,
23 RedisError,
24 ResponseError,
25 TimeoutError,
26 WatchError,
27)
28from redis.lock import Lock
29from redis.retry import Retry
30from redis.utils import safe_str, str_if_bytes
32SYM_EMPTY = b""
33EMPTY_RESPONSE = "EMPTY_RESPONSE"
35# some responses (ie. dump) are binary, and just meant to never be decoded
36NEVER_DECODE = "NEVER_DECODE"
39def timestamp_to_datetime(response):
40 "Converts a unix timestamp to a Python datetime object"
41 if not response:
42 return None
43 try:
44 response = int(response)
45 except ValueError:
46 return None
47 return datetime.datetime.fromtimestamp(response)
50def string_keys_to_dict(key_string, callback):
51 return dict.fromkeys(key_string.split(), callback)
54class CaseInsensitiveDict(dict):
55 "Case insensitive dict implementation. Assumes string keys only."
57 def __init__(self, data):
58 for k, v in data.items():
59 self[k.upper()] = v
61 def __contains__(self, k):
62 return super().__contains__(k.upper())
64 def __delitem__(self, k):
65 super().__delitem__(k.upper())
67 def __getitem__(self, k):
68 return super().__getitem__(k.upper())
70 def get(self, k, default=None):
71 return super().get(k.upper(), default)
73 def __setitem__(self, k, v):
74 super().__setitem__(k.upper(), v)
76 def update(self, data):
77 data = CaseInsensitiveDict(data)
78 super().update(data)
81def parse_debug_object(response):
82 "Parse the results of Redis's DEBUG OBJECT command into a Python dict"
83 # The 'type' of the object is the first item in the response, but isn't
84 # prefixed with a name
85 response = str_if_bytes(response)
86 response = "type:" + response
87 response = dict(kv.split(":") for kv in response.split())
89 # parse some expected int values from the string response
90 # note: this cmd isn't spec'd so these may not appear in all redis versions
91 int_fields = ("refcount", "serializedlength", "lru", "lru_seconds_idle")
92 for field in int_fields:
93 if field in response:
94 response[field] = int(response[field])
96 return response
99def parse_object(response, infotype):
100 """Parse the results of an OBJECT command"""
101 if infotype in ("idletime", "refcount"):
102 return int_or_none(response)
103 return response
106def parse_info(response):
107 """Parse the result of Redis's INFO command into a Python dict"""
108 info = {}
109 response = str_if_bytes(response)
111 def get_value(value):
112 if "," not in value or "=" not in value:
113 try:
114 if "." in value:
115 return float(value)
116 else:
117 return int(value)
118 except ValueError:
119 return value
120 else:
121 sub_dict = {}
122 for item in value.split(","):
123 k, v = item.rsplit("=", 1)
124 sub_dict[k] = get_value(v)
125 return sub_dict
127 for line in response.splitlines():
128 if line and not line.startswith("#"):
129 if line.find(":") != -1:
130 # Split, the info fields keys and values.
131 # Note that the value may contain ':'. but the 'host:'
132 # pseudo-command is the only case where the key contains ':'
133 key, value = line.split(":", 1)
134 if key == "cmdstat_host":
135 key, value = line.rsplit(":", 1)
137 if key == "module":
138 # Hardcode a list for key 'modules' since there could be
139 # multiple lines that started with 'module'
140 info.setdefault("modules", []).append(get_value(value))
141 else:
142 info[key] = get_value(value)
143 else:
144 # if the line isn't splittable, append it to the "__raw__" key
145 info.setdefault("__raw__", []).append(line)
147 return info
150def parse_memory_stats(response, **kwargs):
151 """Parse the results of MEMORY STATS"""
152 stats = pairs_to_dict(response, decode_keys=True, decode_string_values=True)
153 for key, value in stats.items():
154 if key.startswith("db."):
155 stats[key] = pairs_to_dict(
156 value, decode_keys=True, decode_string_values=True
157 )
158 return stats
161SENTINEL_STATE_TYPES = {
162 "can-failover-its-master": int,
163 "config-epoch": int,
164 "down-after-milliseconds": int,
165 "failover-timeout": int,
166 "info-refresh": int,
167 "last-hello-message": int,
168 "last-ok-ping-reply": int,
169 "last-ping-reply": int,
170 "last-ping-sent": int,
171 "master-link-down-time": int,
172 "master-port": int,
173 "num-other-sentinels": int,
174 "num-slaves": int,
175 "o-down-time": int,
176 "pending-commands": int,
177 "parallel-syncs": int,
178 "port": int,
179 "quorum": int,
180 "role-reported-time": int,
181 "s-down-time": int,
182 "slave-priority": int,
183 "slave-repl-offset": int,
184 "voted-leader-epoch": int,
185}
188def parse_sentinel_state(item):
189 result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
190 flags = set(result["flags"].split(","))
191 for name, flag in (
192 ("is_master", "master"),
193 ("is_slave", "slave"),
194 ("is_sdown", "s_down"),
195 ("is_odown", "o_down"),
196 ("is_sentinel", "sentinel"),
197 ("is_disconnected", "disconnected"),
198 ("is_master_down", "master_down"),
199 ):
200 result[name] = flag in flags
201 return result
204def parse_sentinel_master(response):
205 return parse_sentinel_state(map(str_if_bytes, response))
208def parse_sentinel_masters(response):
209 result = {}
210 for item in response:
211 state = parse_sentinel_state(map(str_if_bytes, item))
212 result[state["name"]] = state
213 return result
216def parse_sentinel_slaves_and_sentinels(response):
217 return [parse_sentinel_state(map(str_if_bytes, item)) for item in response]
220def parse_sentinel_get_master(response):
221 return response and (response[0], int(response[1])) or None
224def pairs_to_dict(response, decode_keys=False, decode_string_values=False):
225 """Create a dict given a list of key/value pairs"""
226 if response is None:
227 return {}
228 if decode_keys or decode_string_values:
229 # the iter form is faster, but I don't know how to make that work
230 # with a str_if_bytes() map
231 keys = response[::2]
232 if decode_keys:
233 keys = map(str_if_bytes, keys)
234 values = response[1::2]
235 if decode_string_values:
236 values = map(str_if_bytes, values)
237 return dict(zip(keys, values))
238 else:
239 it = iter(response)
240 return dict(zip(it, it))
243def pairs_to_dict_typed(response, type_info):
244 it = iter(response)
245 result = {}
246 for key, value in zip(it, it):
247 if key in type_info:
248 try:
249 value = type_info[key](value)
250 except Exception:
251 # if for some reason the value can't be coerced, just use
252 # the string value
253 pass
254 result[key] = value
255 return result
258def zset_score_pairs(response, **options):
259 """
260 If ``withscores`` is specified in the options, return the response as
261 a list of (value, score) pairs
262 """
263 if not response or not options.get("withscores"):
264 return response
265 score_cast_func = options.get("score_cast_func", float)
266 it = iter(response)
267 return list(zip(it, map(score_cast_func, it)))
270def sort_return_tuples(response, **options):
271 """
272 If ``groups`` is specified, return the response as a list of
273 n-element tuples with n being the value found in options['groups']
274 """
275 if not response or not options.get("groups"):
276 return response
277 n = options["groups"]
278 return list(zip(*[response[i::n] for i in range(n)]))
281def int_or_none(response):
282 if response is None:
283 return None
284 return int(response)
287def parse_stream_list(response):
288 if response is None:
289 return None
290 data = []
291 for r in response:
292 if r is not None:
293 data.append((r[0], pairs_to_dict(r[1])))
294 else:
295 data.append((None, None))
296 return data
299def pairs_to_dict_with_str_keys(response):
300 return pairs_to_dict(response, decode_keys=True)
303def parse_list_of_dicts(response):
304 return list(map(pairs_to_dict_with_str_keys, response))
307def parse_xclaim(response, **options):
308 if options.get("parse_justid", False):
309 return response
310 return parse_stream_list(response)
313def parse_xautoclaim(response, **options):
314 if options.get("parse_justid", False):
315 return response[1]
316 response[1] = parse_stream_list(response[1])
317 return response
320def parse_xinfo_stream(response, **options):
321 data = pairs_to_dict(response, decode_keys=True)
322 if not options.get("full", False):
323 first = data["first-entry"]
324 if first is not None:
325 data["first-entry"] = (first[0], pairs_to_dict(first[1]))
326 last = data["last-entry"]
327 if last is not None:
328 data["last-entry"] = (last[0], pairs_to_dict(last[1]))
329 else:
330 data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]}
331 data["groups"] = [
332 pairs_to_dict(group, decode_keys=True) for group in data["groups"]
333 ]
334 return data
337def parse_xread(response):
338 if response is None:
339 return []
340 return [[r[0], parse_stream_list(r[1])] for r in response]
343def parse_xpending(response, **options):
344 if options.get("parse_detail", False):
345 return parse_xpending_range(response)
346 consumers = [{"name": n, "pending": int(p)} for n, p in response[3] or []]
347 return {
348 "pending": response[0],
349 "min": response[1],
350 "max": response[2],
351 "consumers": consumers,
352 }
355def parse_xpending_range(response):
356 k = ("message_id", "consumer", "time_since_delivered", "times_delivered")
357 return [dict(zip(k, r)) for r in response]
360def float_or_none(response):
361 if response is None:
362 return None
363 return float(response)
366def bool_ok(response):
367 return str_if_bytes(response) == "OK"
370def parse_zadd(response, **options):
371 if response is None:
372 return None
373 if options.get("as_score"):
374 return float(response)
375 return int(response)
378def parse_client_list(response, **options):
379 clients = []
380 for c in str_if_bytes(response).splitlines():
381 # Values might contain '='
382 clients.append(dict(pair.split("=", 1) for pair in c.split(" ")))
383 return clients
386def parse_config_get(response, **options):
387 response = [str_if_bytes(i) if i is not None else None for i in response]
388 return response and pairs_to_dict(response) or {}
391def parse_scan(response, **options):
392 cursor, r = response
393 return int(cursor), r
396def parse_hscan(response, **options):
397 cursor, r = response
398 return int(cursor), r and pairs_to_dict(r) or {}
401def parse_zscan(response, **options):
402 score_cast_func = options.get("score_cast_func", float)
403 cursor, r = response
404 it = iter(r)
405 return int(cursor), list(zip(it, map(score_cast_func, it)))
408def parse_zmscore(response, **options):
409 # zmscore: list of scores (double precision floating point number) or nil
410 return [float(score) if score is not None else None for score in response]
413def parse_slowlog_get(response, **options):
414 space = " " if options.get("decode_responses", False) else b" "
416 def parse_item(item):
417 result = {"id": item[0], "start_time": int(item[1]), "duration": int(item[2])}
418 # Redis Enterprise injects another entry at index [3], which has
419 # the complexity info (i.e. the value N in case the command has
420 # an O(N) complexity) instead of the command.
421 if isinstance(item[3], list):
422 result["command"] = space.join(item[3])
423 else:
424 result["complexity"] = item[3]
425 result["command"] = space.join(item[4])
426 return result
428 return [parse_item(item) for item in response]
431def parse_stralgo(response, **options):
432 """
433 Parse the response from `STRALGO` command.
434 Without modifiers the returned value is string.
435 When LEN is given the command returns the length of the result
436 (i.e integer).
437 When IDX is given the command returns a dictionary with the LCS
438 length and all the ranges in both the strings, start and end
439 offset for each string, where there are matches.
440 When WITHMATCHLEN is given, each array representing a match will
441 also have the length of the match at the beginning of the array.
442 """
443 if options.get("len", False):
444 return int(response)
445 if options.get("idx", False):
446 if options.get("withmatchlen", False):
447 matches = [
448 [(int(match[-1]))] + list(map(tuple, match[:-1]))
449 for match in response[1]
450 ]
451 else:
452 matches = [list(map(tuple, match)) for match in response[1]]
453 return {
454 str_if_bytes(response[0]): matches,
455 str_if_bytes(response[2]): int(response[3]),
456 }
457 return str_if_bytes(response)
460def parse_cluster_info(response, **options):
461 response = str_if_bytes(response)
462 return dict(line.split(":") for line in response.splitlines() if line)
465def _parse_node_line(line):
466 line_items = line.split(" ")
467 node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8]
468 addr = addr.split("@")[0]
469 node_dict = {
470 "node_id": node_id,
471 "flags": flags,
472 "master_id": master_id,
473 "last_ping_sent": ping,
474 "last_pong_rcvd": pong,
475 "epoch": epoch,
476 "slots": [],
477 "migrations": [],
478 "connected": True if connected == "connected" else False,
479 }
480 if len(line_items) >= 9:
481 slots, migrations = _parse_slots(line_items[8:])
482 node_dict["slots"], node_dict["migrations"] = slots, migrations
483 return addr, node_dict
486def _parse_slots(slot_ranges):
487 slots, migrations = [], []
488 for s_range in slot_ranges:
489 if "->-" in s_range:
490 slot_id, dst_node_id = s_range[1:-1].split("->-", 1)
491 migrations.append(
492 {"slot": slot_id, "node_id": dst_node_id, "state": "migrating"}
493 )
494 elif "-<-" in s_range:
495 slot_id, src_node_id = s_range[1:-1].split("-<-", 1)
496 migrations.append(
497 {"slot": slot_id, "node_id": src_node_id, "state": "importing"}
498 )
499 else:
500 s_range = [sl for sl in s_range.split("-")]
501 slots.append(s_range)
503 return slots, migrations
506def parse_cluster_nodes(response, **options):
507 """
508 @see: https://redis.io/commands/cluster-nodes # string / bytes
509 @see: https://redis.io/commands/cluster-replicas # list of string / bytes
510 """
511 if isinstance(response, (str, bytes)):
512 response = response.splitlines()
513 return dict(_parse_node_line(str_if_bytes(node)) for node in response)
516def parse_geosearch_generic(response, **options):
517 """
518 Parse the response of 'GEOSEARCH', GEORADIUS' and 'GEORADIUSBYMEMBER'
519 commands according to 'withdist', 'withhash' and 'withcoord' labels.
520 """
521 if options["store"] or options["store_dist"]:
522 # `store` and `store_dist` cant be combined
523 # with other command arguments.
524 # relevant to 'GEORADIUS' and 'GEORADIUSBYMEMBER'
525 return response
527 if type(response) != list:
528 response_list = [response]
529 else:
530 response_list = response
532 if not options["withdist"] and not options["withcoord"] and not options["withhash"]:
533 # just a bunch of places
534 return response_list
536 cast = {
537 "withdist": float,
538 "withcoord": lambda ll: (float(ll[0]), float(ll[1])),
539 "withhash": int,
540 }
542 # zip all output results with each casting function to get
543 # the properly native Python value.
544 f = [lambda x: x]
545 f += [cast[o] for o in ["withdist", "withhash", "withcoord"] if options[o]]
546 return [list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list]
549def parse_command(response, **options):
550 commands = {}
551 for command in response:
552 cmd_dict = {}
553 cmd_name = str_if_bytes(command[0])
554 cmd_dict["name"] = cmd_name
555 cmd_dict["arity"] = int(command[1])
556 cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
557 cmd_dict["first_key_pos"] = command[3]
558 cmd_dict["last_key_pos"] = command[4]
559 cmd_dict["step_count"] = command[5]
560 if len(command) > 7:
561 cmd_dict["tips"] = command[7]
562 cmd_dict["key_specifications"] = command[8]
563 cmd_dict["subcommands"] = command[9]
564 commands[cmd_name] = cmd_dict
565 return commands
568def parse_pubsub_numsub(response, **options):
569 return list(zip(response[0::2], response[1::2]))
572def parse_client_kill(response, **options):
573 if isinstance(response, int):
574 return response
575 return str_if_bytes(response) == "OK"
578def parse_acl_getuser(response, **options):
579 if response is None:
580 return None
581 data = pairs_to_dict(response, decode_keys=True)
583 # convert everything but user-defined data in 'keys' to native strings
584 data["flags"] = list(map(str_if_bytes, data["flags"]))
585 data["passwords"] = list(map(str_if_bytes, data["passwords"]))
586 data["commands"] = str_if_bytes(data["commands"])
587 if isinstance(data["keys"], str) or isinstance(data["keys"], bytes):
588 data["keys"] = list(str_if_bytes(data["keys"]).split(" "))
589 if data["keys"] == [""]:
590 data["keys"] = []
591 if "channels" in data:
592 if isinstance(data["channels"], str) or isinstance(data["channels"], bytes):
593 data["channels"] = list(str_if_bytes(data["channels"]).split(" "))
594 if data["channels"] == [""]:
595 data["channels"] = []
596 if "selectors" in data:
597 data["selectors"] = [
598 list(map(str_if_bytes, selector)) for selector in data["selectors"]
599 ]
601 # split 'commands' into separate 'categories' and 'commands' lists
602 commands, categories = [], []
603 for command in data["commands"].split(" "):
604 if "@" in command:
605 categories.append(command)
606 else:
607 commands.append(command)
609 data["commands"] = commands
610 data["categories"] = categories
611 data["enabled"] = "on" in data["flags"]
612 return data
615def parse_acl_log(response, **options):
616 if response is None:
617 return None
618 if isinstance(response, list):
619 data = []
620 for log in response:
621 log_data = pairs_to_dict(log, True, True)
622 client_info = log_data.get("client-info", "")
623 log_data["client-info"] = parse_client_info(client_info)
625 # float() is lossy comparing to the "double" in C
626 log_data["age-seconds"] = float(log_data["age-seconds"])
627 data.append(log_data)
628 else:
629 data = bool_ok(response)
630 return data
633def parse_client_info(value):
634 """
635 Parsing client-info in ACL Log in following format.
636 "key1=value1 key2=value2 key3=value3"
637 """
638 client_info = {}
639 infos = str_if_bytes(value).split(" ")
640 for info in infos:
641 key, value = info.split("=")
642 client_info[key] = value
644 # Those fields are defined as int in networking.c
645 for int_key in {
646 "id",
647 "age",
648 "idle",
649 "db",
650 "sub",
651 "psub",
652 "multi",
653 "qbuf",
654 "qbuf-free",
655 "obl",
656 "argv-mem",
657 "oll",
658 "omem",
659 "tot-mem",
660 }:
661 client_info[int_key] = int(client_info[int_key])
662 return client_info
665def parse_module_result(response):
666 if isinstance(response, ModuleError):
667 raise response
668 return True
671def parse_set_result(response, **options):
672 """
673 Handle SET result since GET argument is available since Redis 6.2.
674 Parsing SET result into:
675 - BOOL
676 - String when GET argument is used
677 """
678 if options.get("get"):
679 # Redis will return a getCommand result.
680 # See `setGenericCommand` in t_string.c
681 return response
682 return response and str_if_bytes(response) == "OK"
685class AbstractRedis:
686 RESPONSE_CALLBACKS = {
687 **string_keys_to_dict(
688 "AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT "
689 "HEXISTS HMSET MOVE MSETNX PERSIST "
690 "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX",
691 bool,
692 ),
693 **string_keys_to_dict(
694 "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN "
695 "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
696 "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
697 "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
698 "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
699 int,
700 ),
701 **string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float),
702 **string_keys_to_dict(
703 # these return OK, or int if redis-server is >=1.3.4
704 "LPUSH RPUSH",
705 lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK",
706 ),
707 **string_keys_to_dict("SORT", sort_return_tuples),
708 **string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none),
709 **string_keys_to_dict(
710 "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READONLY READWRITE "
711 "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
712 bool_ok,
713 ),
714 **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
715 **string_keys_to_dict(
716 "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
717 ),
718 **string_keys_to_dict(
719 "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
720 "ZREVRANGE ZREVRANGEBYSCORE",
721 zset_score_pairs,
722 ),
723 **string_keys_to_dict(
724 "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None
725 ),
726 **string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
727 **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
728 **string_keys_to_dict("XREAD XREADGROUP", parse_xread),
729 **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
730 "ACL CAT": lambda r: list(map(str_if_bytes, r)),
731 "ACL DELUSER": int,
732 "ACL GENPASS": str_if_bytes,
733 "ACL GETUSER": parse_acl_getuser,
734 "ACL HELP": lambda r: list(map(str_if_bytes, r)),
735 "ACL LIST": lambda r: list(map(str_if_bytes, r)),
736 "ACL LOAD": bool_ok,
737 "ACL LOG": parse_acl_log,
738 "ACL SAVE": bool_ok,
739 "ACL SETUSER": bool_ok,
740 "ACL USERS": lambda r: list(map(str_if_bytes, r)),
741 "ACL WHOAMI": str_if_bytes,
742 "CLIENT GETNAME": str_if_bytes,
743 "CLIENT ID": int,
744 "CLIENT KILL": parse_client_kill,
745 "CLIENT LIST": parse_client_list,
746 "CLIENT INFO": parse_client_info,
747 "CLIENT SETNAME": bool_ok,
748 "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
749 "CLIENT PAUSE": bool_ok,
750 "CLIENT GETREDIR": int,
751 "CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)),
752 "CLUSTER ADDSLOTS": bool_ok,
753 "CLUSTER ADDSLOTSRANGE": bool_ok,
754 "CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x),
755 "CLUSTER COUNTKEYSINSLOT": lambda x: int(x),
756 "CLUSTER DELSLOTS": bool_ok,
757 "CLUSTER DELSLOTSRANGE": bool_ok,
758 "CLUSTER FAILOVER": bool_ok,
759 "CLUSTER FORGET": bool_ok,
760 "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
761 "CLUSTER INFO": parse_cluster_info,
762 "CLUSTER KEYSLOT": lambda x: int(x),
763 "CLUSTER MEET": bool_ok,
764 "CLUSTER NODES": parse_cluster_nodes,
765 "CLUSTER REPLICAS": parse_cluster_nodes,
766 "CLUSTER REPLICATE": bool_ok,
767 "CLUSTER RESET": bool_ok,
768 "CLUSTER SAVECONFIG": bool_ok,
769 "CLUSTER SET-CONFIG-EPOCH": bool_ok,
770 "CLUSTER SETSLOT": bool_ok,
771 "CLUSTER SLAVES": parse_cluster_nodes,
772 "COMMAND": parse_command,
773 "COMMAND COUNT": int,
774 "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
775 "CONFIG GET": parse_config_get,
776 "CONFIG RESETSTAT": bool_ok,
777 "CONFIG SET": bool_ok,
778 "DEBUG OBJECT": parse_debug_object,
779 "FUNCTION DELETE": bool_ok,
780 "FUNCTION FLUSH": bool_ok,
781 "FUNCTION RESTORE": bool_ok,
782 "GEOHASH": lambda r: list(map(str_if_bytes, r)),
783 "GEOPOS": lambda r: list(
784 map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)
785 ),
786 "GEOSEARCH": parse_geosearch_generic,
787 "GEORADIUS": parse_geosearch_generic,
788 "GEORADIUSBYMEMBER": parse_geosearch_generic,
789 "HGETALL": lambda r: r and pairs_to_dict(r) or {},
790 "HSCAN": parse_hscan,
791 "INFO": parse_info,
792 "LASTSAVE": timestamp_to_datetime,
793 "MEMORY PURGE": bool_ok,
794 "MEMORY STATS": parse_memory_stats,
795 "MEMORY USAGE": int_or_none,
796 "MODULE LOAD": parse_module_result,
797 "MODULE UNLOAD": parse_module_result,
798 "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
799 "OBJECT": parse_object,
800 "PING": lambda r: str_if_bytes(r) == "PONG",
801 "QUIT": bool_ok,
802 "STRALGO": parse_stralgo,
803 "PUBSUB NUMSUB": parse_pubsub_numsub,
804 "RANDOMKEY": lambda r: r and r or None,
805 "RESET": str_if_bytes,
806 "SCAN": parse_scan,
807 "SCRIPT EXISTS": lambda r: list(map(bool, r)),
808 "SCRIPT FLUSH": bool_ok,
809 "SCRIPT KILL": bool_ok,
810 "SCRIPT LOAD": str_if_bytes,
811 "SENTINEL CKQUORUM": bool_ok,
812 "SENTINEL FAILOVER": bool_ok,
813 "SENTINEL FLUSHCONFIG": bool_ok,
814 "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
815 "SENTINEL MASTER": parse_sentinel_master,
816 "SENTINEL MASTERS": parse_sentinel_masters,
817 "SENTINEL MONITOR": bool_ok,
818 "SENTINEL RESET": bool_ok,
819 "SENTINEL REMOVE": bool_ok,
820 "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
821 "SENTINEL SET": bool_ok,
822 "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
823 "SET": parse_set_result,
824 "SLOWLOG GET": parse_slowlog_get,
825 "SLOWLOG LEN": int,
826 "SLOWLOG RESET": bool_ok,
827 "SSCAN": parse_scan,
828 "TIME": lambda x: (int(x[0]), int(x[1])),
829 "XCLAIM": parse_xclaim,
830 "XAUTOCLAIM": parse_xautoclaim,
831 "XGROUP CREATE": bool_ok,
832 "XGROUP DELCONSUMER": int,
833 "XGROUP DESTROY": bool,
834 "XGROUP SETID": bool_ok,
835 "XINFO CONSUMERS": parse_list_of_dicts,
836 "XINFO GROUPS": parse_list_of_dicts,
837 "XINFO STREAM": parse_xinfo_stream,
838 "XPENDING": parse_xpending,
839 "ZADD": parse_zadd,
840 "ZSCAN": parse_zscan,
841 "ZMSCORE": parse_zmscore,
842 }
845class Redis(AbstractRedis, RedisModuleCommands, CoreCommands, SentinelCommands):
846 """
847 Implementation of the Redis protocol.
849 This abstract class provides a Python interface to all Redis commands
850 and an implementation of the Redis protocol.
852 Pipelines derive from this, implementing how
853 the commands are sent and received to the Redis server. Based on
854 configuration, an instance will either use a ConnectionPool, or
855 Connection object to talk to redis.
857 It is not safe to pass PubSub or Pipeline objects between threads.
858 """
860 @classmethod
861 def from_url(cls, url, **kwargs):
862 """
863 Return a Redis client object configured from the given URL
865 For example::
867 redis://[[username]:[password]]@localhost:6379/0
868 rediss://[[username]:[password]]@localhost:6379/0
869 unix://[username@]/path/to/socket.sock?db=0[&password=password]
871 Three URL schemes are supported:
873 - `redis://` creates a TCP socket connection. See more at:
874 <https://www.iana.org/assignments/uri-schemes/prov/redis>
875 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
876 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
877 - ``unix://``: creates a Unix Domain Socket connection.
879 The username, password, hostname, path and all querystring values
880 are passed through urllib.parse.unquote in order to replace any
881 percent-encoded values with their corresponding characters.
883 There are several ways to specify a database number. The first value
884 found will be used:
886 1. A ``db`` querystring option, e.g. redis://localhost?db=0
887 2. If using the redis:// or rediss:// schemes, the path argument
888 of the url, e.g. redis://localhost/0
889 3. A ``db`` keyword argument to this function.
891 If none of these options are specified, the default db=0 is used.
893 All querystring options are cast to their appropriate Python types.
894 Boolean arguments can be specified with string values "True"/"False"
895 or "Yes"/"No". Values that cannot be properly cast cause a
896 ``ValueError`` to be raised. Once parsed, the querystring arguments
897 and keyword arguments are passed to the ``ConnectionPool``'s
898 class initializer. In the case of conflicting arguments, querystring
899 arguments always win.
901 """
902 connection_pool = ConnectionPool.from_url(url, **kwargs)
903 return cls(connection_pool=connection_pool)
905 def __init__(
906 self,
907 host="localhost",
908 port=6379,
909 db=0,
910 password=None,
911 socket_timeout=None,
912 socket_connect_timeout=None,
913 socket_keepalive=None,
914 socket_keepalive_options=None,
915 connection_pool=None,
916 unix_socket_path=None,
917 encoding="utf-8",
918 encoding_errors="strict",
919 charset=None,
920 errors=None,
921 decode_responses=False,
922 retry_on_timeout=False,
923 retry_on_error=None,
924 ssl=False,
925 ssl_keyfile=None,
926 ssl_certfile=None,
927 ssl_cert_reqs="required",
928 ssl_ca_certs=None,
929 ssl_ca_path=None,
930 ssl_ca_data=None,
931 ssl_check_hostname=False,
932 ssl_password=None,
933 ssl_validate_ocsp=False,
934 ssl_validate_ocsp_stapled=False,
935 ssl_ocsp_context=None,
936 ssl_ocsp_expected_cert=None,
937 max_connections=None,
938 single_connection_client=False,
939 health_check_interval=0,
940 client_name=None,
941 username=None,
942 retry=None,
943 redis_connect_func=None,
944 credential_provider: Optional[CredentialProvider] = None,
945 ):
946 """
947 Initialize a new Redis client.
948 To specify a retry policy for specific errors, first set
949 `retry_on_error` to a list of the error/s to retry on, then set
950 `retry` to a valid `Retry` object.
951 To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
953 Args:
955 single_connection_client:
956 if `True`, connection pool is not used. In that case `Redis`
957 instance use is not thread safe.
958 """
959 if not connection_pool:
960 if charset is not None:
961 warnings.warn(
962 DeprecationWarning(
963 '"charset" is deprecated. Use "encoding" instead'
964 )
965 )
966 encoding = charset
967 if errors is not None:
968 warnings.warn(
969 DeprecationWarning(
970 '"errors" is deprecated. Use "encoding_errors" instead'
971 )
972 )
973 encoding_errors = errors
974 if not retry_on_error:
975 retry_on_error = []
976 if retry_on_timeout is True:
977 retry_on_error.append(TimeoutError)
978 kwargs = {
979 "db": db,
980 "username": username,
981 "password": password,
982 "socket_timeout": socket_timeout,
983 "encoding": encoding,
984 "encoding_errors": encoding_errors,
985 "decode_responses": decode_responses,
986 "retry_on_error": retry_on_error,
987 "retry": copy.deepcopy(retry),
988 "max_connections": max_connections,
989 "health_check_interval": health_check_interval,
990 "client_name": client_name,
991 "redis_connect_func": redis_connect_func,
992 "credential_provider": credential_provider,
993 }
994 # based on input, setup appropriate connection args
995 if unix_socket_path is not None:
996 kwargs.update(
997 {
998 "path": unix_socket_path,
999 "connection_class": UnixDomainSocketConnection,
1000 }
1001 )
1002 else:
1003 # TCP specific options
1004 kwargs.update(
1005 {
1006 "host": host,
1007 "port": port,
1008 "socket_connect_timeout": socket_connect_timeout,
1009 "socket_keepalive": socket_keepalive,
1010 "socket_keepalive_options": socket_keepalive_options,
1011 }
1012 )
1014 if ssl:
1015 kwargs.update(
1016 {
1017 "connection_class": SSLConnection,
1018 "ssl_keyfile": ssl_keyfile,
1019 "ssl_certfile": ssl_certfile,
1020 "ssl_cert_reqs": ssl_cert_reqs,
1021 "ssl_ca_certs": ssl_ca_certs,
1022 "ssl_ca_data": ssl_ca_data,
1023 "ssl_check_hostname": ssl_check_hostname,
1024 "ssl_password": ssl_password,
1025 "ssl_ca_path": ssl_ca_path,
1026 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
1027 "ssl_validate_ocsp": ssl_validate_ocsp,
1028 "ssl_ocsp_context": ssl_ocsp_context,
1029 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
1030 }
1031 )
1032 connection_pool = ConnectionPool(**kwargs)
1033 self.connection_pool = connection_pool
1034 self.connection = None
1035 if single_connection_client:
1036 self.connection = self.connection_pool.get_connection("_")
1038 self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS)
1040 def __repr__(self):
1041 return f"{type(self).__name__}<{repr(self.connection_pool)}>"
1043 def get_encoder(self):
1044 """Get the connection pool's encoder"""
1045 return self.connection_pool.get_encoder()
1047 def get_connection_kwargs(self):
1048 """Get the connection's key-word arguments"""
1049 return self.connection_pool.connection_kwargs
1051 def get_retry(self) -> Optional["Retry"]:
1052 return self.get_connection_kwargs().get("retry")
1054 def set_retry(self, retry: "Retry") -> None:
1055 self.get_connection_kwargs().update({"retry": retry})
1056 self.connection_pool.set_retry(retry)
1058 def set_response_callback(self, command, callback):
1059 """Set a custom Response Callback"""
1060 self.response_callbacks[command] = callback
1062 def load_external_module(self, funcname, func):
1063 """
1064 This function can be used to add externally defined redis modules,
1065 and their namespaces to the redis client.
1067 funcname - A string containing the name of the function to create
1068 func - The function, being added to this class.
1070 ex: Assume that one has a custom redis module named foomod that
1071 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
1072 To load function functions into this namespace:
1074 from redis import Redis
1075 from foomodule import F
1076 r = Redis()
1077 r.load_external_module("foo", F)
1078 r.foo().dothing('your', 'arguments')
1080 For a concrete example see the reimport of the redisjson module in
1081 tests/test_connection.py::test_loading_external_modules
1082 """
1083 setattr(self, funcname, func)
1085 def pipeline(self, transaction=True, shard_hint=None):
1086 """
1087 Return a new pipeline object that can queue multiple commands for
1088 later execution. ``transaction`` indicates whether all commands
1089 should be executed atomically. Apart from making a group of operations
1090 atomic, pipelines are useful for reducing the back-and-forth overhead
1091 between the client and server.
1092 """
1093 return Pipeline(
1094 self.connection_pool, self.response_callbacks, transaction, shard_hint
1095 )
1097 def transaction(self, func, *watches, **kwargs):
1098 """
1099 Convenience method for executing the callable `func` as a transaction
1100 while watching all keys specified in `watches`. The 'func' callable
1101 should expect a single argument which is a Pipeline object.
1102 """
1103 shard_hint = kwargs.pop("shard_hint", None)
1104 value_from_callable = kwargs.pop("value_from_callable", False)
1105 watch_delay = kwargs.pop("watch_delay", None)
1106 with self.pipeline(True, shard_hint) as pipe:
1107 while True:
1108 try:
1109 if watches:
1110 pipe.watch(*watches)
1111 func_value = func(pipe)
1112 exec_value = pipe.execute()
1113 return func_value if value_from_callable else exec_value
1114 except WatchError:
1115 if watch_delay is not None and watch_delay > 0:
1116 time.sleep(watch_delay)
1117 continue
1119 def lock(
1120 self,
1121 name,
1122 timeout=None,
1123 sleep=0.1,
1124 blocking=True,
1125 blocking_timeout=None,
1126 lock_class=None,
1127 thread_local=True,
1128 ):
1129 """
1130 Return a new Lock object using key ``name`` that mimics
1131 the behavior of threading.Lock.
1133 If specified, ``timeout`` indicates a maximum life for the lock.
1134 By default, it will remain locked until release() is called.
1136 ``sleep`` indicates the amount of time to sleep per loop iteration
1137 when the lock is in blocking mode and another client is currently
1138 holding the lock.
1140 ``blocking`` indicates whether calling ``acquire`` should block until
1141 the lock has been acquired or to fail immediately, causing ``acquire``
1142 to return False and the lock not being acquired. Defaults to True.
1143 Note this value can be overridden by passing a ``blocking``
1144 argument to ``acquire``.
1146 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1147 spend trying to acquire the lock. A value of ``None`` indicates
1148 continue trying forever. ``blocking_timeout`` can be specified as a
1149 float or integer, both representing the number of seconds to wait.
1151 ``lock_class`` forces the specified lock implementation. Note that as
1152 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1153 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1154 you have created your own custom lock class.
1156 ``thread_local`` indicates whether the lock token is placed in
1157 thread-local storage. By default, the token is placed in thread local
1158 storage so that a thread only sees its token, not a token set by
1159 another thread. Consider the following timeline:
1161 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1162 thread-1 sets the token to "abc"
1163 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1164 Lock instance.
1165 time: 5, thread-1 has not yet completed. redis expires the lock
1166 key.
1167 time: 5, thread-2 acquired `my-lock` now that it's available.
1168 thread-2 sets the token to "xyz"
1169 time: 6, thread-1 finishes its work and calls release(). if the
1170 token is *not* stored in thread local storage, then
1171 thread-1 would see the token value as "xyz" and would be
1172 able to successfully release the thread-2's lock.
1174 In some use cases it's necessary to disable thread local storage. For
1175 example, if you have code where one thread acquires a lock and passes
1176 that lock instance to a worker thread to release later. If thread
1177 local storage isn't disabled in this case, the worker thread won't see
1178 the token set by the thread that acquired the lock. Our assumption
1179 is that these cases aren't common and as such default to using
1180 thread local storage."""
1181 if lock_class is None:
1182 lock_class = Lock
1183 return lock_class(
1184 self,
1185 name,
1186 timeout=timeout,
1187 sleep=sleep,
1188 blocking=blocking,
1189 blocking_timeout=blocking_timeout,
1190 thread_local=thread_local,
1191 )
1193 def pubsub(self, **kwargs):
1194 """
1195 Return a Publish/Subscribe object. With this object, you can
1196 subscribe to channels and listen for messages that get published to
1197 them.
1198 """
1199 return PubSub(self.connection_pool, **kwargs)
1201 def monitor(self):
1202 return Monitor(self.connection_pool)
1204 def client(self):
1205 return self.__class__(
1206 connection_pool=self.connection_pool, single_connection_client=True
1207 )
1209 def __enter__(self):
1210 return self
1212 def __exit__(self, exc_type, exc_value, traceback):
1213 self.close()
1215 def __del__(self):
1216 self.close()
1218 def close(self):
1219 # In case a connection property does not yet exist
1220 # (due to a crash earlier in the Redis() constructor), return
1221 # immediately as there is nothing to clean-up.
1222 if not hasattr(self, "connection"):
1223 return
1225 conn = self.connection
1226 if conn:
1227 self.connection = None
1228 self.connection_pool.release(conn)
1230 def _send_command_parse_response(self, conn, command_name, *args, **options):
1231 """
1232 Send a command and parse the response
1233 """
1234 conn.send_command(*args)
1235 return self.parse_response(conn, command_name, **options)
1237 def _disconnect_raise(self, conn, error):
1238 """
1239 Close the connection and raise an exception
1240 if retry_on_error is not set or the error
1241 is not one of the specified error types
1242 """
1243 conn.disconnect()
1244 if (
1245 conn.retry_on_error is None
1246 or isinstance(error, tuple(conn.retry_on_error)) is False
1247 ):
1248 raise error
1250 # COMMAND EXECUTION AND PROTOCOL PARSING
1251 def execute_command(self, *args, **options):
1252 """Execute a command and return a parsed response"""
1253 pool = self.connection_pool
1254 command_name = args[0]
1255 conn = self.connection or pool.get_connection(command_name, **options)
1257 try:
1258 return conn.retry.call_with_retry(
1259 lambda: self._send_command_parse_response(
1260 conn, command_name, *args, **options
1261 ),
1262 lambda error: self._disconnect_raise(conn, error),
1263 )
1264 finally:
1265 if not self.connection:
1266 pool.release(conn)
1268 def parse_response(self, connection, command_name, **options):
1269 """Parses a response from the Redis server"""
1270 try:
1271 if NEVER_DECODE in options:
1272 response = connection.read_response(disable_decoding=True)
1273 options.pop(NEVER_DECODE)
1274 else:
1275 response = connection.read_response()
1276 except ResponseError:
1277 if EMPTY_RESPONSE in options:
1278 return options[EMPTY_RESPONSE]
1279 raise
1281 if EMPTY_RESPONSE in options:
1282 options.pop(EMPTY_RESPONSE)
1284 if command_name in self.response_callbacks:
1285 return self.response_callbacks[command_name](response, **options)
1286 return response
1289StrictRedis = Redis
1292class Monitor:
1293 """
1294 Monitor is useful for handling the MONITOR command to the redis server.
1295 next_command() method returns one command from monitor
1296 listen() method yields commands from monitor.
1297 """
1299 monitor_re = re.compile(r"\[(\d+) (.*)\] (.*)")
1300 command_re = re.compile(r'"(.*?)(?<!\\)"')
1302 def __init__(self, connection_pool):
1303 self.connection_pool = connection_pool
1304 self.connection = self.connection_pool.get_connection("MONITOR")
1306 def __enter__(self):
1307 self.connection.send_command("MONITOR")
1308 # check that monitor returns 'OK', but don't return it to user
1309 response = self.connection.read_response()
1310 if not bool_ok(response):
1311 raise RedisError(f"MONITOR failed: {response}")
1312 return self
1314 def __exit__(self, *args):
1315 self.connection.disconnect()
1316 self.connection_pool.release(self.connection)
1318 def next_command(self):
1319 """Parse the response from a monitor command"""
1320 response = self.connection.read_response()
1321 if isinstance(response, bytes):
1322 response = self.connection.encoder.decode(response, force=True)
1323 command_time, command_data = response.split(" ", 1)
1324 m = self.monitor_re.match(command_data)
1325 db_id, client_info, command = m.groups()
1326 command = " ".join(self.command_re.findall(command))
1327 # Redis escapes double quotes because each piece of the command
1328 # string is surrounded by double quotes. We don't have that
1329 # requirement so remove the escaping and leave the quote.
1330 command = command.replace('\\"', '"')
1332 if client_info == "lua":
1333 client_address = "lua"
1334 client_port = ""
1335 client_type = "lua"
1336 elif client_info.startswith("unix"):
1337 client_address = "unix"
1338 client_port = client_info[5:]
1339 client_type = "unix"
1340 else:
1341 # use rsplit as ipv6 addresses contain colons
1342 client_address, client_port = client_info.rsplit(":", 1)
1343 client_type = "tcp"
1344 return {
1345 "time": float(command_time),
1346 "db": int(db_id),
1347 "client_address": client_address,
1348 "client_port": client_port,
1349 "client_type": client_type,
1350 "command": command,
1351 }
1353 def listen(self):
1354 """Listen for commands coming to the server."""
1355 while True:
1356 yield self.next_command()
1359class PubSub:
1360 """
1361 PubSub provides publish, subscribe and listen support to Redis channels.
1363 After subscribing to one or more channels, the listen() method will block
1364 until a message arrives on one of the subscribed channels. That message
1365 will be returned and it's safe to start listening again.
1366 """
1368 PUBLISH_MESSAGE_TYPES = ("message", "pmessage")
1369 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe")
1370 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
1372 def __init__(
1373 self,
1374 connection_pool,
1375 shard_hint=None,
1376 ignore_subscribe_messages=False,
1377 encoder=None,
1378 ):
1379 self.connection_pool = connection_pool
1380 self.shard_hint = shard_hint
1381 self.ignore_subscribe_messages = ignore_subscribe_messages
1382 self.connection = None
1383 self.subscribed_event = threading.Event()
1384 # we need to know the encoding options for this connection in order
1385 # to lookup channel and pattern names for callback handlers.
1386 self.encoder = encoder
1387 if self.encoder is None:
1388 self.encoder = self.connection_pool.get_encoder()
1389 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
1390 if self.encoder.decode_responses:
1391 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
1392 else:
1393 self.health_check_response = [b"pong", self.health_check_response_b]
1394 self.reset()
1396 def __enter__(self):
1397 return self
1399 def __exit__(self, exc_type, exc_value, traceback):
1400 self.reset()
1402 def __del__(self):
1403 try:
1404 # if this object went out of scope prior to shutting down
1405 # subscriptions, close the connection manually before
1406 # returning it to the connection pool
1407 self.reset()
1408 except Exception:
1409 pass
1411 def reset(self):
1412 if self.connection:
1413 self.connection.disconnect()
1414 self.connection.clear_connect_callbacks()
1415 self.connection_pool.release(self.connection)
1416 self.connection = None
1417 self.channels = {}
1418 self.health_check_response_counter = 0
1419 self.pending_unsubscribe_channels = set()
1420 self.patterns = {}
1421 self.pending_unsubscribe_patterns = set()
1422 self.subscribed_event.clear()
1424 def close(self):
1425 self.reset()
1427 def on_connect(self, connection):
1428 "Re-subscribe to any channels and patterns previously subscribed to"
1429 # NOTE: for python3, we can't pass bytestrings as keyword arguments
1430 # so we need to decode channel/pattern names back to unicode strings
1431 # before passing them to [p]subscribe.
1432 self.pending_unsubscribe_channels.clear()
1433 self.pending_unsubscribe_patterns.clear()
1434 if self.channels:
1435 channels = {}
1436 for k, v in self.channels.items():
1437 channels[self.encoder.decode(k, force=True)] = v
1438 self.subscribe(**channels)
1439 if self.patterns:
1440 patterns = {}
1441 for k, v in self.patterns.items():
1442 patterns[self.encoder.decode(k, force=True)] = v
1443 self.psubscribe(**patterns)
1445 @property
1446 def subscribed(self):
1447 """Indicates if there are subscriptions to any channels or patterns"""
1448 return self.subscribed_event.is_set()
1450 def execute_command(self, *args):
1451 """Execute a publish/subscribe command"""
1453 # NOTE: don't parse the response in this function -- it could pull a
1454 # legitimate message off the stack if the connection is already
1455 # subscribed to one or more channels
1457 if self.connection is None:
1458 self.connection = self.connection_pool.get_connection(
1459 "pubsub", self.shard_hint
1460 )
1461 # register a callback that re-subscribes to any channels we
1462 # were listening to when we were disconnected
1463 self.connection.register_connect_callback(self.on_connect)
1464 connection = self.connection
1465 kwargs = {"check_health": not self.subscribed}
1466 if not self.subscribed:
1467 self.clean_health_check_responses()
1468 self._execute(connection, connection.send_command, *args, **kwargs)
1470 def clean_health_check_responses(self):
1471 """
1472 If any health check responses are present, clean them
1473 """
1474 ttl = 10
1475 conn = self.connection
1476 while self.health_check_response_counter > 0 and ttl > 0:
1477 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
1478 response = self._execute(conn, conn.read_response)
1479 if self.is_health_check_response(response):
1480 self.health_check_response_counter -= 1
1481 else:
1482 raise PubSubError(
1483 "A non health check response was cleaned by "
1484 "execute_command: {0}".format(response)
1485 )
1486 ttl -= 1
1488 def _disconnect_raise_connect(self, conn, error):
1489 """
1490 Close the connection and raise an exception
1491 if retry_on_timeout is not set or the error
1492 is not a TimeoutError. Otherwise, try to reconnect
1493 """
1494 conn.disconnect()
1495 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1496 raise error
1497 conn.connect()
1499 def _execute(self, conn, command, *args, **kwargs):
1500 """
1501 Connect manually upon disconnection. If the Redis server is down,
1502 this will fail and raise a ConnectionError as desired.
1503 After reconnection, the ``on_connect`` callback should have been
1504 called by the # connection to resubscribe us to any channels and
1505 patterns we were previously listening to
1506 """
1507 return conn.retry.call_with_retry(
1508 lambda: command(*args, **kwargs),
1509 lambda error: self._disconnect_raise_connect(conn, error),
1510 )
1512 def parse_response(self, block=True, timeout=0):
1513 """Parse the response from a publish/subscribe command"""
1514 conn = self.connection
1515 if conn is None:
1516 raise RuntimeError(
1517 "pubsub connection not set: "
1518 "did you forget to call subscribe() or psubscribe()?"
1519 )
1521 self.check_health()
1523 def try_read():
1524 if not block:
1525 if not conn.can_read(timeout=timeout):
1526 return None
1527 else:
1528 conn.connect()
1529 return conn.read_response()
1531 response = self._execute(conn, try_read)
1533 if self.is_health_check_response(response):
1534 # ignore the health check message as user might not expect it
1535 self.health_check_response_counter -= 1
1536 return None
1537 return response
1539 def is_health_check_response(self, response):
1540 """
1541 Check if the response is a health check response.
1542 If there are no subscriptions redis responds to PING command with a
1543 bulk response, instead of a multi-bulk with "pong" and the response.
1544 """
1545 return response in [
1546 self.health_check_response, # If there was a subscription
1547 self.health_check_response_b, # If there wasn't
1548 ]
1550 def check_health(self):
1551 conn = self.connection
1552 if conn is None:
1553 raise RuntimeError(
1554 "pubsub connection not set: "
1555 "did you forget to call subscribe() or psubscribe()?"
1556 )
1558 if conn.health_check_interval and time.time() > conn.next_health_check:
1559 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1560 self.health_check_response_counter += 1
1562 def _normalize_keys(self, data):
1563 """
1564 normalize channel/pattern names to be either bytes or strings
1565 based on whether responses are automatically decoded. this saves us
1566 from coercing the value for each message coming in.
1567 """
1568 encode = self.encoder.encode
1569 decode = self.encoder.decode
1570 return {decode(encode(k)): v for k, v in data.items()}
1572 def psubscribe(self, *args, **kwargs):
1573 """
1574 Subscribe to channel patterns. Patterns supplied as keyword arguments
1575 expect a pattern name as the key and a callable as the value. A
1576 pattern's callable will be invoked automatically when a message is
1577 received on that pattern rather than producing a message via
1578 ``listen()``.
1579 """
1580 if args:
1581 args = list_or_args(args[0], args[1:])
1582 new_patterns = dict.fromkeys(args)
1583 new_patterns.update(kwargs)
1584 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1585 # update the patterns dict AFTER we send the command. we don't want to
1586 # subscribe twice to these patterns, once for the command and again
1587 # for the reconnection.
1588 new_patterns = self._normalize_keys(new_patterns)
1589 self.patterns.update(new_patterns)
1590 if not self.subscribed:
1591 # Set the subscribed_event flag to True
1592 self.subscribed_event.set()
1593 # Clear the health check counter
1594 self.health_check_response_counter = 0
1595 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1596 return ret_val
1598 def punsubscribe(self, *args):
1599 """
1600 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1601 all patterns.
1602 """
1603 if args:
1604 args = list_or_args(args[0], args[1:])
1605 patterns = self._normalize_keys(dict.fromkeys(args))
1606 else:
1607 patterns = self.patterns
1608 self.pending_unsubscribe_patterns.update(patterns)
1609 return self.execute_command("PUNSUBSCRIBE", *args)
1611 def subscribe(self, *args, **kwargs):
1612 """
1613 Subscribe to channels. Channels supplied as keyword arguments expect
1614 a channel name as the key and a callable as the value. A channel's
1615 callable will be invoked automatically when a message is received on
1616 that channel rather than producing a message via ``listen()`` or
1617 ``get_message()``.
1618 """
1619 if args:
1620 args = list_or_args(args[0], args[1:])
1621 new_channels = dict.fromkeys(args)
1622 new_channels.update(kwargs)
1623 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1624 # update the channels dict AFTER we send the command. we don't want to
1625 # subscribe twice to these channels, once for the command and again
1626 # for the reconnection.
1627 new_channels = self._normalize_keys(new_channels)
1628 self.channels.update(new_channels)
1629 if not self.subscribed:
1630 # Set the subscribed_event flag to True
1631 self.subscribed_event.set()
1632 # Clear the health check counter
1633 self.health_check_response_counter = 0
1634 self.pending_unsubscribe_channels.difference_update(new_channels)
1635 return ret_val
1637 def unsubscribe(self, *args):
1638 """
1639 Unsubscribe from the supplied channels. If empty, unsubscribe from
1640 all channels
1641 """
1642 if args:
1643 args = list_or_args(args[0], args[1:])
1644 channels = self._normalize_keys(dict.fromkeys(args))
1645 else:
1646 channels = self.channels
1647 self.pending_unsubscribe_channels.update(channels)
1648 return self.execute_command("UNSUBSCRIBE", *args)
1650 def listen(self):
1651 "Listen for messages on channels this client has been subscribed to"
1652 while self.subscribed:
1653 response = self.handle_message(self.parse_response(block=True))
1654 if response is not None:
1655 yield response
1657 def get_message(self, ignore_subscribe_messages=False, timeout=0.0):
1658 """
1659 Get the next message if one is available, otherwise None.
1661 If timeout is specified, the system will wait for `timeout` seconds
1662 before returning. Timeout should be specified as a floating point
1663 number, or None, to wait indefinitely.
1664 """
1665 if not self.subscribed:
1666 # Wait for subscription
1667 start_time = time.time()
1668 if self.subscribed_event.wait(timeout) is True:
1669 # The connection was subscribed during the timeout time frame.
1670 # The timeout should be adjusted based on the time spent
1671 # waiting for the subscription
1672 time_spent = time.time() - start_time
1673 timeout = max(0.0, timeout - time_spent)
1674 else:
1675 # The connection isn't subscribed to any channels or patterns,
1676 # so no messages are available
1677 return None
1679 response = self.parse_response(block=(timeout is None), timeout=timeout)
1680 if response:
1681 return self.handle_message(response, ignore_subscribe_messages)
1682 return None
1684 def ping(self, message=None):
1685 """
1686 Ping the Redis server
1687 """
1688 message = "" if message is None else message
1689 return self.execute_command("PING", message)
1691 def handle_message(self, response, ignore_subscribe_messages=False):
1692 """
1693 Parses a pub/sub message. If the channel or pattern was subscribed to
1694 with a message handler, the handler is invoked instead of a parsed
1695 message being returned.
1696 """
1697 if response is None:
1698 return None
1699 message_type = str_if_bytes(response[0])
1700 if message_type == "pmessage":
1701 message = {
1702 "type": message_type,
1703 "pattern": response[1],
1704 "channel": response[2],
1705 "data": response[3],
1706 }
1707 elif message_type == "pong":
1708 message = {
1709 "type": message_type,
1710 "pattern": None,
1711 "channel": None,
1712 "data": response[1],
1713 }
1714 else:
1715 message = {
1716 "type": message_type,
1717 "pattern": None,
1718 "channel": response[1],
1719 "data": response[2],
1720 }
1722 # if this is an unsubscribe message, remove it from memory
1723 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1724 if message_type == "punsubscribe":
1725 pattern = response[1]
1726 if pattern in self.pending_unsubscribe_patterns:
1727 self.pending_unsubscribe_patterns.remove(pattern)
1728 self.patterns.pop(pattern, None)
1729 else:
1730 channel = response[1]
1731 if channel in self.pending_unsubscribe_channels:
1732 self.pending_unsubscribe_channels.remove(channel)
1733 self.channels.pop(channel, None)
1734 if not self.channels and not self.patterns:
1735 # There are no subscriptions anymore, set subscribed_event flag
1736 # to false
1737 self.subscribed_event.clear()
1739 if message_type in self.PUBLISH_MESSAGE_TYPES:
1740 # if there's a message handler, invoke it
1741 if message_type == "pmessage":
1742 handler = self.patterns.get(message["pattern"], None)
1743 else:
1744 handler = self.channels.get(message["channel"], None)
1745 if handler:
1746 handler(message)
1747 return None
1748 elif message_type != "pong":
1749 # this is a subscribe/unsubscribe message. ignore if we don't
1750 # want them
1751 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1752 return None
1754 return message
1756 def run_in_thread(self, sleep_time=0, daemon=False, exception_handler=None):
1757 for channel, handler in self.channels.items():
1758 if handler is None:
1759 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1760 for pattern, handler in self.patterns.items():
1761 if handler is None:
1762 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1764 thread = PubSubWorkerThread(
1765 self, sleep_time, daemon=daemon, exception_handler=exception_handler
1766 )
1767 thread.start()
1768 return thread
1771class PubSubWorkerThread(threading.Thread):
1772 def __init__(self, pubsub, sleep_time, daemon=False, exception_handler=None):
1773 super().__init__()
1774 self.daemon = daemon
1775 self.pubsub = pubsub
1776 self.sleep_time = sleep_time
1777 self.exception_handler = exception_handler
1778 self._running = threading.Event()
1780 def run(self):
1781 if self._running.is_set():
1782 return
1783 self._running.set()
1784 pubsub = self.pubsub
1785 sleep_time = self.sleep_time
1786 while self._running.is_set():
1787 try:
1788 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
1789 except BaseException as e:
1790 if self.exception_handler is None:
1791 raise
1792 self.exception_handler(e, pubsub, self)
1793 pubsub.close()
1795 def stop(self):
1796 # trip the flag so the run loop exits. the run loop will
1797 # close the pubsub connection, which disconnects the socket
1798 # and returns the connection to the pool.
1799 self._running.clear()
1802class Pipeline(Redis):
1803 """
1804 Pipelines provide a way to transmit multiple commands to the Redis server
1805 in one transmission. This is convenient for batch processing, such as
1806 saving all the values in a list to Redis.
1808 All commands executed within a pipeline are wrapped with MULTI and EXEC
1809 calls. This guarantees all commands executed in the pipeline will be
1810 executed atomically.
1812 Any command raising an exception does *not* halt the execution of
1813 subsequent commands in the pipeline. Instead, the exception is caught
1814 and its instance is placed into the response list returned by execute().
1815 Code iterating over the response list should be able to deal with an
1816 instance of an exception as a potential value. In general, these will be
1817 ResponseError exceptions, such as those raised when issuing a command
1818 on a key of a different datatype.
1819 """
1821 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1823 def __init__(self, connection_pool, response_callbacks, transaction, shard_hint):
1824 self.connection_pool = connection_pool
1825 self.connection = None
1826 self.response_callbacks = response_callbacks
1827 self.transaction = transaction
1828 self.shard_hint = shard_hint
1830 self.watching = False
1831 self.reset()
1833 def __enter__(self):
1834 return self
1836 def __exit__(self, exc_type, exc_value, traceback):
1837 self.reset()
1839 def __del__(self):
1840 try:
1841 self.reset()
1842 except Exception:
1843 pass
1845 def __len__(self):
1846 return len(self.command_stack)
1848 def __bool__(self):
1849 """Pipeline instances should always evaluate to True"""
1850 return True
1852 def reset(self):
1853 self.command_stack = []
1854 self.scripts = set()
1855 # make sure to reset the connection state in the event that we were
1856 # watching something
1857 if self.watching and self.connection:
1858 try:
1859 # call this manually since our unwatch or
1860 # immediate_execute_command methods can call reset()
1861 self.connection.send_command("UNWATCH")
1862 self.connection.read_response()
1863 except ConnectionError:
1864 # disconnect will also remove any previous WATCHes
1865 self.connection.disconnect()
1866 # clean up the other instance attributes
1867 self.watching = False
1868 self.explicit_transaction = False
1869 # we can safely return the connection to the pool here since we're
1870 # sure we're no longer WATCHing anything
1871 if self.connection:
1872 self.connection_pool.release(self.connection)
1873 self.connection = None
1875 def multi(self):
1876 """
1877 Start a transactional block of the pipeline after WATCH commands
1878 are issued. End the transactional block with `execute`.
1879 """
1880 if self.explicit_transaction:
1881 raise RedisError("Cannot issue nested calls to MULTI")
1882 if self.command_stack:
1883 raise RedisError(
1884 "Commands without an initial WATCH have already been issued"
1885 )
1886 self.explicit_transaction = True
1888 def execute_command(self, *args, **kwargs):
1889 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1890 return self.immediate_execute_command(*args, **kwargs)
1891 return self.pipeline_execute_command(*args, **kwargs)
1893 def _disconnect_reset_raise(self, conn, error):
1894 """
1895 Close the connection, reset watching state and
1896 raise an exception if we were watching,
1897 retry_on_timeout is not set,
1898 or the error is not a TimeoutError
1899 """
1900 conn.disconnect()
1901 # if we were already watching a variable, the watch is no longer
1902 # valid since this connection has died. raise a WatchError, which
1903 # indicates the user should retry this transaction.
1904 if self.watching:
1905 self.reset()
1906 raise WatchError(
1907 "A ConnectionError occurred on while watching one or more keys"
1908 )
1909 # if retry_on_timeout is not set, or the error is not
1910 # a TimeoutError, raise it
1911 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1912 self.reset()
1913 raise
1915 def immediate_execute_command(self, *args, **options):
1916 """
1917 Execute a command immediately, but don't auto-retry on a
1918 ConnectionError if we're already WATCHing a variable. Used when
1919 issuing WATCH or subsequent commands retrieving their values but before
1920 MULTI is called.
1921 """
1922 command_name = args[0]
1923 conn = self.connection
1924 # if this is the first call, we need a connection
1925 if not conn:
1926 conn = self.connection_pool.get_connection(command_name, self.shard_hint)
1927 self.connection = conn
1929 return conn.retry.call_with_retry(
1930 lambda: self._send_command_parse_response(
1931 conn, command_name, *args, **options
1932 ),
1933 lambda error: self._disconnect_reset_raise(conn, error),
1934 )
1936 def pipeline_execute_command(self, *args, **options):
1937 """
1938 Stage a command to be executed when execute() is next called
1940 Returns the current Pipeline object back so commands can be
1941 chained together, such as:
1943 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1945 At some other point, you can then run: pipe.execute(),
1946 which will execute all commands queued in the pipe.
1947 """
1948 self.command_stack.append((args, options))
1949 return self
1951 def _execute_transaction(self, connection, commands, raise_on_error):
1952 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1953 all_cmds = connection.pack_commands(
1954 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1955 )
1956 connection.send_packed_command(all_cmds)
1957 errors = []
1959 # parse off the response for MULTI
1960 # NOTE: we need to handle ResponseErrors here and continue
1961 # so that we read all the additional command messages from
1962 # the socket
1963 try:
1964 self.parse_response(connection, "_")
1965 except ResponseError as e:
1966 errors.append((0, e))
1968 # and all the other commands
1969 for i, command in enumerate(commands):
1970 if EMPTY_RESPONSE in command[1]:
1971 errors.append((i, command[1][EMPTY_RESPONSE]))
1972 else:
1973 try:
1974 self.parse_response(connection, "_")
1975 except ResponseError as e:
1976 self.annotate_exception(e, i + 1, command[0])
1977 errors.append((i, e))
1979 # parse the EXEC.
1980 try:
1981 response = self.parse_response(connection, "_")
1982 except ExecAbortError:
1983 if errors:
1984 raise errors[0][1]
1985 raise
1987 # EXEC clears any watched keys
1988 self.watching = False
1990 if response is None:
1991 raise WatchError("Watched variable changed.")
1993 # put any parse errors into the response
1994 for i, e in errors:
1995 response.insert(i, e)
1997 if len(response) != len(commands):
1998 self.connection.disconnect()
1999 raise ResponseError(
2000 "Wrong number of response items from pipeline execution"
2001 )
2003 # find any errors in the response and raise if necessary
2004 if raise_on_error:
2005 self.raise_first_error(commands, response)
2007 # We have to run response callbacks manually
2008 data = []
2009 for r, cmd in zip(response, commands):
2010 if not isinstance(r, Exception):
2011 args, options = cmd
2012 command_name = args[0]
2013 if command_name in self.response_callbacks:
2014 r = self.response_callbacks[command_name](r, **options)
2015 data.append(r)
2016 return data
2018 def _execute_pipeline(self, connection, commands, raise_on_error):
2019 # build up all commands into a single request to increase network perf
2020 all_cmds = connection.pack_commands([args for args, _ in commands])
2021 connection.send_packed_command(all_cmds)
2023 response = []
2024 for args, options in commands:
2025 try:
2026 response.append(self.parse_response(connection, args[0], **options))
2027 except ResponseError as e:
2028 response.append(e)
2030 if raise_on_error:
2031 self.raise_first_error(commands, response)
2032 return response
2034 def raise_first_error(self, commands, response):
2035 for i, r in enumerate(response):
2036 if isinstance(r, ResponseError):
2037 self.annotate_exception(r, i + 1, commands[i][0])
2038 raise r
2040 def annotate_exception(self, exception, number, command):
2041 cmd = " ".join(map(safe_str, command))
2042 msg = (
2043 f"Command # {number} ({cmd}) of pipeline "
2044 f"caused error: {exception.args[0]}"
2045 )
2046 exception.args = (msg,) + exception.args[1:]
2048 def parse_response(self, connection, command_name, **options):
2049 result = Redis.parse_response(self, connection, command_name, **options)
2050 if command_name in self.UNWATCH_COMMANDS:
2051 self.watching = False
2052 elif command_name == "WATCH":
2053 self.watching = True
2054 return result
2056 def load_scripts(self):
2057 # make sure all scripts that are about to be run on this pipeline exist
2058 scripts = list(self.scripts)
2059 immediate = self.immediate_execute_command
2060 shas = [s.sha for s in scripts]
2061 # we can't use the normal script_* methods because they would just
2062 # get buffered in the pipeline.
2063 exists = immediate("SCRIPT EXISTS", *shas)
2064 if not all(exists):
2065 for s, exist in zip(scripts, exists):
2066 if not exist:
2067 s.sha = immediate("SCRIPT LOAD", s.script)
2069 def _disconnect_raise_reset(self, conn, error):
2070 """
2071 Close the connection, raise an exception if we were watching,
2072 and raise an exception if retry_on_timeout is not set,
2073 or the error is not a TimeoutError
2074 """
2075 conn.disconnect()
2076 # if we were watching a variable, the watch is no longer valid
2077 # since this connection has died. raise a WatchError, which
2078 # indicates the user should retry this transaction.
2079 if self.watching:
2080 raise WatchError(
2081 "A ConnectionError occurred on while watching one or more keys"
2082 )
2083 # if retry_on_timeout is not set, or the error is not
2084 # a TimeoutError, raise it
2085 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
2086 self.reset()
2087 raise
2089 def execute(self, raise_on_error=True):
2090 """Execute all the commands in the current pipeline"""
2091 stack = self.command_stack
2092 if not stack and not self.watching:
2093 return []
2094 if self.scripts:
2095 self.load_scripts()
2096 if self.transaction or self.explicit_transaction:
2097 execute = self._execute_transaction
2098 else:
2099 execute = self._execute_pipeline
2101 conn = self.connection
2102 if not conn:
2103 conn = self.connection_pool.get_connection("MULTI", self.shard_hint)
2104 # assign to self.connection so reset() releases the connection
2105 # back to the pool after we're done
2106 self.connection = conn
2108 try:
2109 return conn.retry.call_with_retry(
2110 lambda: execute(conn, stack, raise_on_error),
2111 lambda error: self._disconnect_raise_reset(conn, error),
2112 )
2113 finally:
2114 self.reset()
2116 def discard(self):
2117 """
2118 Flushes all previously queued commands
2119 See: https://redis.io/commands/DISCARD
2120 """
2121 self.execute_command("DISCARD")
2123 def watch(self, *names):
2124 """Watches the values at keys ``names``"""
2125 if self.explicit_transaction:
2126 raise RedisError("Cannot issue a WATCH after a MULTI")
2127 return self.execute_command("WATCH", *names)
2129 def unwatch(self):
2130 """Unwatches all previously specified keys"""
2131 return self.watching and self.execute_command("UNWATCH") or True