1
#pragma once
2

            
3
#include <functional>
4
#include <memory>
5

            
6
#include "envoy/buffer/buffer.h"
7
#include "envoy/common/optref.h"
8
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
9
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.validate.h"
10
#include "envoy/http/filter.h"
11
#include "envoy/http/header_map.h"
12
#include "envoy/matcher/matcher.h"
13
#include "envoy/network/socket.h"
14
#include "envoy/protobuf/message_validator.h"
15

            
16
#include "source/common/buffer/watermark_buffer.h"
17
#include "source/common/common/dump_state_utils.h"
18
#include "source/common/common/linked_object.h"
19
#include "source/common/common/logger.h"
20
#include "source/common/grpc/common.h"
21
#include "source/common/http/header_utility.h"
22
#include "source/common/http/headers.h"
23
#include "source/common/http/matching/data_impl.h"
24
#include "source/common/http/utility.h"
25
#include "source/common/local_reply/local_reply.h"
26
#include "source/common/matcher/matcher.h"
27
#include "source/common/protobuf/utility.h"
28
#include "source/common/runtime/runtime_features.h"
29
#include "source/common/stream_info/stream_info_impl.h"
30

            
31
namespace Envoy {
32
namespace Http {
33

            
34
class FilterManager;
35
class DownstreamFilterManager;
36

            
37
struct ActiveStreamFilterBase;
38
struct ActiveStreamDecoderFilter;
39
struct ActiveStreamEncoderFilter;
40
using ActiveStreamDecoderFilterPtr = std::unique_ptr<ActiveStreamDecoderFilter>;
41
using ActiveStreamEncoderFilterPtr = std::unique_ptr<ActiveStreamEncoderFilter>;
42

            
43
constexpr absl::string_view LocalReplyFilterStateKey =
44
    "envoy.filters.network.http_connection_manager.local_reply_owner";
45
class LocalReplyOwnerObject : public StreamInfo::FilterState::Object {
46
public:
47
  LocalReplyOwnerObject(const std::string& filter_config_name)
48
7247
      : filter_config_name_(filter_config_name) {}
49

            
50
6
  ProtobufTypes::MessagePtr serializeAsProto() const override {
51
6
    auto message = std::make_unique<Protobuf::StringValue>();
52
6
    message->set_value(filter_config_name_);
53
6
    return message;
54
6
  }
55

            
56
6
  absl::optional<std::string> serializeAsString() const override { return filter_config_name_; }
57

            
58
private:
59
  const std::string filter_config_name_;
60
};
61

            
62
// TODO(wbpcode): Rather than allocating every filter with an unique pointer, we could
63
// construct the filter in place in the vector. This should reduce the heap allocation and
64
// memory fragmentation.
65

            
66
// HTTP decoder filters. If filters are configured in the following order (assume all three
67
// filters are both decoder/encoder filters):
68
//   http_filters:
69
//     - A
70
//     - B
71
//     - C
72
// The decoder filter chain will iterate through filters A, B, C.
73
struct StreamDecoderFilters {
74
  using Element = ActiveStreamDecoderFilter;
75
  using Iterator = std::vector<ActiveStreamDecoderFilterPtr>::iterator;
76

            
77
1197638
  Iterator begin() { return entries_.begin(); }
78
3188912
  Iterator end() { return entries_.end(); }
79

            
80
  std::vector<ActiveStreamDecoderFilterPtr> entries_;
81
};
82

            
83
// HTTP encoder filters. If filters are configured in the following order (assume all three
84
// filters are both decoder/encoder filters):
85
//   http_filters:
86
//     - A
87
//     - B
88
//     - C
89
// Unlike the decoder filter, the encoder filter chain will iterate with the
90
// reverse order of the configured filters, i.e., C, B, A. This is why we use reverse_iterator
91
// here.
92
struct StreamEncoderFilters {
93
  using Element = ActiveStreamEncoderFilter;
94
  using Iterator = std::vector<ActiveStreamEncoderFilterPtr>::reverse_iterator;
95

            
96
795539
  Iterator begin() { return entries_.rbegin(); }
97
1941495
  Iterator end() { return entries_.rend(); }
98

            
99
  std::vector<ActiveStreamEncoderFilterPtr> entries_;
100
};
101

            
102
/**
103
 * Base class wrapper for both stream encoder and decoder filters.
104
 *
105
 * This class is responsible for performing matching and updating match data when a match tree is
106
 * configured for the associated filter. When not using a match tree, only minimal overhead (i.e.
107
 * memory overhead of unused fields) should apply.
108
 */
109
struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
110
                                Logger::Loggable<Logger::Id::http> {
111
  ActiveStreamFilterBase(FilterManager& parent, absl::string_view filter_config_name)
112
160278
      : parent_(parent), iteration_state_(IterationState::Continue),
113
160278
        filter_context_(filter_config_name) {}
114

            
115
  // Functions in the following block are called after the filter finishes processing
116
  // corresponding data. Those functions handle state updates and data storage (if needed)
117
  // according to the status returned by filter's callback functions.
118
  bool commonHandleAfter1xxHeadersCallback(Filter1xxHeadersStatus status);
119
  bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& end_stream);
120
  bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data,
121
                                     bool& buffer_was_streaming);
122
  bool commonHandleAfterTrailersCallback(FilterTrailersStatus status);
123

            
124
  // Buffers provided_data.
125
  void commonHandleBufferData(Buffer::Instance& provided_data);
126

            
127
  // If iteration has stopped for all frame types, calls this function to buffer the data before
128
  // the filter processes data. The function also updates streaming state.
129
  void commonBufferDataIfStopAll(Buffer::Instance& provided_data, bool& buffer_was_streaming);
130

            
131
  void commonContinue();
132
  virtual bool canContinue() PURE;
133
  virtual Buffer::InstancePtr createBuffer() PURE;
134
  virtual Buffer::InstancePtr& bufferedData() PURE;
135
  virtual bool observedEndStream() PURE;
136
  virtual bool has1xxHeaders() PURE;
137
  virtual void do1xxHeaders() PURE;
138
  virtual void doHeaders(bool end_stream) PURE;
139
  virtual void doData(bool end_stream) PURE;
140
  virtual void doTrailers() PURE;
141
  virtual bool hasTrailers() PURE;
142
  virtual void doMetadata() PURE;
143
  // TODO(soya3129): make this pure when adding impl to encoder filter.
144
  virtual void handleMetadataAfterHeadersCallback() PURE;
145

            
146
  // Http::StreamFilterCallbacks
147
  OptRef<const Network::Connection> connection() override;
148
  Event::Dispatcher& dispatcher() override;
149
  Router::RouteConstSharedPtr route() override;
150
  void resetStream(Http::StreamResetReason reset_reason,
151
                   absl::string_view transport_failure_reason) override;
152
  Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
153
  uint64_t streamId() const override;
154
  StreamInfo::StreamInfo& streamInfo() override;
155
  Tracing::Span& activeSpan() override;
156
  OptRef<const Tracing::Config> tracingConfig() const override;
157
  const ScopeTrackedObject& scope() override;
158
  void restoreContextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) override;
159
  void resetIdleTimer() override;
