Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/router.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <memory>
4
#include <string>
5
#include <vector>
6
7
#include "envoy/buffer/buffer.h"
8
#include "envoy/common/optref.h"
9
#include "envoy/local_info/local_info.h"
10
#include "envoy/rds/config.h"
11
#include "envoy/router/router.h"
12
#include "envoy/tcp/conn_pool.h"
13
14
#include "source/common/buffer/buffer_impl.h"
15
#include "source/common/common/logger.h"
16
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
17
#include "source/extensions/filters/network/thrift_proxy/metadata.h"
18
#include "source/extensions/filters/network/thrift_proxy/protocol_converter.h"
19
#include "source/extensions/filters/network/thrift_proxy/protocol_options_config.h"
20
#include "source/extensions/filters/network/well_known_names.h"
21
22
namespace Envoy {
23
namespace Extensions {
24
namespace NetworkFilters {
25
namespace ThriftProxy {
26
namespace Router {
27
28
class RateLimitPolicy;
29
class RequestMirrorPolicy;
30
31
/**
32
 * RouteEntry is an individual resolved route entry.
33
 */
34
class RouteEntry {
35
public:
36
0
  virtual ~RouteEntry() = default;
37
38
  /**
39
   * @return const std::string& the upstream cluster that owns the route.
40
   */
41
  virtual const std::string& clusterName() const PURE;
42
43
  /**
44
   * @return MetadataMatchCriteria* the metadata that a subset load balancer should match when
45
   * selecting an upstream host
46
   */
47
  virtual const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() const PURE;
48
49
  /**
50
   * @return const RateLimitPolicy& the rate limit policy for the route.
51
   */
52
  virtual const RateLimitPolicy& rateLimitPolicy() const PURE;
53
54
  /**
55
   * @return bool should the service name prefix be stripped from the method.
56
   */
57
  virtual bool stripServiceName() const PURE;
58
59
  /**
60
   * @return const Http::LowerCaseString& the header used to determine the cluster.
61
   */
62
  virtual const Http::LowerCaseString& clusterHeader() const PURE;
63
64
  /**
65
   * @return const std::vector<RequestMirrorPolicy>& the mirror policies associated with this route,
66
   * if any.
67
   */
68
  virtual const std::vector<std::shared_ptr<RequestMirrorPolicy>>&
69
  requestMirrorPolicies() const PURE;
70
};
71
72
/**
73
 * Route holds the RouteEntry for a request.
74
 */
75
class Route {
76
public:
77
0
  virtual ~Route() = default;
78
79
  /**
80
   * @return the route entry or nullptr if there is no matching route for the request.
81
   */
82
  virtual const RouteEntry* routeEntry() const PURE;
83
};
84
85
using RouteConstSharedPtr = std::shared_ptr<const Route>;
86
87
/**
88
 * The router configuration.
89
 */
90
class Config : public Rds::Config {
91
public:
92
  /**
93
   * Based on the incoming Thrift request transport and/or protocol data, determine the target
94
   * route for the request.
95
   * @param metadata MessageMetadata for the message to route
96
   * @param random_value uint64_t used to select cluster affinity
97
   * @return the route or nullptr if there is no matching route for the request.
98
   */
99
  virtual RouteConstSharedPtr route(const MessageMetadata& metadata,
100
                                    uint64_t random_value) const PURE;
101
};
102
103
using ConfigConstSharedPtr = std::shared_ptr<const Config>;
104
105
#define ALL_THRIFT_ROUTER_STATS(COUNTER, GAUGE, HISTOGRAM)                                         \
106
0
  COUNTER(route_missing)                                                                           \
107
0
  COUNTER(unknown_cluster)                                                                         \
108
0
  COUNTER(upstream_rq_maintenance_mode)                                                            \
109
0
  COUNTER(no_healthy_upstream)                                                                     \
110
0
  COUNTER(shadow_request_submit_failure)
111
112
/**
113
 * Struct containing named stats for the router.
114
 */
115
struct RouterNamedStats {
116
  ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT)
117
118
0
  static RouterNamedStats generateStats(const std::string& prefix, Stats::Scope& scope) {
119
0
    return RouterNamedStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
120
0
                                                    POOL_GAUGE_PREFIX(scope, prefix),
121
0
                                                    POOL_HISTOGRAM_PREFIX(scope, prefix))};
122
0
  }
123
};
124
125
/**
126
 * Stats for use in the router.
127
 */
128
class RouterStats {
129
public:
130
  RouterStats(const std::string& stat_prefix, Stats::Scope& scope,
131
              const LocalInfo::LocalInfo& local_info)
132
      : named_(RouterNamedStats::generateStats(stat_prefix, scope)),
133
        stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")),
134
        symbol_table_(scope.symbolTable()),
135
        upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")),
136
        upstream_rq_oneway_(stat_name_set_->add("thrift.upstream_rq_oneway")),
137
        upstream_rq_invalid_type_(stat_name_set_->add("thrift.upstream_rq_invalid_type")),
138
        upstream_resp_reply_(stat_name_set_->add("thrift.upstream_resp_reply")),
139
        upstream_resp_reply_success_(stat_name_set_->add("thrift.upstream_resp_success")),
140
        upstream_resp_reply_error_(stat_name_set_->add("thrift.upstream_resp_error")),
141
        upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")),
142
        upstream_resp_exception_local_(stat_name_set_->add("thrift.upstream_resp_exception_local")),
143
        upstream_resp_exception_remote_(
144
            stat_name_set_->add("thrift.upstream_resp_exception_remote")),
145
        upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")),
146
        upstream_resp_decoding_error_(stat_name_set_->add("thrift.upstream_resp_decoding_error")),
147
        upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")),
148
        upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")),
149
        upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")),
150
        zone_(stat_name_set_->add("zone")), local_zone_name_(local_info.zoneStatName()),
151
        upstream_cx_drain_close_(stat_name_set_->add("thrift.upstream_cx_drain_close")),
152
        downstream_cx_partial_response_close_(
153
            stat_name_set_->add("thrift.downstream_cx_partial_response_close")),
154
        downstream_cx_underflow_response_close_(
155
0
            stat_name_set_->add("thrift.downstream_cx_underflow_response_close")) {}
156
157
  /**
158
   * Increment counter for request calls.
159
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
160
   */
161
0
  void incRequestCall(const Upstream::ClusterInfo& cluster) const {
162
0
    incClusterScopeCounter(cluster, nullptr, upstream_rq_call_);
163
0
  }
164
165
  /**
166
   * Increment counter for requests that are one way only.
167
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
168
   */
169
0
  void incRequestOneWay(const Upstream::ClusterInfo& cluster) const {
170
0
    incClusterScopeCounter(cluster, nullptr, upstream_rq_oneway_);
171
0
  }
172
173
  /**
174
   * Increment counter for requests that are invalid.
175
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
176
   */
177
0
  void incRequestInvalid(const Upstream::ClusterInfo& cluster) const {
178
0
    incClusterScopeCounter(cluster, nullptr, upstream_rq_invalid_type_);
179
0
  }
180
181
  /**
182
   * Increment counter for connections that were closed due to draining.
183
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
184
   */
185
0
  void incCloseDrain(const Upstream::ClusterInfo& cluster) const {
186
0
    incClusterScopeCounter(cluster, nullptr, upstream_cx_drain_close_);
187
0
  }
188
189
  /**
190
   * Increment counter for downstream connections that were closed due to partial responses.
191
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
192
   */
193
0
  void incClosePartialResponse(const Upstream::ClusterInfo& cluster) const {
194
0
    incClusterScopeCounter(cluster, nullptr, downstream_cx_partial_response_close_);
195
0
  }
196
197
  /**
198
   * Increment counter for downstream connections that were closed due to underflow responses.
199
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
200
   */
201
0
  void incCloseUnderflowResponse(const Upstream::ClusterInfo& cluster) const {
202
0
    incClusterScopeCounter(cluster, nullptr, downstream_cx_underflow_response_close_);
203
0
  }
204
205
  /**
206
   * Increment counter for received responses that are replies that are successful.
207
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
208
   * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host
209
   */
210
  void incResponseReplySuccess(const Upstream::ClusterInfo& cluster,
211
0
                               Upstream::HostDescriptionConstSharedPtr upstream_host) const {
212
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_);
213
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_success_);
214
0
    ASSERT(upstream_host != nullptr);
215
0
    upstream_host->stats().rq_success_.inc();
216
0
  }
217
218
  /**
219
   * Increment counter for received responses that are replies that are an error.
220
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
221
   * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host
222
   */
223
  void incResponseReplyError(const Upstream::ClusterInfo& cluster,
224
0
                             Upstream::HostDescriptionConstSharedPtr upstream_host) const {
225
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_);
226
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_error_);
227
0
    ASSERT(upstream_host != nullptr);
