1from __future__ import annotations
2
3from json import JSONDecoder, JSONEncoder
4from typing import TYPE_CHECKING
5
6if TYPE_CHECKING:
7 from .bf import BFBloom, CFBloom, CMSBloom, TDigestBloom, TOPKBloom
8 from .json import JSON
9 from .search import AsyncSearch, Search
10 from .timeseries import TimeSeries
11 from .vectorset import VectorSet
12
13
14class RedisModuleCommands:
15 """This class contains the wrapper functions to bring supported redis
16 modules into the command namespace.
17 """
18
19 def json(self, encoder=JSONEncoder(), decoder=JSONDecoder()) -> JSON:
20 """Access the json namespace, providing support for redis json."""
21
22 from .json import JSON
23
24 jj = JSON(client=self, encoder=encoder, decoder=decoder)
25 return jj
26
27 def ft(self, index_name="idx") -> Search:
28 """Access the search namespace, providing support for redis search."""
29
30 from .search import Search
31
32 s = Search(client=self, index_name=index_name)
33 return s
34
35 def ts(self) -> TimeSeries:
36 """Access the timeseries namespace, providing support for
37 redis timeseries data.
38 """
39
40 from .timeseries import TimeSeries
41
42 s = TimeSeries(client=self)
43 return s
44
45 def bf(self) -> BFBloom:
46 """Access the bloom namespace."""
47
48 from .bf import BFBloom
49
50 bf = BFBloom(client=self)
51 return bf
52
53 def cf(self) -> CFBloom:
54 """Access the bloom namespace."""
55
56 from .bf import CFBloom
57
58 cf = CFBloom(client=self)
59 return cf
60
61 def cms(self) -> CMSBloom:
62 """Access the bloom namespace."""
63
64 from .bf import CMSBloom
65
66 cms = CMSBloom(client=self)
67 return cms
68
69 def topk(self) -> TOPKBloom:
70 """Access the bloom namespace."""
71
72 from .bf import TOPKBloom
73
74 topk = TOPKBloom(client=self)
75 return topk
76
77 def tdigest(self) -> TDigestBloom:
78 """Access the bloom namespace."""
79
80 from .bf import TDigestBloom
81
82 tdigest = TDigestBloom(client=self)
83 return tdigest
84
85 def vset(self) -> VectorSet:
86 """Access the VectorSet commands namespace."""
87
88 from .vectorset import VectorSet
89
90 vset = VectorSet(client=self)
91 return vset
92
93
94class AsyncRedisModuleCommands(RedisModuleCommands):
95 def ft(self, index_name="idx") -> AsyncSearch:
96 """Access the search namespace, providing support for redis search."""
97
98 from .search import AsyncSearch
99
100 s = AsyncSearch(client=self, index_name=index_name)
101 return s