160
  const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override;
161
  Router::RouteSpecificFilterConfigs perFilterConfigs() const override;
162
  Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override;
163
  OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override;
164
  OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override;
165
3708
  absl::string_view filterConfigName() const override { return filter_context_.config_name; }
166
  RequestHeaderMapOptRef requestHeaders() override;
167
  RequestTrailerMapOptRef requestTrailers() override;
168
  ResponseHeaderMapOptRef informationalHeaders() override;
169
  ResponseHeaderMapOptRef responseHeaders() override;
170
  ResponseTrailerMapOptRef responseTrailers() override;
171
  void setBufferLimit(uint64_t limit) override;
172
  uint64_t bufferLimit() override;
173

            
174
  // Functions to set or get iteration state.
175
774
  bool canIterate() { return iteration_state_ == IterationState::Continue; }
176
749057
  bool stoppedAll() {
177
749057
    return iteration_state_ == IterationState::StopAllBuffer ||
178
749057
           iteration_state_ == IterationState::StopAllWatermark;
179
749057
  }
180
44234
  void allowIteration() {
181
44234
    ASSERT(iteration_state_ != IterationState::Continue);
182
44234
    iteration_state_ = IterationState::Continue;
183
44234
  }
184
7328
  MetadataMapVector* getSavedRequestMetadata() {
185
7328
    if (saved_request_metadata_ == nullptr) {
186
6506
      saved_request_metadata_ = std::make_unique<MetadataMapVector>();
187
6506
    }
188
7328
    return saved_request_metadata_.get();
189
7328
  }
190
142
  MetadataMapVector* getSavedResponseMetadata() {
191
142
    if (saved_response_metadata_ == nullptr) {
192
100
      saved_response_metadata_ = std::make_unique<MetadataMapVector>();
193
100
    }
194
142
    return saved_response_metadata_.get();
195
142
  }
196

            
197
  Router::RouteConstSharedPtr getRoute() const;
198

            
199
  void sendLocalReply(Code code, absl::string_view body,
200
                      std::function<void(ResponseHeaderMap& headers)> modify_headers,
201
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
202
                      absl::string_view details);
203

            
204
  // A vector to save metadata when the current filter's [de|en]codeMetadata() can not be called,
205
  // either because [de|en]codeHeaders() of the current filter returns StopAllIteration or because
206
  // [de|en]codeHeaders() adds new metadata to [de|en]code, but we don't know
207
  // [de|en]codeHeaders()'s return value yet. The storage is created on demand.
208
  std::unique_ptr<MetadataMapVector> saved_request_metadata_{nullptr};
209
  std::unique_ptr<MetadataMapVector> saved_response_metadata_{nullptr};
210
  // The state of iteration.
211
  enum class IterationState : uint8_t {
212
    Continue,            // Iteration has not stopped for any frame type.
213
    StopSingleIteration, // Iteration has stopped for headers, 100-continue, or data.
214
    StopAllBuffer,       // Iteration has stopped for all frame types, and following data should
215
                         // be buffered.
216
    StopAllWatermark,    // Iteration has stopped for all frame types, and following data should
217
                         // be buffered until high watermark is reached.
218
  };
219
  FilterManager& parent_;
220
  IterationState iteration_state_{};
221

            
222
  const FilterContext filter_context_;
223

            
224
  // If the filter resumes iteration from a StopAllBuffer/Watermark state, the current filter
225
  // hasn't parsed data and trailers. As a result, the filter iteration should start with the
226
  // current filter instead of the next one. If true, filter iteration starts with the current
227
  // filter. Otherwise, starts with the next filter in the chain.
228
  bool iterate_from_current_filter_{};
229
  bool headers_continued_{};
230
  bool continued_1xx_headers_{};
231
  // If true, end_stream is called for this filter.
232
  bool end_stream_{};
233
  // If true, the filter has processed headers.
234
  bool processed_headers_{};
235
};
236

            
237
/**
238
 * Wrapper for a stream decoder filter.
239
 */
240
struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
241
                                   public StreamDecoderFilterCallbacks {
242
  ActiveStreamDecoderFilter(FilterManager& parent, StreamDecoderFilterSharedPtr filter,
243
                            absl::string_view filter_config_name)
244
150672
      : ActiveStreamFilterBase(parent, filter_config_name), handle_(std::move(filter)) {
245
150672
    handle_->setDecoderFilterCallbacks(*this);
246
150672
  }
247

            
248
  // ActiveStreamFilterBase
249
  bool canContinue() override;
250
  Buffer::InstancePtr createBuffer() override;
251
  Buffer::InstancePtr& bufferedData() override;
252
  bool observedEndStream() override;
253
43457
  bool has1xxHeaders() override { return false; }
254
  void do1xxHeaders() override { IS_ENVOY_BUG("unexpected 1xx headers"); }
255
  void doHeaders(bool end_stream) override;
256
  void doData(bool end_stream) override;
257
43375
  void doMetadata() override {
258
43375
    if (saved_request_metadata_ != nullptr) {
259
111
      drainSavedRequestMetadata();
260
111
    }
261
43375
  }
262
  void doTrailers() override;
263
  bool hasTrailers() override;
264

            
265
  void drainSavedRequestMetadata();
266
  // This function is called after the filter calls decodeHeaders() to drain accumulated metadata.
267
  void handleMetadataAfterHeadersCallback() override;
268

            
269
  // Http::StreamDecoderFilterCallbacks
270
  void addDecodedData(Buffer::Instance& data, bool streaming) override;
271
  void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
272
  RequestTrailerMap& addDecodedTrailers() override;
273
  MetadataMapVector& addDecodedMetadata() override;
274
  void continueDecoding() override;
275
  const Buffer::Instance* decodingBuffer() override;
276

            
277
  void modifyDecodingBuffer(std::function<void(Buffer::Instance&)> callback) override;
278

            
279
  void sendLocalReply(Code code, absl::string_view body,
280
                      std::function<void(ResponseHeaderMap& headers)> modify_headers,
281
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
282
                      absl::string_view details) override;
283
  void encode1xxHeaders(ResponseHeaderMapPtr&& headers) override;
284
  void encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
285
                     absl::string_view details) override;
286
  void encodeData(Buffer::Instance& data, bool end_stream) override;
287
  void encodeTrailers(ResponseTrailerMapPtr&& trailers) override;
288
  void encodeMetadata(MetadataMapPtr&& metadata_map_ptr) override;
289
  void onDecoderFilterAboveWriteBufferHighWatermark() override;
290
  void onDecoderFilterBelowWriteBufferLowWatermark() override;
291
  void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& watermark_callbacks) override;
292
  void
293
  removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& watermark_callbacks) override;
294
  bool recreateStream(const Http::ResponseHeaderMap* original_response_headers) override;
295

            
296
  void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr& options) override;
297

            
298
  Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override;
299
  Buffer::BufferMemoryAccountSharedPtr account() const override;
300
  void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override;
301
  absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstreamOverrideHost() const override;
302
  bool shouldLoadShed() const override;
303
  void sendGoAwayAndClose(bool graceful = false) override;
304

            
305
  // Each decoder filter instance checks if the request passed to the filter is gRPC