228
    // Currently IDL exceptions are always considered endpoint error but it's possible for an error
229
    // to have semantics matching HTTP 4xx, rather than 5xx. rq_error classification chosen
230
    // here to match outlier detection external failure in upstream_request.cc.
231
0
    upstream_host->stats().rq_error_.inc();
232
0
  }
233
234
  /**
235
   * Increment counter for received remote responses that are exceptions.
236
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
237
   * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host
238
   */
239
  void incResponseRemoteException(const Upstream::ClusterInfo& cluster,
240
0
                                  Upstream::HostDescriptionConstSharedPtr upstream_host) const {
241
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_exception_);
242
0
    ASSERT(upstream_host != nullptr);
243
0
    incClusterScopeCounter(cluster, nullptr, upstream_resp_exception_remote_);
244
0
    upstream_host->stats().rq_error_.inc();
245
0
  }
246
247
  /**
248
   * Increment counter for responses that are local exceptions, without forwarding a request
249
   * upstream.
250
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
251
   */
252
0
  void incResponseLocalException(const Upstream::ClusterInfo& cluster) const {
253
0
    incClusterScopeCounter(cluster, nullptr, upstream_resp_exception_);
254
0
    incClusterScopeCounter(cluster, nullptr, upstream_resp_exception_local_);
255
0
  }
