Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/client.py: 24%
941 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
1import copy
2import datetime
3import re
4import threading
5import time
6import warnings
7from itertools import chain
8from typing import Optional
10from redis.commands import (
11 CoreCommands,
12 RedisModuleCommands,
13 SentinelCommands,
14 list_or_args,
15)
16from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
17from redis.credentials import CredentialProvider
18from redis.exceptions import (
19 ConnectionError,
20 ExecAbortError,
21 ModuleError,
22 PubSubError,
23 RedisError,
24 ResponseError,
25 TimeoutError,
26 WatchError,
27)
28from redis.lock import Lock
29from redis.retry import Retry
30from redis.utils import safe_str, str_if_bytes
32SYM_EMPTY = b""
33EMPTY_RESPONSE = "EMPTY_RESPONSE"
35# some responses (ie. dump) are binary, and just meant to never be decoded
36NEVER_DECODE = "NEVER_DECODE"
39def timestamp_to_datetime(response):
40 "Converts a unix timestamp to a Python datetime object"
41 if not response:
42 return None
43 try:
44 response = int(response)
45 except ValueError:
46 return None
47 return datetime.datetime.fromtimestamp(response)
50def string_keys_to_dict(key_string, callback):
51 return dict.fromkeys(key_string.split(), callback)
54class CaseInsensitiveDict(dict):
55 "Case insensitive dict implementation. Assumes string keys only."
57 def __init__(self, data):
58 for k, v in data.items():
59 self[k.upper()] = v
61 def __contains__(self, k):
62 return super().__contains__(k.upper())
64 def __delitem__(self, k):
65 super().__delitem__(k.upper())
67 def __getitem__(self, k):
68 return super().__getitem__(k.upper())
70 def get(self, k, default=None):
71 return super().get(k.upper(), default)
73 def __setitem__(self, k, v):
74 super().__setitem__(k.upper(), v)
76 def update(self, data):
77 data = CaseInsensitiveDict(data)
78 super().update(data)
81def parse_debug_object(response):
82 "Parse the results of Redis's DEBUG OBJECT command into a Python dict"
83 # The 'type' of the object is the first item in the response, but isn't
84 # prefixed with a name
85 response = str_if_bytes(response)
86 response = "type:" + response
87 response = dict(kv.split(":") for kv in response.split())
89 # parse some expected int values from the string response
90 # note: this cmd isn't spec'd so these may not appear in all redis versions
91 int_fields = ("refcount", "serializedlength", "lru", "lru_seconds_idle")
92 for field in int_fields:
93 if field in response:
94 response[field] = int(response[field])
96 return response
99def parse_object(response, infotype):
100 """Parse the results of an OBJECT command"""
101 if infotype in ("idletime", "refcount"):
102 return int_or_none(response)
103 return response
106def parse_info(response):
107 """Parse the result of Redis's INFO command into a Python dict"""
108 info = {}
109 response = str_if_bytes(response)
111 def get_value(value):
112 if "," not in value or "=" not in value:
113 try:
114 if "." in value:
115 return float(value)
116 else:
117 return int(value)
118 except ValueError:
119 return value
120 else:
121 sub_dict = {}
122 for item in value.split(","):
123 k, v = item.rsplit("=", 1)
124 sub_dict[k] = get_value(v)
125 return sub_dict
127 for line in response.splitlines():
128 if line and not line.startswith("#"):
129 if line.find(":") != -1:
130 # Split, the info fields keys and values.
131 # Note that the value may contain ':'. but the 'host:'
132 # pseudo-command is the only case where the key contains ':'
133 key, value = line.split(":", 1)
134 if key == "cmdstat_host":
135 key, value = line.rsplit(":", 1)
137 if key == "module":
138 # Hardcode a list for key 'modules' since there could be
139 # multiple lines that started with 'module'
140 info.setdefault("modules", []).append(get_value(value))
141 else:
142 info[key] = get_value(value)
143 else:
144 # if the line isn't splittable, append it to the "__raw__" key
145 info.setdefault("__raw__", []).append(line)
147 return info
150def parse_memory_stats(response, **kwargs):
151 """Parse the results of MEMORY STATS"""
152 stats = pairs_to_dict(response, decode_keys=True, decode_string_values=True)
153 for key, value in stats.items():
154 if key.startswith("db."):
155 stats[key] = pairs_to_dict(
156 value, decode_keys=True, decode_string_values=True
157 )
158 return stats
161SENTINEL_STATE_TYPES = {
162 "can-failover-its-master": int,
163 "config-epoch": int,
164 "down-after-milliseconds": int,
165 "failover-timeout": int,
166 "info-refresh": int,
167 "last-hello-message": int,
168 "last-ok-ping-reply": int,
169 "last-ping-reply": int,
170 "last-ping-sent": int,
171 "master-link-down-time": int,
172 "master-port": int,
173 "num-other-sentinels": int,
174 "num-slaves": int,
175 "o-down-time": int,
176 "pending-commands": int,
177 "parallel-syncs": int,
178 "port": int,
179 "quorum": int,
180 "role-reported-time": int,
181 "s-down-time": int,
182 "slave-priority": int,
183 "slave-repl-offset": int,
184 "voted-leader-epoch": int,
185}
188def parse_sentinel_state(item):
189 result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
190 flags = set(result["flags"].split(","))
191 for name, flag in (
192 ("is_master", "master"),
193 ("is_slave", "slave"),
194 ("is_sdown", "s_down"),
195 ("is_odown", "o_down"),
196 ("is_sentinel", "sentinel"),
197 ("is_disconnected", "disconnected"),
198 ("is_master_down", "master_down"),
199 ):
200 result[name] = flag in flags
201 return result
204def parse_sentinel_master(response):
205 return parse_sentinel_state(map(str_if_bytes, response))
208def parse_sentinel_masters(response):
209 result = {}
210 for item in response:
211 state = parse_sentinel_state(map(str_if_bytes, item))
212 result[state["name"]] = state
213 return result
216def parse_sentinel_slaves_and_sentinels(response):
217 return [parse_sentinel_state(map(str_if_bytes, item)) for item in response]
220def parse_sentinel_get_master(response):
221 return response and (response[0], int(response[1])) or None
224def pairs_to_dict(response, decode_keys=False, decode_string_values=False):
225 """Create a dict given a list of key/value pairs"""
226 if response is None:
227 return {}
228 if decode_keys or decode_string_values:
229 # the iter form is faster, but I don't know how to make that work
230 # with a str_if_bytes() map
231 keys = response[::2]
232 if decode_keys:
233 keys = map(str_if_bytes, keys)
234 values = response[1::2]
235 if decode_string_values:
236 values = map(str_if_bytes, values)
237 return dict(zip(keys, values))
238 else:
239 it = iter(response)
240 return dict(zip(it, it))
243def pairs_to_dict_typed(response, type_info):
244 it = iter(response)
245 result = {}
246 for key, value in zip(it, it):
247 if key in type_info:
248 try:
249 value = type_info[key](value)
250 except Exception:
251 # if for some reason the value can't be coerced, just use
252 # the string value
253 pass
254 result[key] = value
255 return result
258def zset_score_pairs(response, **options):
259 """
260 If ``withscores`` is specified in the options, return the response as
261 a list of (value, score) pairs
262 """
263 if not response or not options.get("withscores"):
264 return response
265 score_cast_func = options.get("score_cast_func", float)
266 it = iter(response)
267 return list(zip(it, map(score_cast_func, it)))
270def sort_return_tuples(response, **options):
271 """
272 If ``groups`` is specified, return the response as a list of
273 n-element tuples with n being the value found in options['groups']
274 """
275 if not response or not options.get("groups"):
276 return response
277 n = options["groups"]
278 return list(zip(*[response[i::n] for i in range(n)]))
281def int_or_none(response):
282 if response is None:
283 return None
284 return int(response)
287def parse_stream_list(response):
288 if response is None:
289 return None
290 data = []
291 for r in response:
292 if r is not None:
293 data.append((r[0], pairs_to_dict(r[1])))
294 else:
295 data.append((None, None))
296 return data
299def pairs_to_dict_with_str_keys(response):
300 return pairs_to_dict(response, decode_keys=True)
303def parse_list_of_dicts(response):
304 return list(map(pairs_to_dict_with_str_keys, response))
307def parse_xclaim(response, **options):
308 if options.get("parse_justid", False):
309 return response
310 return parse_stream_list(response)
313def parse_xautoclaim(response, **options):
314 if options.get("parse_justid", False):
315 return response[1]
316 response[1] = parse_stream_list(response[1])
317 return response
320def parse_xinfo_stream(response, **options):
321 data = pairs_to_dict(response, decode_keys=True)
322 if not options.get("full", False):
323 first = data["first-entry"]
324 if first is not None:
325 data["first-entry"] = (first[0], pairs_to_dict(first[1]))
326 last = data["last-entry"]
327 if last is not None:
328 data["last-entry"] = (last[0], pairs_to_dict(last[1]))
329 else:
330 data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]}
331 data["groups"] = [
332 pairs_to_dict(group, decode_keys=True) for group in data["groups"]
333 ]
334 return data
337def parse_xread(response):
338 if response is None:
339 return []
340 return [[r[0], parse_stream_list(r[1])] for r in response]
343def parse_xpending(response, **options):
344 if options.get("parse_detail", False):
345 return parse_xpending_range(response)
346 consumers = [{"name": n, "pending": int(p)} for n, p in response[3] or []]
347 return {
348 "pending": response[0],
349 "min": response[1],
350 "max": response[2],
351 "consumers": consumers,
352 }
355def parse_xpending_range(response):
356 k = ("message_id", "consumer", "time_since_delivered", "times_delivered")
357 return [dict(zip(k, r)) for r in response]
360def float_or_none(response):
361 if response is None:
362 return None
363 return float(response)
366def bool_ok(response):
367 return str_if_bytes(response) == "OK"
370def parse_zadd(response, **options):
371 if response is None:
372 return None
373 if options.get("as_score"):
374 return float(response)
375 return int(response)
378def parse_client_list(response, **options):
379 clients = []
380 for c in str_if_bytes(response).splitlines():
381 # Values might contain '='
382 clients.append(dict(pair.split("=", 1) for pair in c.split(" ")))
383 return clients
386def parse_config_get(response, **options):
387 response = [str_if_bytes(i) if i is not None else None for i in response]
388 return response and pairs_to_dict(response) or {}
391def parse_scan(response, **options):
392 cursor, r = response
393 return int(cursor), r
396def parse_hscan(response, **options):
397 cursor, r = response
398 return int(cursor), r and pairs_to_dict(r) or {}
401def parse_zscan(response, **options):
402 score_cast_func = options.get("score_cast_func", float)
403 cursor, r = response
404 it = iter(r)
405 return int(cursor), list(zip(it, map(score_cast_func, it)))
408def parse_zmscore(response, **options):
409 # zmscore: list of scores (double precision floating point number) or nil
410 return [float(score) if score is not None else None for score in response]
413def parse_slowlog_get(response, **options):
414 space = " " if options.get("decode_responses", False) else b" "
416 def parse_item(item):
417 result = {"id": item[0], "start_time": int(item[1]), "duration": int(item[2])}
418 # Redis Enterprise injects another entry at index [3], which has
419 # the complexity info (i.e. the value N in case the command has
420 # an O(N) complexity) instead of the command.
421 if isinstance(item[3], list):
422 result["command"] = space.join(item[3])
423 result["client_address"] = item[4]
424 result["client_name"] = item[5]
425 else:
426 result["complexity"] = item[3]
427 result["command"] = space.join(item[4])
428 result["client_address"] = item[5]
429 result["client_name"] = item[6]
430 return result
432 return [parse_item(item) for item in response]
435def parse_stralgo(response, **options):
436 """
437 Parse the response from `STRALGO` command.
438 Without modifiers the returned value is string.
439 When LEN is given the command returns the length of the result
440 (i.e integer).
441 When IDX is given the command returns a dictionary with the LCS
442 length and all the ranges in both the strings, start and end
443 offset for each string, where there are matches.
444 When WITHMATCHLEN is given, each array representing a match will
445 also have the length of the match at the beginning of the array.
446 """
447 if options.get("len", False):
448 return int(response)
449 if options.get("idx", False):
450 if options.get("withmatchlen", False):
451 matches = [
452 [(int(match[-1]))] + list(map(tuple, match[:-1]))
453 for match in response[1]
454 ]
455 else:
456 matches = [list(map(tuple, match)) for match in response[1]]
457 return {
458 str_if_bytes(response[0]): matches,
459 str_if_bytes(response[2]): int(response[3]),
460 }
461 return str_if_bytes(response)
464def parse_cluster_info(response, **options):
465 response = str_if_bytes(response)
466 return dict(line.split(":") for line in response.splitlines() if line)
469def _parse_node_line(line):
470 line_items = line.split(" ")
471 node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8]
472 addr = addr.split("@")[0]
473 node_dict = {
474 "node_id": node_id,
475 "flags": flags,
476 "master_id": master_id,
477 "last_ping_sent": ping,
478 "last_pong_rcvd": pong,
479 "epoch": epoch,
480 "slots": [],
481 "migrations": [],
482 "connected": True if connected == "connected" else False,
483 }
484 if len(line_items) >= 9:
485 slots, migrations = _parse_slots(line_items[8:])
486 node_dict["slots"], node_dict["migrations"] = slots, migrations
487 return addr, node_dict
490def _parse_slots(slot_ranges):
491 slots, migrations = [], []
492 for s_range in slot_ranges:
493 if "->-" in s_range:
494 slot_id, dst_node_id = s_range[1:-1].split("->-", 1)
495 migrations.append(
496 {"slot": slot_id, "node_id": dst_node_id, "state": "migrating"}
497 )
498 elif "-<-" in s_range:
499 slot_id, src_node_id = s_range[1:-1].split("-<-", 1)
500 migrations.append(
501 {"slot": slot_id, "node_id": src_node_id, "state": "importing"}
502 )
503 else:
504 s_range = [sl for sl in s_range.split("-")]
505 slots.append(s_range)
507 return slots, migrations
510def parse_cluster_nodes(response, **options):
511 """
512 @see: https://redis.io/commands/cluster-nodes # string / bytes
513 @see: https://redis.io/commands/cluster-replicas # list of string / bytes
514 """
515 if isinstance(response, (str, bytes)):
516 response = response.splitlines()
517 return dict(_parse_node_line(str_if_bytes(node)) for node in response)
520def parse_geosearch_generic(response, **options):
521 """
522 Parse the response of 'GEOSEARCH', GEORADIUS' and 'GEORADIUSBYMEMBER'
523 commands according to 'withdist', 'withhash' and 'withcoord' labels.
524 """
525 try:
526 if options["store"] or options["store_dist"]:
527 # `store` and `store_dist` cant be combined
528 # with other command arguments.
529 # relevant to 'GEORADIUS' and 'GEORADIUSBYMEMBER'
530 return response
531 except KeyError: # it means the command was sent via execute_command
532 return response
534 if type(response) != list:
535 response_list = [response]
536 else:
537 response_list = response
539 if not options["withdist"] and not options["withcoord"] and not options["withhash"]:
540 # just a bunch of places
541 return response_list
543 cast = {
544 "withdist": float,
545 "withcoord": lambda ll: (float(ll[0]), float(ll[1])),
546 "withhash": int,
547 }
549 # zip all output results with each casting function to get
550 # the properly native Python value.
551 f = [lambda x: x]
552 f += [cast[o] for o in ["withdist", "withhash", "withcoord"] if options[o]]
553 return [list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list]
556def parse_command(response, **options):
557 commands = {}
558 for command in response:
559 cmd_dict = {}
560 cmd_name = str_if_bytes(command[0])
561 cmd_dict["name"] = cmd_name
562 cmd_dict["arity"] = int(command[1])
563 cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
564 cmd_dict["first_key_pos"] = command[3]
565 cmd_dict["last_key_pos"] = command[4]
566 cmd_dict["step_count"] = command[5]
567 if len(command) > 7:
568 cmd_dict["tips"] = command[7]
569 cmd_dict["key_specifications"] = command[8]
570 cmd_dict["subcommands"] = command[9]
571 commands[cmd_name] = cmd_dict
572 return commands
575def parse_pubsub_numsub(response, **options):
576 return list(zip(response[0::2], response[1::2]))
579def parse_client_kill(response, **options):
580 if isinstance(response, int):
581 return response
582 return str_if_bytes(response) == "OK"
585def parse_acl_getuser(response, **options):
586 if response is None:
587 return None
588 data = pairs_to_dict(response, decode_keys=True)
590 # convert everything but user-defined data in 'keys' to native strings
591 data["flags"] = list(map(str_if_bytes, data["flags"]))
592 data["passwords"] = list(map(str_if_bytes, data["passwords"]))
593 data["commands"] = str_if_bytes(data["commands"])
594 if isinstance(data["keys"], str) or isinstance(data["keys"], bytes):
595 data["keys"] = list(str_if_bytes(data["keys"]).split(" "))
596 if data["keys"] == [""]:
597 data["keys"] = []
598 if "channels" in data:
599 if isinstance(data["channels"], str) or isinstance(data["channels"], bytes):
600 data["channels"] = list(str_if_bytes(data["channels"]).split(" "))
601 if data["channels"] == [""]:
602 data["channels"] = []
603 if "selectors" in data:
604 data["selectors"] = [
605 list(map(str_if_bytes, selector)) for selector in data["selectors"]
606 ]
608 # split 'commands' into separate 'categories' and 'commands' lists
609 commands, categories = [], []
610 for command in data["commands"].split(" "):
611 if "@" in command:
612 categories.append(command)
613 else:
614 commands.append(command)
616 data["commands"] = commands
617 data["categories"] = categories
618 data["enabled"] = "on" in data["flags"]
619 return data
622def parse_acl_log(response, **options):
623 if response is None:
624 return None
625 if isinstance(response, list):
626 data = []
627 for log in response:
628 log_data = pairs_to_dict(log, True, True)
629 client_info = log_data.get("client-info", "")
630 log_data["client-info"] = parse_client_info(client_info)
632 # float() is lossy comparing to the "double" in C
633 log_data["age-seconds"] = float(log_data["age-seconds"])
634 data.append(log_data)
635 else:
636 data = bool_ok(response)
637 return data
640def parse_client_info(value):
641 """
642 Parsing client-info in ACL Log in following format.
643 "key1=value1 key2=value2 key3=value3"
644 """
645 client_info = {}
646 infos = str_if_bytes(value).split(" ")
647 for info in infos:
648 key, value = info.split("=")
649 client_info[key] = value
651 # Those fields are defined as int in networking.c
652 for int_key in {
653 "id",
654 "age",
655 "idle",
656 "db",
657 "sub",
658 "psub",
659 "multi",
660 "qbuf",
661 "qbuf-free",
662 "obl",
663 "argv-mem",
664 "oll",
665 "omem",
666 "tot-mem",
667 }:
668 client_info[int_key] = int(client_info[int_key])
669 return client_info
672def parse_module_result(response):
673 if isinstance(response, ModuleError):
674 raise response
675 return True
678def parse_set_result(response, **options):
679 """
680 Handle SET result since GET argument is available since Redis 6.2.
681 Parsing SET result into:
682 - BOOL
683 - String when GET argument is used
684 """
685 if options.get("get"):
686 # Redis will return a getCommand result.
687 # See `setGenericCommand` in t_string.c
688 return response
689 return response and str_if_bytes(response) == "OK"
692class AbstractRedis:
693 RESPONSE_CALLBACKS = {
694 **string_keys_to_dict(
695 "AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT "
696 "HEXISTS HMSET MOVE MSETNX PERSIST "
697 "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX",
698 bool,
699 ),
700 **string_keys_to_dict(
701 "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN "
702 "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
703 "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
704 "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
705 "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
706 int,
707 ),
708 **string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float),
709 **string_keys_to_dict(
710 # these return OK, or int if redis-server is >=1.3.4
711 "LPUSH RPUSH",
712 lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK",
713 ),
714 **string_keys_to_dict("SORT", sort_return_tuples),
715 **string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none),
716 **string_keys_to_dict(
717 "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READONLY READWRITE "
718 "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
719 bool_ok,
720 ),
721 **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
722 **string_keys_to_dict(
723 "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
724 ),
725 **string_keys_to_dict(
726 "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
727 "ZREVRANGE ZREVRANGEBYSCORE",
728 zset_score_pairs,
729 ),
730 **string_keys_to_dict(
731 "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None
732 ),
733 **string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
734 **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
735 **string_keys_to_dict("XREAD XREADGROUP", parse_xread),
736 **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
737 "ACL CAT": lambda r: list(map(str_if_bytes, r)),
738 "ACL DELUSER": int,
739 "ACL GENPASS": str_if_bytes,
740 "ACL GETUSER": parse_acl_getuser,
741 "ACL HELP": lambda r: list(map(str_if_bytes, r)),
742 "ACL LIST": lambda r: list(map(str_if_bytes, r)),
743 "ACL LOAD": bool_ok,
744 "ACL LOG": parse_acl_log,
745 "ACL SAVE": bool_ok,
746 "ACL SETUSER": bool_ok,
747 "ACL USERS": lambda r: list(map(str_if_bytes, r)),
748 "ACL WHOAMI": str_if_bytes,
749 "CLIENT GETNAME": str_if_bytes,
750 "CLIENT ID": int,
751 "CLIENT KILL": parse_client_kill,
752 "CLIENT LIST": parse_client_list,
753 "CLIENT INFO": parse_client_info,
754 "CLIENT SETNAME": bool_ok,
755 "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
756 "CLIENT PAUSE": bool_ok,
757 "CLIENT GETREDIR": int,
758 "CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)),
759 "CLUSTER ADDSLOTS": bool_ok,
760 "CLUSTER ADDSLOTSRANGE": bool_ok,
761 "CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x),
762 "CLUSTER COUNTKEYSINSLOT": lambda x: int(x),
763 "CLUSTER DELSLOTS": bool_ok,
764 "CLUSTER DELSLOTSRANGE": bool_ok,
765 "CLUSTER FAILOVER": bool_ok,
766 "CLUSTER FORGET": bool_ok,
767 "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
768 "CLUSTER INFO": parse_cluster_info,
769 "CLUSTER KEYSLOT": lambda x: int(x),
770 "CLUSTER MEET": bool_ok,
771 "CLUSTER NODES": parse_cluster_nodes,
772 "CLUSTER REPLICAS": parse_cluster_nodes,
773 "CLUSTER REPLICATE": bool_ok,
774 "CLUSTER RESET": bool_ok,
775 "CLUSTER SAVECONFIG": bool_ok,
776 "CLUSTER SET-CONFIG-EPOCH": bool_ok,
777 "CLUSTER SETSLOT": bool_ok,
778 "CLUSTER SLAVES": parse_cluster_nodes,
779 "COMMAND": parse_command,
780 "COMMAND COUNT": int,
781 "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
782 "CONFIG GET": parse_config_get,
783 "CONFIG RESETSTAT": bool_ok,
784 "CONFIG SET": bool_ok,
785 "DEBUG OBJECT": parse_debug_object,
786 "FUNCTION DELETE": bool_ok,
787 "FUNCTION FLUSH": bool_ok,
788 "FUNCTION RESTORE": bool_ok,
789 "GEOHASH": lambda r: list(map(str_if_bytes, r)),
790 "GEOPOS": lambda r: list(
791 map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)
792 ),
793 "GEOSEARCH": parse_geosearch_generic,
794 "GEORADIUS": parse_geosearch_generic,
795 "GEORADIUSBYMEMBER": parse_geosearch_generic,
796 "HGETALL": lambda r: r and pairs_to_dict(r) or {},
797 "HSCAN": parse_hscan,
798 "INFO": parse_info,
799 "LASTSAVE": timestamp_to_datetime,
800 "MEMORY PURGE": bool_ok,
801 "MEMORY STATS": parse_memory_stats,
802 "MEMORY USAGE": int_or_none,
803 "MODULE LOAD": parse_module_result,
804 "MODULE UNLOAD": parse_module_result,
805 "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
806 "OBJECT": parse_object,
807 "PING": lambda r: str_if_bytes(r) == "PONG",
808 "QUIT": bool_ok,
809 "STRALGO": parse_stralgo,
810 "PUBSUB NUMSUB": parse_pubsub_numsub,
811 "RANDOMKEY": lambda r: r and r or None,
812 "RESET": str_if_bytes,
813 "SCAN": parse_scan,
814 "SCRIPT EXISTS": lambda r: list(map(bool, r)),
815 "SCRIPT FLUSH": bool_ok,
816 "SCRIPT KILL": bool_ok,
817 "SCRIPT LOAD": str_if_bytes,
818 "SENTINEL CKQUORUM": bool_ok,
819 "SENTINEL FAILOVER": bool_ok,
820 "SENTINEL FLUSHCONFIG": bool_ok,
821 "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
822 "SENTINEL MASTER": parse_sentinel_master,
823 "SENTINEL MASTERS": parse_sentinel_masters,
824 "SENTINEL MONITOR": bool_ok,
825 "SENTINEL RESET": bool_ok,
826 "SENTINEL REMOVE": bool_ok,
827 "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
828 "SENTINEL SET": bool_ok,
829 "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
830 "SET": parse_set_result,
831 "SLOWLOG GET": parse_slowlog_get,
832 "SLOWLOG LEN": int,
833 "SLOWLOG RESET": bool_ok,
834 "SSCAN": parse_scan,
835 "TIME": lambda x: (int(x[0]), int(x[1])),
836 "XCLAIM": parse_xclaim,
837 "XAUTOCLAIM": parse_xautoclaim,
838 "XGROUP CREATE": bool_ok,
839 "XGROUP DELCONSUMER": int,
840 "XGROUP DESTROY": bool,
841 "XGROUP SETID": bool_ok,
842 "XINFO CONSUMERS": parse_list_of_dicts,
843 "XINFO GROUPS": parse_list_of_dicts,
844 "XINFO STREAM": parse_xinfo_stream,
845 "XPENDING": parse_xpending,
846 "ZADD": parse_zadd,
847 "ZSCAN": parse_zscan,
848 "ZMSCORE": parse_zmscore,
849 }
852class Redis(AbstractRedis, RedisModuleCommands, CoreCommands, SentinelCommands):
853 """
854 Implementation of the Redis protocol.
856 This abstract class provides a Python interface to all Redis commands
857 and an implementation of the Redis protocol.
859 Pipelines derive from this, implementing how
860 the commands are sent and received to the Redis server. Based on
861 configuration, an instance will either use a ConnectionPool, or
862 Connection object to talk to redis.
864 It is not safe to pass PubSub or Pipeline objects between threads.
865 """
867 @classmethod
868 def from_url(cls, url, **kwargs):
869 """
870 Return a Redis client object configured from the given URL
872 For example::
874 redis://[[username]:[password]]@localhost:6379/0
875 rediss://[[username]:[password]]@localhost:6379/0
876 unix://[username@]/path/to/socket.sock?db=0[&password=password]
878 Three URL schemes are supported:
880 - `redis://` creates a TCP socket connection. See more at:
881 <https://www.iana.org/assignments/uri-schemes/prov/redis>
882 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
883 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
884 - ``unix://``: creates a Unix Domain Socket connection.
886 The username, password, hostname, path and all querystring values
887 are passed through urllib.parse.unquote in order to replace any
888 percent-encoded values with their corresponding characters.
890 There are several ways to specify a database number. The first value
891 found will be used:
893 1. A ``db`` querystring option, e.g. redis://localhost?db=0
894 2. If using the redis:// or rediss:// schemes, the path argument
895 of the url, e.g. redis://localhost/0
896 3. A ``db`` keyword argument to this function.
898 If none of these options are specified, the default db=0 is used.
900 All querystring options are cast to their appropriate Python types.
901 Boolean arguments can be specified with string values "True"/"False"
902 or "Yes"/"No". Values that cannot be properly cast cause a
903 ``ValueError`` to be raised. Once parsed, the querystring arguments
904 and keyword arguments are passed to the ``ConnectionPool``'s
905 class initializer. In the case of conflicting arguments, querystring
906 arguments always win.
908 """
909 single_connection_client = kwargs.pop("single_connection_client", False)
910 connection_pool = ConnectionPool.from_url(url, **kwargs)
911 return cls(
912 connection_pool=connection_pool,
913 single_connection_client=single_connection_client,
914 )
916 def __init__(
917 self,
918 host="localhost",
919 port=6379,
920 db=0,
921 password=None,
922 socket_timeout=None,
923 socket_connect_timeout=None,
924 socket_keepalive=None,
925 socket_keepalive_options=None,
926 connection_pool=None,
927 unix_socket_path=None,
928 encoding="utf-8",
929 encoding_errors="strict",
930 charset=None,
931 errors=None,
932 decode_responses=False,
933 retry_on_timeout=False,
934 retry_on_error=None,
935 ssl=False,
936 ssl_keyfile=None,
937 ssl_certfile=None,
938 ssl_cert_reqs="required",
939 ssl_ca_certs=None,
940 ssl_ca_path=None,
941 ssl_ca_data=None,
942 ssl_check_hostname=False,
943 ssl_password=None,
944 ssl_validate_ocsp=False,
945 ssl_validate_ocsp_stapled=False,
946 ssl_ocsp_context=None,
947 ssl_ocsp_expected_cert=None,
948 max_connections=None,
949 single_connection_client=False,
950 health_check_interval=0,
951 client_name=None,
952 username=None,
953 retry=None,
954 redis_connect_func=None,
955 credential_provider: Optional[CredentialProvider] = None,
956 ):
957 """
958 Initialize a new Redis client.
959 To specify a retry policy for specific errors, first set
960 `retry_on_error` to a list of the error/s to retry on, then set
961 `retry` to a valid `Retry` object.
962 To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
964 Args:
966 single_connection_client:
967 if `True`, connection pool is not used. In that case `Redis`
968 instance use is not thread safe.
969 """
970 if not connection_pool:
971 if charset is not None:
972 warnings.warn(
973 DeprecationWarning(
974 '"charset" is deprecated. Use "encoding" instead'
975 )
976 )
977 encoding = charset
978 if errors is not None:
979 warnings.warn(
980 DeprecationWarning(
981 '"errors" is deprecated. Use "encoding_errors" instead'
982 )
983 )
984 encoding_errors = errors
985 if not retry_on_error:
986 retry_on_error = []
987 if retry_on_timeout is True:
988 retry_on_error.append(TimeoutError)
989 kwargs = {
990 "db": db,
991 "username": username,
992 "password": password,
993 "socket_timeout": socket_timeout,
994 "encoding": encoding,
995 "encoding_errors": encoding_errors,
996 "decode_responses": decode_responses,
997 "retry_on_error": retry_on_error,
998 "retry": copy.deepcopy(retry),
999 "max_connections": max_connections,
1000 "health_check_interval": health_check_interval,
1001 "client_name": client_name,
1002 "redis_connect_func": redis_connect_func,
1003 "credential_provider": credential_provider,
1004 }
1005 # based on input, setup appropriate connection args
1006 if unix_socket_path is not None:
1007 kwargs.update(
1008 {
1009 "path": unix_socket_path,
1010 "connection_class": UnixDomainSocketConnection,
1011 }
1012 )
1013 else:
1014 # TCP specific options
1015 kwargs.update(
1016 {
1017 "host": host,
1018 "port": port,
1019 "socket_connect_timeout": socket_connect_timeout,
1020 "socket_keepalive": socket_keepalive,
1021 "socket_keepalive_options": socket_keepalive_options,
1022 }
1023 )
1025 if ssl:
1026 kwargs.update(
1027 {
1028 "connection_class": SSLConnection,
1029 "ssl_keyfile": ssl_keyfile,
1030 "ssl_certfile": ssl_certfile,
1031 "ssl_cert_reqs": ssl_cert_reqs,
1032 "ssl_ca_certs": ssl_ca_certs,
1033 "ssl_ca_data": ssl_ca_data,
1034 "ssl_check_hostname": ssl_check_hostname,
1035 "ssl_password": ssl_password,
1036 "ssl_ca_path": ssl_ca_path,
1037 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
1038 "ssl_validate_ocsp": ssl_validate_ocsp,
1039 "ssl_ocsp_context": ssl_ocsp_context,
1040 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
1041 }
1042 )
1043 connection_pool = ConnectionPool(**kwargs)
1044 self.connection_pool = connection_pool
1045 self.connection = None
1046 if single_connection_client:
1047 self.connection = self.connection_pool.get_connection("_")
1049 self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS)
1051 def __repr__(self):
1052 return f"{type(self).__name__}<{repr(self.connection_pool)}>"
1054 def get_encoder(self):
1055 """Get the connection pool's encoder"""
1056 return self.connection_pool.get_encoder()
1058 def get_connection_kwargs(self):
1059 """Get the connection's key-word arguments"""
1060 return self.connection_pool.connection_kwargs
1062 def get_retry(self) -> Optional["Retry"]:
1063 return self.get_connection_kwargs().get("retry")
1065 def set_retry(self, retry: "Retry") -> None:
1066 self.get_connection_kwargs().update({"retry": retry})
1067 self.connection_pool.set_retry(retry)
1069 def set_response_callback(self, command, callback):
1070 """Set a custom Response Callback"""
1071 self.response_callbacks[command] = callback
1073 def load_external_module(self, funcname, func):
1074 """
1075 This function can be used to add externally defined redis modules,
1076 and their namespaces to the redis client.
1078 funcname - A string containing the name of the function to create
1079 func - The function, being added to this class.
1081 ex: Assume that one has a custom redis module named foomod that
1082 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
1083 To load function functions into this namespace:
1085 from redis import Redis
1086 from foomodule import F
1087 r = Redis()
1088 r.load_external_module("foo", F)
1089 r.foo().dothing('your', 'arguments')
1091 For a concrete example see the reimport of the redisjson module in
1092 tests/test_connection.py::test_loading_external_modules
1093 """
1094 setattr(self, funcname, func)
1096 def pipeline(self, transaction=True, shard_hint=None):
1097 """
1098 Return a new pipeline object that can queue multiple commands for
1099 later execution. ``transaction`` indicates whether all commands
1100 should be executed atomically. Apart from making a group of operations
1101 atomic, pipelines are useful for reducing the back-and-forth overhead
1102 between the client and server.
1103 """
1104 return Pipeline(
1105 self.connection_pool, self.response_callbacks, transaction, shard_hint
1106 )
1108 def transaction(self, func, *watches, **kwargs):
1109 """
1110 Convenience method for executing the callable `func` as a transaction
1111 while watching all keys specified in `watches`. The 'func' callable
1112 should expect a single argument which is a Pipeline object.
1113 """
1114 shard_hint = kwargs.pop("shard_hint", None)
1115 value_from_callable = kwargs.pop("value_from_callable", False)
1116 watch_delay = kwargs.pop("watch_delay", None)
1117 with self.pipeline(True, shard_hint) as pipe:
1118 while True:
1119 try:
1120 if watches:
1121 pipe.watch(*watches)
1122 func_value = func(pipe)
1123 exec_value = pipe.execute()
1124 return func_value if value_from_callable else exec_value
1125 except WatchError:
1126 if watch_delay is not None and watch_delay > 0:
1127 time.sleep(watch_delay)
1128 continue
1130 def lock(
1131 self,
1132 name,
1133 timeout=None,
1134 sleep=0.1,
1135 blocking=True,
1136 blocking_timeout=None,
1137 lock_class=None,
1138 thread_local=True,
1139 ):
1140 """
1141 Return a new Lock object using key ``name`` that mimics
1142 the behavior of threading.Lock.
1144 If specified, ``timeout`` indicates a maximum life for the lock.
1145 By default, it will remain locked until release() is called.
1147 ``sleep`` indicates the amount of time to sleep per loop iteration
1148 when the lock is in blocking mode and another client is currently
1149 holding the lock.
1151 ``blocking`` indicates whether calling ``acquire`` should block until
1152 the lock has been acquired or to fail immediately, causing ``acquire``
1153 to return False and the lock not being acquired. Defaults to True.
1154 Note this value can be overridden by passing a ``blocking``
1155 argument to ``acquire``.
1157 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1158 spend trying to acquire the lock. A value of ``None`` indicates
1159 continue trying forever. ``blocking_timeout`` can be specified as a
1160 float or integer, both representing the number of seconds to wait.
1162 ``lock_class`` forces the specified lock implementation. Note that as
1163 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1164 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1165 you have created your own custom lock class.
1167 ``thread_local`` indicates whether the lock token is placed in
1168 thread-local storage. By default, the token is placed in thread local
1169 storage so that a thread only sees its token, not a token set by
1170 another thread. Consider the following timeline:
1172 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1173 thread-1 sets the token to "abc"
1174 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1175 Lock instance.
1176 time: 5, thread-1 has not yet completed. redis expires the lock
1177 key.
1178 time: 5, thread-2 acquired `my-lock` now that it's available.
1179 thread-2 sets the token to "xyz"
1180 time: 6, thread-1 finishes its work and calls release(). if the
1181 token is *not* stored in thread local storage, then
1182 thread-1 would see the token value as "xyz" and would be
1183 able to successfully release the thread-2's lock.
1185 In some use cases it's necessary to disable thread local storage. For
1186 example, if you have code where one thread acquires a lock and passes
1187 that lock instance to a worker thread to release later. If thread
1188 local storage isn't disabled in this case, the worker thread won't see
1189 the token set by the thread that acquired the lock. Our assumption
1190 is that these cases aren't common and as such default to using
1191 thread local storage."""
1192 if lock_class is None:
1193 lock_class = Lock
1194 return lock_class(
1195 self,
1196 name,
1197 timeout=timeout,
1198 sleep=sleep,
1199 blocking=blocking,
1200 blocking_timeout=blocking_timeout,
1201 thread_local=thread_local,
1202 )
1204 def pubsub(self, **kwargs):
1205 """
1206 Return a Publish/Subscribe object. With this object, you can
1207 subscribe to channels and listen for messages that get published to
1208 them.
1209 """
1210 return PubSub(self.connection_pool, **kwargs)
1212 def monitor(self):
1213 return Monitor(self.connection_pool)
1215 def client(self):
1216 return self.__class__(
1217 connection_pool=self.connection_pool, single_connection_client=True
1218 )
1220 def __enter__(self):
1221 return self
1223 def __exit__(self, exc_type, exc_value, traceback):
1224 self.close()
1226 def __del__(self):
1227 self.close()
1229 def close(self):
1230 # In case a connection property does not yet exist
1231 # (due to a crash earlier in the Redis() constructor), return
1232 # immediately as there is nothing to clean-up.
1233 if not hasattr(self, "connection"):
1234 return
1236 conn = self.connection
1237 if conn:
1238 self.connection = None
1239 self.connection_pool.release(conn)
1241 def _send_command_parse_response(self, conn, command_name, *args, **options):
1242 """
1243 Send a command and parse the response
1244 """
1245 conn.send_command(*args)
1246 return self.parse_response(conn, command_name, **options)
1248 def _disconnect_raise(self, conn, error):
1249 """
1250 Close the connection and raise an exception
1251 if retry_on_error is not set or the error
1252 is not one of the specified error types
1253 """
1254 conn.disconnect()
1255 if (
1256 conn.retry_on_error is None
1257 or isinstance(error, tuple(conn.retry_on_error)) is False
1258 ):
1259 raise error
1261 # COMMAND EXECUTION AND PROTOCOL PARSING
1262 def execute_command(self, *args, **options):
1263 """Execute a command and return a parsed response"""
1264 pool = self.connection_pool
1265 command_name = args[0]
1266 conn = self.connection or pool.get_connection(command_name, **options)
1268 try:
1269 return conn.retry.call_with_retry(
1270 lambda: self._send_command_parse_response(
1271 conn, command_name, *args, **options
1272 ),
1273 lambda error: self._disconnect_raise(conn, error),
1274 )
1275 finally:
1276 if not self.connection:
1277 pool.release(conn)
1279 def parse_response(self, connection, command_name, **options):
1280 """Parses a response from the Redis server"""
1281 try:
1282 if NEVER_DECODE in options:
1283 response = connection.read_response(disable_decoding=True)
1284 options.pop(NEVER_DECODE)
1285 else:
1286 response = connection.read_response()
1287 except ResponseError:
1288 if EMPTY_RESPONSE in options:
1289 return options[EMPTY_RESPONSE]
1290 raise
1292 if EMPTY_RESPONSE in options:
1293 options.pop(EMPTY_RESPONSE)
1295 if command_name in self.response_callbacks:
1296 return self.response_callbacks[command_name](response, **options)
1297 return response
1300StrictRedis = Redis
1303class Monitor:
1304 """
1305 Monitor is useful for handling the MONITOR command to the redis server.
1306 next_command() method returns one command from monitor
1307 listen() method yields commands from monitor.
1308 """
1310 monitor_re = re.compile(r"\[(\d+) (.*)\] (.*)")
1311 command_re = re.compile(r'"(.*?)(?<!\\)"')
1313 def __init__(self, connection_pool):
1314 self.connection_pool = connection_pool
1315 self.connection = self.connection_pool.get_connection("MONITOR")
1317 def __enter__(self):
1318 self.connection.send_command("MONITOR")
1319 # check that monitor returns 'OK', but don't return it to user
1320 response = self.connection.read_response()
1321 if not bool_ok(response):
1322 raise RedisError(f"MONITOR failed: {response}")
1323 return self
1325 def __exit__(self, *args):
1326 self.connection.disconnect()
1327 self.connection_pool.release(self.connection)
1329 def next_command(self):
1330 """Parse the response from a monitor command"""
1331 response = self.connection.read_response()
1332 if isinstance(response, bytes):
1333 response = self.connection.encoder.decode(response, force=True)
1334 command_time, command_data = response.split(" ", 1)
1335 m = self.monitor_re.match(command_data)
1336 db_id, client_info, command = m.groups()
1337 command = " ".join(self.command_re.findall(command))
1338 # Redis escapes double quotes because each piece of the command
1339 # string is surrounded by double quotes. We don't have that
1340 # requirement so remove the escaping and leave the quote.
1341 command = command.replace('\\"', '"')
1343 if client_info == "lua":
1344 client_address = "lua"
1345 client_port = ""
1346 client_type = "lua"
1347 elif client_info.startswith("unix"):
1348 client_address = "unix"
1349 client_port = client_info[5:]
1350 client_type = "unix"
1351 else:
1352 # use rsplit as ipv6 addresses contain colons
1353 client_address, client_port = client_info.rsplit(":", 1)
1354 client_type = "tcp"
1355 return {
1356 "time": float(command_time),
1357 "db": int(db_id),
1358 "client_address": client_address,
1359 "client_port": client_port,
1360 "client_type": client_type,
1361 "command": command,
1362 }
1364 def listen(self):
1365 """Listen for commands coming to the server."""
1366 while True:
1367 yield self.next_command()
1370class PubSub:
1371 """
1372 PubSub provides publish, subscribe and listen support to Redis channels.
1374 After subscribing to one or more channels, the listen() method will block
1375 until a message arrives on one of the subscribed channels. That message
1376 will be returned and it's safe to start listening again.
1377 """
1379 PUBLISH_MESSAGE_TYPES = ("message", "pmessage")
1380 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe")
1381 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
1383 def __init__(
1384 self,
1385 connection_pool,
1386 shard_hint=None,
1387 ignore_subscribe_messages=False,
1388 encoder=None,
1389 ):
1390 self.connection_pool = connection_pool
1391 self.shard_hint = shard_hint
1392 self.ignore_subscribe_messages = ignore_subscribe_messages
1393 self.connection = None
1394 self.subscribed_event = threading.Event()
1395 # we need to know the encoding options for this connection in order
1396 # to lookup channel and pattern names for callback handlers.
1397 self.encoder = encoder
1398 if self.encoder is None:
1399 self.encoder = self.connection_pool.get_encoder()
1400 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
1401 if self.encoder.decode_responses:
1402 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
1403 else:
1404 self.health_check_response = [b"pong", self.health_check_response_b]
1405 self.reset()
1407 def __enter__(self):
1408 return self
1410 def __exit__(self, exc_type, exc_value, traceback):
1411 self.reset()
1413 def __del__(self):
1414 try:
1415 # if this object went out of scope prior to shutting down
1416 # subscriptions, close the connection manually before
1417 # returning it to the connection pool
1418 self.reset()
1419 except Exception:
1420 pass
1422 def reset(self):
1423 if self.connection:
1424 self.connection.disconnect()
1425 self.connection.clear_connect_callbacks()
1426 self.connection_pool.release(self.connection)
1427 self.connection = None
1428 self.channels = {}
1429 self.health_check_response_counter = 0
1430 self.pending_unsubscribe_channels = set()
1431 self.patterns = {}
1432 self.pending_unsubscribe_patterns = set()
1433 self.subscribed_event.clear()
1435 def close(self):
1436 self.reset()
1438 def on_connect(self, connection):
1439 "Re-subscribe to any channels and patterns previously subscribed to"
1440 # NOTE: for python3, we can't pass bytestrings as keyword arguments
1441 # so we need to decode channel/pattern names back to unicode strings
1442 # before passing them to [p]subscribe.
1443 self.pending_unsubscribe_channels.clear()
1444 self.pending_unsubscribe_patterns.clear()
1445 if self.channels:
1446 channels = {}
1447 for k, v in self.channels.items():
1448 channels[self.encoder.decode(k, force=True)] = v
1449 self.subscribe(**channels)
1450 if self.patterns:
1451 patterns = {}
1452 for k, v in self.patterns.items():
1453 patterns[self.encoder.decode(k, force=True)] = v
1454 self.psubscribe(**patterns)
1456 @property
1457 def subscribed(self):
1458 """Indicates if there are subscriptions to any channels or patterns"""
1459 return self.subscribed_event.is_set()
1461 def execute_command(self, *args):
1462 """Execute a publish/subscribe command"""
1464 # NOTE: don't parse the response in this function -- it could pull a
1465 # legitimate message off the stack if the connection is already
1466 # subscribed to one or more channels
1468 if self.connection is None:
1469 self.connection = self.connection_pool.get_connection(
1470 "pubsub", self.shard_hint
1471 )
1472 # register a callback that re-subscribes to any channels we
1473 # were listening to when we were disconnected
1474 self.connection.register_connect_callback(self.on_connect)
1475 connection = self.connection
1476 kwargs = {"check_health": not self.subscribed}
1477 if not self.subscribed:
1478 self.clean_health_check_responses()
1479 self._execute(connection, connection.send_command, *args, **kwargs)
1481 def clean_health_check_responses(self):
1482 """
1483 If any health check responses are present, clean them
1484 """
1485 ttl = 10
1486 conn = self.connection
1487 while self.health_check_response_counter > 0 and ttl > 0:
1488 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
1489 response = self._execute(conn, conn.read_response)
1490 if self.is_health_check_response(response):
1491 self.health_check_response_counter -= 1
1492 else:
1493 raise PubSubError(
1494 "A non health check response was cleaned by "
1495 "execute_command: {0}".format(response)
1496 )
1497 ttl -= 1
1499 def _disconnect_raise_connect(self, conn, error):
1500 """
1501 Close the connection and raise an exception
1502 if retry_on_timeout is not set or the error
1503 is not a TimeoutError. Otherwise, try to reconnect
1504 """
1505 conn.disconnect()
1506 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1507 raise error
1508 conn.connect()
1510 def _execute(self, conn, command, *args, **kwargs):
1511 """
1512 Connect manually upon disconnection. If the Redis server is down,
1513 this will fail and raise a ConnectionError as desired.
1514 After reconnection, the ``on_connect`` callback should have been
1515 called by the # connection to resubscribe us to any channels and
1516 patterns we were previously listening to
1517 """
1518 return conn.retry.call_with_retry(
1519 lambda: command(*args, **kwargs),
1520 lambda error: self._disconnect_raise_connect(conn, error),
1521 )
1523 def parse_response(self, block=True, timeout=0):
1524 """Parse the response from a publish/subscribe command"""
1525 conn = self.connection
1526 if conn is None:
1527 raise RuntimeError(
1528 "pubsub connection not set: "
1529 "did you forget to call subscribe() or psubscribe()?"
1530 )
1532 self.check_health()
1534 def try_read():
1535 if not block:
1536 if not conn.can_read(timeout=timeout):
1537 return None
1538 else:
1539 conn.connect()
1540 return conn.read_response(disconnect_on_error=False)
1542 response = self._execute(conn, try_read)
1544 if self.is_health_check_response(response):
1545 # ignore the health check message as user might not expect it
1546 self.health_check_response_counter -= 1
1547 return None
1548 return response
1550 def is_health_check_response(self, response):
1551 """
1552 Check if the response is a health check response.
1553 If there are no subscriptions redis responds to PING command with a
1554 bulk response, instead of a multi-bulk with "pong" and the response.
1555 """
1556 return response in [
1557 self.health_check_response, # If there was a subscription
1558 self.health_check_response_b, # If there wasn't
1559 ]
1561 def check_health(self):
1562 conn = self.connection
1563 if conn is None:
1564 raise RuntimeError(
1565 "pubsub connection not set: "
1566 "did you forget to call subscribe() or psubscribe()?"
1567 )
1569 if conn.health_check_interval and time.time() > conn.next_health_check:
1570 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1571 self.health_check_response_counter += 1
1573 def _normalize_keys(self, data):
1574 """
1575 normalize channel/pattern names to be either bytes or strings
1576 based on whether responses are automatically decoded. this saves us
1577 from coercing the value for each message coming in.
1578 """
1579 encode = self.encoder.encode
1580 decode = self.encoder.decode
1581 return {decode(encode(k)): v for k, v in data.items()}
1583 def psubscribe(self, *args, **kwargs):
1584 """
1585 Subscribe to channel patterns. Patterns supplied as keyword arguments
1586 expect a pattern name as the key and a callable as the value. A
1587 pattern's callable will be invoked automatically when a message is
1588 received on that pattern rather than producing a message via
1589 ``listen()``.
1590 """
1591 if args:
1592 args = list_or_args(args[0], args[1:])
1593 new_patterns = dict.fromkeys(args)
1594 new_patterns.update(kwargs)
1595 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1596 # update the patterns dict AFTER we send the command. we don't want to
1597 # subscribe twice to these patterns, once for the command and again
1598 # for the reconnection.
1599 new_patterns = self._normalize_keys(new_patterns)
1600 self.patterns.update(new_patterns)
1601 if not self.subscribed:
1602 # Set the subscribed_event flag to True
1603 self.subscribed_event.set()
1604 # Clear the health check counter
1605 self.health_check_response_counter = 0
1606 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1607 return ret_val
1609 def punsubscribe(self, *args):
1610 """
1611 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1612 all patterns.
1613 """
1614 if args:
1615 args = list_or_args(args[0], args[1:])
1616 patterns = self._normalize_keys(dict.fromkeys(args))
1617 else:
1618 patterns = self.patterns
1619 self.pending_unsubscribe_patterns.update(patterns)
1620 return self.execute_command("PUNSUBSCRIBE", *args)
1622 def subscribe(self, *args, **kwargs):
1623 """
1624 Subscribe to channels. Channels supplied as keyword arguments expect
1625 a channel name as the key and a callable as the value. A channel's
1626 callable will be invoked automatically when a message is received on
1627 that channel rather than producing a message via ``listen()`` or
1628 ``get_message()``.
1629 """
1630 if args:
1631 args = list_or_args(args[0], args[1:])
1632 new_channels = dict.fromkeys(args)
1633 new_channels.update(kwargs)
1634 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1635 # update the channels dict AFTER we send the command. we don't want to
1636 # subscribe twice to these channels, once for the command and again
1637 # for the reconnection.
1638 new_channels = self._normalize_keys(new_channels)
1639 self.channels.update(new_channels)
1640 if not self.subscribed:
1641 # Set the subscribed_event flag to True
1642 self.subscribed_event.set()
1643 # Clear the health check counter
1644 self.health_check_response_counter = 0
1645 self.pending_unsubscribe_channels.difference_update(new_channels)
1646 return ret_val
1648 def unsubscribe(self, *args):
1649 """
1650 Unsubscribe from the supplied channels. If empty, unsubscribe from
1651 all channels
1652 """
1653 if args:
1654 args = list_or_args(args[0], args[1:])
1655 channels = self._normalize_keys(dict.fromkeys(args))
1656 else:
1657 channels = self.channels
1658 self.pending_unsubscribe_channels.update(channels)
1659 return self.execute_command("UNSUBSCRIBE", *args)
1661 def listen(self):
1662 "Listen for messages on channels this client has been subscribed to"
1663 while self.subscribed:
1664 response = self.handle_message(self.parse_response(block=True))
1665 if response is not None:
1666 yield response
1668 def get_message(self, ignore_subscribe_messages=False, timeout=0.0):
1669 """
1670 Get the next message if one is available, otherwise None.
1672 If timeout is specified, the system will wait for `timeout` seconds
1673 before returning. Timeout should be specified as a floating point
1674 number, or None, to wait indefinitely.
1675 """
1676 if not self.subscribed:
1677 # Wait for subscription
1678 start_time = time.time()
1679 if self.subscribed_event.wait(timeout) is True:
1680 # The connection was subscribed during the timeout time frame.
1681 # The timeout should be adjusted based on the time spent
1682 # waiting for the subscription
1683 time_spent = time.time() - start_time
1684 timeout = max(0.0, timeout - time_spent)
1685 else:
1686 # The connection isn't subscribed to any channels or patterns,
1687 # so no messages are available
1688 return None
1690 response = self.parse_response(block=(timeout is None), timeout=timeout)
1691 if response:
1692 return self.handle_message(response, ignore_subscribe_messages)
1693 return None
1695 def ping(self, message=None):
1696 """
1697 Ping the Redis server
1698 """
1699 message = "" if message is None else message
1700 return self.execute_command("PING", message)
1702 def handle_message(self, response, ignore_subscribe_messages=False):
1703 """
1704 Parses a pub/sub message. If the channel or pattern was subscribed to
1705 with a message handler, the handler is invoked instead of a parsed
1706 message being returned.
1707 """
1708 if response is None:
1709 return None
1710 message_type = str_if_bytes(response[0])
1711 if message_type == "pmessage":
1712 message = {
1713 "type": message_type,
1714 "pattern": response[1],
1715 "channel": response[2],
1716 "data": response[3],
1717 }
1718 elif message_type == "pong":
1719 message = {
1720 "type": message_type,
1721 "pattern": None,
1722 "channel": None,
1723 "data": response[1],
1724 }
1725 else:
1726 message = {
1727 "type": message_type,
1728 "pattern": None,
1729 "channel": response[1],
1730 "data": response[2],
1731 }
1733 # if this is an unsubscribe message, remove it from memory
1734 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1735 if message_type == "punsubscribe":
1736 pattern = response[1]
1737 if pattern in self.pending_unsubscribe_patterns:
1738 self.pending_unsubscribe_patterns.remove(pattern)
1739 self.patterns.pop(pattern, None)
1740 else:
1741 channel = response[1]
1742 if channel in self.pending_unsubscribe_channels:
1743 self.pending_unsubscribe_channels.remove(channel)
1744 self.channels.pop(channel, None)
1745 if not self.channels and not self.patterns:
1746 # There are no subscriptions anymore, set subscribed_event flag
1747 # to false
1748 self.subscribed_event.clear()
1750 if message_type in self.PUBLISH_MESSAGE_TYPES:
1751 # if there's a message handler, invoke it
1752 if message_type == "pmessage":
1753 handler = self.patterns.get(message["pattern"], None)
1754 else:
1755 handler = self.channels.get(message["channel"], None)
1756 if handler:
1757 handler(message)
1758 return None
1759 elif message_type != "pong":
1760 # this is a subscribe/unsubscribe message. ignore if we don't
1761 # want them
1762 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1763 return None
1765 return message
1767 def run_in_thread(self, sleep_time=0, daemon=False, exception_handler=None):
1768 for channel, handler in self.channels.items():
1769 if handler is None:
1770 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1771 for pattern, handler in self.patterns.items():
1772 if handler is None:
1773 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1775 thread = PubSubWorkerThread(
1776 self, sleep_time, daemon=daemon, exception_handler=exception_handler
1777 )
1778 thread.start()
1779 return thread
1782class PubSubWorkerThread(threading.Thread):
1783 def __init__(self, pubsub, sleep_time, daemon=False, exception_handler=None):
1784 super().__init__()
1785 self.daemon = daemon
1786 self.pubsub = pubsub
1787 self.sleep_time = sleep_time
1788 self.exception_handler = exception_handler
1789 self._running = threading.Event()
1791 def run(self):
1792 if self._running.is_set():
1793 return
1794 self._running.set()
1795 pubsub = self.pubsub
1796 sleep_time = self.sleep_time
1797 while self._running.is_set():
1798 try:
1799 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
1800 except BaseException as e:
1801 if self.exception_handler is None:
1802 raise
1803 self.exception_handler(e, pubsub, self)
1804 pubsub.close()
1806 def stop(self):
1807 # trip the flag so the run loop exits. the run loop will
1808 # close the pubsub connection, which disconnects the socket
1809 # and returns the connection to the pool.
1810 self._running.clear()
1813class Pipeline(Redis):
1814 """
1815 Pipelines provide a way to transmit multiple commands to the Redis server
1816 in one transmission. This is convenient for batch processing, such as
1817 saving all the values in a list to Redis.
1819 All commands executed within a pipeline are wrapped with MULTI and EXEC
1820 calls. This guarantees all commands executed in the pipeline will be
1821 executed atomically.
1823 Any command raising an exception does *not* halt the execution of
1824 subsequent commands in the pipeline. Instead, the exception is caught
1825 and its instance is placed into the response list returned by execute().
1826 Code iterating over the response list should be able to deal with an
1827 instance of an exception as a potential value. In general, these will be
1828 ResponseError exceptions, such as those raised when issuing a command
1829 on a key of a different datatype.
1830 """
1832 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1834 def __init__(self, connection_pool, response_callbacks, transaction, shard_hint):
1835 self.connection_pool = connection_pool
1836 self.connection = None
1837 self.response_callbacks = response_callbacks
1838 self.transaction = transaction
1839 self.shard_hint = shard_hint
1841 self.watching = False
1842 self.reset()
1844 def __enter__(self):
1845 return self
1847 def __exit__(self, exc_type, exc_value, traceback):
1848 self.reset()
1850 def __del__(self):
1851 try:
1852 self.reset()
1853 except Exception:
1854 pass
1856 def __len__(self):
1857 return len(self.command_stack)
1859 def __bool__(self):
1860 """Pipeline instances should always evaluate to True"""
1861 return True
1863 def reset(self):
1864 self.command_stack = []
1865 self.scripts = set()
1866 # make sure to reset the connection state in the event that we were
1867 # watching something
1868 if self.watching and self.connection:
1869 try:
1870 # call this manually since our unwatch or
1871 # immediate_execute_command methods can call reset()
1872 self.connection.send_command("UNWATCH")
1873 self.connection.read_response()
1874 except ConnectionError:
1875 # disconnect will also remove any previous WATCHes
1876 self.connection.disconnect()
1877 # clean up the other instance attributes
1878 self.watching = False
1879 self.explicit_transaction = False
1880 # we can safely return the connection to the pool here since we're
1881 # sure we're no longer WATCHing anything
1882 if self.connection:
1883 self.connection_pool.release(self.connection)
1884 self.connection = None
1886 def multi(self):
1887 """
1888 Start a transactional block of the pipeline after WATCH commands
1889 are issued. End the transactional block with `execute`.
1890 """
1891 if self.explicit_transaction:
1892 raise RedisError("Cannot issue nested calls to MULTI")
1893 if self.command_stack:
1894 raise RedisError(
1895 "Commands without an initial WATCH have already been issued"
1896 )
1897 self.explicit_transaction = True
1899 def execute_command(self, *args, **kwargs):
1900 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1901 return self.immediate_execute_command(*args, **kwargs)
1902 return self.pipeline_execute_command(*args, **kwargs)
1904 def _disconnect_reset_raise(self, conn, error):
1905 """
1906 Close the connection, reset watching state and
1907 raise an exception if we were watching,
1908 retry_on_timeout is not set,
1909 or the error is not a TimeoutError
1910 """
1911 conn.disconnect()
1912 # if we were already watching a variable, the watch is no longer
1913 # valid since this connection has died. raise a WatchError, which
1914 # indicates the user should retry this transaction.
1915 if self.watching:
1916 self.reset()
1917 raise WatchError(
1918 "A ConnectionError occurred on while watching one or more keys"
1919 )
1920 # if retry_on_timeout is not set, or the error is not
1921 # a TimeoutError, raise it
1922 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
1923 self.reset()
1924 raise
1926 def immediate_execute_command(self, *args, **options):
1927 """
1928 Execute a command immediately, but don't auto-retry on a
1929 ConnectionError if we're already WATCHing a variable. Used when
1930 issuing WATCH or subsequent commands retrieving their values but before
1931 MULTI is called.
1932 """
1933 command_name = args[0]
1934 conn = self.connection
1935 # if this is the first call, we need a connection
1936 if not conn:
1937 conn = self.connection_pool.get_connection(command_name, self.shard_hint)
1938 self.connection = conn
1940 return conn.retry.call_with_retry(
1941 lambda: self._send_command_parse_response(
1942 conn, command_name, *args, **options
1943 ),
1944 lambda error: self._disconnect_reset_raise(conn, error),
1945 )
1947 def pipeline_execute_command(self, *args, **options):
1948 """
1949 Stage a command to be executed when execute() is next called
1951 Returns the current Pipeline object back so commands can be
1952 chained together, such as:
1954 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1956 At some other point, you can then run: pipe.execute(),
1957 which will execute all commands queued in the pipe.
1958 """
1959 self.command_stack.append((args, options))
1960 return self
1962 def _execute_transaction(self, connection, commands, raise_on_error):
1963 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1964 all_cmds = connection.pack_commands(
1965 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1966 )
1967 connection.send_packed_command(all_cmds)
1968 errors = []
1970 # parse off the response for MULTI
1971 # NOTE: we need to handle ResponseErrors here and continue
1972 # so that we read all the additional command messages from
1973 # the socket
1974 try:
1975 self.parse_response(connection, "_")
1976 except ResponseError as e:
1977 errors.append((0, e))
1979 # and all the other commands
1980 for i, command in enumerate(commands):
1981 if EMPTY_RESPONSE in command[1]:
1982 errors.append((i, command[1][EMPTY_RESPONSE]))
1983 else:
1984 try:
1985 self.parse_response(connection, "_")
1986 except ResponseError as e:
1987 self.annotate_exception(e, i + 1, command[0])
1988 errors.append((i, e))
1990 # parse the EXEC.
1991 try:
1992 response = self.parse_response(connection, "_")
1993 except ExecAbortError:
1994 if errors:
1995 raise errors[0][1]
1996 raise
1998 # EXEC clears any watched keys
1999 self.watching = False
2001 if response is None:
2002 raise WatchError("Watched variable changed.")
2004 # put any parse errors into the response
2005 for i, e in errors:
2006 response.insert(i, e)
2008 if len(response) != len(commands):
2009 self.connection.disconnect()
2010 raise ResponseError(
2011 "Wrong number of response items from pipeline execution"
2012 )
2014 # find any errors in the response and raise if necessary
2015 if raise_on_error:
2016 self.raise_first_error(commands, response)
2018 # We have to run response callbacks manually
2019 data = []
2020 for r, cmd in zip(response, commands):
2021 if not isinstance(r, Exception):
2022 args, options = cmd
2023 command_name = args[0]
2024 if command_name in self.response_callbacks:
2025 r = self.response_callbacks[command_name](r, **options)
2026 data.append(r)
2027 return data
2029 def _execute_pipeline(self, connection, commands, raise_on_error):
2030 # build up all commands into a single request to increase network perf
2031 all_cmds = connection.pack_commands([args for args, _ in commands])
2032 connection.send_packed_command(all_cmds)
2034 response = []
2035 for args, options in commands:
2036 try:
2037 response.append(self.parse_response(connection, args[0], **options))
2038 except ResponseError as e:
2039 response.append(e)
2041 if raise_on_error:
2042 self.raise_first_error(commands, response)
2043 return response
2045 def raise_first_error(self, commands, response):
2046 for i, r in enumerate(response):
2047 if isinstance(r, ResponseError):
2048 self.annotate_exception(r, i + 1, commands[i][0])
2049 raise r
2051 def annotate_exception(self, exception, number, command):
2052 cmd = " ".join(map(safe_str, command))
2053 msg = (
2054 f"Command # {number} ({cmd}) of pipeline "
2055 f"caused error: {exception.args[0]}"
2056 )
2057 exception.args = (msg,) + exception.args[1:]
2059 def parse_response(self, connection, command_name, **options):
2060 result = Redis.parse_response(self, connection, command_name, **options)
2061 if command_name in self.UNWATCH_COMMANDS:
2062 self.watching = False
2063 elif command_name == "WATCH":
2064 self.watching = True
2065 return result
2067 def load_scripts(self):
2068 # make sure all scripts that are about to be run on this pipeline exist
2069 scripts = list(self.scripts)
2070 immediate = self.immediate_execute_command
2071 shas = [s.sha for s in scripts]
2072 # we can't use the normal script_* methods because they would just
2073 # get buffered in the pipeline.
2074 exists = immediate("SCRIPT EXISTS", *shas)
2075 if not all(exists):
2076 for s, exist in zip(scripts, exists):
2077 if not exist:
2078 s.sha = immediate("SCRIPT LOAD", s.script)
2080 def _disconnect_raise_reset(self, conn, error):
2081 """
2082 Close the connection, raise an exception if we were watching,
2083 and raise an exception if retry_on_timeout is not set,
2084 or the error is not a TimeoutError
2085 """
2086 conn.disconnect()
2087 # if we were watching a variable, the watch is no longer valid
2088 # since this connection has died. raise a WatchError, which
2089 # indicates the user should retry this transaction.
2090 if self.watching:
2091 raise WatchError(
2092 "A ConnectionError occurred on while watching one or more keys"
2093 )
2094 # if retry_on_timeout is not set, or the error is not
2095 # a TimeoutError, raise it
2096 if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
2097 self.reset()
2098 raise
2100 def execute(self, raise_on_error=True):
2101 """Execute all the commands in the current pipeline"""
2102 stack = self.command_stack
2103 if not stack and not self.watching:
2104 return []
2105 if self.scripts:
2106 self.load_scripts()
2107 if self.transaction or self.explicit_transaction:
2108 execute = self._execute_transaction
2109 else:
2110 execute = self._execute_pipeline
2112 conn = self.connection
2113 if not conn:
2114 conn = self.connection_pool.get_connection("MULTI", self.shard_hint)
2115 # assign to self.connection so reset() releases the connection
2116 # back to the pool after we're done
2117 self.connection = conn
2119 try:
2120 return conn.retry.call_with_retry(
2121 lambda: execute(conn, stack, raise_on_error),
2122 lambda error: self._disconnect_raise_reset(conn, error),
2123 )
2124 finally:
2125 self.reset()
2127 def discard(self):
2128 """
2129 Flushes all previously queued commands
2130 See: https://redis.io/commands/DISCARD
2131 """
2132 self.execute_command("DISCARD")
2134 def watch(self, *names):
2135 """Watches the values at keys ``names``"""
2136 if self.explicit_transaction:
2137 raise RedisError("Cannot issue a WATCH after a MULTI")
2138 return self.execute_command("WATCH", *names)
2140 def unwatch(self):
2141 """Unwatches all previously specified keys"""
2142 return self.watching and self.execute_command("UNWATCH") or True