306
  // so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders()
307
  // called here may change the content type, so we must check it before the call.
308
104401
  FilterHeadersStatus decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
309
104401
    is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
310
104401
    FilterHeadersStatus status = handle_->decodeHeaders(headers, end_stream);
311
104401
    return status;
312
104401
  }
313

            
314
  void requestDataTooLarge();
315
  void requestDataDrained();
316
  // Encoding end_stream by a non-terminal filters (i.e. cache filter) always causes the decoding to
317
  // be stopped even if independent half-close is enabled. For simplicity, independent half-close is
318
  // enabled only when the terminal filter is encoding the response.
319
  void stopDecodingIfNonTerminalFilterEncodedEndStream(bool encoded_end_stream);
320
75773
  StreamDecoderFilters::Iterator entry() const { return entry_; }
321

            
322
  StreamDecoderFilterSharedPtr handle_;
323
  StreamDecoderFilters::Iterator entry_;
324
  bool is_grpc_request_{};
325
};
326

            
327
/**
328
 * Wrapper for a stream encoder filter.
329
 */
330
struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,
331
                                   public StreamEncoderFilterCallbacks {
332
  ActiveStreamEncoderFilter(FilterManager& parent, StreamEncoderFilterSharedPtr filter,
333
                            absl::string_view filter_config_name)
334
9606
      : ActiveStreamFilterBase(parent, filter_config_name), handle_(std::move(filter)) {
335
9606
    handle_->setEncoderFilterCallbacks(*this);
336
9606
  }
337

            
338
  // ActiveStreamFilterBase
339
  bool canContinue() override;
340
  Buffer::InstancePtr createBuffer() override;
341
  Buffer::InstancePtr& bufferedData() override;
342
  bool observedEndStream() override;
343
  bool has1xxHeaders() override;
344
  void do1xxHeaders() override;
345
  void doHeaders(bool end_stream) override;
346
  void doData(bool end_stream) override;
347
  void drainSavedResponseMetadata();
348
  void handleMetadataAfterHeadersCallback() override;
349

            
350
752
  void doMetadata() override {
351
752
    if (saved_response_metadata_ != nullptr) {
352
      drainSavedResponseMetadata();
353
    }
354
752
  }
355
  void doTrailers() override;
356
  bool hasTrailers() override;
357

            
358
  // Http::StreamEncoderFilterCallbacks
359
  void addEncodedData(Buffer::Instance& data, bool streaming) override;
360
  void injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
361
  ResponseTrailerMap& addEncodedTrailers() override;
362
  void addEncodedMetadata(MetadataMapPtr&& metadata_map) override;
363
  void onEncoderFilterAboveWriteBufferHighWatermark() override;
364
  void onEncoderFilterBelowWriteBufferLowWatermark() override;
365
  void continueEncoding() override;
366
  const Buffer::Instance* encodingBuffer() override;
367
  void modifyEncodingBuffer(std::function<void(Buffer::Instance&)> callback) override;
368
  void sendLocalReply(Code code, absl::string_view body,
369
                      std::function<void(ResponseHeaderMap& headers)> modify_headers,
370
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
371
                      absl::string_view details) override;
372

            
373
  void responseDataTooLarge();
374
  void responseDataDrained();
375
7662
  StreamEncoderFilters::Iterator entry() const { return entry_; }
376

            
377
  StreamEncoderFilterSharedPtr handle_;
378
  StreamEncoderFilters::Iterator entry_;
379
};
380

            
381
/**
382
 * Callbacks invoked by the FilterManager to pass filter data/events back to the caller.
383
 */
