1from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
2
3from redis.exceptions import RedisError, ResponseError
4from redis.utils import str_if_bytes
5
6if TYPE_CHECKING:
7 from redis.asyncio.cluster import ClusterNode
8
9
10class AbstractCommandsParser:
11 def _get_pubsub_keys(self, *args):
12 """
13 Get the keys from pubsub command.
14 Although PubSub commands have predetermined key locations, they are not
15 supported in the 'COMMAND's output, so the key positions are hardcoded
16 in this method
17 """
18 if len(args) < 2:
19 # The command has no keys in it
20 return None
21 args = [str_if_bytes(arg) for arg in args]
22 command = args[0].upper()
23 keys = None
24 if command == "PUBSUB":
25 # the second argument is a part of the command name, e.g.
26 # ['PUBSUB', 'NUMSUB', 'foo'].
27 pubsub_type = args[1].upper()
28 if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]:
29 keys = args[2:]
30 elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]:
31 # format example:
32 # SUBSCRIBE channel [channel ...]
33 keys = list(args[1:])
34 elif command in ["PUBLISH", "SPUBLISH"]:
35 # format example:
36 # PUBLISH channel message
37 keys = [args[1]]
38 return keys
39
40 def parse_subcommand(self, command, **options):
41 cmd_dict = {}
42 cmd_name = str_if_bytes(command[0])
43 cmd_dict["name"] = cmd_name
44 cmd_dict["arity"] = int(command[1])
45 cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
46 cmd_dict["first_key_pos"] = command[3]
47 cmd_dict["last_key_pos"] = command[4]
48 cmd_dict["step_count"] = command[5]
49 if len(command) > 7:
50 cmd_dict["tips"] = command[7]
51 cmd_dict["key_specifications"] = command[8]
52 cmd_dict["subcommands"] = command[9]
53 return cmd_dict
54
55
56class CommandsParser(AbstractCommandsParser):
57 """
58 Parses Redis commands to get command keys.
59 COMMAND output is used to determine key locations.
60 Commands that do not have a predefined key location are flagged with
61 'movablekeys', and these commands' keys are determined by the command
62 'COMMAND GETKEYS'.
63 """
64
65 def __init__(self, redis_connection):
66 self.commands = {}
67 self.initialize(redis_connection)
68
69 def initialize(self, r):
70 commands = r.command()
71 uppercase_commands = []
72 for cmd in commands:
73 if any(x.isupper() for x in cmd):
74 uppercase_commands.append(cmd)
75 for cmd in uppercase_commands:
76 commands[cmd.lower()] = commands.pop(cmd)
77 self.commands = commands
78
79 # As soon as this PR is merged into Redis, we should reimplement
80 # our logic to use COMMAND INFO changes to determine the key positions
81 # https://github.com/redis/redis/pull/8324
82 def get_keys(self, redis_conn, *args):
83 """
84 Get the keys from the passed command.
85
86 NOTE: Due to a bug in redis<7.0, this function does not work properly
87 for EVAL or EVALSHA when the `numkeys` arg is 0.
88 - issue: https://github.com/redis/redis/issues/9493
89 - fix: https://github.com/redis/redis/pull/9733
90
91 So, don't use this function with EVAL or EVALSHA.
92 """
93 if len(args) < 2:
94 # The command has no keys in it
95 return None
96
97 cmd_name = args[0].lower()
98 if cmd_name not in self.commands:
99 # try to split the command name and to take only the main command,
100 # e.g. 'memory' for 'memory usage'
101 cmd_name_split = cmd_name.split()
102 cmd_name = cmd_name_split[0]
103 if cmd_name in self.commands:
104 # save the splitted command to args
105 args = cmd_name_split + list(args[1:])
106 else:
107 # We'll try to reinitialize the commands cache, if the engine
108 # version has changed, the commands may not be current
109 self.initialize(redis_conn)
110 if cmd_name not in self.commands:
111 raise RedisError(
112 f"{cmd_name.upper()} command doesn't exist in Redis commands"
113 )
114
115 command = self.commands.get(cmd_name)
116 if "movablekeys" in command["flags"]:
117 keys = self._get_moveable_keys(redis_conn, *args)
118 elif "pubsub" in command["flags"] or command["name"] == "pubsub":
119 keys = self._get_pubsub_keys(*args)
120 else:
121 if (
122 command["step_count"] == 0
123 and command["first_key_pos"] == 0
124 and command["last_key_pos"] == 0
125 ):
126 is_subcmd = False
127 if "subcommands" in command:
128 subcmd_name = f"{cmd_name}|{args[1].lower()}"
129 for subcmd in command["subcommands"]:
130 if str_if_bytes(subcmd[0]) == subcmd_name:
131 command = self.parse_subcommand(subcmd)
132 is_subcmd = True
133
134 # The command doesn't have keys in it
135 if not is_subcmd:
136 return None
137 last_key_pos = command["last_key_pos"]
138 if last_key_pos < 0:
139 last_key_pos = len(args) - abs(last_key_pos)
140 keys_pos = list(
141 range(command["first_key_pos"], last_key_pos + 1, command["step_count"])
142 )
143 keys = [args[pos] for pos in keys_pos]
144
145 return keys
146
147 def _get_moveable_keys(self, redis_conn, *args):
148 """
149 NOTE: Due to a bug in redis<7.0, this function does not work properly
150 for EVAL or EVALSHA when the `numkeys` arg is 0.
151 - issue: https://github.com/redis/redis/issues/9493
152 - fix: https://github.com/redis/redis/pull/9733
153
154 So, don't use this function with EVAL or EVALSHA.
155 """
156 # The command name should be splitted into separate arguments,
157 # e.g. 'MEMORY USAGE' will be splitted into ['MEMORY', 'USAGE']
158 pieces = args[0].split() + list(args[1:])
159 try:
160 keys = redis_conn.execute_command("COMMAND GETKEYS", *pieces)
161 except ResponseError as e:
162 message = e.__str__()
163 if (
164 "Invalid arguments" in message
165 or "The command has no key arguments" in message
166 ):
167 return None
168 else:
169 raise e
170 return keys
171
172
173class AsyncCommandsParser(AbstractCommandsParser):
174 """
175 Parses Redis commands to get command keys.
176
177 COMMAND output is used to determine key locations.
178 Commands that do not have a predefined key location are flagged with 'movablekeys',
179 and these commands' keys are determined by the command 'COMMAND GETKEYS'.
180
181 NOTE: Due to a bug in redis<7.0, this does not work properly
182 for EVAL or EVALSHA when the `numkeys` arg is 0.
183 - issue: https://github.com/redis/redis/issues/9493
184 - fix: https://github.com/redis/redis/pull/9733
185
186 So, don't use this with EVAL or EVALSHA.
187 """
188
189 __slots__ = ("commands", "node")
190
191 def __init__(self) -> None:
192 self.commands: Dict[str, Union[int, Dict[str, Any]]] = {}
193
194 async def initialize(self, node: Optional["ClusterNode"] = None) -> None:
195 if node:
196 self.node = node
197
198 commands = await self.node.execute_command("COMMAND")
199 self.commands = {cmd.lower(): command for cmd, command in commands.items()}
200
201 # As soon as this PR is merged into Redis, we should reimplement
202 # our logic to use COMMAND INFO changes to determine the key positions
203 # https://github.com/redis/redis/pull/8324
204 async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
205 """
206 Get the keys from the passed command.
207
208 NOTE: Due to a bug in redis<7.0, this function does not work properly
209 for EVAL or EVALSHA when the `numkeys` arg is 0.
210 - issue: https://github.com/redis/redis/issues/9493
211 - fix: https://github.com/redis/redis/pull/9733
212
213 So, don't use this function with EVAL or EVALSHA.
214 """
215 if len(args) < 2:
216 # The command has no keys in it
217 return None
218
219 cmd_name = args[0].lower()
220 if cmd_name not in self.commands:
221 # try to split the command name and to take only the main command,
222 # e.g. 'memory' for 'memory usage'
223 cmd_name_split = cmd_name.split()
224 cmd_name = cmd_name_split[0]
225 if cmd_name in self.commands:
226 # save the splitted command to args
227 args = cmd_name_split + list(args[1:])
228 else:
229 # We'll try to reinitialize the commands cache, if the engine
230 # version has changed, the commands may not be current
231 await self.initialize()
232 if cmd_name not in self.commands:
233 raise RedisError(
234 f"{cmd_name.upper()} command doesn't exist in Redis commands"
235 )
236
237 command = self.commands.get(cmd_name)
238 if "movablekeys" in command["flags"]:
239 keys = await self._get_moveable_keys(*args)
240 elif "pubsub" in command["flags"] or command["name"] == "pubsub":
241 keys = self._get_pubsub_keys(*args)
242 else:
243 if (
244 command["step_count"] == 0
245 and command["first_key_pos"] == 0
246 and command["last_key_pos"] == 0
247 ):
248 is_subcmd = False
249 if "subcommands" in command:
250 subcmd_name = f"{cmd_name}|{args[1].lower()}"
251 for subcmd in command["subcommands"]:
252 if str_if_bytes(subcmd[0]) == subcmd_name:
253 command = self.parse_subcommand(subcmd)
254 is_subcmd = True
255
256 # The command doesn't have keys in it
257 if not is_subcmd:
258 return None
259 last_key_pos = command["last_key_pos"]
260 if last_key_pos < 0:
261 last_key_pos = len(args) - abs(last_key_pos)
262 keys_pos = list(
263 range(command["first_key_pos"], last_key_pos + 1, command["step_count"])
264 )
265 keys = [args[pos] for pos in keys_pos]
266
267 return keys
268
269 async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
270 try:
271 keys = await self.node.execute_command("COMMAND GETKEYS", *args)
272 except ResponseError as e:
273 message = e.__str__()
274 if (
275 "Invalid arguments" in message
276 or "The command has no key arguments" in message
277 ):
278 return None
279 else:
280 raise e
281 return keys