Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/router_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/thrift_proxy/router/router_impl.h"
2
3
#include <memory>
4
5
#include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h"
6
#include "envoy/upstream/cluster_manager.h"
7
#include "envoy/upstream/thread_local_cluster.h"
8
9
#include "source/common/common/utility.h"
10
#include "source/common/router/metadatamatchcriteria_impl.h"
11
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
12
13
#include "absl/strings/match.h"
14
15
namespace Envoy {
16
namespace Extensions {
17
namespace NetworkFilters {
18
namespace ThriftProxy {
19
namespace Router {
20
21
RouteEntryImplBase::RouteEntryImplBase(
22
    const envoy::extensions::filters::network::thrift_proxy::v3::Route& route)
23
    : cluster_name_(route.route().cluster()),
24
      config_headers_(Http::HeaderUtility::buildHeaderDataVector(route.match().headers())),
25
      rate_limit_policy_(route.route().rate_limits()),
26
      strip_service_name_(route.route().strip_service_name()),
27
      cluster_header_(route.route().cluster_header()),
28
0
      mirror_policies_(buildMirrorPolicies(route.route())) {
29
0
  if (route.route().has_metadata_match()) {
30
0
    const auto filter_it = route.route().metadata_match().filter_metadata().find(
31
0
        Envoy::Config::MetadataFilters::get().ENVOY_LB);
32
0
    if (filter_it != route.route().metadata_match().filter_metadata().end()) {
33
0
      metadata_match_criteria_ =
34
0
          std::make_unique<Envoy::Router::MetadataMatchCriteriaImpl>(filter_it->second);
35
0
    }
36
0
  }
37
38
0
  if (route.route().cluster_specifier_case() ==
39
0
      envoy::extensions::filters::network::thrift_proxy::v3::RouteAction::ClusterSpecifierCase::
40
0
          kWeightedClusters) {
41
42
0
    total_cluster_weight_ = 0UL;
43
0
    for (const auto& cluster : route.route().weighted_clusters().clusters()) {
44
0
      std::unique_ptr<WeightedClusterEntry> cluster_entry(new WeightedClusterEntry(*this, cluster));
45
0
      weighted_clusters_.emplace_back(std::move(cluster_entry));
46
0
      total_cluster_weight_ += weighted_clusters_.back()->clusterWeight();
47
0
    }
48
0
  }
49
0
}
50
51
// Similar validation procedure with Envoy::Router::RouteEntryImplBase::validateCluster
52
void RouteEntryImplBase::validateClusters(
53
0
    const Upstream::ClusterManager::ClusterInfoMaps& cluster_info_maps) const {
54
  // Currently, we verify that the cluster exists in the CM if we have an explicit cluster or
55
  // weighted cluster rule. We obviously do not verify a cluster_header rule. This means that
56
  // trying to use all CDS clusters with a static route table will not work. In the upcoming RDS
57
  // change we will make it so that dynamically loaded route tables do *not* perform CM checks.
58
  // In the future we might decide to also have a config option that turns off checks for static
59
  // route tables. This would enable the all CDS with static route table case.
60
0
  if (!cluster_name_.empty()) {
61
0
    if (!cluster_info_maps.hasCluster(cluster_name_)) {
62
0
      throw EnvoyException(fmt::format("route: unknown thrift cluster '{}'", cluster_name_));
63
0
    }
64
0
  } else if (!weighted_clusters_.empty()) {
65
0
    for (const WeightedClusterEntrySharedPtr& cluster : weighted_clusters_) {
66
0
      if (!cluster_info_maps.hasCluster(cluster->clusterName())) {
67
0
        throw EnvoyException(
68
0
            fmt::format("route: unknown thrift weighted cluster '{}'", cluster->clusterName()));
69
0
      }
70
0
    }
71
0
  }
72
0
}
73
74
std::vector<std::shared_ptr<RequestMirrorPolicy>> RouteEntryImplBase::buildMirrorPolicies(
75
0
    const envoy::extensions::filters::network::thrift_proxy::v3::RouteAction& route) {
76
0
  std::vector<std::shared_ptr<RequestMirrorPolicy>> policies{};
77
78
0
  const auto& proto_policies = route.request_mirror_policies();
79
0
  policies.reserve(proto_policies.size());
80
0
  for (const auto& policy : proto_policies) {
81
0
    policies.push_back(std::make_shared<RequestMirrorPolicyImpl>(
82
0
        policy.cluster(), policy.runtime_fraction().runtime_key(),
83
0
        policy.runtime_fraction().default_value()));
84
0
  }
85
86
0
  return policies;
87
0
}
88
89
0
const std::string& RouteEntryImplBase::clusterName() const { return cluster_name_; }
90
91
0
const RouteEntry* RouteEntryImplBase::routeEntry() const { return this; }
92
93
RouteConstSharedPtr RouteEntryImplBase::clusterEntry(uint64_t random_value,
94
0
                                                     const MessageMetadata& metadata) const {
95
0
  if (!weighted_clusters_.empty()) {
96
0
    return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,
97
0
                                            false);
98
0
  }
99
100
0
  const auto& cluster_header = clusterHeader();
101
0
  if (!cluster_header.get().empty()) {
102
0
    const auto& headers = metadata.requestHeaders();
103
0
    const auto entry = headers.get(cluster_header);
104
0
    if (!entry.empty()) {
105
      // This is an implicitly untrusted header, so per the API documentation only the first
106
      // value is used.
107
0
      return std::make_shared<DynamicRouteEntry>(*this, entry[0]->value().getStringView());
108
0
    }
109
110
0
    return nullptr;
111
0
  }
112
113
0
  return shared_from_this();
114
0
}
115
116
0
bool RouteEntryImplBase::headersMatch(const Http::HeaderMap& headers) const {
117
0
  return Http::HeaderUtility::matchHeaders(headers, config_headers_);
118
0
}
119
120
RouteEntryImplBase::WeightedClusterEntry::WeightedClusterEntry(
121
    const RouteEntryImplBase& parent,
122
    const envoy::extensions::filters::network::thrift_proxy::v3::WeightedCluster::ClusterWeight&
123
        cluster)
124
    : parent_(parent), cluster_name_(cluster.name()),
125
0
      cluster_weight_(PROTOBUF_GET_WRAPPED_REQUIRED(cluster, weight)) {
126
0
  if (cluster.has_metadata_match()) {
127
0
    const auto filter_it = cluster.metadata_match().filter_metadata().find(
128
0
        Envoy::Config::MetadataFilters::get().ENVOY_LB);
129
0
    if (filter_it != cluster.metadata_match().filter_metadata().end()) {
130
0
      if (parent.metadata_match_criteria_) {
131
0
        metadata_match_criteria_ =
132
0
            parent.metadata_match_criteria_->mergeMatchCriteria(filter_it->second);
133
0
      } else {
134
0
        metadata_match_criteria_ =
135
0
            std::make_unique<Envoy::Router::MetadataMatchCriteriaImpl>(filter_it->second);
136
0
      }
137
0
    }
138
0
  }
139
0
}
140
141
MethodNameRouteEntryImpl::MethodNameRouteEntryImpl(
142
    const envoy::extensions::filters::network::thrift_proxy::v3::Route& route)
143
    : RouteEntryImplBase(route), method_name_(route.match().method_name()),
144
0
      invert_(route.match().invert()) {
145
0
  if (method_name_.empty() && invert_) {
146
0
    throw EnvoyException("Cannot have an empty method name with inversion enabled");
147
0
  }
148
0
}
149
150
RouteConstSharedPtr MethodNameRouteEntryImpl::matches(const MessageMetadata& metadata,
151
0
                                                      uint64_t random_value) const {
152
0
  if (RouteEntryImplBase::headersMatch(metadata.requestHeaders())) {
153
0
    bool matches =
154
0
        method_name_.empty() || (metadata.hasMethodName() && metadata.methodName() == method_name_);
155
156
0
    if (matches ^ invert_) {
157
0
      return clusterEntry(random_value, metadata);
158
0
    }
159
0
  }
160
161
0
  return nullptr;
162
0
}
163
164
ServiceNameRouteEntryImpl::ServiceNameRouteEntryImpl(
165
    const envoy::extensions::filters::network::thrift_proxy::v3::Route& route)
166
0
    : RouteEntryImplBase(route), invert_(route.match().invert()) {
167
0
  const std::string service_name = route.match().service_name();
168
0
  if (service_name.empty() && invert_) {
169
0
    throw EnvoyException("Cannot have an empty service name with inversion enabled");
170
0
  }
171
172
0
  if (!service_name.empty() && !absl::EndsWith(service_name, ":")) {
173
0
    service_name_ = service_name + ":";
174
0
  } else {
175
0
    service_name_ = service_name;
176
0
  }
177
0
}
178
179
RouteConstSharedPtr ServiceNameRouteEntryImpl::matches(const MessageMetadata& metadata,
180
0
                                                       uint64_t random_value) const {
181
0
  if (RouteEntryImplBase::headersMatch(metadata.requestHeaders())) {
182
0
    bool matches =
183
0
        service_name_.empty() ||
184
0
        (metadata.hasMethodName() && absl::StartsWith(metadata.methodName(), service_name_));
185
186
0
    if (matches ^ invert_) {
187
0
      return clusterEntry(random_value, metadata);
188
0
    }
189
0
  }
190
191
0
  return nullptr;
192
0
}
193
194
RouteMatcher::RouteMatcher(
195
    const envoy::extensions::filters::network::thrift_proxy::v3::RouteConfiguration& config,
196
0
    const absl::optional<Upstream::ClusterManager::ClusterInfoMaps>& validation_clusters) {
197
0
  using envoy::extensions::filters::network::thrift_proxy::v3::RouteMatch;
198
199
0
  for (const auto& route : config.routes()) {
200
0
    RouteEntryImplBaseConstSharedPtr route_entry;
201
0
    switch (route.match().match_specifier_case()) {
202
0
    case RouteMatch::MatchSpecifierCase::kMethodName:
203
0
      route_entry = std::make_shared<MethodNameRouteEntryImpl>(route);
204
0
      break;
205
0
    case RouteMatch::MatchSpecifierCase::kServiceName:
206
0
      route_entry = std::make_shared<ServiceNameRouteEntryImpl>(route);
207
0
      break;
208
0
    case RouteMatch::MatchSpecifierCase::MATCH_SPECIFIER_NOT_SET:
209
0
      PANIC_DUE_TO_CORRUPT_ENUM;
210
0
    }
211
212
0
    if (validation_clusters) {
213
      // Throw exception for unknown clusters.
214
0
      route_entry->validateClusters(*validation_clusters);
215
216
0
      for (const auto& mirror_policy : route_entry->requestMirrorPolicies()) {
217
0
        if (!validation_clusters->hasCluster(mirror_policy->clusterName())) {
218
0
          throw EnvoyException(fmt::format("route: unknown thrift shadow cluster '{}'",
219
0
                                           mirror_policy->clusterName()));
220
0
        }
221
0
      }
222
0
    }
223
224
    // Now we pass the validation. Add the route to the table.
225
0
    routes_.emplace_back(route_entry);
226
0
  }
227
0
}
228
229
RouteConstSharedPtr RouteMatcher::route(const MessageMetadata& metadata,
230
0
                                        uint64_t random_value) const {
231
0
  for (const auto& route : routes_) {
232
0
    RouteConstSharedPtr route_entry = route->matches(metadata, random_value);
233
0
    if (nullptr != route_entry) {
234
0
      return route_entry;
235
0
    }
236
0
  }
237
238
0
  return nullptr;
239
0
}
240
241
0
void Router::onDestroy() {
242
0
  if (upstream_request_ != nullptr) {
243
0
    ENVOY_LOG(debug, "router on destroy reset stream");
244
0
    upstream_request_->resetStream();
245
0
    cleanup();
246
0
  }
247
248
0
  for (auto& shadow_router : shadow_routers_) {
249
0
    shadow_router.get().onRouterDestroy();
250
0
  }
251
252
0
  shadow_routers_.clear();
253
0
}
254
255
0
void Router::setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) {
256
0
  callbacks_ = &callbacks;
257
0
  upstream_response_callbacks_ = std::make_unique<UpstreamResponseCallbacksImpl>(callbacks_);
258
259
  // TODO(zuercher): handle buffer limits
260
0
}
261
262
0
FilterStatus Router::transportBegin(MessageMetadataSharedPtr metadata) {
263
0
  UNREFERENCED_PARAMETER(metadata);
264
0
  return FilterStatus::Continue;
265
0
}
266
267
0
FilterStatus Router::transportEnd() {
268
0
  upstream_request_->onRequestComplete();
269
0
  if (upstream_request_->metadata_->messageType() == MessageType::Oneway) {
270
    // No response expected
271
0
    upstream_request_->onResponseComplete();
272
0
    cleanup();
273
0
  }
274
0
  return FilterStatus::Continue;
275
0
}
276
277
0
FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
278
0
  route_ = callbacks_->route();
279
0
  if (!route_) {
280
0
    ENVOY_STREAM_LOG(debug, "no route match for method '{}'", *callbacks_, metadata->methodName());
281
0
    stats().named_.route_missing_.inc();
282
0
    callbacks_->sendLocalReply(
283
0
        AppException(AppExceptionType::UnknownMethod,
284
0
                     fmt::format("no route for method '{}'", metadata->methodName())),
285
0
        close_downstream_on_error_);
286
0
    return FilterStatus::StopIteration;
287
0
  }
288
289
0
  route_entry_ = route_->routeEntry();
290
0
  const std::string& cluster_name = route_entry_->clusterName();
291
292
0
  auto prepare_result =
293
0
      prepareUpstreamRequest(cluster_name, metadata, callbacks_->downstreamTransportType(),
294
0
                             callbacks_->downstreamProtocolType(), this);
295
0
  if (prepare_result.exception.has_value()) {
296
0
    callbacks_->sendLocalReply(prepare_result.exception.value(), close_downstream_on_error_);
297
0
    return FilterStatus::StopIteration;
298
0
  }
299
300
0
  ENVOY_STREAM_LOG(debug, "router decoding request", *callbacks_);
301
302
0
  if (route_entry_->stripServiceName()) {
303
0
    const auto& method = metadata->methodName();
304
0
    const auto pos = method.find(':');
305
0
    if (pos != std::string::npos) {
306
0
      metadata->setMethodName(method.substr(pos + 1));
307
0
    }
308
0
  }
309
310
0
  auto& upstream_req_info = prepare_result.upstream_request_info.value();
311
0
  passthrough_supported_ = upstream_req_info.passthrough_supported;
312
313
  // Prepare connections for shadow routers, if there are mirror policies configured and currently
314
  // enabled.
315
0
  const auto& policies = route_entry_->requestMirrorPolicies();
316
0
  if (!policies.empty()) {
317
0
    for (const auto& policy : policies) {
318
0
      if (policy->enabled(runtime_)) {
319
0
        auto shadow_router =
320
0
            shadow_writer_.submit(policy->clusterName(), metadata, upstream_req_info.transport,
321
0
                                  upstream_req_info.protocol);
322
0
        if (shadow_router.has_value()) {
323
0
          shadow_routers_.push_back(shadow_router.value());
324
0
        }
325
0
      }
326
0
    }
327
0
  }
328
329
0
  upstream_request_ = std::make_unique<UpstreamRequest>(
330
0
      *this, *upstream_req_info.conn_pool_data, metadata, upstream_req_info.transport,
331
0
      upstream_req_info.protocol, close_downstream_on_error_);
332
0
  return upstream_request_->start();
333
0
}
334
335
0
FilterStatus Router::messageEnd() {
336
0
  ProtocolConverter::messageEnd();
337
0
  const auto encode_size = upstream_request_->encodeAndWrite(upstream_request_buffer_);
338
0
  addSize(encode_size);
339
0
  stats().recordUpstreamRequestSize(*cluster_, request_size_);
340
0
  callbacks_->streamInfo().addBytesReceived(request_size_);
341
342
  // Dispatch shadow requests, if any.
343
  // Note: if connections aren't ready, the write will happen when appropriate.
344
0
  for (auto& shadow_router : shadow_routers_) {
345
0
    auto& router = shadow_router.get();
346
0
    router.requestOwner().messageEnd();
347
0
  }
348
349
0
  return FilterStatus::Continue;
350
0
}
351
352
0
FilterStatus Router::passthroughData(Buffer::Instance& data) {
353
0
  for (auto& shadow_router : shadow_routers_) {
354
0
    Buffer::OwnedImpl shadow_data;
355
0
    shadow_data.add(data);
356
0
    shadow_router.get().requestOwner().passthroughData(shadow_data);
357
0
  }
358
359
0
  return ProtocolConverter::passthroughData(data);
360
0
}
361
362
0
FilterStatus Router::structBegin(absl::string_view name) {
363
0
  for (auto& shadow_router : shadow_routers_) {
364
0
    shadow_router.get().requestOwner().structBegin(name);
365
0
  }
366
367
0
  return ProtocolConverter::structBegin(name);
368
0
}
369
370
0
FilterStatus Router::structEnd() {
371
0
  for (auto& shadow_router : shadow_routers_) {
372
0
    shadow_router.get().requestOwner().structEnd();
373
0
  }
374
375
0
  return ProtocolConverter::structEnd();
376
0
}
377
378
0
FilterStatus Router::fieldBegin(absl::string_view name, FieldType& field_type, int16_t& field_id) {
379
0
  for (auto& shadow_router : shadow_routers_) {
380
0
    shadow_router.get().requestOwner().fieldBegin(name, field_type, field_id);
381
0
  }
382
383
0
  return ProtocolConverter::fieldBegin(name, field_type, field_id);
384
0
}
385
386
0
FilterStatus Router::fieldEnd() {
387
0
  for (auto& shadow_router : shadow_routers_) {
388
0
    shadow_router.get().requestOwner().fieldEnd();
389
0
  }
390
391
0
  return ProtocolConverter::fieldEnd();
392
0
}
393
394
0
FilterStatus Router::boolValue(bool& value) {
395
0
  for (auto& shadow_router : shadow_routers_) {
396
0
    shadow_router.get().requestOwner().boolValue(value);
397
0
  }
398
399
0
  return ProtocolConverter::boolValue(value);
400
0
}
401
402
0
FilterStatus Router::byteValue(uint8_t& value) {
403
0
  for (auto& shadow_router : shadow_routers_) {
404
0
    shadow_router.get().requestOwner().byteValue(value);
405
0
  }
406
407
0
  return ProtocolConverter::byteValue(value);
408
0
}
409
410
0
FilterStatus Router::int16Value(int16_t& value) {
411
0
  for (auto& shadow_router : shadow_routers_) {
412
0
    shadow_router.get().requestOwner().int16Value(value);
413
0
  }
414
415
0
  return ProtocolConverter::int16Value(value);
416
0
}
417
418
0
FilterStatus Router::int32Value(int32_t& value) {
419
0
  for (auto& shadow_router : shadow_routers_) {
420
0
    shadow_router.get().requestOwner().int32Value(value);
421
0
  }
422
423
0
  return ProtocolConverter::int32Value(value);
424
0
}
425
426
0
FilterStatus Router::int64Value(int64_t& value) {
427
0
  for (auto& shadow_router : shadow_routers_) {
428
0
    shadow_router.get().requestOwner().int64Value(value);
429
0
  }
430
431
0
  return ProtocolConverter::int64Value(value);
432
0
}
433
434
0
FilterStatus Router::doubleValue(double& value) {
435
0
  for (auto& shadow_router : shadow_routers_) {
436
0
    shadow_router.get().requestOwner().doubleValue(value);
437
0
  }
438
439
0
  return ProtocolConverter::doubleValue(value);
440
0
}
441
442
0
FilterStatus Router::stringValue(absl::string_view value) {
443
0
  for (auto& shadow_router : shadow_routers_) {
444
0
    shadow_router.get().requestOwner().stringValue(value);
445
0
  }
446
447
0
  return ProtocolConverter::stringValue(value);
448
0
}
449
450
0
FilterStatus Router::mapBegin(FieldType& key_type, FieldType& value_type, uint32_t& size) {
451
0
  for (auto& shadow_router : shadow_routers_) {
452
0
    shadow_router.get().requestOwner().mapBegin(key_type, value_type, size);
453
0
  }
454
455
0
  return ProtocolConverter::mapBegin(key_type, value_type, size);
456
0
}
457
458
0
FilterStatus Router::mapEnd() {
459
0
  for (auto& shadow_router : shadow_routers_) {
460
0
    shadow_router.get().requestOwner().mapEnd();
461
0
  }
462
463
0
  return ProtocolConverter::mapEnd();
464
0
}
465
466
0
FilterStatus Router::listBegin(FieldType& elem_type, uint32_t& size) {
467
0
  for (auto& shadow_router : shadow_routers_) {
468
0
    shadow_router.get().requestOwner().listBegin(elem_type, size);
469
0
  }
470
471
0
  return ProtocolConverter::listBegin(elem_type, size);
472
0
}
473
474
0
FilterStatus Router::listEnd() {
475
0
  for (auto& shadow_router : shadow_routers_) {
476
0
    shadow_router.get().requestOwner().listEnd();
477
0
  }
478
479
0
  return ProtocolConverter::listEnd();
480
0
}
481
482
0
FilterStatus Router::setBegin(FieldType& elem_type, uint32_t& size) {
483
0
  for (auto& shadow_router : shadow_routers_) {
484
0
    shadow_router.get().requestOwner().setBegin(elem_type, size);
485
0
  }
486
487
0
  return ProtocolConverter::setBegin(elem_type, size);
488
0
}
489
490
0
FilterStatus Router::setEnd() {
491
0
  for (auto& shadow_router : shadow_routers_) {
492
0
    shadow_router.get().requestOwner().setEnd();
493
0
  }
494
495
0
  return ProtocolConverter::setEnd();
496
0
}
497
498
0
void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
499
0
  const bool done =
500
0
      upstream_request_->handleUpstreamData(data, end_stream, *upstream_response_callbacks_);
501
0
  if (done) {
502
0
    cleanup();
503
0
  }
504
0
}
505
506
0
void Router::onEvent(Network::ConnectionEvent event) { upstream_request_->onEvent(event); }
507
508
0
const Network::Connection* Router::downstreamConnection() const {
509
0
  if (callbacks_ != nullptr) {
510
0
    return callbacks_->connection();
511
0
  }
512
513
0
  return nullptr;
514
0
}
515
516
0
void Router::cleanup() { upstream_request_.reset(); }
517
518
} // namespace Router
519
} // namespace ThriftProxy
520
} // namespace NetworkFilters
521
} // namespace Extensions
522
} // namespace Envoy