384
class FilterManagerCallbacks {
385
public:
386
141588
  virtual ~FilterManagerCallbacks() = default;
387

            
388
  /**
389
   * Called when the provided headers have been encoded by all the filters in the chain.
390
   * @param response_headers the encoded headers.
391
   * @param end_stream whether this is a header only response.
392
   */
393
  virtual void encodeHeaders(ResponseHeaderMap& response_headers, bool end_stream) PURE;
394

            
395
  /**
396
   * Called when the provided 100 Continue headers have been encoded by all the filters in the
397
   * chain.
398
   * @param response_headers the encoded headers.
399
   */
400
  virtual void encode1xxHeaders(ResponseHeaderMap& response_headers) PURE;
401

            
402
  /**
403
   * Called when the provided data has been encoded by all filters in the chain.
404
   * @param data the encoded data.
405
   * @param end_stream whether this is the end of the response.
406
   */
407
  virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE;
408

            
409
  /**
410
   * Called when the provided trailers have been encoded by all filters in the chain.
411
   * @param trailers the encoded trailers.
412
   */
413
  virtual void encodeTrailers(ResponseTrailerMap& trailers) PURE;
414

            
415
  /**
416
   * Called when the provided metadata has been encoded by all filters in the chain.
417
   * @param trailers the encoded trailers.
418
   */
419
  virtual void encodeMetadata(MetadataMapPtr&& metadata) PURE;
420

            
421
  /**
422
   * Injects request trailers into a stream that originally did not have request trailers.
423
   */
424
  virtual void setRequestTrailers(RequestTrailerMapPtr&& request_trailers) PURE;
425

            
426
  /**
427
   * Passes ownership of received informational headers to the parent. This may be called multiple
428
   * times in the case of multiple upstream calls.
429
   */
430
  virtual void setInformationalHeaders(ResponseHeaderMapPtr&& response_headers) PURE;
431

            
432
  /**
433
   * Passes ownership of received response headers to the parent. This may be called multiple times
434
   * in the case of multiple upstream calls.
435
   */
436
  virtual void setResponseHeaders(ResponseHeaderMapPtr&& response_headers) PURE;
437

            
438
  /**
439
   * Passes ownership of received response trailers to the parent. This may be called multiple times
440
   * in the case of multiple upstream calls.
441
   */
442
  virtual void setResponseTrailers(ResponseTrailerMapPtr&& response_trailers) PURE;
443

            
444
  /**
445
   * Optionally updates response code stats based on the details in the headers.
446
   */
447
  virtual void chargeStats(const ResponseHeaderMap& /*headers*/) {}
448

            
449
  // TODO(snowp): We should consider moving filter access to headers/trailers to happen via the
450
  // callbacks instead of via the encode/decode callbacks on the filters.
451

            
452
  /**
453
   * The downstream request headers if set.
454
   */
455
  virtual RequestHeaderMapOptRef requestHeaders() PURE;
456

            
457
  /**
458
   * The downstream request trailers if present.
459
   */
460
  virtual RequestTrailerMapOptRef requestTrailers() PURE;
461

            
462
  /**
463
   * Retrieves a pointer to the continue headers set via the call to setInformationalHeaders.
464
   */
465
  virtual ResponseHeaderMapOptRef informationalHeaders() PURE;
466

            
467
  /**
468
   * Retrieves a pointer to the response headers set via the last call to setResponseHeaders.
469
   * Note that response headers might be set multiple times (e.g. if a local reply is issued after
470
   * headers have been received but before headers have been encoded), so it is not safe in general
471
   * to assume that any set of headers will be valid for the duration of a stream.
472
   */
473
  virtual ResponseHeaderMapOptRef responseHeaders() PURE;
474

            
475
  /**
476
   * Retrieves a pointer to the last response trailers set via setResponseTrailers.
477
   * Note that response trailers might be set multiple times, so it is not safe in general to assume
478
   * that any set of trailers will be valid for the duration of the stream.
479
   */
480
  virtual ResponseTrailerMapOptRef responseTrailers() PURE;
481

            
482
  /**
483
   * Called after encoding has completed.
484
   */
485
  virtual void endStream() PURE;
486

            
487
  /**
488
   * Attempt to send GOAWAY and close the connection.
489
   */
490
  virtual void sendGoAwayAndClose(bool graceful = false) PURE;
491

            
492
  /**
493
   * Called when the stream write buffer is no longer above the low watermark.
494
   */
495
  virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE;
496

            
497
  /**
498
   * Called when the stream write buffer is above above the high watermark.
499
   */
500
  virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;
501

            
502
  /**
503
   * Called when request activity indicates that the request timeout should be disarmed.
504
   */
505
  virtual void disarmRequestTimeout() PURE;
506

            
507
  /**
508
   * Called when stream activity indicates that the stream idle timeout should be reset.
509
   */
510
  virtual void resetIdleTimer() PURE;
511

            
512
  /**
513
   * Called when the stream should be re-created, e.g. for an internal redirect.
514
   */
515
  virtual void recreateStream(StreamInfo::FilterStateSharedPtr filter_state) PURE;
516

            
517
  /**
518
   * Called when the stream should be reset.
519
   */
520
  virtual void
521
  resetStream(Http::StreamResetReason reset_reason = Http::StreamResetReason::LocalReset,
522
              absl::string_view transport_failure_reason = "") PURE;
523

            
524
  /**
525
   * Returns the upgrade map for the current route entry.
526
   */
527
  virtual const Router::RouteEntry::UpgradeMap* upgradeMap() PURE;
528

            
529
  /**
530
   * Returns the cluster info for the current route entry.
531
   */
532
  virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE;
533

            
534
  /**
535
   * Returns the current active span.
536
   */
537
  virtual Tracing::Span& activeSpan() PURE;
538

            
539
  // TODO(snowp): It might make more sense to pass (optional?) counters to the FM instead of
540
  // calling back out to the AS to record them.
541
  /**
542
   * Called when a stream fails due to the response data being too large.
543
   */
544
  virtual void onResponseDataTooLarge() PURE;
545

            
546
  /**
547
   * Called when a stream fails due to the request data being too large.
548
   */
549
  virtual void onRequestDataTooLarge() PURE;
550

            
551
  /**
552
   * Returns the Http1StreamEncoderOptions associated with the response encoder.
553
   */
554
  virtual Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() PURE;
555

            
556
  /**
557
   * Returns the UpstreamStreamFilterCallbacks for upstream HTTP filters.
558
   */
559
222
  virtual OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() { return {}; }
560

            
561
  /**
562
   * Called when a local reply is made by the filter manager.
563
   * @param code the response code of the local reply.
564
   */
565
  virtual void onLocalReply(Code code) PURE;
566

            
567
  /**
568
   * Returns the tracing configuration to use for this stream.
569
   */
570
  virtual OptRef<const Tracing::Config> tracingConfig() const PURE;
571

            
572
  /**
573
   * Returns the tracked scope to use for this stream.
574
   */
575
  virtual const ScopeTrackedObject& scope() PURE;
576

            
577
  /**
578
   * Returns the DownstreamStreamFilterCallbacks for downstream HTTP filters.
579
   */
580
141076
  virtual OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() { return {}; }
581
  /**
582
   * Returns if close from the upstream is to be handled with half-close semantics.
583
   * This is used for HTTP/1.1 codec.
584
   */
585
  virtual bool isHalfCloseEnabled() PURE;
586
};
587

            
588
/**
589
 * This class allows the remote address to be overridden for HTTP stream info. This is used for
590
 * XFF handling. This is required to avoid providing stream info with a non-const connection info
591
 * provider. Private inheritance from ConnectionInfoProvider is used to make sure users get the
592
 * address provider via the normal getter.
593
 */
594
class OverridableRemoteConnectionInfoSetterStreamInfo : public StreamInfo::StreamInfoImpl,
595
                                                        private Network::ConnectionInfoProvider {
596
public:
597
  using StreamInfoImpl::StreamInfoImpl;
598

            
599
  void setDownstreamRemoteAddress(
600
90026
      const Network::Address::InstanceConstSharedPtr& downstream_remote_address) {
601
    // TODO(rgs1): we should assert overridden_downstream_remote_address_ is nullptr,
602
    // but we are currently relaxing this as a workaround to:
603
    //
604
    // https://github.com/envoyproxy/envoy/pull/14432#issuecomment-758167614
605
90026
    overridden_downstream_remote_address_ = downstream_remote_address;
606
90026
  }
607

            
608
  // StreamInfo::StreamInfo
609
44643
  const Network::ConnectionInfoProvider& downstreamAddressProvider() const override {
610
44643
    return *this;
611
44643
  }
612

            
613
  // Network::ConnectionInfoProvider
614
68
  const Network::Address::InstanceConstSharedPtr& localAddress() const override {
615
68
    return StreamInfoImpl::downstreamAddressProvider().localAddress();
616
68
  }
617
24
  const Network::Address::InstanceConstSharedPtr& directLocalAddress() const override {
618
24
    return StreamInfoImpl::downstreamAddressProvider().directLocalAddress();
619
24
  }
620
  bool localAddressRestored() const override {
621
    return StreamInfoImpl::downstreamAddressProvider().localAddressRestored();
622
  }
623
90
  const Network::Address::InstanceConstSharedPtr& remoteAddress() const override {
624
90
    return overridden_downstream_remote_address_ != nullptr
625
90
               ? overridden_downstream_remote_address_
626
90
               : StreamInfoImpl::downstreamAddressProvider().remoteAddress();
627
90
  }
628
118
  const Network::Address::InstanceConstSharedPtr& directRemoteAddress() const override {
629
118
    return StreamInfoImpl::downstreamAddressProvider().directRemoteAddress();
630
118
  }
631
29
  absl::string_view requestedServerName() const override {
632
29
    return StreamInfoImpl::downstreamAddressProvider().requestedServerName();
633
29
  }
634
1
  const std::vector<std::string>& requestedApplicationProtocols() const override {
635
1
    return StreamInfoImpl::downstreamAddressProvider().requestedApplicationProtocols();
636
1
  }
637
3
  absl::optional<uint64_t> connectionID() const override {
638
3
    return StreamInfoImpl::downstreamAddressProvider().connectionID();
639
3
  }
640
  absl::optional<absl::string_view> interfaceName() const override {
641
    return StreamInfoImpl::downstreamAddressProvider().interfaceName();
642
  }
643
43596
  Ssl::ConnectionInfoConstSharedPtr sslConnection() const override {
644
43596
    return StreamInfoImpl::downstreamAddressProvider().sslConnection();
645
43596
  }
646
2
  void dumpState(std::ostream& os, int indent_level) const override {
647
2
    StreamInfoImpl::dumpState(os, indent_level);
648

            
649
2
    const char* spaces = spacesForLevel(indent_level);
650
2
    os << spaces << "OverridableRemoteConnectionInfoSetterStreamInfo " << this
651
2
       << DUMP_MEMBER_AS(remoteAddress(), remoteAddress()->asStringView())
652
2
       << DUMP_MEMBER_AS(directRemoteAddress(), directRemoteAddress()->asStringView())
653
2
       << DUMP_MEMBER_AS(localAddress(), localAddress()->asStringView()) << "\n";
654
2
  }
655
20
  absl::string_view ja3Hash() const override {
656
20
    return StreamInfoImpl::downstreamAddressProvider().ja3Hash();
657
20
  }
658
4
  absl::string_view ja4Hash() const override {
659
4
    return StreamInfoImpl::downstreamAddressProvider().ja4Hash();
660
4
  }
661
719
  const absl::optional<std::chrono::milliseconds>& roundTripTime() const override {
662
719
    return StreamInfoImpl::downstreamAddressProvider().roundTripTime();
663
719
  }
664
  OptRef<const Network::FilterChainInfo> filterChainInfo() const override {
665
    return StreamInfoImpl::downstreamAddressProvider().filterChainInfo();
666
  }
667
9
  OptRef<const Network::ListenerInfo> listenerInfo() const override {
668
9
    return StreamInfoImpl::downstreamAddressProvider().listenerInfo();
669
9
  }
670

            
671
private:
672
  Network::Address::InstanceConstSharedPtr overridden_downstream_remote_address_;
673
};
674

            
675
/**
676
 * FilterManager manages decoding a request through a series of decoding filter and the encoding
677
 * of the resulting response.
678
 */
