/proc/self/cwd/source/extensions/filters/network/redis_proxy/config.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/filters/network/redis_proxy/config.h" |
2 | | |
3 | | #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" |
4 | | #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h" |
5 | | |
6 | | #include "source/extensions/common/dynamic_forward_proxy/dns_cache_manager_impl.h" |
7 | | #include "source/extensions/common/redis/cluster_refresh_manager_impl.h" |
8 | | #include "source/extensions/filters/network/common/redis/client_impl.h" |
9 | | #include "source/extensions/filters/network/common/redis/fault_impl.h" |
10 | | #include "source/extensions/filters/network/redis_proxy/command_splitter_impl.h" |
11 | | #include "source/extensions/filters/network/redis_proxy/proxy_filter.h" |
12 | | #include "source/extensions/filters/network/redis_proxy/router_impl.h" |
13 | | |
14 | | #include "absl/container/flat_hash_set.h" |
15 | | |
16 | | namespace Envoy { |
17 | | namespace Extensions { |
18 | | namespace NetworkFilters { |
19 | | namespace RedisProxy { |
20 | | |
21 | | namespace { |
22 | | inline void addUniqueClusters( |
23 | | absl::flat_hash_set<std::string>& clusters, |
24 | | const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::PrefixRoutes::Route& |
25 | 0 | route) { |
26 | 0 | clusters.emplace(route.cluster()); |
27 | 0 | for (auto& mirror : route.request_mirror_policy()) { |
28 | 0 | clusters.emplace(mirror.cluster()); |
29 | 0 | } |
30 | 0 | if (route.has_read_command_policy()) { |
31 | 0 | clusters.emplace(route.read_command_policy().cluster()); |
32 | 0 | } |
33 | 0 | } |
34 | | } // namespace |
35 | | |
36 | | Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromProtoTyped( |
37 | | const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy& proto_config, |
38 | 0 | Server::Configuration::FactoryContext& context) { |
39 | |
|
40 | 0 | ASSERT(!proto_config.stat_prefix().empty()); |
41 | 0 | ASSERT(proto_config.has_settings()); |
42 | | |
43 | 0 | Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager = |
44 | 0 | Extensions::Common::Redis::getClusterRefreshManager( |
45 | 0 | context.singletonManager(), context.mainThreadDispatcher(), context.clusterManager(), |
46 | 0 | context.timeSource()); |
47 | |
|
48 | 0 | Extensions::Common::DynamicForwardProxy::DnsCacheManagerFactoryImpl cache_manager_factory( |
49 | 0 | context); |
50 | 0 | auto filter_config = |
51 | 0 | std::make_shared<ProxyFilterConfig>(proto_config, context.scope(), context.drainDecision(), |
52 | 0 | context.runtime(), context.api(), cache_manager_factory); |
53 | |
|
54 | 0 | envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::PrefixRoutes prefix_routes( |
55 | 0 | proto_config.prefix_routes()); |
56 | | |
57 | | // Set the catch-all route from the settings parameters. |
58 | 0 | if (prefix_routes.routes_size() == 0 && !prefix_routes.has_catch_all_route()) { |
59 | 0 | throw EnvoyException("cannot configure a redis-proxy without any upstream"); |
60 | 0 | } |
61 | | |
62 | 0 | absl::flat_hash_set<std::string> unique_clusters; |
63 | 0 | for (auto& route : prefix_routes.routes()) { |
64 | 0 | addUniqueClusters(unique_clusters, route); |
65 | 0 | } |
66 | 0 | addUniqueClusters(unique_clusters, prefix_routes.catch_all_route()); |
67 | |
|
68 | 0 | auto redis_command_stats = |
69 | 0 | Common::Redis::RedisCommandStats::createRedisCommandStats(context.scope().symbolTable()); |
70 | |
|
71 | 0 | Upstreams upstreams; |
72 | 0 | for (auto& cluster : unique_clusters) { |
73 | 0 | Stats::ScopeSharedPtr stats_scope = |
74 | 0 | context.scope().createScope(fmt::format("cluster.{}.redis_cluster", cluster)); |
75 | 0 | auto conn_pool_ptr = std::make_shared<ConnPool::InstanceImpl>( |
76 | 0 | cluster, context.clusterManager(), Common::Redis::Client::ClientFactoryImpl::instance_, |
77 | 0 | context.threadLocal(), proto_config.settings(), context.api(), std::move(stats_scope), |
78 | 0 | redis_command_stats, refresh_manager, filter_config->dns_cache_); |
79 | 0 | conn_pool_ptr->init(); |
80 | 0 | upstreams.emplace(cluster, conn_pool_ptr); |
81 | 0 | } |
82 | |
|
83 | 0 | auto router = |
84 | 0 | std::make_unique<PrefixRoutes>(prefix_routes, std::move(upstreams), context.runtime()); |
85 | |
|
86 | 0 | auto fault_manager = std::make_unique<Common::Redis::FaultManagerImpl>( |
87 | 0 | context.api().randomGenerator(), context.runtime(), proto_config.faults()); |
88 | |
|
89 | 0 | std::shared_ptr<CommandSplitter::Instance> splitter = |
90 | 0 | std::make_shared<CommandSplitter::InstanceImpl>( |
91 | 0 | std::move(router), context.scope(), filter_config->stat_prefix_, context.timeSource(), |
92 | 0 | proto_config.latency_in_micros(), std::move(fault_manager)); |
93 | 0 | return [splitter, filter_config](Network::FilterManager& filter_manager) -> void { |
94 | 0 | Common::Redis::DecoderFactoryImpl factory; |
95 | 0 | filter_manager.addReadFilter(std::make_shared<ProxyFilter>( |
96 | 0 | factory, Common::Redis::EncoderPtr{new Common::Redis::EncoderImpl()}, *splitter, |
97 | 0 | filter_config)); |
98 | 0 | }; |
99 | 0 | } |
100 | | |
101 | | /** |
102 | | * Static registration for the redis filter. @see RegisterFactory. |
103 | | */ |
104 | | LEGACY_REGISTER_FACTORY(RedisProxyFilterConfigFactory, |
105 | | Server::Configuration::NamedNetworkFilterConfigFactory, |
106 | | "envoy.redis_proxy"); |
107 | | |
108 | | } // namespace RedisProxy |
109 | | } // namespace NetworkFilters |
110 | | } // namespace Extensions |
111 | | } // namespace Envoy |