1import copy
2import random
3import string
4from typing import List, Tuple
5
6import redis
7from redis.typing import KeysT, KeyT
8
9
10def list_or_args(keys: KeysT, args: Tuple[KeyT, ...]) -> List[KeyT]:
11 # returns a single new list combining keys and args
12 try:
13 iter(keys)
14 # a string or bytes instance can be iterated, but indicates
15 # keys wasn't passed as a list
16 if isinstance(keys, (bytes, str)):
17 keys = [keys]
18 else:
19 keys = list(keys)
20 except TypeError:
21 keys = [keys]
22 if args:
23 keys.extend(args)
24 return keys
25
26
27def nativestr(x):
28 """Return the decoded binary string, or a string, depending on type."""
29 r = x.decode("utf-8", "replace") if isinstance(x, bytes) else x
30 if r == "null":
31 return
32 return r
33
34
35def delist(x):
36 """Given a list of binaries, return the stringified version."""
37 if x is None:
38 return x
39 return [nativestr(obj) for obj in x]
40
41
42def parse_to_list(response):
43 """Optimistically parse the response to a list."""
44 res = []
45
46 special_values = {"infinity", "nan", "-infinity"}
47
48 if response is None:
49 return res
50
51 for item in response:
52 if item is None:
53 res.append(None)
54 continue
55 try:
56 item_str = nativestr(item)
57 except TypeError:
58 res.append(None)
59 continue
60
61 if isinstance(item_str, str) and item_str.lower() in special_values:
62 res.append(item_str) # Keep as string
63 else:
64 try:
65 res.append(int(item))
66 except ValueError:
67 try:
68 res.append(float(item))
69 except ValueError:
70 res.append(item_str)
71
72 return res
73
74
75def parse_list_to_dict(response):
76 res = {}
77 for i in range(0, len(response), 2):
78 if isinstance(response[i], list):
79 res["Child iterators"].append(parse_list_to_dict(response[i]))
80 try:
81 if isinstance(response[i + 1], list):
82 res["Child iterators"].append(parse_list_to_dict(response[i + 1]))
83 except IndexError:
84 pass
85 elif isinstance(response[i + 1], list):
86 res["Child iterators"] = [parse_list_to_dict(response[i + 1])]
87 else:
88 try:
89 res[response[i]] = float(response[i + 1])
90 except (TypeError, ValueError):
91 res[response[i]] = response[i + 1]
92 return res
93
94
95def random_string(length=10):
96 """
97 Returns a random N character long string.
98 """
99 return "".join( # nosec
100 random.choice(string.ascii_lowercase) for x in range(length)
101 )
102
103
104def decode_dict_keys(obj):
105 """Decode the keys of the given dictionary with utf-8."""
106 newobj = copy.copy(obj)
107 for k in obj.keys():
108 if isinstance(k, bytes):
109 newobj[k.decode("utf-8")] = newobj[k]
110 newobj.pop(k)
111 return newobj
112
113
114def get_protocol_version(client):
115 if isinstance(client, redis.Redis) or isinstance(client, redis.asyncio.Redis):
116 return client.connection_pool.connection_kwargs.get("protocol")
117 elif isinstance(client, redis.cluster.AbstractRedisCluster):
118 return client.nodes_manager.connection_kwargs.get("protocol")