679
class FilterManager : public ScopeTrackedObject, Logger::Loggable<Logger::Id::http> {
680
public:
681
  FilterManager(FilterManagerCallbacks& filter_manager_callbacks, Event::Dispatcher& dispatcher,
682
                OptRef<const Network::Connection> connection, uint64_t stream_id,
683
                Buffer::BufferMemoryAccountSharedPtr account, bool proxy_100_continue,
684
                uint64_t buffer_limit)
685
141587
      : filter_manager_callbacks_(filter_manager_callbacks), dispatcher_(dispatcher),
686
141587
        connection_(connection), stream_id_(stream_id), account_(std::move(account)),
687
141587
        proxy_100_continue_(proxy_100_continue), buffer_limit_(buffer_limit) {}
688

            
689
141587
  ~FilterManager() override {
690
141587
    ASSERT(state_.destroyed_);
691
141587
    ASSERT(state_.filter_call_state_ == 0);
692
141587
  }
693

            
694
  // ScopeTrackedObject
695
  OptRef<const StreamInfo::StreamInfo> trackedStream() const override { return streamInfo(); }
696
4
  void dumpState(std::ostream& os, int indent_level = 0) const override {
697
4
    const char* spaces = spacesForLevel(indent_level);
698
4
    os << spaces << "FilterManager " << this << DUMP_MEMBER(state_.has_1xx_headers_)
699
4
       << DUMP_MEMBER(state_.decoder_filter_chain_complete_)
700
4
       << DUMP_MEMBER(state_.encoder_filter_chain_complete_)
701
4
       << DUMP_MEMBER(state_.observed_decode_end_stream_)
702
4
       << DUMP_MEMBER(state_.observed_encode_end_stream_) << "\n";
703

            
704
4
    DUMP_DETAILS(filter_manager_callbacks_.requestHeaders());
705
4
    DUMP_DETAILS(filter_manager_callbacks_.requestTrailers());
706
4
    DUMP_DETAILS(filter_manager_callbacks_.responseHeaders());
707
4
    DUMP_DETAILS(filter_manager_callbacks_.responseTrailers());
708
4
    DUMP_DETAILS(&streamInfo());
709
4
  }
710

            
711
92585
  void log(const Formatter::Context log_context) {
712
92587
    for (const auto& log_handler : access_log_handlers_) {
713
195
      log_handler->log(log_context, streamInfo());
714
195
    }
715
92585
  }
716

            
717
2527
  const AccessLog::InstanceSharedPtrVector& accessLogHandlers() { return access_log_handlers_; }
718

            
719
94065
  void onStreamComplete() {
720
102992
    for (auto filter : filters_) {
721
101841
      filter->onStreamComplete();
722
101841
    }
723
94065
  }
724

            
725
141587
  void destroyFilters() {
726
141587
    state_.destroyed_ = true;
727

            
728
152024
    for (auto filter : filters_) {
729
150873
      filter->onDestroy();
730
150873
    }
731
141587
  }
732

            
733
  /**
734
   * Decodes the provided headers starting at the first filter in the chain.
735
   * @param headers the headers to decode.
736
   * @param end_stream whether the request is header only.
737
   */
738
97731
  void decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
739
97731
    state_.observed_decode_end_stream_ = end_stream;
740
97731
    decodeHeaders(nullptr, headers, end_stream);
741
97731
  }
742

            
743
  /**
744
   * Decodes the provided data starting at the first filter in the chain.
745
   * @param data the data to decode.
746
   * @param end_stream whether this data is the end of the request.
747
   */
748
436964
  void decodeData(Buffer::Instance& data, bool end_stream) {
749
436964
    state_.observed_decode_end_stream_ = end_stream;
750
436964
    decodeData(nullptr, data, end_stream, FilterIterationStartState::CanStartFromCurrent);
751
436964
  }
752

            
753
  /**
754
   * Decodes the provided trailers starting at the first filter in the chain.
755
   * @param trailers the trailers to decode.
756
   */
757
955
  void decodeTrailers(RequestTrailerMap& trailers) {
758
955
    state_.observed_decode_end_stream_ = true;
759
955
    decodeTrailers(nullptr, trailers);
760
955
  }
761

            
762
  /**
763
   * Decodes the provided metadata starting at the first filter in the chain.
764
   * @param metadata_map the metadata to decode.
765
   */
766
2151
  void decodeMetadata(MetadataMap& metadata_map) { decodeMetadata(nullptr, metadata_map); }
767

            
768
  void disarmRequestTimeout();
769

            
770
  /**
771
   * If end_stream is true, marks encoding as complete. This is a noop if end_stream is false.
772
   * @param end_stream whether encoding is complete.
773
   */
774
  void maybeEndEncode(bool end_stream);
775

            
776
  /**
777
   * If terminal_filter_decoded_end_stream is true, marks decoding as complete. This is a noop if
778
   * terminal_filter_decoded_end_stream is false.
779
   * @param end_stream whether decoding is complete.
780
   */
781
  void maybeEndDecode(bool terminal_filter_decoded_end_stream);
782

            
783
  void checkAndCloseStreamIfFullyClosed();
784

            
785
  virtual void sendLocalReply(Code code, absl::string_view body,
786
                              const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
787
                              const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
788
                              absl::string_view details) PURE;
789

            
790
  void resetStream(StreamResetReason reason, absl::string_view transport_failure_reason);
791

            
792
  /**
793
   * Executes a prepared, but not yet propagated, local reply.
794
   * Prepared local replies can occur in the decoder filter chain iteration.
795
   */
