Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/router_impl.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <memory>
4
#include <string>
5
#include <vector>
6
7
#include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h"
8
#include "envoy/router/router.h"
9
#include "envoy/tcp/conn_pool.h"
10
#include "envoy/upstream/load_balancer.h"
11
12
#include "source/common/http/header_utility.h"
13
#include "source/common/router/metadatamatchcriteria_impl.h"
14
#include "source/common/upstream/load_balancer_impl.h"
15
#include "source/extensions/filters/network/thrift_proxy/conn_manager.h"
16
#include "source/extensions/filters/network/thrift_proxy/filters/filter.h"
17
#include "source/extensions/filters/network/thrift_proxy/router/router.h"
18
#include "source/extensions/filters/network/thrift_proxy/router/router_ratelimit_impl.h"
19
#include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h"
20
#include "source/extensions/filters/network/thrift_proxy/thrift_object.h"
21
22
#include "absl/types/optional.h"
23
24
namespace Envoy {
25
namespace Extensions {
26
namespace NetworkFilters {
27
namespace ThriftProxy {
28
namespace Router {
29
30
class RequestMirrorPolicyImpl : public RequestMirrorPolicy {
31
public:
32
  RequestMirrorPolicyImpl(const std::string& cluster_name, const std::string& runtime_key,
33
                          const envoy::type::v3::FractionalPercent& default_value)
34
0
      : cluster_name_(cluster_name), runtime_key_(runtime_key), default_value_(default_value) {}
35
36
  // Router::RequestMirrorPolicy
37
0
  const std::string& clusterName() const override { return cluster_name_; }
38
0
  bool enabled(Runtime::Loader& runtime) const override {
39
0
    return runtime_key_.empty() ? true
40
0
                                : runtime.snapshot().featureEnabled(runtime_key_, default_value_);
41
0
  }
42
43
private:
44
  const std::string cluster_name_;
45
  const std::string runtime_key_;
46
  const envoy::type::v3::FractionalPercent default_value_;
47
};
48
49
class RouteEntryImplBase : public RouteEntry,
50
                           public Route,
51
                           public std::enable_shared_from_this<RouteEntryImplBase> {
52
public:
53
  RouteEntryImplBase(const envoy::extensions::filters::network::thrift_proxy::v3::Route& route);
54
55
  void validateClusters(const Upstream::ClusterManager::ClusterInfoMaps& cluster_info_maps) const;
56
  // Router::RouteEntry
57
  const std::string& clusterName() const override;
58
0
  const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
59
0
    return metadata_match_criteria_.get();
60
0
  }
61
0
  const RateLimitPolicy& rateLimitPolicy() const override { return rate_limit_policy_; }
62
0
  bool stripServiceName() const override { return strip_service_name_; };
63
0
  const Http::LowerCaseString& clusterHeader() const override { return cluster_header_; }
64
0
  const std::vector<std::shared_ptr<RequestMirrorPolicy>>& requestMirrorPolicies() const override {
65
0
    return mirror_policies_;
66
0
  }
67
68
  // Router::Route
69
  const RouteEntry* routeEntry() const override;
70
71
  virtual RouteConstSharedPtr matches(const MessageMetadata& metadata,
72
                                      uint64_t random_value) const PURE;
73
74
protected:
75
  RouteConstSharedPtr clusterEntry(uint64_t random_value, const MessageMetadata& metadata) const;
76
  bool headersMatch(const Http::HeaderMap& headers) const;
77
78
private:
79
  class WeightedClusterEntry : public RouteEntry, public Route {
80
  public:
81
    WeightedClusterEntry(
82
        const RouteEntryImplBase& parent,
83
        const envoy::extensions::filters::network::thrift_proxy::v3::WeightedCluster::ClusterWeight&
84
            cluster);
85
86
0
    uint64_t clusterWeight() const { return cluster_weight_; }
87
88
    // Router::RouteEntry
89
0
    const std::string& clusterName() const override { return cluster_name_; }
90
0
    const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
91
0
      if (metadata_match_criteria_) {
92
0
        return metadata_match_criteria_.get();
93
0
      }
94
95
0
      return parent_.metadataMatchCriteria();
96
0
    }
97
0
    const RateLimitPolicy& rateLimitPolicy() const override { return parent_.rateLimitPolicy(); }
98
0
    bool stripServiceName() const override { return parent_.stripServiceName(); }
99
0
    const Http::LowerCaseString& clusterHeader() const override {
100
      // Weighted cluster entries don't have a cluster header based on proto.
101
0
      ASSERT(parent_.clusterHeader().get().empty());
102
0
      return parent_.clusterHeader();
103
0
    }
104
    const std::vector<std::shared_ptr<RequestMirrorPolicy>>&
105
0
    requestMirrorPolicies() const override {
106
0
      return parent_.requestMirrorPolicies();
107
0
    }
108
109
    // Router::Route
110
0
    const RouteEntry* routeEntry() const override { return this; }
111
112
  private:
113
    const RouteEntryImplBase& parent_;
114
    const std::string cluster_name_;
115
    const uint64_t cluster_weight_;
116
    Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
117
  };
118
  using WeightedClusterEntrySharedPtr = std::shared_ptr<WeightedClusterEntry>;
119
120
  class DynamicRouteEntry : public RouteEntry, public Route {
121
  public:
122
    DynamicRouteEntry(const RouteEntryImplBase& parent, absl::string_view cluster_name)
123
0
        : parent_(parent), cluster_name_(std::string(cluster_name)) {}
124
125
    // Router::RouteEntry
126
0
    const std::string& clusterName() const override { return cluster_name_; }
127
0
    const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
128
0
      return parent_.metadataMatchCriteria();
129
0
    }
130
0
    const RateLimitPolicy& rateLimitPolicy() const override { return parent_.rateLimitPolicy(); }
131
0
    bool stripServiceName() const override { return parent_.stripServiceName(); }
132
0
    const Http::LowerCaseString& clusterHeader() const override { return parent_.clusterHeader(); }
133
    const std::vector<std::shared_ptr<RequestMirrorPolicy>>&
134
0
    requestMirrorPolicies() const override {
135
0
      return parent_.requestMirrorPolicies();
136
0
    }
137
138
    // Router::Route
139
0
    const RouteEntry* routeEntry() const override { return this; }
140
141
  private:
142
    const RouteEntryImplBase& parent_;
143
    const std::string cluster_name_;
144
  };
145
146
  static std::vector<std::shared_ptr<RequestMirrorPolicy>> buildMirrorPolicies(
147
      const envoy::extensions::filters::network::thrift_proxy::v3::RouteAction& route);
148
149
  const std::string cluster_name_;
150
  const std::vector<Http::HeaderUtility::HeaderDataPtr> config_headers_;
151
  std::vector<WeightedClusterEntrySharedPtr> weighted_clusters_;
152
  uint64_t total_cluster_weight_;
153
  Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
154
  const RateLimitPolicyImpl rate_limit_policy_;
155
  const bool strip_service_name_;
156
  const Http::LowerCaseString cluster_header_;
157
  const std::vector<std::shared_ptr<RequestMirrorPolicy>> mirror_policies_;
158
};
159
160
using RouteEntryImplBaseConstSharedPtr = std::shared_ptr<const RouteEntryImplBase>;
161
162
class MethodNameRouteEntryImpl : public RouteEntryImplBase {
163
public:
164
  MethodNameRouteEntryImpl(
165
      const envoy::extensions::filters::network::thrift_proxy::v3::Route& route);
166
167
  // RouteEntryImplBase
168
  RouteConstSharedPtr matches(const MessageMetadata& metadata,
169
                              uint64_t random_value) const override;
170
171
private:
172
  const std::string method_name_;
173
  const bool invert_;
174
};
175
176
class ServiceNameRouteEntryImpl : public RouteEntryImplBase {
177
public:
178
  ServiceNameRouteEntryImpl(
179
      const envoy::extensions::filters::network::thrift_proxy::v3::Route& route);
180
181
  // RouteEntryImplBase
182
  RouteConstSharedPtr matches(const MessageMetadata& metadata,
183
                              uint64_t random_value) const override;
184
185
private:
186
  std::string service_name_;
187
  const bool invert_;
188
};
189
190
class RouteMatcher {
191
public:
192
  // validation_clusters = absl::nullopt means that clusters are not validated.
193
  RouteMatcher(
194
      const envoy::extensions::filters::network::thrift_proxy::v3::RouteConfiguration& config,
195
      const absl::optional<Upstream::ClusterManager::ClusterInfoMaps>& validation_clusters);
196
197
  RouteConstSharedPtr route(const MessageMetadata& metadata, uint64_t random_value) const;
198
199
private:
200
  std::vector<RouteEntryImplBaseConstSharedPtr> routes_;
201
};
202
203
// Adapter from DecoderFilterCallbacks to UpstreamResponseCallbacks.
204
class UpstreamResponseCallbacksImpl : public UpstreamResponseCallbacks {
205
public:
206
  UpstreamResponseCallbacksImpl(ThriftFilters::DecoderFilterCallbacks* callbacks)
207
0
      : callbacks_(callbacks) {}
208
209
0
  void startUpstreamResponse(Transport& transport, Protocol& protocol) override {
210
0
    callbacks_->startUpstreamResponse(transport, protocol);
211
0
  }
212
0
  ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override {
213
0
    callbacks_->streamInfo().addBytesSent(buffer.length());
214
0
    return callbacks_->upstreamData(buffer);
215
0
  }
216
0
  MessageMetadataSharedPtr responseMetadata() override { return callbacks_->responseMetadata(); }
217
0
  bool responseSuccess() override { return callbacks_->responseSuccess(); }
218
219
private:
220
  ThriftFilters::DecoderFilterCallbacks* callbacks_{};
221
};
222
223
class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
224
               public Upstream::LoadBalancerContextBase,
225
               public RequestOwner,
226
               public ThriftFilters::DecoderFilter {
227
public:
228
  Router(Upstream::ClusterManager& cluster_manager, const RouterStats& stats,
229
         Runtime::Loader& runtime, ShadowWriter& shadow_writer, bool close_downstream_on_error)
230
      : RequestOwner(cluster_manager, stats), passthrough_supported_(false), runtime_(runtime),
231
0
        shadow_writer_(shadow_writer), close_downstream_on_error_(close_downstream_on_error) {}
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::Router::Router(Envoy::Upstream::ClusterManager&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::RouterStats const&, Envoy::Runtime::Loader&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowWriter&, bool)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::Router::Router(Envoy::Upstream::ClusterManager&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::RouterStats const&, Envoy::Runtime::Loader&, Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowWriter&, bool)
232
233
0
  ~Router() override = default;
234
235
  // ThriftFilters::DecoderFilter
236
  void onDestroy() override;
237
  void setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) override;
238
0
  bool passthroughSupported() const override { return passthrough_supported_; }
239
240
  // RequestOwner
241
0
  Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() override {
242
0
    ASSERT(callbacks_ != nullptr);
243
0
    ASSERT(upstream_request_ != nullptr);
244
245
0
    auto upstream_info = std::make_shared<StreamInfo::UpstreamInfoImpl>();
246
0
    upstream_info->setUpstreamHost(upstream_request_->upstream_host_);
247
0
    callbacks_->streamInfo().setUpstreamInfo(std::move(upstream_info));
248
249
0
    return *this;
250
0
  }
251
0
  Buffer::OwnedImpl& buffer() override { return upstream_request_buffer_; }
252
0
  Event::Dispatcher& dispatcher() override { return callbacks_->dispatcher(); }
253
0
  void addSize(uint64_t size) override { request_size_ += size; }
254
0
  void continueDecoding() override { callbacks_->continueDecoding(); }
255
0
  void resetDownstreamConnection() override { callbacks_->resetDownstreamConnection(); }
256
0
  void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) override {
257
0
    callbacks_->sendLocalReply(response, end_stream);
258
0
  }
259
0
  void onReset() override { callbacks_->onReset(); }
260
261
  // RequestOwner::ProtocolConverter
262
  FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override;
263
  FilterStatus transportEnd() override;
264
  FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override;
265
  FilterStatus messageEnd() override;
266
  FilterStatus passthroughData(Buffer::Instance& data) override;
267
  FilterStatus structBegin(absl::string_view name) override;
268
  FilterStatus structEnd() override;
269
  FilterStatus fieldBegin(absl::string_view name, FieldType& field_type,
270
                          int16_t& field_id) override;
271
  FilterStatus fieldEnd() override;
272
  FilterStatus boolValue(bool& value) override;
273
  FilterStatus byteValue(uint8_t& value) override;
274
  FilterStatus int16Value(int16_t& value) override;
275
  FilterStatus int32Value(int32_t& value) override;
276
  FilterStatus int64Value(int64_t& value) override;
277
  FilterStatus doubleValue(double& value) override;
278
  FilterStatus stringValue(absl::string_view value) override;
279
  FilterStatus mapBegin(FieldType& key_type, FieldType& value_type, uint32_t& size) override;
280
  FilterStatus mapEnd() override;
281
  FilterStatus listBegin(FieldType& elem_type, uint32_t& size) override;
282
  FilterStatus listEnd() override;
283
  FilterStatus setBegin(FieldType& elem_type, uint32_t& size) override;
284
  FilterStatus setEnd() override;
285
286
  // Upstream::LoadBalancerContext
287
  const Network::Connection* downstreamConnection() const override;
288
0
  const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override {
289
0
    const Envoy::Router::MetadataMatchCriteria* route_criteria =
290
0
        (route_entry_ != nullptr) ? route_entry_->metadataMatchCriteria() : nullptr;
291
292
    // Support getting metadata match criteria from thrift request.
293
0
    const auto& request_metadata = callbacks_->streamInfo().dynamicMetadata().filter_metadata();
294
0
    const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
295
296
0
    if (filter_it == request_metadata.end()) {
297
0
      return route_criteria;
298
0
    }
299
300
0
    if (route_criteria != nullptr) {
301
0
      metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second);
302
0
    } else {
303
0
      metadata_match_criteria_ =
304
0
          std::make_unique<Envoy::Router::MetadataMatchCriteriaImpl>(filter_it->second);
305
0
    }
306
307
0
    return metadata_match_criteria_.get();
308
0
  }
