/proc/self/cwd/source/extensions/filters/network/redis_proxy/router_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/filters/network/redis_proxy/router_impl.h" |
2 | | |
3 | | #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" |
4 | | #include "envoy/type/v3/percent.pb.h" |
5 | | |
6 | | #include "source/common/formatter/substitution_formatter.h" |
7 | | #include "source/common/http/header_map_impl.h" |
8 | | |
9 | | #include "absl/strings/str_replace.h" |
10 | | |
11 | | namespace Envoy { |
12 | | namespace Extensions { |
13 | | namespace NetworkFilters { |
14 | | namespace RedisProxy { |
15 | | |
16 | | MirrorPolicyImpl::MirrorPolicyImpl(const envoy::extensions::filters::network::redis_proxy::v3:: |
17 | | RedisProxy::PrefixRoutes::Route::RequestMirrorPolicy& config, |
18 | | const ConnPool::InstanceSharedPtr upstream, |
19 | | Runtime::Loader& runtime) |
20 | | : runtime_key_(config.runtime_fraction().runtime_key()), |
21 | | default_value_(config.has_runtime_fraction() |
22 | | ? absl::optional<envoy::type::v3::FractionalPercent>( |
23 | | config.runtime_fraction().default_value()) |
24 | | : absl::nullopt), |
25 | | exclude_read_commands_(config.exclude_read_commands()), upstream_(upstream), |
26 | 0 | runtime_(runtime) {} |
27 | | |
28 | 0 | bool MirrorPolicyImpl::shouldMirror(const std::string& command) const { |
29 | 0 | if (!upstream_) { |
30 | 0 | return false; |
31 | 0 | } |
32 | | |
33 | 0 | std::string to_lower_string = absl::AsciiStrToLower(command); |
34 | |
|
35 | 0 | if (exclude_read_commands_ && Common::Redis::SupportedCommands::isReadCommand(to_lower_string)) { |
36 | 0 | return false; |
37 | 0 | } |
38 | | |
39 | 0 | if (default_value_.has_value()) { |
40 | 0 | return runtime_.snapshot().featureEnabled(runtime_key_, default_value_.value()); |
41 | 0 | } |
42 | | |
43 | 0 | return true; |
44 | 0 | } |
45 | | |
46 | | Prefix::Prefix( |
47 | | const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::PrefixRoutes::Route |
48 | | route, |
49 | | Upstreams& upstreams, Runtime::Loader& runtime) |
50 | | : prefix_(route.prefix()), key_formatter_(route.key_formatter()), |
51 | 0 | remove_prefix_(route.remove_prefix()), upstream_(upstreams.at(route.cluster())) { |
52 | 0 | for (auto const& mirror_policy : route.request_mirror_policy()) { |
53 | 0 | mirror_policies_.emplace_back(std::make_shared<MirrorPolicyImpl>( |
54 | 0 | mirror_policy, upstreams.at(mirror_policy.cluster()), runtime)); |
55 | 0 | } |
56 | 0 | if (route.has_read_command_policy()) { |
57 | 0 | read_upstream_ = upstreams.at(route.read_command_policy().cluster()); |
58 | 0 | } |
59 | 0 | } |
60 | | |
61 | 0 | ConnPool::InstanceSharedPtr Prefix::upstream(const std::string& command) const { |
62 | |
|
63 | 0 | if (read_upstream_) { |
64 | 0 | std::string to_lower_string = absl::AsciiStrToLower(command); |
65 | 0 | if (Common::Redis::SupportedCommands::isReadCommand(to_lower_string)) { |
66 | 0 | return read_upstream_; |
67 | 0 | } |
68 | 0 | } |
69 | | |
70 | 0 | return upstream_; |
71 | 0 | } |
72 | | |
73 | | PrefixRoutes::PrefixRoutes( |
74 | | const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::PrefixRoutes& config, |
75 | | Upstreams&& upstreams, Runtime::Loader& runtime) |
76 | | : case_insensitive_(config.case_insensitive()), upstreams_(std::move(upstreams)), |
77 | | catch_all_route_(config.has_catch_all_route() |
78 | | ? std::make_shared<Prefix>(config.catch_all_route(), upstreams_, runtime) |
79 | 0 | : nullptr) { |
80 | |
|
81 | 0 | for (auto const& route : config.routes()) { |
82 | 0 | std::string copy(route.prefix()); |
83 | |
|
84 | 0 | if (case_insensitive_) { |
85 | 0 | absl::AsciiStrToLower(©); |
86 | 0 | } |
87 | |
|
88 | 0 | auto success = prefix_lookup_table_.add( |
89 | 0 | copy.c_str(), std::make_shared<Prefix>(route, upstreams_, runtime), false); |
90 | 0 | if (!success) { |
91 | 0 | throw EnvoyException(fmt::format("prefix `{}` already exists.", route.prefix())); |
92 | 0 | } |
93 | 0 | } |
94 | 0 | } |
95 | | |
96 | | RouteSharedPtr PrefixRoutes::upstreamPool(std::string& key, |
97 | 0 | const StreamInfo::StreamInfo& stream_info) { |
98 | 0 | PrefixSharedPtr value = nullptr; |
99 | 0 | if (case_insensitive_) { |
100 | 0 | std::string copy = absl::AsciiStrToLower(key); |
101 | 0 | value = prefix_lookup_table_.findLongestPrefix(copy.c_str()); |
102 | 0 | } else { |
103 | 0 | value = prefix_lookup_table_.findLongestPrefix(key.c_str()); |
104 | 0 | } |
105 | |
|
106 | 0 | if (value == nullptr) { |
107 | | // prefix route not found, default to catch all route. |
108 | 0 | value = catch_all_route_; |
109 | | // prefix route not found, check if catch_all_route is defined to fallback to. |
110 | 0 | if (catch_all_route_ != nullptr) { |
111 | 0 | value = catch_all_route_; |
112 | 0 | } else { |
113 | | // no route found. |
114 | 0 | return value; |
115 | 0 | } |
116 | 0 | } |
117 | | |
118 | 0 | if (value->removePrefix()) { |
119 | 0 | key.erase(0, value->prefix().length()); |
120 | 0 | } |
121 | 0 | if (!value->keyFormatter().empty()) { |
122 | 0 | formatKey(key, value->keyFormatter(), stream_info); |
123 | 0 | } |
124 | 0 | return value; |
125 | 0 | } |
126 | | |
127 | | void PrefixRoutes::formatKey(std::string& key, std::string redis_key_formatter, |
128 | 0 | const StreamInfo::StreamInfo& stream_info) { |
129 | | // If key_formatter defines %KEY% command, then do a direct string replacement. |
130 | | // TODO(deveshkandpal24121990) - Possibly define a RedisKeyFormatter as a SubstitutionFormatter. |
131 | | // There is a possibility that key might have a '%' character in it, which will be incorrectly |
132 | | // processed by SubstitutionFormatter. To avoid that, replace the '%KEY%' command with |
133 | | // '~REPLACED_KEY~` place holder. After SubstitutionFormatter is done, replace the |
134 | | // '~REPLACED_KEY~' with the actual key. |
135 | 0 | if (redis_key_formatter.find(redis_key_formatter_command_) != std::string::npos) { |
136 | 0 | redis_key_formatter = absl::StrReplaceAll( |
137 | 0 | redis_key_formatter, {{redis_key_formatter_command_, redis_key_to_be_replaced_}}); |
138 | 0 | } |
139 | 0 | auto providers = Formatter::SubstitutionFormatParser::parse(redis_key_formatter); |
140 | 0 | std::string formatted_key; |
141 | 0 | for (Formatter::FormatterProviderPtr& provider : providers) { |
142 | 0 | auto provider_formatted_key = provider->formatValueWithContext({}, stream_info); |
143 | 0 | if (provider_formatted_key.has_string_value()) { |
144 | 0 | formatted_key = formatted_key + provider_formatted_key.string_value(); |
145 | 0 | } |
146 | 0 | } |
147 | 0 | if (formatted_key.find(redis_key_to_be_replaced_) != std::string::npos) { |
148 | 0 | formatted_key = absl::StrReplaceAll(formatted_key, {{redis_key_to_be_replaced_, key}}); |
149 | 0 | } |
150 | 0 | if (!formatted_key.empty()) { |
151 | 0 | key = formatted_key; |
152 | 0 | } |
153 | 0 | ENVOY_LOG(trace, "formatted key is {}", key); |
154 | 0 | } |
155 | | |
156 | | } // namespace RedisProxy |
157 | | } // namespace NetworkFilters |
158 | | } // namespace Extensions |
159 | | } // namespace Envoy |