796
  virtual void executeLocalReplyIfPrepared() PURE;
797

            
798
  // Possibly increases buffer_limit_ to the value of limit.
799
  void setBufferLimit(uint64_t limit);
800

            
801
  /**
802
   * @return uint64_t the current buffer limit.
803
   */
804
51
  uint64_t bufferLimit() const { return buffer_limit_; }
805

            
806
  /**
807
   * @return bool whether any above high watermark triggers are currently active
808
   */
809
  bool aboveHighWatermark() { return high_watermark_count_ != 0; }
810

            
811
  // Pass on watermark callbacks to watermark subscribers. This boils down to passing watermark
812
  // events for this stream and the downstream connection to the router filter.
813
  void callHighWatermarkCallbacks();
814
  void callLowWatermarkCallbacks();
815

            
816
138955
  void requestHeadersInitialized() {
817
138955
    if (Http::Headers::get().MethodValues.Head ==
818
138955
        filter_manager_callbacks_.requestHeaders()->getMethodValue()) {
819
292
      state_.is_head_request_ = true;
820
292
    }
821
138955
    state_.is_grpc_request_ =
822
138955
        Grpc::Common::isGrpcRequestHeaders(filter_manager_callbacks_.requestHeaders().ref());
823
138955
  }
824

            
825
  /**
826
   * Marks local processing as complete.
827
   * TODO(yanvlasov): deprecate and decommission this function.
828
   */
829
2792
  void setLocalComplete() {
830
2792
    state_.observed_encode_end_stream_ = true;
831
2792
    state_.decoder_filter_chain_aborted_ = true;
832
2792
  }
833

            
834
  /**
835
   * Whether the filters have been destroyed.
836
   */
837
220406
  bool destroyed() const { return state_.destroyed_; }
838

            
839
  /**
840
   * Whether remote processing has been marked as complete.
841
   */
842
35094
  virtual bool decoderObservedEndStream() const { return state_.observed_decode_end_stream_; }
843

            
844
  /**
845
   * Instructs the FilterManager to not create a filter chain. This makes it possible to issue
846
   * a local reply without the overhead of creating and traversing the filters.
847
   */
848
49
  void skipFilterChainCreation() {
849
49
    ASSERT(!state_.create_chain_result_.created());
850
49
    state_.create_chain_result_ = CreateChainResult(true);
851
49
  }
852

            
853
  virtual StreamInfo::StreamInfo& streamInfo() PURE;
854
  virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
855

            
856
  enum class UpgradeResult : uint8_t { UpgradeUnneeded, UpgradeAccepted, UpgradeRejected };
857

            
858
  /**
859
   * Filter chain creation result.
860
   */
861
  class CreateChainResult {
862
  public:
863
141587
    CreateChainResult() = default;
864

            
865
    /**
866
     * @param created whether the filter chain was created.
867
     * @param upgrade the upgrade result.
868
     */
869
    CreateChainResult(bool created, UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded)
870
232530
        : created_(created), upgrade_(upgrade) {}
871

            
872
    /**
873
     * @return whether the filter chain was created.
874
     */
875
379304
    bool created() const { return created_; }
876
    /**
877
     * @return whether the upgrade was accepted.
878
     */
879
90322
    bool upgradeAccepted() const { return upgrade_ == UpgradeResult::UpgradeAccepted; }
880
    /**
881
     * @return whether the upgrade was rejected.
882
     */
883
87620
    bool upgradeRejected() const { return upgrade_ == UpgradeResult::UpgradeRejected; }
884

            
885
  private:
886
    bool created_ = false;
887
    UpgradeResult upgrade_ = UpgradeResult::UpgradeUnneeded;
888
  };
889

            
890
  /**
891
   * Set up the Encoder/Decoder filter chain.
892
   * @param filter_chain_factory the factory to create the filter chain.
893
   */
894
  CreateChainResult createFilterChain(const FilterChainFactory& filter_chain_factory);
895

            
896
272541
  OptRef<const Network::Connection> connection() const { return connection_; }
897

            
898
172947
  uint64_t streamId() const { return stream_id_; }
899
86871
  Buffer::BufferMemoryAccountSharedPtr account() const { return account_; }
900

            
901
296
  Buffer::InstancePtr& bufferedRequestData() { return buffered_request_data_; }
902

            
903
  void contextOnContinue(ScopeTrackedObjectStack& tracked_object_stack);
904

            
905
49635
  void onDownstreamReset() { state_.saw_downstream_reset_ = true; }
906
2529
  bool sawDownstreamReset() { return state_.saw_downstream_reset_; }
907

            
908
  virtual bool shouldLoadShed() { return false; };
909

            
910
18
  void sendGoAwayAndClose(bool graceful = false) {
911
    // Stop filter chain iteration by checking encoder or decoder chain.
912
18
    if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
913
9
      state_.decoder_filter_chain_aborted_ = true;
914
13
    } else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
915
9
      state_.encoder_filter_chain_aborted_ = true;
916
9
    }
917
18
    filter_manager_callbacks_.sendGoAwayAndClose(graceful);
918
18
  }
919

            
920
protected:
921
  struct State {
922
141587
    State() = default;
923
    uint32_t filter_call_state_{0};
924

            
925
    // Set after decoder filter chain has completed iteration. Prevents further calls to decoder
926
    // filters. This flag is used to determine stream completion when the independent half-close is
927
    // enabled.
928
    bool decoder_filter_chain_complete_{};
929

            
930
    // Set after encoder filter chain has completed iteration. Prevents further calls to encoder
931
    // filters. This flag is used to determine stream completion when the independent half-close is
932
    // enabled.
933
    bool encoder_filter_chain_complete_{};
934

            
935
    // Set `true` when the filter manager observes end stream on the decoder path (from downstream
936
    // client) before iteration of the decoder filter chain begins. This flag is used for setting
937
    // end_stream value when resuming decoder filter chain iteration.
938
    bool observed_decode_end_stream_{};
939
    // Set `true` when the filter manager observes end stream on the encoder path (from upstream
940
    // server or Envoy's local reply) before iteration of the encoder filter chain begins. This flag
941
    // is used for setting end_stream value when resuming encoder filter chain iteration.
942
    bool observed_encode_end_stream_{};
943

            
944
    // By default, we will assume there are no 1xx. If encode1xxHeaders
945
    // is ever called, this is set to true so commonContinue resumes processing the 1xx.
946
    bool has_1xx_headers_{};
947
    // These two are latched on initial header read, to determine if the original headers
948
    // constituted a HEAD or gRPC request, respectively.
949
    bool is_head_request_{};
950
    bool is_grpc_request_{};
951
    // Tracks if headers other than 100-Continue have been encoded to the codec.
952
    bool non_100_response_headers_encoded_{};
953
    // True under the stack of onLocalReply, false otherwise.
954
    bool under_on_local_reply_{};
955
    // True when the filter chain iteration was aborted with local reply.
956
    bool decoder_filter_chain_aborted_{};
957
    bool encoder_filter_chain_aborted_{};
958
    bool saw_downstream_reset_{};
959
    // True when the stream was recreated.
960
    bool recreated_stream_{};
961

            
962
    // The following 3 members are booleans rather than part of the space-saving bitfield as they
963
    // are passed as arguments to functions expecting bools. Extend State using the bitfield
964
    // where possible.
965
    bool encoder_filters_streaming_{true};
966
    bool decoder_filters_streaming_{true};
967
    bool destroyed_{false};
968

            
969
    // Result of filter chain creation.
970
    CreateChainResult create_chain_result_;
971

            
972
    // Used to track which filter is the latest filter that has received data.
973
    ActiveStreamEncoderFilter* latest_data_encoding_filter_{};
974
    ActiveStreamDecoderFilter* latest_data_decoding_filter_{};
975
  };
