Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/helpers.py: 82%

107 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import copy 

2import random 

3import string 

4from typing import List, Tuple 

5 

6import redis 

7from redis.typing import KeysT, KeyT 

8 

9 

10def list_or_args(keys: KeysT, args: Tuple[KeyT, ...]) -> List[KeyT]: 

11 # returns a single new list combining keys and args 

12 try: 

13 iter(keys) 

14 # a string or bytes instance can be iterated, but indicates 

15 # keys wasn't passed as a list 

16 if isinstance(keys, (bytes, str)): 

17 keys = [keys] 

18 else: 

19 keys = list(keys) 

20 except TypeError: 

21 keys = [keys] 

22 if args: 

23 keys.extend(args) 

24 return keys 

25 

26 

27def nativestr(x): 

28 """Return the decoded binary string, or a string, depending on type.""" 

29 r = x.decode("utf-8", "replace") if isinstance(x, bytes) else x 

30 if r == "null": 

31 return 

32 return r 

33 

34 

35def delist(x): 

36 """Given a list of binaries, return the stringified version.""" 

37 if x is None: 

38 return x 

39 return [nativestr(obj) for obj in x] 

40 

41 

42def parse_to_list(response): 

43 """Optimistically parse the response to a list.""" 

44 res = [] 

45 

46 if response is None: 

47 return res 

48 

49 for item in response: 

50 try: 

51 res.append(int(item)) 

52 except ValueError: 

53 try: 

54 res.append(float(item)) 

55 except ValueError: 

56 res.append(nativestr(item)) 

57 except TypeError: 

58 res.append(None) 

59 return res 

60 

61 

62def parse_list_to_dict(response): 

63 res = {} 

64 for i in range(0, len(response), 2): 

65 if isinstance(response[i], list): 

66 res["Child iterators"].append(parse_list_to_dict(response[i])) 

67 try: 

68 if isinstance(response[i + 1], list): 

69 res["Child iterators"].append(parse_list_to_dict(response[i + 1])) 

70 except IndexError: 

71 pass 

72 elif isinstance(response[i + 1], list): 

73 res["Child iterators"] = [parse_list_to_dict(response[i + 1])] 

74 else: 

75 try: 

76 res[response[i]] = float(response[i + 1]) 

77 except (TypeError, ValueError): 

78 res[response[i]] = response[i + 1] 

79 return res 

80 

81 

82def parse_to_dict(response): 

83 if response is None: 

84 return {} 

85 

86 res = {} 

87 for det in response: 

88 if isinstance(det[1], list): 

89 res[det[0]] = parse_list_to_dict(det[1]) 

90 else: 

91 try: # try to set the attribute. may be provided without value 

92 try: # try to convert the value to float 

93 res[det[0]] = float(det[1]) 

94 except (TypeError, ValueError): 

95 res[det[0]] = det[1] 

96 except IndexError: 

97 pass 

98 return res 

99 

100 

101def random_string(length=10): 

102 """ 

103 Returns a random N character long string. 

104 """ 

105 return "".join( # nosec 

106 random.choice(string.ascii_lowercase) for x in range(length) 

107 ) 

108 

109 

110def quote_string(v): 

111 """ 

112 RedisGraph strings must be quoted, 

113 quote_string wraps given v with quotes incase 

114 v is a string. 

115 """ 

116 

117 if isinstance(v, bytes): 

118 v = v.decode() 

119 elif not isinstance(v, str): 

120 return v 

121 if len(v) == 0: 

122 return '""' 

123 

124 v = v.replace("\\", "\\\\") 

125 v = v.replace('"', '\\"') 

126 

127 return f'"{v}"' 

128 

129 

130def decode_dict_keys(obj): 

131 """Decode the keys of the given dictionary with utf-8.""" 

132 newobj = copy.copy(obj) 

133 for k in obj.keys(): 

134 if isinstance(k, bytes): 

135 newobj[k.decode("utf-8")] = newobj[k] 

136 newobj.pop(k) 

137 return newobj 

138 

139 

140def stringify_param_value(value): 

141 """ 

142 Turn a parameter value into a string suitable for the params header of 

143 a Cypher command. 

144 You may pass any value that would be accepted by `json.dumps()`. 

145 

146 Ways in which output differs from that of `str()`: 

147 * Strings are quoted. 

148 * None --> "null". 

149 * In dictionaries, keys are _not_ quoted. 

150 

151 :param value: The parameter value to be turned into a string. 

152 :return: string 

153 """ 

154 

155 if isinstance(value, str): 

156 return quote_string(value) 

157 elif value is None: 

158 return "null" 

159 elif isinstance(value, (list, tuple)): 

160 return f'[{",".join(map(stringify_param_value, value))}]' 

161 elif isinstance(value, dict): 

162 return f'{{{",".join(f"{k}:{stringify_param_value(v)}" for k, v in value.items())}}}' # noqa 

163 else: 

164 return str(value) 

165 

166 

167def get_protocol_version(client): 

168 if isinstance(client, redis.Redis) or isinstance(client, redis.asyncio.Redis): 

169 return client.connection_pool.connection_kwargs.get("protocol") 

170 elif isinstance(client, redis.cluster.AbstractRedisCluster): 

171 return client.nodes_manager.connection_kwargs.get("protocol")