309
310
  // Tcp::ConnectionPool::UpstreamCallbacks
311
  void onUpstreamData(Buffer::Instance& data, bool end_stream) override;
312
  void onEvent(Network::ConnectionEvent event) override;
313
0
  void onAboveWriteBufferHighWatermark() override {}
314
0
  void onBelowWriteBufferLowWatermark() override {}
315
316
private:
317
  void cleanup();
318
319
  ThriftFilters::DecoderFilterCallbacks* callbacks_{};
320
  std::unique_ptr<UpstreamResponseCallbacksImpl> upstream_response_callbacks_{};
321
  RouteConstSharedPtr route_{};
322
  const RouteEntry* route_entry_{};
323
  Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
324
325
  std::unique_ptr<UpstreamRequest> upstream_request_;
326
  Buffer::OwnedImpl upstream_request_buffer_;
327
328
  bool passthrough_supported_ : 1;
329
  uint64_t request_size_{};
330
  Runtime::Loader& runtime_;
331
  ShadowWriter& shadow_writer_;
332
  std::vector<std::reference_wrapper<ShadowRouterHandle>> shadow_routers_{};
333
334
  bool close_downstream_on_error_;
335
};
336
337
} // namespace Router
338
} // namespace ThriftProxy
339
} // namespace NetworkFilters
340
} // namespace Extensions
341
} // namespace Envoy