976

            
977
8618
  State& state() { return state_; }
978

            
979
private:
980
  friend class DownstreamFilterManager;
981
  class FilterChainFactoryCallbacksImpl : public Http::FilterChainFactoryCallbacks {
982
  public:
983
    FilterChainFactoryCallbacksImpl(FilterManager& manager)
984
232481
        : manager_(manager), route_(manager_.streamInfo().route()) {}
985

            
986
141267
    void addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr filter) override {
987
141267
      manager_.filters_.push_back(filter.get());
988

            
989
141267
      manager_.decoder_filters_.entries_.emplace_back(std::make_unique<ActiveStreamDecoderFilter>(
990
141267
          manager_, std::move(filter), filter_config_name_));
991
141267
    }
992

            
993
201
    void addStreamEncoderFilter(Http::StreamEncoderFilterSharedPtr filter) override {
994
201
      manager_.filters_.push_back(filter.get());
995

            
996
201
      manager_.encoder_filters_.entries_.emplace_back(std::make_unique<ActiveStreamEncoderFilter>(
997
201
          manager_, std::move(filter), filter_config_name_));
998
201
    }
999

            
9405
    void addStreamFilter(Http::StreamFilterSharedPtr filter) override {
9405
      manager_.filters_.push_back(filter.get());
9405
      manager_.decoder_filters_.entries_.emplace_back(
9405
          std::make_unique<ActiveStreamDecoderFilter>(manager_, filter, filter_config_name_));
9405
      manager_.encoder_filters_.entries_.emplace_back(std::make_unique<ActiveStreamEncoderFilter>(
9405
          manager_, std::move(filter), filter_config_name_));
9405
    }
225
    void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) override {
225
      manager_.access_log_handlers_.push_back(std::move(handler));
225
    }
278
    Event::Dispatcher& dispatcher() override { return manager_.dispatcher_; }
    absl::string_view filterConfigName() const override { return filter_config_name_; }
150879
    void setFilterConfigName(absl::string_view name) override { filter_config_name_ = name; }
    OptRef<const Router::Route> route() const override { return makeOptRefFromPtr(route_.get()); }
103905
    absl::optional<bool> filterDisabled(absl::string_view config_name) const override {
103905
      return route_ != nullptr ? route_->filterDisabled(config_name) : absl::nullopt;
103905
    }
    const StreamInfo::StreamInfo& streamInfo() const override { return manager_.streamInfo(); }
    RequestHeaderMapOptRef requestHeaders() const override {
      return manager_.filter_manager_callbacks_.requestHeaders();
    }
  private:
    FilterManager& manager_;
    absl::string_view filter_config_name_;
    Router::RouteConstSharedPtr route_;
  };
  // Indicates which filter to start the iteration with.
  enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent };
  UpgradeResult createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
                                         FilterChainFactoryCallbacksImpl& callbacks);
  // Returns the encoder filter to start iteration with.
  StreamEncoderFilters::Iterator
  commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
                     FilterIterationStartState filter_iteration_start_state);
  // Returns the decoder filter to start iteration with.
  StreamDecoderFilters::Iterator
  commonDecodePrefix(ActiveStreamDecoderFilter* filter,
                     FilterIterationStartState filter_iteration_start_state);
  void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming);
  RequestTrailerMap& addDecodedTrailers();
  MetadataMapVector& addDecodedMetadata();
  // Helper function for the case where we have a header only request, but a filter adds a body
  // to it.
  void maybeContinueDecoding(StreamDecoderFilters::Iterator maybe_continue_data_entry);
  void decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers, bool end_stream);
  // Sends data through decoding filter chains. filter_iteration_start_state indicates which
  // filter to start the iteration with.
  void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream,
                  FilterIterationStartState filter_iteration_start_state);
  void decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers);
  void decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map);
  void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data, bool streaming);
  ResponseTrailerMap& addEncodedTrailers();
  void encode1xxHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers);
  // As with most of the encode functions, this runs encodeHeaders on various
  // filters before calling encodeHeadersInternal which does final header munging and passes the
  // headers to the encoder.
  void maybeContinueEncoding(StreamEncoderFilters::Iterator maybe_continue_data_entry);
  void encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
                     bool end_stream);
  // Sends data through encoding filter chains. filter_iteration_start_state indicates which
  // filter to start the iteration with, and finally calls encodeDataInternal
  // to update stats, do end stream bookkeeping, and send the data to encoder.
  void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream,
                  FilterIterationStartState filter_iteration_start_state);
  void encodeTrailers(ActiveStreamEncoderFilter* filter, ResponseTrailerMap& trailers);
  void encodeMetadata(ActiveStreamEncoderFilter* filter, MetadataMapPtr&& metadata_map_ptr);
  // Returns true if new metadata is decoded. Otherwise, returns false.
  bool processNewlyAddedMetadata();
  // Returns true if filter has stopped iteration for all frame types. Otherwise, returns false.
  // filter_streaming is the variable to indicate if stream is streaming, and its value may be
  // changed by the function.
  bool handleDataIfStopAll(ActiveStreamFilterBase& filter, Buffer::Instance& data,
                           bool& filter_streaming);
2274
  MetadataMapVector* getRequestMetadataMapVector() {
2274
    if (request_metadata_map_vector_ == nullptr) {
52
      request_metadata_map_vector_ = std::make_unique<MetadataMapVector>();
52
    }
2274
    return request_metadata_map_vector_.get();
2274
  }
  // Returns true if the decoder filter chain should not process any more frames.
  // This includes cases where the chain was explicitly aborted (e.g., local reply)
  // or where the downstream connection has been reset.
941981
  bool stopDecoderFilterChain() {
941981
    return state_.decoder_filter_chain_aborted_ || state_.saw_downstream_reset_;
941981
  }