256
257
  /**
258
   * Increment counter for received responses that are invalid.
259
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
260
   * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host
261
   */
262
  void incResponseInvalidType(const Upstream::ClusterInfo& cluster,
263
0
                              Upstream::HostDescriptionConstSharedPtr upstream_host) const {
264
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_invalid_type_);
265
0
    ASSERT(upstream_host != nullptr);
266
0
    upstream_host->stats().rq_error_.inc();
267
0
  }
268
269
  /**
270
   * Increment counter for decoding errors during responses.
271
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
272
   * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host
273
   */
274
  void incResponseDecodingError(const Upstream::ClusterInfo& cluster,
275
0
                                Upstream::HostDescriptionConstSharedPtr upstream_host) const {
276
0
    incClusterScopeCounter(cluster, upstream_host, upstream_resp_decoding_error_);
277
0
    ASSERT(upstream_host != nullptr);
278
0
    upstream_host->stats().rq_error_.inc();
279
0
  }
280
281
  /**
282
   * Record a value for the request size histogram.
283
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
284
   * @param value uint64_t size in bytes of the full request
285
   */
286
0
  void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) const {
287
0
    recordClusterScopeHistogram(cluster, nullptr, upstream_rq_size_, Stats::Histogram::Unit::Bytes,
288
0
                                value);
289
0
  }
290
291
  /**
292
   * Record a value for the response size histogram.
293
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
294
   * @param value uint64_t size in bytes of the full response
295
   */
296
0
  void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) const {
297
0
    recordClusterScopeHistogram(cluster, nullptr, upstream_resp_size_,
298
0
                                Stats::Histogram::Unit::Bytes, value);
299
0
  }
300
301
  /**
302
   * Record a value for the response time duration histogram.
303
   * @param cluster Upstream::ClusterInfo& describing the upstream cluster
304
   * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host
305
   * @param value uint64_t duration in milliseconds to receive the complete response
306
   */
