1import copy
2import random
3import string
4from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Mapping, Tuple
5
6import redis
7from redis.typing import ChannelT, KeysT, KeyT
8
9if TYPE_CHECKING:
10 from redis._parsers import Encoder
11
12
13def list_or_args(keys: KeysT, args: Tuple[KeyT, ...]) -> List[KeyT]:
14 # returns a single new list combining keys and args
15 try:
16 iter(keys)
17 # a string or bytes instance can be iterated, but indicates
18 # keys wasn't passed as a list
19 if isinstance(keys, (bytes, str)):
20 keys = [keys]
21 else:
22 keys = list(keys)
23 except TypeError:
24 keys = [keys]
25 if args:
26 keys.extend(args)
27 return keys
28
29
30def nativestr(x):
31 """Return the decoded binary string, or a string, depending on type."""
32 r = x.decode("utf-8", "replace") if isinstance(x, bytes) else x
33 if r == "null":
34 return
35 return r
36
37
38def delist(x):
39 """Given a list of binaries, return the stringified version."""
40 if x is None:
41 return x
42 return [nativestr(obj) for obj in x]
43
44
45def parse_to_list(response):
46 """Optimistically parse the response to a list."""
47 res = []
48
49 special_values = {"infinity", "nan", "-infinity"}
50
51 if response is None:
52 return res
53
54 for item in response:
55 if item is None:
56 res.append(None)
57 continue
58 try:
59 item_str = nativestr(item)
60 except TypeError:
61 res.append(None)
62 continue
63
64 if isinstance(item_str, str) and item_str.lower() in special_values:
65 res.append(item_str) # Keep as string
66 else:
67 try:
68 res.append(int(item))
69 except (ValueError, OverflowError, TypeError):
70 try:
71 res.append(float(item))
72 except (ValueError, TypeError):
73 res.append(item_str)
74
75 return res
76
77
78def random_string(length=10):
79 """
80 Returns a random N character long string.
81 """
82 return "".join( # nosec
83 random.choice(string.ascii_lowercase) for x in range(length)
84 )
85
86
87def decode_dict_keys(obj):
88 """Decode the keys of the given dictionary with utf-8."""
89 newobj = copy.copy(obj)
90 for k in obj.keys():
91 if isinstance(k, bytes):
92 newobj[k.decode("utf-8")] = newobj[k]
93 newobj.pop(k)
94 return newobj
95
96
97def get_protocol_version(client):
98 if isinstance(client, redis.Redis) or isinstance(client, redis.asyncio.Redis):
99 return client.connection_pool.connection_kwargs.get("protocol")
100 elif isinstance(client, redis.cluster.AbstractRedisCluster):
101 return client.nodes_manager.connection_kwargs.get("protocol")
102
103
104def at_most_one_value_set(iterable: Iterable[Any]):
105 """
106 Checks that at most one of the values in the iterable is truthy.
107
108 Args:
109 iterable: An iterable of values to check.
110
111 Returns:
112 True if at most one value is truthy, False otherwise.
113
114 Raises:
115 Might raise an error if the values in iterable are not boolean-compatible.
116 For example if the type of the values implement
117 __len__ or __bool__ methods and they raise an error.
118 """
119 values = (bool(x) for x in iterable)
120 return sum(values) <= 1
121
122
123def partition_pubsub_subscriptions_by_handler(
124 subscriptions: Mapping[ChannelT, Callable | None],
125 encoder: "Encoder",
126) -> tuple[list[ChannelT], dict[str, Callable]]:
127 """Partition a PubSub ``{name: handler|None}`` mapping into the positional
128 and keyword arguments expected by ``[s|p]subscribe``.
129
130 For python3, we can't pass bytestrings as keyword arguments, so names
131 with a handler are decoded (keyword args). Names subscribed without a
132 callback are stored with a ``None`` handler and may have binary values
133 that are not valid in the current encoding (e.g. arbitrary bytes that
134 are not valid UTF-8); they are returned as raw keys (positional args)
135 so that no decoding is required.
136 """
137 subscriptions_without_handlers: list[ChannelT] = []
138 subscriptions_with_handlers: dict[str, Callable] = {}
139 for k, v in subscriptions.items():
140 if v is not None:
141 subscriptions_with_handlers[encoder.decode(k, force=True)] = v
142 else:
143 subscriptions_without_handlers.append(k)
144 return subscriptions_without_handlers, subscriptions_with_handlers