3236
  bool stopEncoderFilterChain() { return state_.encoder_filter_chain_aborted_; }
  bool isTerminalDecoderFilter(const ActiveStreamDecoderFilter& filter) const;
  FilterManagerCallbacks& filter_manager_callbacks_;
  Event::Dispatcher& dispatcher_;
  // This is unset if there is no downstream connection, e.g. for health check or
  // async requests.
  OptRef<const Network::Connection> connection_;
  const uint64_t stream_id_;
  Buffer::BufferMemoryAccountSharedPtr account_;
  const bool proxy_100_continue_;
  StreamDecoderFilters decoder_filters_;
  StreamEncoderFilters encoder_filters_;
  std::vector<StreamFilterBase*> filters_;
  AccessLog::InstanceSharedPtrVector access_log_handlers_;
  // Stores metadata added in the decoding filter that is being processed. Will be cleared before
  // processing the next filter. The storage is created on demand. We need to store metadata
  // temporarily in the filter in case the filter has stopped all while processing headers.
  std::unique_ptr<MetadataMapVector> request_metadata_map_vector_;
  Buffer::InstancePtr buffered_response_data_;
  Buffer::InstancePtr buffered_request_data_;
  uint64_t buffer_limit_{0};
  uint32_t high_watermark_count_{0};
  std::list<DownstreamWatermarkCallbacks*> watermark_callbacks_;
  Network::Socket::OptionsSharedPtr upstream_options_ =
      std::make_shared<Network::Socket::Options>();
  std::pair<std::string, bool> upstream_override_host_;
  // TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM,
  // at which point they no longer need to be friends.
  friend ActiveStreamFilterBase;
  friend ActiveStreamDecoderFilter;
  friend ActiveStreamEncoderFilter;
  /**
   * Flags that keep track of which filter calls are currently in progress.
   */
  // clang-format off
    struct FilterCallState {
      static constexpr uint32_t DecodeHeaders   = 0x01;
      static constexpr uint32_t DecodeData      = 0x02;
      static constexpr uint32_t DecodeMetadata  = 0x04;
      static constexpr uint32_t DecodeTrailers  = 0x08;
      static constexpr uint32_t EncodeHeaders   = 0x10;
      static constexpr uint32_t EncodeData      = 0x20;
      static constexpr uint32_t EncodeMetadata  = 0x40;
      static constexpr uint32_t EncodeTrailers  = 0x80;
      // Encode1xxHeaders is a bit of a special state as 1xx
      // headers may be sent during request processing. This state is only used
      // to verify we do not encode1xx headers more than once per
      // filter.
      static constexpr uint32_t Encode1xxHeaders  = 0x100;
      // Used to indicate that one of the following conditions is true:
      // 1. The filter manager is processing the final [En|De]codeData frame.
      // 2. The filter manager is in [En|De]codeHeaders and is processing a
      //    header-only request or response.
      static constexpr uint32_t EndOfStream = 0x200;
      // Masks for filter call state.
      static constexpr uint32_t IsDecodingMask = DecodeHeaders | DecodeData | DecodeMetadata | DecodeTrailers;
      static constexpr uint32_t IsEncodingMask = EncodeHeaders | Encode1xxHeaders | EncodeData | EncodeMetadata | EncodeTrailers;
    };
  // clang-format on
  State state_;
};
// The DownstreamFilterManager has explicit handling to send local replies.
// The UpstreamFilterManager will not, and will instead defer local reply
// management to the DownstreamFilterManager.
class DownstreamFilterManager : public FilterManager {
public:
  DownstreamFilterManager(FilterManagerCallbacks& filter_manager_callbacks,
                          Event::Dispatcher& dispatcher, const Network::Connection& connection,
                          uint64_t stream_id, Buffer::BufferMemoryAccountSharedPtr account,
                          bool proxy_100_continue, uint32_t buffer_limit,
                          FilterChainFactory& filter_chain_factory,
                          const LocalReply::LocalReply& local_reply, Http::Protocol protocol,
                          TimeSource& time_source,
                          StreamInfo::FilterStateSharedPtr parent_filter_state,
                          Server::OverloadManager& overload_manager)
94090
      : FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
94090
                      proxy_100_continue, buffer_limit),
94090
        stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(),
94090
                     StreamInfo::FilterState::LifeSpan::FilterChain,
94090
                     std::move(parent_filter_state)),
94090
        local_reply_(local_reply), filter_chain_factory_(filter_chain_factory),
94090
        downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
94090
            Server::LoadShedPointName::get().HttpDownstreamFilterCheck)) {
94090
    ENVOY_LOG_ONCE_IF(
94090
        trace, downstream_filter_load_shed_point_ == nullptr,
94090
        "LoadShedPoint envoy.load_shed_points.http_downstream_filter_check is not found. "
94090
        "Is it configured?");
94090
  }
94090
  ~DownstreamFilterManager() override {
94090
    ASSERT(prepared_local_reply_ == nullptr,
94090
           "Filter Manager destroyed without executing prepared local reply");
94090
  }
  // TODO(snowp): This should probably return a StreamInfo instead of the impl.
4012143
  StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }
759361
  const StreamInfo::StreamInfoImpl& streamInfo() const override { return stream_info_; }
  void setDownstreamRemoteAddress(
90026
      const Network::Address::InstanceConstSharedPtr& downstream_remote_address) {
90026
    stream_info_.setDownstreamRemoteAddress(downstream_remote_address);
90026
  }
  CreateChainResult createDownstreamFilterChain();
  /**
   * Called before local reply is made by the filter manager.
   * @param data the data associated with the local reply.
   */
  void onLocalReply(StreamFilterBase::LocalReplyData& data);
  void sendLocalReply(Code code, absl::string_view body,
                      const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
                      const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
                      absl::string_view details) override;
  /**
   * Whether downstream has observed end_stream.
   */
259125
  bool decoderObservedEndStream() const override { return state_.observed_decode_end_stream_; }
  /**
   * Return true if the timestamp of the downstream end_stream was recorded.
   * For the HCM to handle the case of internal redirects, timeout error replies
   * and stream resets on premature upstream response.
   */
321090
  bool hasLastDownstreamByteReceived() const {
321090
    return streamInfo().downstreamTiming() &&
321090
           streamInfo().downstreamTiming()->lastDownstreamRxByteReceived().has_value();
321090
  }
43500
  bool shouldLoadShed() override {
43500
    return downstream_filter_load_shed_point_ != nullptr &&
43500
           downstream_filter_load_shed_point_->shouldShedLoad();
43500
  }
private:
  /**
   * Sends a local reply by constructing a response and passing it through all the encoder
   * filters. The resulting response will be passed out via the FilterManagerCallbacks.
   * This will be deprecated in favor of prepareLocalReplyViaFilterChain.
   */
  void sendLocalReplyViaFilterChain(
      bool is_grpc_request, Code code, absl::string_view body,
      const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
      const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details);
  /**
   * Prepares a local reply that will be sent along the encoder filters in
   * executeLocalReplyViaFilterChain.
   */
  void prepareLocalReplyViaFilterChain(
      bool is_grpc_request, Code code, absl::string_view body,
      const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
      const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details);
  /**
   * Executes a prepared local reply along the encoder filters.
   */
  void executeLocalReplyIfPrepared() override;
  /**
   * Sends a local reply by constructing a response and skipping the encoder filters. The
   * resulting response will be passed out via the FilterManagerCallbacks.
   */
  void sendDirectLocalReply(Code code, absl::string_view body,
                            const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
                            bool is_head_request,
                            const absl::optional<Grpc::Status::GrpcStatus> grpc_status);
private:
  OverridableRemoteConnectionInfoSetterStreamInfo stream_info_;
  const LocalReply::LocalReply& local_reply_;
  const FilterChainFactory& filter_chain_factory_;
  Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
  Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
};
} // namespace Http
} // namespace Envoy