307
  void recordUpstreamResponseTime(const Upstream::ClusterInfo& cluster,
308
                                  Upstream::HostDescriptionConstSharedPtr upstream_host,
309
0
                                  uint64_t value) const {
310
0
    recordClusterScopeHistogram(cluster, upstream_host, upstream_rq_time_,
311
0
                                Stats::Histogram::Unit::Milliseconds, value);
312
0
  }
313
314
  const RouterNamedStats named_;
315
316
private:
317
  void incClusterScopeCounter(const Upstream::ClusterInfo& cluster,
318
                              Upstream::HostDescriptionConstSharedPtr upstream_host,
319
0
                              const Stats::StatName& stat_name) const {
320
0
    const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name});
321
0
    cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc();
322
0
    const Stats::SymbolTable::StoragePtr zone_stat_name_storage =
323
0
        upstreamZoneStatName(upstream_host, stat_name);
324
0
    if (zone_stat_name_storage) {
325
0
      cluster.statsScope().counterFromStatName(Stats::StatName(zone_stat_name_storage.get())).inc();
326
0
    }
327
0
  }
328
329
  void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster,
330
                                   Upstream::HostDescriptionConstSharedPtr upstream_host,
331
                                   const Stats::StatName& stat_name, Stats::Histogram::Unit unit,
332
0
                                   uint64_t value) const {
333
0
    const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name});
334
0
    cluster.statsScope()
335
0
        .histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit)
336
0
        .recordValue(value);
337
0
    const Stats::SymbolTable::StoragePtr zone_stat_name_storage =
338
0
        upstreamZoneStatName(upstream_host, stat_name);
339
0
    if (zone_stat_name_storage) {
340
0
      cluster.statsScope()
341
0
          .histogramFromStatName(Stats::StatName(zone_stat_name_storage.get()), unit)
342
0
          .recordValue(value);
343
0
    }
344
0
  }
345
346
  Stats::SymbolTable::StoragePtr
347
  upstreamZoneStatName(Upstream::HostDescriptionConstSharedPtr upstream_host,
348
0
                       const Stats::StatName& stat_name) const {
349
0
    if (!upstream_host || local_zone_name_.empty()) {
350
0
      return nullptr;
351
0
    }
352
0
    const auto& upstream_zone_name = upstream_host->localityZoneStatName();
353
0
    if (upstream_zone_name.empty()) {
354
0
      return nullptr;
355
0
    }
356
0
    return symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name});
357
0
  }
358
359
  Stats::StatNameSetPtr stat_name_set_;
360
  Stats::SymbolTable& symbol_table_;
361
  const Stats::StatName upstream_rq_call_;
362
  const Stats::StatName upstream_rq_oneway_;
363
  const Stats::StatName upstream_rq_invalid_type_;
364
  const Stats::StatName upstream_resp_reply_;
365
  const Stats::StatName upstream_resp_reply_success_;
366
  const Stats::StatName upstream_resp_reply_error_;
367
  const Stats::StatName upstream_resp_exception_;
368
  const Stats::StatName upstream_resp_exception_local_;
369
  const Stats::StatName upstream_resp_exception_remote_;
370
  const Stats::StatName upstream_resp_invalid_type_;
371
  const Stats::StatName upstream_resp_decoding_error_;
372
  const Stats::StatName upstream_rq_time_;
373
  const Stats::StatName upstream_rq_size_;
374
  const Stats::StatName upstream_resp_size_;
375
  const Stats::StatName zone_;
376
  const Stats::StatName local_zone_name_;
377
  const Stats::StatName upstream_cx_drain_close_;
378
  const Stats::StatName downstream_cx_partial_response_close_;
379
  const Stats::StatName downstream_cx_underflow_response_close_;
380
};
381
382
/**
383
 * This interface is used by an upstream request to communicate its state.
384
 */
385
class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::Id::thrift> {
386
public:
387
  RequestOwner(Upstream::ClusterManager& cluster_manager, const RouterStats& stats)
388
0
      : cluster_manager_(cluster_manager), stats_(stats) {}
389
0
  ~RequestOwner() override = default;
390
391
  /**
392
   * @return ConnectionPool::UpstreamCallbacks& the handler for upstream data.
393
   */
394
  virtual Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() PURE;
395
396
  /**
397
   * @return Buffer::OwnedImpl& the buffer used to serialize the upstream request.
398
   */
399
  virtual Buffer::OwnedImpl& buffer() PURE;
400
401
  /**
402
   * @return Event::Dispatcher& the dispatcher used for timers, etc.
403
   */
404
  virtual Event::Dispatcher& dispatcher() PURE;
405
406
  /**
407
   * Converts message begin into the right protocol.
408
   */
409
0
  void convertMessageBegin(MessageMetadataSharedPtr metadata) {
410
0
    ProtocolConverter::messageBegin(metadata);
411
0
  }
412
413
  /**
414
   * Used to update the request size every time bytes are pushed out.
415
   *
416
   * @param size uint64_t the value of the increment.
417
   */
418
  virtual void addSize(uint64_t size) PURE;
419
420
  /**
421
   * Used to continue decoding if it was previously stopped.
422
   */
423
  virtual void continueDecoding() PURE;
424
425
  /**
426
   * Used to reset the downstream connection after an error.
427
   */
428
  virtual void resetDownstreamConnection() PURE;
429
430
  /**
431
   * Sends a locally generated response using the provided response object.
432
   *
433
   * @param response DirectResponse the response to send to the downstream client
434
   * @param end_stream if true, the downstream connection should be closed after this response
435
   */
436
  virtual void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) PURE;
437
438
  /**
439
   * @return Upstream::ClusterManager& the cluster manager.
440
   */
441
0
  Upstream::ClusterManager& clusterManager() { return cluster_manager_; }
442
443
  /**
444
   * @return Upstream::Cluster& the upstream cluster associated with the request.
445
   */
446
0
  const Upstream::ClusterInfo& cluster() const { return *cluster_; }
447
448
  /**
449
   * @return RouterStats the common router stats.
450
   */
451
0
  const RouterStats& stats() { return stats_; }
452
453
0
  virtual void onReset() {}
454
455
protected:
456
  struct UpstreamRequestInfo {
457
    bool passthrough_supported;
458
    TransportType transport;
459
    ProtocolType protocol;
460
    absl::optional<Upstream::TcpPoolData> conn_pool_data;
461
  };
462
463
  struct PrepareUpstreamRequestResult {
464
    absl::optional<AppException> exception;
465
    absl::optional<UpstreamRequestInfo> upstream_request_info;
466
  };
467
468
  PrepareUpstreamRequestResult prepareUpstreamRequest(const std::string& cluster_name,
469
                                                      MessageMetadataSharedPtr& metadata,
470
                                                      TransportType transport,
471
                                                      ProtocolType protocol,
472
0
                                                      Upstream::LoadBalancerContext* lb_context) {
473
0
    Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name);
474
0
    if (!cluster) {
475
0
      ENVOY_LOG(debug, "unknown cluster '{}'", cluster_name);
476
0
      stats().named_.unknown_cluster_.inc();
477
0
      return {AppException(AppExceptionType::InternalError,
478
0
                           fmt::format("unknown cluster '{}'", cluster_name)),
479
0
              absl::nullopt};
480
0
    }
481
482
0
    cluster_ = cluster->info();
483
0
    ENVOY_LOG(debug, "cluster '{}' match for method '{}'", cluster_name, metadata->methodName());
484
485
0
    switch (metadata->messageType()) {
486
0
    case MessageType::Call:
487
0
      stats().incRequestCall(*cluster_);
488
0
      break;
489
490
0
    case MessageType::Oneway:
491
0
      stats().incRequestOneWay(*cluster_);
492
0
      break;
493
494
0
    default:
495
0
      stats().incRequestInvalid(*cluster_);
496
0
      break;
497
0
    }
498
499
0
    if (cluster_->maintenanceMode()) {
500
0
      stats().named_.upstream_rq_maintenance_mode_.inc();
501
0
      if (metadata->messageType() == MessageType::Call) {
502
0
        stats().incResponseLocalException(*cluster_);
503
0
      }
504
0
      return {AppException(AppExceptionType::InternalError,
505
0
                           fmt::format("maintenance mode for cluster '{}'", cluster_name)),
506
0
              absl::nullopt};
507
0
    }
508
509
0
    const std::shared_ptr<const ProtocolOptionsConfig> options =
510
0
        cluster_->extensionProtocolOptionsTyped<ProtocolOptionsConfig>(
511
0
            NetworkFilterNames::get().ThriftProxy);
512
513
0
    const TransportType final_transport = options ? options->transport(transport) : transport;
514
0
    ASSERT(final_transport != TransportType::Auto);
515
516
0
    const ProtocolType final_protocol = options ? options->protocol(protocol) : protocol;
517
0
    ASSERT(final_protocol != ProtocolType::Auto);
518
519
0
    auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, lb_context);
520
0
    if (!conn_pool_data) {
521
0
      stats().named_.no_healthy_upstream_.inc();
522
0
      if (metadata->messageType() == MessageType::Call) {
523
0
        stats().incResponseLocalException(*cluster_);
524
0
      }
525
0
      return {AppException(AppExceptionType::InternalError,
526
0
                           fmt::format("no healthy upstream for '{}'", cluster_name)),
527
0
              absl::nullopt};
528
0
    }
529
530
0
    const auto passthrough_supported =
531
0
        (transport == TransportType::Framed || transport == TransportType::Header) &&
532
0
        (final_transport == TransportType::Framed || final_transport == TransportType::Header) &&
533
0
        protocol == final_protocol && final_protocol != ProtocolType::Twitter;
534
0
    UpstreamRequestInfo result = {passthrough_supported, final_transport, final_protocol,
535
0
                                  conn_pool_data};
536
0
    return {absl::nullopt, result};
537
0
  }
538
539
  Upstream::ClusterInfoConstSharedPtr cluster_;
540
541
private:
542
  Upstream::ClusterManager& cluster_manager_;
543
  const RouterStats& stats_;
544
};
545
546
/**
547
 * RequestMirrorPolicy is an individual mirroring rule for a route entry.
548
 */
549
class RequestMirrorPolicy {
550
public:
551
0
  virtual ~RequestMirrorPolicy() = default;
552
553
  /**
554
   * @return const std::string& the upstream cluster that should be used for the mirrored request.
555
   */
556
  virtual const std::string& clusterName() const PURE;
557
558
  /**
559
   * @return bool whether this policy is currently enabled.
560
   */
561
  virtual bool enabled(Runtime::Loader& runtime) const PURE;
562
};
563
564
/**
565
 * ShadowRouterHandle is used to write a request or release a connection early if needed.
566
 */
567
class ShadowRouterHandle {
568
public:
569
0
  virtual ~ShadowRouterHandle() = default;
570
571
  /**
572
   * Called after the Router is destroyed.
573
   */
574
  virtual void onRouterDestroy() PURE;
575
576
  /**
577
   * Checks if the request is currently waiting for an upstream connection to become available.
578
   */
579
  virtual bool waitingForConnection() const PURE;
580
581
  /**
582
   * @return RequestOwner& the interface associated with this ShadowRouter.
583
   */
584
  virtual RequestOwner& requestOwner() PURE;
585
};
586
587
/**
588
 * ShadowWriter is used for submitting requests and ignoring the response.
589
 */
590
class ShadowWriter {
591
public:
592
0
  virtual ~ShadowWriter() = default;
593
594
  /**
595
   * @return Upstream::ClusterManager& the cluster manager.
596
   */
597
  virtual Upstream::ClusterManager& clusterManager() PURE;
598
599
  /**
600
   * @return Dispatcher& the dispatcher.
601
   */
602
  virtual Event::Dispatcher& dispatcher() PURE;
603
604
  /**
605
   * Starts the shadow request by requesting an upstream connection.
606
   */
607
  virtual OptRef<ShadowRouterHandle> submit(const std::string& cluster_name,
608
                                            MessageMetadataSharedPtr metadata,
609
                                            TransportType original_transport,
610
                                            ProtocolType original_protocol) PURE;
611
};
612
613
} // namespace Router
614
} // namespace ThriftProxy
615
} // namespace NetworkFilters
616
} // namespace Extensions
617
} // namespace Envoy