1
#include "source/common/http/filter_manager.h"
2

            
3
#include <functional>
4

            
5
#include "envoy/http/header_map.h"
6
#include "envoy/matcher/matcher.h"
7

            
8
#include "source/common/common/enum_to_int.h"
9
#include "source/common/common/scope_tracked_object_stack.h"
10
#include "source/common/common/scope_tracker.h"
11
#include "source/common/http/codes.h"
12
#include "source/common/http/header_map_impl.h"
13
#include "source/common/http/header_utility.h"
14
#include "source/common/http/utility.h"
15
#include "source/common/runtime/runtime_features.h"
16

            
17
#include "matching/data_impl.h"
18

            
19
namespace Envoy {
20
namespace Http {
21

            
22
namespace {
23

            
24
// Shared helper for recording the latest filter used.
25
template <class Filters>
26
void recordLatestDataFilter(typename Filters::Iterator current_filter,
27
461381
                            typename Filters::Element*& latest_filter, Filters& filters) {
28
  // If this is the first time we're calling onData, just record the current filter.
29
461381
  if (latest_filter == nullptr) {
30
21257
    latest_filter = current_filter->get();
31
21257
    return;
32
21257
  }
33

            
34
  // We want to keep this pointing at the latest filter in the filter list that has received the
35
  // onData callback. To do so, we compare the current latest with the *previous* filter. If they
36
  // match, then we must be processing a new filter for the first time. We omit this check if we're
37
  // the first filter, since the above check handles that case.
38
  //
39
  // We compare against the previous filter to avoid multiple filter iterations from resetting the
40
  // pointer: If we just set latest to current, then the first onData filter iteration would
41
  // correctly iterate over the filters and set latest, but on subsequent onData iterations
42
  // we'd start from the beginning again, potentially allowing filter N to modify the buffer even
43
  // though filter M > N was the filter that inserted data into the buffer.
44
440124
  if (current_filter != filters.begin() && latest_filter == std::prev(current_filter)->get()) {
45
2803
    latest_filter = current_filter->get();
46
2803
  }
47
440124
}
48

            
49
void finalizeHeaders(FilterManagerCallbacks& callbacks, StreamInfo::StreamInfo& stream_info,
50
9731
                     ResponseHeaderMap& headers) {
51
9731
  const Router::RouteConstSharedPtr& route = stream_info.route();
52
9731
  if (route != nullptr && route->routeEntry() != nullptr) {
53
4831
    const Formatter::Context formatter_context{
54
4831
        callbacks.requestHeaders().ptr(), &headers, {}, {}, {}, &callbacks.activeSpan()};
55
4831
    route->routeEntry()->finalizeResponseHeaders(headers, formatter_context, stream_info);
56
4831
  }
57
9731
}
58

            
59
} // namespace
60

            
61
44588
void ActiveStreamFilterBase::commonContinue() {
62
44588
  if (!canContinue()) {
63
101
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
64
101
                     static_cast<const void*>(this));
65
101
    return;
66
101
  }
67

            
68
  // Set ScopeTrackerScopeState if there's no existing crash context.
69
44487
  ScopeTrackedObjectStack encapsulated_object;
70
44487
  absl::optional<ScopeTrackerScopeState> state;
71
44487
  if (parent_.dispatcher_.trackedObjectStackIsEmpty()) {
72
14047
    restoreContextOnContinue(encapsulated_object);
73
14047
    state.emplace(&encapsulated_object, parent_.dispatcher_);
74
14047
  }
75

            
76
44487
  ENVOY_STREAM_LOG(trace, "continuing filter chain: filter={}", *this,
77
44487
                   static_cast<const void*>(this));
78
44487
  ASSERT(!canIterate(),
79
44487
         "Attempting to continue iteration while the IterationState is already Continue");
80
  // If iteration has stopped for all frame types, set iterate_from_current_filter_ to true so the
81
  // filter iteration starts with the current filter instead of the next one.
82
44487
  if (stoppedAll()) {
83
41928
    iterate_from_current_filter_ = true;
84
41928
  }
85
44487
  allowIteration();
86

            
87
  // Only resume with do1xxHeaders() if we've actually seen 1xx headers.
88
44487
  if (has1xxHeaders()) {
89
1
    continued_1xx_headers_ = true;
90
1
    do1xxHeaders();
91
    // If the response headers have not yet come in, don't continue on with
92
    // headers and body. doHeaders expects request headers to exist.
93
1
    if (!parent_.filter_manager_callbacks_.responseHeaders()) {
94
1
      return;
95
1
    }
96
1
  }
97

            
98
44486
  if (!canContinue()) {
99
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
100
                     static_cast<const void*>(this));
101
    return;
102
  }
103

            
104
  // Make sure that we handle the zero byte data frame case. We make no effort to optimize this
105
  // case in terms of merging it into a header only request/response. This could be done in the
106
  // future.
107
44486
  if (!headers_continued_) {
108
44066
    headers_continued_ = true;
109
44066
    doHeaders(observedEndStream() && !bufferedData() && !hasTrailers());
110
44066
  }
111

            
112
44486
  if (!canContinue()) {
113
112
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
114
112
                     static_cast<const void*>(this));
115
112
    return;
116
112
  }
117

            
118
44374
  doMetadata();
119

            
120
44374
  if (!canContinue()) {
121
1
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
122
1
                     static_cast<const void*>(this));
123
1
    return;
124
1
  }
125

            
126
  // It is possible for trailers to be added during doData(). doData() itself handles continuation
127
  // of trailers for the non-continuation case. Thus, we must keep track of whether we had
128
  // trailers prior to calling doData(). If we do, then we continue them here, otherwise we rely
129
  // on doData() to do so.
130
44373
  const bool had_trailers_before_data = hasTrailers();
131
44373
  if (bufferedData()) {
132
10360
    doData(observedEndStream() && !had_trailers_before_data);
133
10360
  }
134

            
135
44373
  if (!canContinue()) {
136
602
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
137
602
                     static_cast<const void*>(this));
138
602
    return;
139
602
  }
140

            
141
43771
  if (had_trailers_before_data) {
142
511
    doTrailers();
143
511
  }
144

            
145
43771
  iterate_from_current_filter_ = false;
146
43771
}
147

            
148
18
bool ActiveStreamFilterBase::commonHandleAfter1xxHeadersCallback(Filter1xxHeadersStatus status) {
149
18
  ASSERT(parent_.state_.has_1xx_headers_);
150
18
  ASSERT(!continued_1xx_headers_);
151
18
  ASSERT(canIterate());
152

            
153
18
  switch (status) {
154
17
  case Filter1xxHeadersStatus::Continue:
155
17
    continued_1xx_headers_ = true;
156
17
    return true;
157
1
  case Filter1xxHeadersStatus::StopIteration:
158
1
    iteration_state_ = IterationState::StopSingleIteration;
159
1
    return false;
160
18
  }
161

            
162
  PANIC_DUE_TO_CORRUPT_ENUM;
163
}
164

            
165
bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatus status,
166
113349
                                                              bool& end_stream) {
167
113349
  ASSERT(!headers_continued_);
168
113349
  ASSERT(canIterate());
169

            
170
113349
  switch (status) {
171
39739
  case FilterHeadersStatus::StopIteration:
172
39739
    iteration_state_ = IterationState::StopSingleIteration;
173
39739
    break;
174
114
  case FilterHeadersStatus::StopAllIterationAndBuffer:
175
114
    iteration_state_ = IterationState::StopAllBuffer;
176
114
    break;
177
43443
  case FilterHeadersStatus::StopAllIterationAndWatermark:
178
43443
    iteration_state_ = IterationState::StopAllWatermark;
179
43443
    break;
180
31
  case FilterHeadersStatus::ContinueAndDontEndStream:
181
31
    end_stream = false;
182
31
    headers_continued_ = true;
183
31
    ENVOY_STREAM_LOG(debug, "converting to headers and body (body not available yet)", parent_);
184
31
    break;
185
30022
  case FilterHeadersStatus::Continue:
186
30022
    headers_continued_ = true;
187
30022
    break;
188
113349
  }
189

            
190
113349
  handleMetadataAfterHeadersCallback();
191

            
192
113349
  if (stoppedAll() || status == FilterHeadersStatus::StopIteration) {
193
83296
    return false;
194
85509
  } else {
195
30053
    return true;
196
30053
  }
197
113349
}
198

            
199
30581
void ActiveStreamFilterBase::commonHandleBufferData(Buffer::Instance& provided_data) {
200

            
201
  // The way we do buffering is a little complicated which is why we have this common function
202
  // which is used for both encoding and decoding. When data first comes into our filter pipeline,
203
  // we send it through. Any filter can choose to stop iteration and buffer or not. If we then
204
  // continue iteration in the future, we use the buffered data. A future filter can stop and
205
  // buffer again. In this case, since we are already operating on buffered data, we don't
206
  // rebuffer, because we assume the filter has modified the buffer as it wishes in place.
207
30581
  if (bufferedData().get() != &provided_data) {
208
30073
    if (!bufferedData()) {
209
10670
      bufferedData() = createBuffer();
210
10670
    }
211
30073
    bufferedData()->move(provided_data);
212
30073
  }
213
30581
}
214

            
215
bool ActiveStreamFilterBase::commonHandleAfterDataCallback(FilterDataStatus status,
216
                                                           Buffer::Instance& provided_data,
217
461232
                                                           bool& buffer_was_streaming) {
218

            
219
461232
  if (status == FilterDataStatus::Continue) {
220
210574
    if (iteration_state_ == IterationState::StopSingleIteration) {
221
618
      commonHandleBufferData(provided_data);
222
618
      commonContinue();
223
618
      return false;
224
209967
    } else {
225
209956
      ASSERT(headers_continued_);
226
209956
    }
227
281669
  } else {
228
250658
    iteration_state_ = IterationState::StopSingleIteration;
229
250658
    if (status == FilterDataStatus::StopIterationAndBuffer ||
230
250658
        status == FilterDataStatus::StopIterationAndWatermark) {
231
10164
      buffer_was_streaming = status == FilterDataStatus::StopIterationAndWatermark;
232
10164
      commonHandleBufferData(provided_data);
233
246688
    } else if (observedEndStream() && !hasTrailers() && !bufferedData() &&
234
               // If the stream is destroyed, no need to handle the data buffer or trailers.
235
               // This can occur if the filter calls sendLocalReply.
236
240494
               !parent_.state_.destroyed_) {
237
      // If this filter is doing StopIterationNoBuffer and this stream is terminated with a zero
238
      // byte data frame, we need to create an empty buffer to make sure that when commonContinue
239
      // is called, the pipeline resumes with an empty data frame with end_stream = true
240
3326
      ASSERT(end_stream_);
241
3326
      bufferedData() = createBuffer();
242
3326
    }
243

            
244
250658
    return false;
245
250658
  }
246

            
247
209956
  return true;
248
461232
}
249

            
250
1715
bool ActiveStreamFilterBase::commonHandleAfterTrailersCallback(FilterTrailersStatus status) {
251

            
252
1715
  if (status == FilterTrailersStatus::Continue) {
253
1127
    if (iteration_state_ == IterationState::StopSingleIteration) {
254
91
      commonContinue();
255
91
      return false;
256
1061
    } else {
257
1036
      ASSERT(headers_continued_);
258
1036
    }
259
1240
  } else if (status == FilterTrailersStatus::StopIteration) {
260
588
    if (canIterate()) {
261
22
      iteration_state_ = IterationState::StopSingleIteration;
262
22
    }
263
588
    return false;
264
588
  }
265

            
266
1036
  return true;
267
1715
}
268

            
269
271646
OptRef<const Network::Connection> ActiveStreamFilterBase::connection() {
270
271646
  return parent_.connection();
271
271646
}
272

            
273
994295
Event::Dispatcher& ActiveStreamFilterBase::dispatcher() { return parent_.dispatcher_; }
274

            
275
1561118
StreamInfo::StreamInfo& ActiveStreamFilterBase::streamInfo() { return parent_.streamInfo(); }
276

            
277
126612
Tracing::Span& ActiveStreamFilterBase::activeSpan() {
278
126612
  return parent_.filter_manager_callbacks_.activeSpan();
279
126612
}
280

            
281
151504
const ScopeTrackedObject& ActiveStreamFilterBase::scope() {
282
151504
  return parent_.filter_manager_callbacks_.scope();
283
151504
}
284

            
285
void ActiveStreamFilterBase::restoreContextOnContinue(
286
14047
    ScopeTrackedObjectStack& tracked_object_stack) {
287
14047
  parent_.contextOnContinue(tracked_object_stack);
288
14047
}
289

            
290
43806
OptRef<const Tracing::Config> ActiveStreamFilterBase::tracingConfig() const {
291
43806
  return parent_.filter_manager_callbacks_.tracingConfig();
292
43806
}
293

            
294
694
Upstream::ClusterInfoConstSharedPtr ActiveStreamFilterBase::clusterInfo() {
295
694
  return parent_.filter_manager_callbacks_.clusterInfo();
296
694
}
297

            
298
136248
Router::RouteConstSharedPtr ActiveStreamFilterBase::route() { return getRoute(); }
299

            
300
143151
Router::RouteConstSharedPtr ActiveStreamFilterBase::getRoute() const {
301
143151
  if (parent_.filter_manager_callbacks_.downstreamCallbacks()) {
302
142089
    return parent_.filter_manager_callbacks_.downstreamCallbacks()->route(nullptr);
303
142089
  }
304
1062
  return parent_.streamInfo().route();
305
143151
}
306

            
307
12
void ActiveStreamFilterBase::resetIdleTimer() {
308
12
  parent_.filter_manager_callbacks_.resetIdleTimer();
309
12
}
310

            
311
const Router::RouteSpecificFilterConfig*
312
5325
ActiveStreamFilterBase::mostSpecificPerFilterConfig() const {
313
5325
  auto current_route = getRoute();
314
5325
  if (current_route == nullptr) {
315
164
    return nullptr;
316
164
  }
317
5161
  return current_route->mostSpecificPerFilterConfig(filter_context_.config_name);
318
5325
}
319

            
320
1578
Router::RouteSpecificFilterConfigs ActiveStreamFilterBase::perFilterConfigs() const {
321
1578
  Router::RouteConstSharedPtr current_route = getRoute();
322
1578
  if (current_route == nullptr) {
323
11
    return {};
324
11
  }
325

            
326
1567
  return current_route->perFilterConfigs(filter_context_.config_name);
327
1578
}
328

            
329
2
Http1StreamEncoderOptionsOptRef ActiveStreamFilterBase::http1StreamEncoderOptions() {
330
  // TODO(mattklein123): At some point we might want to actually wrap this interface but for now
331
  // we give the filter direct access to the encoder options.
332
2
  return parent_.filter_manager_callbacks_.http1StreamEncoderOptions();
333
2
}
334

            
335
1922
OptRef<DownstreamStreamFilterCallbacks> ActiveStreamFilterBase::downstreamCallbacks() {
336
1922
  return parent_.filter_manager_callbacks_.downstreamCallbacks();
337
1922
}
338

            
339
997070
OptRef<UpstreamStreamFilterCallbacks> ActiveStreamFilterBase::upstreamCallbacks() {
340
997070
  return parent_.filter_manager_callbacks_.upstreamCallbacks();
341
997070
}
342

            
343
310
RequestHeaderMapOptRef ActiveStreamFilterBase::requestHeaders() {
344
310
  return parent_.filter_manager_callbacks_.requestHeaders();
345
310
}
346
156
RequestTrailerMapOptRef ActiveStreamFilterBase::requestTrailers() {
347
156
  return parent_.filter_manager_callbacks_.requestTrailers();
348
156
}
349
3
ResponseHeaderMapOptRef ActiveStreamFilterBase::informationalHeaders() {
350
3
  return parent_.filter_manager_callbacks_.informationalHeaders();
351
3
}
352
202
ResponseHeaderMapOptRef ActiveStreamFilterBase::responseHeaders() {
353
202
  return parent_.filter_manager_callbacks_.responseHeaders();
354
202
}
355
122
ResponseTrailerMapOptRef ActiveStreamFilterBase::responseTrailers() {
356
122
  return parent_.filter_manager_callbacks_.responseTrailers();
357
122
}
358

            
359
309
void ActiveStreamFilterBase::setBufferLimit(uint64_t limit) { parent_.setBufferLimit(limit); }
360

            
361
249276
uint64_t ActiveStreamFilterBase::bufferLimit() { return parent_.buffer_limit_; }
362

            
363
void ActiveStreamFilterBase::sendLocalReply(
364
    Code code, absl::string_view body,
365
    std::function<void(ResponseHeaderMap& headers)> modify_headers,
366
7853
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
367
7853
  if (!streamInfo().filterState()->hasData<LocalReplyOwnerObject>(LocalReplyFilterStateKey)) {
368
7138
    streamInfo().filterState()->setData(
369
7138
        LocalReplyFilterStateKey,
370
7138
        std::make_shared<LocalReplyOwnerObject>(filter_context_.config_name),
371
7138
        StreamInfo::FilterState::StateType::ReadOnly,
372
7138
        StreamInfo::FilterState::LifeSpan::FilterChain);
373
7138
  }
374

            
375
7853
  parent_.sendLocalReply(code, body, modify_headers, grpc_status, details);
376
7853
}
377

            
378
218421
bool ActiveStreamDecoderFilter::canContinue() {
379
  // It is possible for the connection manager to respond directly to a request even while
380
  // a filter is trying to continue. If a response has already happened, we should not
381
  // continue to further filters. A concrete example of this is a filter buffering data, the
382
  // last data frame comes in and the filter continues, but the final buffering takes the stream
383
  // over the high watermark such that a 413 is returned.
384
218421
  return !parent_.stopDecoderFilterChain();
385
218421
}
386

            
387
3886
bool ActiveStreamEncoderFilter::canContinue() {
388
  // As with ActiveStreamDecoderFilter::canContinue() make sure we do not
389
  // continue if a local reply has been sent or ActiveStreamDecoderFilter::recreateStream() is
390
  // called, etc.
391
3886
  return !parent_.state_.encoder_filter_chain_complete_ && !parent_.stopEncoderFilterChain();
392
3886
}
393

            
394
13102
Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() {
395
13102
  auto buffer = dispatcher().getWatermarkFactory().createBuffer(
396
13102
      [this]() -> void { this->requestDataDrained(); },
397
13102
      [this]() -> void { this->requestDataTooLarge(); },
398
13102
      []() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
399
13102
  buffer->setWatermarks(parent_.buffer_limit_);
400
13102
  return buffer;
401
13102
}
402

            
403
166883
Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
404
166883
  return parent_.buffered_request_data_;
405
166883
}
406

            
407
293431
bool ActiveStreamDecoderFilter::observedEndStream() { return parent_.decoderObservedEndStream(); }
408

            
409
43523
void ActiveStreamDecoderFilter::doHeaders(bool end_stream) {
410
43523
  parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream);
411
43523
}
412

            
413
9640
void ActiveStreamDecoderFilter::doData(bool end_stream) {
414
9640
  parent_.decodeData(this, *parent_.buffered_request_data_, end_stream,
415
9640
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
416
9640
}
417

            
418
404
void ActiveStreamDecoderFilter::doTrailers() {
419
404
  parent_.decodeTrailers(this, *parent_.filter_manager_callbacks_.requestTrailers());
420
404
}
421
76458
bool ActiveStreamDecoderFilter::hasTrailers() {
422
76458
  return parent_.filter_manager_callbacks_.requestTrailers().has_value();
423
76458
}
424

            
425
152
void ActiveStreamDecoderFilter::drainSavedRequestMetadata() {
426
152
  ASSERT(saved_request_metadata_ != nullptr);
427
444
  for (auto& metadata_map : *getSavedRequestMetadata()) {
428
444
    parent_.decodeMetadata(this, *metadata_map);
429
444
  }
430
152
  getSavedRequestMetadata()->clear();
431
152
}
432

            
433
104655
void ActiveStreamDecoderFilter::handleMetadataAfterHeadersCallback() {
434
104655
  if (parent_.state_.decoder_filter_chain_aborted_) {
435
    // The decoder filter chain has been aborted, possibly due to a local reply. In this case,
436
    // there's no reason to decode saved metadata.
437
6240
    getSavedRequestMetadata()->clear();
438
6240
    return;
439
6240
  }
440
  // If we drain accumulated metadata, the iteration must start with the current filter.
441
98415
  const bool saved_state = iterate_from_current_filter_;
442
98415
  iterate_from_current_filter_ = true;
443
  // If decodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait
444
  // for doMetadata() to drain the metadata after iteration continues.
445
98415
  if (!stoppedAll() && saved_request_metadata_ != nullptr && !getSavedRequestMetadata()->empty()) {
446
41
    drainSavedRequestMetadata();
447
41
  }
448
  // Restores the original value of iterate_from_current_filter_.
449
98415
  iterate_from_current_filter_ = saved_state;
450
98415
}
451

            
452
30
RequestTrailerMap& ActiveStreamDecoderFilter::addDecodedTrailers() {
453
30
  return parent_.addDecodedTrailers();
454
30
}
455

            
456
1799
void ActiveStreamDecoderFilter::addDecodedData(Buffer::Instance& data, bool streaming) {
457
1799
  parent_.addDecodedData(*this, data, streaming);
458
1799
}
459

            
460
469
MetadataMapVector& ActiveStreamDecoderFilter::addDecodedMetadata() {
461
469
  return parent_.addDecodedMetadata();
462
469
}
463

            
464
void ActiveStreamDecoderFilter::injectDecodedDataToFilterChain(Buffer::Instance& data,
465
5739
                                                               bool end_stream) {
466
5739
  if (!headers_continued_) {
467
139
    headers_continued_ = true;
468
139
    doHeaders(false);
469
139
  }
470
5739
  if (Runtime::runtimeFeatureEnabled(
471
5739
          "envoy.reloadable_features.ext_proc_inject_data_with_state_update")) {
472
5739
    parent_.state().observed_decode_end_stream_ = end_stream;
473
5739
  }
474
5739
  parent_.decodeData(this, data, end_stream,
475
5739
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
476
5739
}
477

            
478
43296
void ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); }
479
116567
const Buffer::Instance* ActiveStreamDecoderFilter::decodingBuffer() {
480
116567
  return parent_.buffered_request_data_.get();
481
116567
}
482

            
483
43629
bool ActiveStreamDecoderFilter::shouldLoadShed() const { return parent_.shouldLoadShed(); }
484

            
485
void ActiveStreamDecoderFilter::modifyDecodingBuffer(
486
125
    std::function<void(Buffer::Instance&)> callback) {
487
125
  ASSERT(parent_.state_.latest_data_decoding_filter_ == this);
488
125
  callback(*parent_.buffered_request_data_.get());
489
125
}
490

            
491
void ActiveStreamDecoderFilter::sendLocalReply(
492
    Code code, absl::string_view body,
493
    std::function<void(ResponseHeaderMap& headers)> modify_headers,
494
7600
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
495
7600
  ActiveStreamFilterBase::sendLocalReply(code, body, modify_headers, grpc_status, details);
496
7600
}
497

            
498
18
void ActiveStreamDecoderFilter::sendGoAwayAndClose(bool graceful) {
499
18
  parent_.sendGoAwayAndClose(graceful);
500
18
}
501

            
502
302
void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers) {
503
  // If Envoy is not configured to proxy 100-Continue responses, swallow the 100 Continue
504
  // here. This avoids the potential situation where Envoy strips Expect: 100-Continue and sends a
505
  // 100-Continue, then proxies a duplicate 100 Continue from upstream.
506
302
  if (parent_.proxy_100_continue_) {
507
282
    parent_.filter_manager_callbacks_.setInformationalHeaders(std::move(headers));
508
282
    parent_.encode1xxHeaders(nullptr, *parent_.filter_manager_callbacks_.informationalHeaders());
509
282
  }
510
302
}
511

            
512
void ActiveStreamDecoderFilter::stopDecodingIfNonTerminalFilterEncodedEndStream(
513
534293
    bool encoded_end_stream) {
514
  // Encoding end_stream by a non-terminal filters (i.e. cache filter) always causes the decoding to
515
  // be stopped even if independent half-close is enabled. For simplicity, independent half-close is
516
  // allowed only when the terminal (router) filter is encoding the response.
517
534293
  if (encoded_end_stream && !parent_.isTerminalDecoderFilter(*this) &&
518
534293
      !parent_.state_.decoder_filter_chain_complete_) {
519
399
    parent_.state_.decoder_filter_chain_aborted_ = true;
520
399
  }
521
534293
}
522

            
523
void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
524
78259
                                              absl::string_view details) {
525
78259
  stopDecodingIfNonTerminalFilterEncodedEndStream(end_stream);
526
78259
  parent_.streamInfo().setResponseCodeDetails(details);
527
78259
  parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers));
528
78259
  parent_.encodeHeaders(nullptr, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
529
78259
}
530

            
531
454836
void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stream) {
532
454836
  stopDecodingIfNonTerminalFilterEncodedEndStream(end_stream);
533
454836
  parent_.encodeData(nullptr, data, end_stream,
534
454836
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
535
454836
}
536

            
537
1198
void ActiveStreamDecoderFilter::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
538
1198
  stopDecodingIfNonTerminalFilterEncodedEndStream(true);
539
1198
  parent_.filter_manager_callbacks_.setResponseTrailers(std::move(trailers));
540
1198
  parent_.encodeTrailers(nullptr, *parent_.filter_manager_callbacks_.responseTrailers());
541
1198
}
542

            
543
1893
void ActiveStreamDecoderFilter::encodeMetadata(MetadataMapPtr&& metadata_map_ptr) {
544
1893
  parent_.encodeMetadata(nullptr, std::move(metadata_map_ptr));
545
1893
}
546

            
547
446344
void ActiveStreamDecoderFilter::onDecoderFilterAboveWriteBufferHighWatermark() {
548
446344
  parent_.filter_manager_callbacks_.onDecoderFilterAboveWriteBufferHighWatermark();
549
446344
}
550

            
551
247
void ActiveStreamDecoderFilter::requestDataTooLarge() {
552
247
  ENVOY_STREAM_LOG(debug, "request data too large watermark exceeded", parent_);
553
247
  if (parent_.state_.decoder_filters_streaming_) {
554
135
    onDecoderFilterAboveWriteBufferHighWatermark();
555
220
  } else {
556
112
    parent_.filter_manager_callbacks_.onRequestDataTooLarge();
557
112
    sendLocalReply(Code::PayloadTooLarge, CodeUtility::toString(Code::PayloadTooLarge), nullptr,
558
112
                   absl::nullopt, StreamInfo::ResponseCodeDetails::get().RequestPayloadTooLarge);
559
112
  }
560
247
}
561

            
562
141481
void FilterManager::maybeContinueDecoding(StreamDecoderFilters::Iterator continue_data_entry) {
563
141481
  if (continue_data_entry != decoder_filters_.end()) {
564
    // We use the continueDecoding() code since it will correctly handle not calling
565
    // decodeHeaders() again. Fake setting StopSingleIteration since the continueDecoding() code
566
    // expects it.
567
121
    ASSERT(buffered_request_data_);
568
121
    (*continue_data_entry)->iteration_state_ =
569
121
        ActiveStreamFilterBase::IterationState::StopSingleIteration;
570
121
    (*continue_data_entry)->continueDecoding();
571
121
  }
572
141481
}
573

            
574
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
575
141482
                                  bool end_stream) {
576
  // If the stream has been reset, do not process any more frames.
577
141482
  if (stopDecoderFilterChain()) {
578
1
    return;
579
1
  }
580

            
581
  // Headers filter iteration should always start with the next filter if available.
582
141481
  StreamDecoderFilters::Iterator entry =
583
141481
      commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
584
141481
  StreamDecoderFilters::Iterator continue_data_entry = decoder_filters_.end();
585
141481
  bool terminal_filter_decoded_end_stream = false;
586
141481
  ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end() ||
587
141481
         (*entry)->end_stream_);
588

            
589
240763
  for (; entry != decoder_filters_.end(); entry++) {
590
104655
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
591
104655
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
592
104655
    state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
593
104655
    (*entry)->end_stream_ = (end_stream && continue_data_entry == decoder_filters_.end());
594
104655
    if ((*entry)->end_stream_) {
595
73925
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
596
73925
    }
597
104655
    FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);
598
104655
    state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
599
104655
    if ((*entry)->end_stream_) {
600
73925
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
601
73925
    }
602
104655
    if (state_.decoder_filter_chain_aborted_) {
603
6240
      executeLocalReplyIfPrepared();
604
6240
      ENVOY_STREAM_LOG(trace,
605
6240
                       "decodeHeaders filter iteration aborted due to local reply: filter={}",
606
6240
                       *this, (*entry)->filter_context_.config_name);
607
6240
      status = FilterHeadersStatus::StopIteration;
608
6240
    }
609

            
610
104655
    ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
611
104655
           "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
612
104655
           "decodeHeaders when end_stream is already false");
613

            
614
104655
    ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
615
104655
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
616

            
617
104655
    (*entry)->processed_headers_ = true;
618

            
619
104655
    const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
620
104655
    ENVOY_BUG(!continue_iteration || !stopDecoderFilterChain(),
621
104655
              fmt::format(
622
104655
                  "filter={} did not return StopAll or StopIteration after sending a local reply.",
623
104655
                  (*entry)->filter_context_.config_name));
624

            
625
    // If this filter ended the stream, decodeComplete() should be called for it.
626
104655
    if ((*entry)->end_stream_) {
627
73925
      (*entry)->handle_->decodeComplete();
628
73925
    }
629

            
630
    // Skip processing metadata after sending local reply
631
104655
    if (stopDecoderFilterChain() && std::next(entry) != decoder_filters_.end()) {
632
2716
      maybeContinueDecoding(continue_data_entry);
633
2716
      return;
634
2716
    }
635

            
636
101939
    const bool new_metadata_added = processNewlyAddedMetadata();
637
    // If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream
638
    // in headers by inserting an empty data frame with end_stream set. The empty data frame is sent
639
    // after the new metadata.
640
101939
    if ((*entry)->end_stream_ && new_metadata_added && !buffered_request_data_) {
641
10
      Buffer::OwnedImpl empty_data("");
642
10
      ENVOY_STREAM_LOG(
643
10
          trace, "inserting an empty data frame for end_stream due metadata being added.", *this);
644
      // Metadata frame doesn't carry end of stream bit. We need an empty data frame to end the
645
      // stream.
646
10
      addDecodedData(*(*entry), empty_data, true);
647
10
    }
648

            
649
101939
    if (!continue_iteration && std::next(entry) != decoder_filters_.end()) {
650
      // Stop iteration IFF this is not the last filter. If it is the last filter, continue with
651
      // processing since we need to handle the case where a terminal filter wants to buffer, but
652
      // a previous filter has added body.
653
2657
      maybeContinueDecoding(continue_data_entry);
654
2657
      return;
655
2657
    }
656

            
657
    // Here we handle the case where we have a header only request, but a filter adds a body
658
    // to it. We need to not raise end_stream = true to further filters during inline iteration.
659
99282
    if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
660
121
      continue_data_entry = entry;
661
121
    }
662
    // The decoder filter chain is finished here if the following is true:
663
    // 1. the last filter has observed the end_stream AND
664
    // 2. no filter, including the last filter, has injected body during header iteration.
665
    // If body was injected the end_stream will be processed in the `decodeData()` method below.
666
99282
    const bool no_body_was_injected = continue_data_entry == decoder_filters_.end();
667
99282
    terminal_filter_decoded_end_stream =
668
99282
        (std::next(entry) == decoder_filters_.end() && (*entry)->end_stream_) &&
669
99282
        no_body_was_injected;
670
99282
  }
671

            
672
136108
  maybeContinueDecoding(continue_data_entry);
673

            
674
136108
  if (end_stream) {
675
92532
    disarmRequestTimeout();
676
92532
  }
677
136108
  maybeEndDecode(terminal_filter_decoded_end_stream);
678
136108
}
679

            
680
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
681
                               bool end_stream,
682
451411
                               FilterIterationStartState filter_iteration_start_state) {
683
451411
  ScopeTrackerScopeState scope(this, dispatcher_);
684
451411
  filter_manager_callbacks_.resetIdleTimer();
685

            
686
  // If a response is complete or a reset has been sent, filters do not care about further body
687
  // data. Just drop it.
688
451411
  if (stopDecoderFilterChain()) {
689
62
    return;
690
62
  }
691

            
692
451349
  auto trailers_added_entry = decoder_filters_.end();
693
451349
  const bool trailers_exists_at_start = filter_manager_callbacks_.requestTrailers().has_value();
694
  // Filter iteration may start at the current filter.
695
451349
  StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, filter_iteration_start_state);
696
451349
  bool terminal_filter_decoded_end_stream = false;
697
451349
  ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end() ||
698
451349
         (*entry)->end_stream_);
699

            
700
763187
  for (; entry != decoder_filters_.end(); entry++) {
701
458984
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
702
    // If the filter pointed by entry has stopped for all frame types, return now.
703
458984
    if (handleDataIfStopAll(**entry, data, state_.decoder_filters_streaming_)) {
704
15689
      return;
705
15689
    }
706
    // If end_stream_ is marked for a filter, the data is not for this filter and filters after.
707
    //
708
    // In following case, ActiveStreamFilterBase::commonContinue() could be called recursively and
709
    // its doData() is called with wrong data.
710
    //
711
    //  There are 3 decode filters and "wrapper" refers to ActiveStreamFilter object.
712
    //
713
    //  filter0->decodeHeaders(_, true)
714
    //    return STOP
715
    //  filter0->continueDecoding()
716
    //    wrapper0->commonContinue()
717
    //      wrapper0->decodeHeaders(_, _, true)
718
    //        filter1->decodeHeaders(_, true)
719
    //          filter1->addDecodeData()
720
    //          return CONTINUE
721
    //        filter2->decodeHeaders(_, false)
722
    //          return CONTINUE
723
    //        wrapper1->commonContinue() // Detects data is added.
724
    //          wrapper1->doData()
725
    //            wrapper1->decodeData()
726
    //              filter2->decodeData(_, true)
727
    //                 return CONTINUE
728
    //      wrapper0->doData() // This should not be called
729
    //        wrapper0->decodeData()
730
    //          filter1->decodeData(_, true)  // It will cause assertions.
731
    //
732
    // One way to solve this problem is to mark end_stream_ for each filter.
733
    // If a filter is already marked as end_stream_ when decodeData() is called, bails out the
734
    // whole function. If just skip the filter, the codes after the loop will be called with
735
    // wrong data. For encodeData, the response_encoder->encode() will be called.
736
443295
    if ((*entry)->end_stream_) {
737
106
      return;
738
106
    }
739
443189
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeData));
740

            
741
    // We check the request_trailers_ pointer here in case addDecodedTrailers
742
    // is called in decodeData during a previous filter invocation, at which point we communicate to
743
    // the current and future filters that the stream has not yet ended.
744
443189
    if (end_stream) {
745
15718
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
746
15718
    }
747

            
748
443189
    recordLatestDataFilter(entry, state_.latest_data_decoding_filter_, decoder_filters_);
749

            
750
443189
    state_.filter_call_state_ |= FilterCallState::DecodeData;
751
443189
    (*entry)->end_stream_ = end_stream && !filter_manager_callbacks_.requestTrailers();
752
443189
    FilterDataStatus status = (*entry)->handle_->decodeData(data, (*entry)->end_stream_);
753
443189
    if ((*entry)->end_stream_) {
754
15718
      (*entry)->handle_->decodeComplete();
755
15718
    }
756
443189
    state_.filter_call_state_ &= ~FilterCallState::DecodeData;
757
443189
    if (end_stream) {
758
15718
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
759
15718
    }
760
443189
    ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this,
761
443189
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
762
443189
    if (state_.decoder_filter_chain_aborted_) {
763
149
      executeLocalReplyIfPrepared();
764
149
      ENVOY_STREAM_LOG(trace, "decodeData filter iteration aborted due to local reply: filter={}",
765
149
                       *this, (*entry)->filter_context_.config_name);
766
149
      return;
767
149
    }
768

            
769
443040
    processNewlyAddedMetadata();
770

            
771
443040
    if (!trailers_exists_at_start && filter_manager_callbacks_.requestTrailers() &&
772
443040
        trailers_added_entry == decoder_filters_.end()) {
773
26
      end_stream = false;
774
26
      trailers_added_entry = entry;
775
26
    }
776

            
777
    // The decoder filter chain is finished here if the following is true:
778
    // 1. the last filter has observed the end_stream AND
779
    // 2. no filter, including the last filter, has injected trailers during header iteration.
780
    //    NOTE: end_stream is set to false above if a filter injected trailers.
781
    // If trailers were injected the end_stream will be processed in the `decodeTrailers()` method
782
    // below.
783
443040
    terminal_filter_decoded_end_stream = end_stream && std::next(entry) == decoder_filters_.end();
784

            
785
443040
    if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) &&
786
443040
        std::next(entry) != decoder_filters_.end()) {
787
      // Stop iteration IFF this is not the last filter. If it is the last filter, continue with
788
      // processing since we need to handle the case where a terminal filter wants to buffer, but
789
      // a previous filter has added trailers.
790
131202
      break;
791
131202
    }
792
443040
  }
793

            
794
  // If trailers were adding during decodeData we need to trigger decodeTrailers in order
795
  // to allow filters to process the trailers.
796
435405
  if (trailers_added_entry != decoder_filters_.end()) {
797
10
    decodeTrailers(trailers_added_entry->get(), *filter_manager_callbacks_.requestTrailers());
798
10
  }
799

            
800
435405
  if (end_stream) {
801
14235
    disarmRequestTimeout();
802
14235
  }
803
435405
  maybeEndDecode(terminal_filter_decoded_end_stream);
804
435405
}
805

            
806
30
RequestTrailerMap& FilterManager::addDecodedTrailers() {
807
  // Trailers can only be added during the last data frame (i.e. end_stream = true).
808
30
  ASSERT(state_.filter_call_state_ & FilterCallState::EndOfStream);
809

            
810
30
  filter_manager_callbacks_.setRequestTrailers(RequestTrailerMapImpl::create());
811
30
  return *filter_manager_callbacks_.requestTrailers();
812
30
}
813

            
814
void FilterManager::addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data,
815
1809
                                   bool streaming) {
816
1809
  if (state_.filter_call_state_ == 0 ||
817
1809
      (state_.filter_call_state_ & FilterCallState::DecodeHeaders) ||
818
1809
      (state_.filter_call_state_ & FilterCallState::DecodeData) ||
819
1809
      ((state_.filter_call_state_ & FilterCallState::DecodeTrailers) && !filter.canIterate())) {
820
    // Make sure if this triggers watermarks, the correct action is taken.
821
1761
    state_.decoder_filters_streaming_ = streaming;
822
    // If no call is happening or we are in the decode headers/data callback, buffer the data.
823
    // Inline processing happens in the decodeHeaders() callback if necessary.
824
1761
    filter.commonHandleBufferData(data);
825
1761
  } else if (state_.filter_call_state_ & FilterCallState::DecodeTrailers) {
826
    // In this case we need to inline dispatch the data to further filters. If those filters
827
    // choose to buffer/stop iteration that's fine.
828
37
    decodeData(&filter, data, false, FilterIterationStartState::AlwaysStartFromNext);
829
42
  } else {
830
11
    IS_ENVOY_BUG("Invalid request data");
831
11
    sendLocalReply(Http::Code::BadGateway, "Filter error", nullptr, absl::nullopt,
832
11
                   StreamInfo::ResponseCodeDetails::get().FilterAddedInvalidRequestData);
833
11
  }
834
1809
}
835

            
836
469
MetadataMapVector& FilterManager::addDecodedMetadata() { return *getRequestMetadataMapVector(); }
837

            
838
1370
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers) {
839
  // If a response is complete or a reset has been sent, filters do not care about further body
840
  // data. Just drop it.
841
1370
  if (stopDecoderFilterChain()) {
842
9
    return;
843
9
  }
844

            
845
  // Filter iteration may start at the current filter.
846
1361
  StreamDecoderFilters::Iterator entry =
847
1361
      commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);
848
1361
  bool terminal_filter_reached = false;
849
1361
  ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end());
850

            
851
2589
  for (; entry != decoder_filters_.end(); entry++) {
852
1626
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
853
    // If the filter pointed by entry has stopped for all frame type, return now.
854
1626
    if ((*entry)->stoppedAll()) {
855
298
      return;
856
298
    }
857
1328
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeTrailers));
858
1328
    state_.filter_call_state_ |= FilterCallState::DecodeTrailers;
859
1328
    FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers);
860
1328
    (*entry)->handle_->decodeComplete();
861
1328
    (*entry)->end_stream_ = true;
862
1328
    state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers;
863
1328
    ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this,
864
1328
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
865
1328
    if (state_.decoder_filter_chain_aborted_) {
866
11
      executeLocalReplyIfPrepared();
867
11
      ENVOY_STREAM_LOG(trace,
868
11
                       "decodeTrailers filter iteration aborted due to local reply: filter={}",
869
11
                       *this, (*entry)->filter_context_.config_name);
870
11
      status = FilterTrailersStatus::StopIteration;
871
11
    }
872

            
873
1328
    processNewlyAddedMetadata();
874
    // Check if the last filter has processed trailers
875
1328
    terminal_filter_reached = std::next(entry) == decoder_filters_.end();
876

            
877
1328
    if (!(*entry)->commonHandleAfterTrailersCallback(status) && !terminal_filter_reached) {
878
100
      return;
879
100
    }
880
1328
  }
881
963
  disarmRequestTimeout();
882
963
  maybeEndDecode(terminal_filter_reached);
883
963
}
884

            
885
3064
void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) {
886
3064
  ScopeTrackerScopeState scope(&*this, dispatcher_);
887
3064
  filter_manager_callbacks_.resetIdleTimer();
888

            
889
  // If the stream has been reset, do not process any more frames.
890
3064
  if (stopDecoderFilterChain()) {
891
2
    return;
892
2
  }
893

            
894
  // Filter iteration may start at the current filter.
895
3062
  StreamDecoderFilters::Iterator entry =
896
3062
      commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);
897

            
898
3062
  ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeMetadata));
899

            
900
6080
  for (; entry != decoder_filters_.end(); entry++) {
901
3640
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
902
    // If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
903
    // If the filter pointed by entry hasn't returned from decodeHeaders, stores newly added
904
    // metadata in case decodeHeaders returns StopAllIteration. The latter can happen when headers
905
    // callbacks generate new metadata.
906
3640
    if (!(*entry)->processed_headers_ || (*entry)->stoppedAll()) {
907
561
      Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
908
561
      (*entry)->getSavedRequestMetadata()->emplace_back(std::move(metadata_map_ptr));
909
561
      return;
910
561
    }
911
3079
    state_.filter_call_state_ |= FilterCallState::DecodeMetadata;
912
3079
    FilterMetadataStatus status = (*entry)->handle_->decodeMetadata(metadata_map);
913
3079
    state_.filter_call_state_ &= ~FilterCallState::DecodeMetadata;
914

            
915
3079
    ENVOY_STREAM_LOG(trace, "decode metadata called: filter={} status={}, metadata: {}", *this,
916
3079
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status),
917
3079
                     metadata_map);
918
3079
    if (state_.decoder_filter_chain_aborted_) {
919
      // If the decoder filter chain has been aborted, then either:
920
      // 1. This filter has sent a local reply or GoAway from decode metadata.
921
      // 2. This filter is the terminal http filter, and an upstream HTTP filter has sent a local
922
      // reply.
923
7
      ASSERT((status == FilterMetadataStatus::StopIterationForLocalReply) ||
924
7
             (std::next(entry) == decoder_filters_.end()));
925
7
      executeLocalReplyIfPrepared();
926
7
      ENVOY_STREAM_LOG(trace,
927
7
                       "decodeMetadata filter iteration aborted due to local reply: filter={}",
928
7
                       *this, (*entry)->filter_context_.config_name);
929
7
      return;
930
7
    }
931

            
932
    // Add the processed metadata to the next entry.
933
3072
    if (status == FilterMetadataStatus::ContinueAll && !(*entry)->canIterate()) {
934
54
      if (std::next(entry) != decoder_filters_.end()) {
935
54
        (*std::next(entry))
936
54
            ->getSavedRequestMetadata()
937
54
            ->emplace_back(std::make_unique<MetadataMap>(metadata_map));
938
54
      }
939
54
      (*entry)->commonContinue();
940
54
      return;
941
54
    }
942
3072
  }
943
3062
}
944

            
945
293298
void FilterManager::disarmRequestTimeout() { filter_manager_callbacks_.disarmRequestTimeout(); }
946

            
947
StreamEncoderFilters::Iterator
948
FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
949
553719
                                  FilterIterationStartState filter_iteration_start_state) {
950
  // Only do base state setting on the initial call. Subsequent calls for filtering do not touch
951
  // the base state.
952
553719
  ENVOY_STREAM_LOG(trace, "commonEncodePrefix end_stream: {}, isHalfCloseEnabled: {}", *this,
953
553719
                   end_stream, filter_manager_callbacks_.isHalfCloseEnabled());
954
553719
  if (filter == nullptr) {
955
549499
    if (end_stream) {
956
84468
      ASSERT(!state_.observed_encode_end_stream_);
957
84468
      state_.observed_encode_end_stream_ = true;
958

            
959
      // When half close semantics are disabled, receiving end stream from the upstream causes
960
      // decoder filter to stop, as neither filters nor upstream is interested in downstream data.
961
      // half close is enabled in case tcp proxying is done with http1 encoder. In this case, we
962
      // should not stop decoder filter chain when end_stream is true, as it will cause any data
963
      // sent in the upstream direction to be
964
      // dropped.
965
84468
      if (!filter_manager_callbacks_.isHalfCloseEnabled()) {
966
84179
        state_.decoder_filter_chain_aborted_ = true;
967
84179
      }
968
84468
    }
969
549499
    return encoder_filters_.begin();
970
549499
  }
971

            
972
4220
  if (filter_iteration_start_state == FilterIterationStartState::CanStartFromCurrent &&
973
4220
      (*(filter->entry()))->iterate_from_current_filter_) {
974
    // The filter iteration has been stopped for all frame types, and now the iteration continues.
975
    // The current filter's encoding callback has not be called. Call it now.
976
130
    return filter->entry();
977
130
  }
978
4090
  return std::next(filter->entry());
979
4220
}
980

            
981
StreamDecoderFilters::Iterator
982
FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter,
983
597253
                                  FilterIterationStartState filter_iteration_start_state) {
984
597253
  if (!filter) {
985
537465
    return decoder_filters_.begin();
986
537465
  }
987
59788
  if (filter_iteration_start_state == FilterIterationStartState::CanStartFromCurrent &&
988
59788
      (*(filter->entry()))->iterate_from_current_filter_) {
989
    // The filter iteration has been stopped for all frame types, and now the iteration continues.
990
    // The current filter's callback function has not been called. Call it now.
991
9517
    return filter->entry();
992
9517
  }
993
50271
  return std::next(filter->entry());
994
59788
}
995

            
996
10033
void DownstreamFilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) {
997
10033
  state_.under_on_local_reply_ = true;
998
10033
  filter_manager_callbacks_.onLocalReply(data.code_);
999

            
13439
  for (auto entry : filters_) {
11056
    if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
3
      data.reset_imminent_ = true;
3
    }
11056
  }
10033
  state_.under_on_local_reply_ = false;
10033
}
void DownstreamFilterManager::sendLocalReply(
    Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
10033
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
10033
  ASSERT(!state_.under_on_local_reply_);
10033
  const bool is_head_request = state_.is_head_request_;
10033
  const bool is_grpc_request = state_.is_grpc_request_;
  // Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks
  // are running.
10033
  if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
5274
    state_.decoder_filter_chain_aborted_ = true;
9542
  } else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
158
    state_.encoder_filter_chain_aborted_ = true;
158
  }
  // When independent half-close is enabled local reply always stops decoder filter chain.
10033
  if (filter_manager_callbacks_.isHalfCloseEnabled()) {
8
    state_.decoder_filter_chain_aborted_ = true;
8
  }
10033
  streamInfo().setResponseCodeDetails(details);
10033
  StreamFilterBase::LocalReplyData data{code, grpc_status, details, false};
10033
  onLocalReply(data);
10033
  if (data.reset_imminent_) {
3
    ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this,
3
                     details);
3
    filter_manager_callbacks_.resetStream(Http::StreamResetReason::LocalReset, "");
3
    return;
3
  }
10030
  if (!filter_manager_callbacks_.responseHeaders().has_value() &&
10030
      (!filter_manager_callbacks_.informationalHeaders().has_value() ||
9495
       !(state_.filter_call_state_ & FilterCallState::IsEncodingMask))) {
    // If the response has not started at all, or if the only response so far is an informational
    // 1xx that has already been fully processed, send the response through the filter chain.
9492
    if (auto cb = filter_manager_callbacks_.downstreamCallbacks(); cb.has_value()) {
      // The initial route maybe never be set or the cached route maybe cleared by the filters.
      // This will force route refreshment if there is not a cached route to avoid potential
      // route refreshment in the response filter chain.
9481
      cb->route(nullptr);
9481
    }
    // We only prepare a local reply to execute later if we're actively invoking filters to avoid
    // re-entrant in filters.
    //
    // For reverse connections (where upstream initiates the connection to downstream), we need to
    // send local replies immediately rather than queuing them. This ensures proper handling of the
    // reversed connection flow and prevents potential issues with connection state and filter chain
    // processing.
9492
    if (!Runtime::runtimeFeatureEnabled(
9492
            "envoy.reloadable_features.reverse_conn_force_local_reply") &&
9492
        (state_.filter_call_state_ & FilterCallState::IsDecodingMask)) {
5271
      prepareLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
5271
                                      grpc_status, details);
9112
    } else {
4221
      sendLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
4221
                                   grpc_status, details);
4221
    }
9582
  } else if (!state_.non_100_response_headers_encoded_) {
241
    ENVOY_STREAM_LOG(debug, "Sending local reply with details {} directly to the encoder", *this,
241
                     details);
    // In this case, at least the header and possibly the body has started
    // processing through the filter chain, but no non-informational headers
    // have been sent downstream. To ensure that filters don't get their
    // state machine screwed up, bypass the filter chain and send the local
    // reply directly to the codec.
    //
241
    sendDirectLocalReply(code, body, modify_headers, state_.is_head_request_, grpc_status);
491
  } else {
    // If we land in this branch, response headers have already been sent to the client.
    // All we can do at this point is reset the stream.
297
    ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. Prior headers have already been sent",
297
                     *this, details);
    // TODO(snowp): This means we increment the tx_reset stat which we weren't doing previously.
    // Intended?
297
    filter_manager_callbacks_.resetStream();
297
  }
10030
}
void DownstreamFilterManager::prepareLocalReplyViaFilterChain(
    bool is_grpc_request, Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
5271
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
5271
  ENVOY_STREAM_LOG(debug, "Preparing local reply with details {}", *this, details);
5271
  ASSERT(!filter_manager_callbacks_.responseHeaders().has_value());
  // For early error handling, do a best-effort attempt to create a filter chain
  // to ensure access logging. If the filter chain already exists this will be
  // a no-op.
5271
  createDownstreamFilterChain();
5271
  if (prepared_local_reply_) {
2
    return;
2
  }
5269
  prepared_local_reply_ = Utility::prepareLocalReply(
5269
      Utility::EncodeFunctions{
5269
          [this, modify_headers](ResponseHeaderMap& headers) -> void {
5269
            finalizeHeaders(filter_manager_callbacks_, streamInfo(), headers);
5269
            if (modify_headers) {
617
              modify_headers(headers);
617
            }
5269
          },
5269
          [this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
5269
                 absl::string_view& content_type) -> void {
5269
            local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
5269
                                 streamInfo(), code, body, content_type);
5269
          },
5269
          [this](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
5269
            filter_manager_callbacks_.setResponseHeaders(std::move(headers));
5269
            encodeHeaders(nullptr, filter_manager_callbacks_.responseHeaders().ref(), end_stream);
5269
          },
5269
          [this](Buffer::Instance& data, bool end_stream) -> void {
650
            encodeData(nullptr, data, end_stream,
650
                       FilterManager::FilterIterationStartState::CanStartFromCurrent);
650
          }},
5269
      Utility::LocalReplyData{is_grpc_request, code, body, grpc_status, is_head_request});
5269
}
5730
void DownstreamFilterManager::executeLocalReplyIfPrepared() {
5730
  if (!prepared_local_reply_) {
461
    return;
461
  }
5269
  ENVOY_STREAM_LOG(debug, "Executing sending local reply.", *this);
5269
  Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_));
5269
}
99880
FilterManager::CreateChainResult DownstreamFilterManager::createDownstreamFilterChain() {
99880
  return createFilterChain(filter_chain_factory_);
99880
}
void DownstreamFilterManager::sendLocalReplyViaFilterChain(
    bool is_grpc_request, Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
4221
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
4221
  ENVOY_STREAM_LOG(debug, "Sending local reply with details {}", *this, details);
4221
  ASSERT(!filter_manager_callbacks_.responseHeaders().has_value());
  // For early error handling, do a best-effort attempt to create a filter chain
  // to ensure access logging. If the filter chain already exists this will be
  // a no-op.
4221
  createDownstreamFilterChain();
4221
  Utility::sendLocalReply(
4221
      state_.destroyed_,
4221
      Utility::EncodeFunctions{
4221
          [this, modify_headers](ResponseHeaderMap& headers) -> void {
4221
            finalizeHeaders(filter_manager_callbacks_, streamInfo(), headers);
4221
            if (modify_headers) {
1414
              modify_headers(headers);
1414
            }
4221
          },
4221
          [this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
4221
                 absl::string_view& content_type) -> void {
4221
            local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
4221
                                 streamInfo(), code, body, content_type);
4221
          },
4221
          [this](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
4221
            filter_manager_callbacks_.setResponseHeaders(std::move(headers));
4221
            encodeHeaders(nullptr, filter_manager_callbacks_.responseHeaders().ref(), end_stream);
4221
          },
4221
          [this](Buffer::Instance& data, bool end_stream) -> void {
2891
            encodeData(nullptr, data, end_stream,
2891
                       FilterManager::FilterIterationStartState::CanStartFromCurrent);
2891
          }},
4221
      Utility::LocalReplyData{is_grpc_request, code, body, grpc_status, is_head_request});
4221
}
void DownstreamFilterManager::sendDirectLocalReply(
    Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap&)>& modify_headers, bool is_head_request,
241
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status) {
  // Make sure we won't end up with nested watermark calls from the body buffer.
241
  state_.encoder_filters_streaming_ = true;
241
  Http::Utility::sendLocalReply(
241
      state_.destroyed_,
241
      Utility::EncodeFunctions{
241
          [this, modify_headers](ResponseHeaderMap& headers) -> void {
241
            finalizeHeaders(filter_manager_callbacks_, streamInfo(), headers);
241
            if (modify_headers) {
73
              modify_headers(headers);
73
            }
241
          },
241
          [&](ResponseHeaderMap& response_headers, Code& code, std::string& body,
241
              absl::string_view& content_type) -> void {
241
            local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
241
                                 streamInfo(), code, body, content_type);
241
          },
241
          [&](ResponseHeaderMapPtr&& response_headers, bool end_stream) -> void {
            // Move the response headers into the FilterManager to make sure they're visible to
            // access logs.
241
            filter_manager_callbacks_.setResponseHeaders(std::move(response_headers));
241
            state_.non_100_response_headers_encoded_ = true;
241
            filter_manager_callbacks_.encodeHeaders(*filter_manager_callbacks_.responseHeaders(),
241
                                                    end_stream);
241
            if (state_.saw_downstream_reset_) {
              return;
            }
241
            maybeEndEncode(end_stream);
241
          },
241
          [&](Buffer::Instance& data, bool end_stream) -> void {
171
            filter_manager_callbacks_.encodeData(data, end_stream);
171
            if (state_.saw_downstream_reset_) {
1
              return;
1
            }
170
            maybeEndEncode(end_stream);
170
          }},
241
      Utility::LocalReplyData{state_.is_grpc_request_, code, body, grpc_status, is_head_request});
241
}
void FilterManager::encode1xxHeaders(ActiveStreamEncoderFilter* filter,
283
                                     ResponseHeaderMap& headers) {
283
  filter_manager_callbacks_.resetIdleTimer();
283
  ASSERT(proxy_100_continue_);
  // The caller must guarantee that encode1xxHeaders() is invoked at most once.
283
  ASSERT(!state_.has_1xx_headers_ || filter != nullptr);
  // Make sure commonContinue continues encode1xxHeaders.
283
  state_.has_1xx_headers_ = true;
  // Similar to the block in encodeHeaders, run encode1xxHeaders on each
  // filter. This is simpler than that case because 100 continue implies no
  // end-stream, and because there are normal headers coming there's no need for
  // complex continuation logic.
  // 100-continue filter iteration should always start with the next filter if available.
283
  StreamEncoderFilters::Iterator entry =
283
      commonEncodePrefix(filter, false, FilterIterationStartState::AlwaysStartFromNext);
300
  for (; entry != encoder_filters_.end(); entry++) {
21
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
21
    ASSERT(!(state_.filter_call_state_ & FilterCallState::Encode1xxHeaders));
21
    state_.filter_call_state_ |= FilterCallState::Encode1xxHeaders;
21
    const Filter1xxHeadersStatus status = (*entry)->handle_->encode1xxHeaders(headers);
21
    state_.filter_call_state_ &= ~FilterCallState::Encode1xxHeaders;
21
    ENVOY_STREAM_LOG(trace, "encode 1xx continue headers called: filter={} status={}", *this,
21
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
21
    if (state_.encoder_filter_chain_aborted_) {
3
      ENVOY_STREAM_LOG(trace,
3
                       "encode1xxHeaders filter iteration aborted due to local reply: filter={}",
3
                       *this, (*entry)->filter_context_.config_name);
3
      return;
3
    }
18
    if (!(*entry)->commonHandleAfter1xxHeadersCallback(status)) {
1
      return;
1
    }
18
  }
279
  filter_manager_callbacks_.encode1xxHeaders(headers);
279
}
34111
void FilterManager::maybeContinueEncoding(StreamEncoderFilters::Iterator continue_data_entry) {
34111
  if (continue_data_entry != encoder_filters_.end()) {
    // We use the continueEncoding() code since it will correctly handle not calling
    // encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code
    // expects it.
49
    ASSERT(buffered_response_data_);
49
    (*continue_data_entry)->iteration_state_ =
49
        ActiveStreamFilterBase::IterationState::StopSingleIteration;
49
    (*continue_data_entry)->continueEncoding();
49
  }
34111
}
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
88490
                                  bool end_stream) {
  // See encodeHeaders() comments in envoy/http/filter.h for why the 1xx precondition holds.
88490
  ASSERT(!CodeUtility::is1xx(Utility::getResponseStatus(headers)) ||
88490
         Utility::getResponseStatus(headers) == enumToInt(Http::Code::SwitchingProtocols));
88490
  filter_manager_callbacks_.resetIdleTimer();
88490
  disarmRequestTimeout();
  // Headers filter iteration should always start with the next filter if available.
88490
  StreamEncoderFilters::Iterator entry =
88490
      commonEncodePrefix(filter, end_stream, FilterIterationStartState::AlwaysStartFromNext);
88490
  StreamEncoderFilters::Iterator continue_data_entry = encoder_filters_.end();
96106
  for (; entry != encoder_filters_.end(); entry++) {
8694
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
8694
    ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders));
8694
    state_.filter_call_state_ |= FilterCallState::EncodeHeaders;
8694
    (*entry)->end_stream_ = (end_stream && continue_data_entry == encoder_filters_.end());
8694
    if ((*entry)->end_stream_) {
4026
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
4026
    }
8694
    FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);
8694
    if (state_.encoder_filter_chain_aborted_) {
242
      ENVOY_STREAM_LOG(trace,
242
                       "encodeHeaders filter iteration aborted due to local reply: filter={}",
242
                       *this, (*entry)->filter_context_.config_name);
242
      status = FilterHeadersStatus::StopIteration;
242
    }
8694
    ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
8694
           "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
8694
           "encodeHeaders when end_stream is already false");
8694
    state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders;
8694
    if ((*entry)->end_stream_) {
4026
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
4026
    }
8694
    ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this,
8694
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
8694
    (*entry)->processed_headers_ = true;
8694
    const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
    // If this filter ended the stream, encodeComplete() should be called for it.
8694
    if ((*entry)->end_stream_) {
4026
      (*entry)->handle_->encodeComplete();
4026
    }
8694
    if (!continue_iteration) {
1078
      if (!(*entry)->end_stream_) {
834
        maybeContinueEncoding(continue_data_entry);
834
      }
1078
      return;
1078
    }
    // Here we handle the case where we have a header only response, but a filter adds a body
    // to it. We need to not raise end_stream = true to further filters during inline iteration.
7616
    if (end_stream && buffered_response_data_ && continue_data_entry == encoder_filters_.end()) {
49
      continue_data_entry = entry;
49
    }
7616
  }
  // Check if the filter chain above did not remove critical headers or set malformed header values.
  // We could do this at the codec in order to prevent other places than the filter chain from
  // removing critical headers, but it will come with the implementation complexity.
  // See the previous attempt (#15658) for detail, and for now we choose to protect only against
  // filter chains.
87412
  const auto status = HeaderUtility::checkRequiredResponseHeaders(headers);
87412
  if (!status.ok()) {
    // If the check failed, then we reply with BadGateway, and stop the further processing.
35
    sendLocalReply(
35
        Http::Code::BadGateway, status.message(), nullptr, absl::nullopt,
35
        absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredResponseHeaders,
35
                     "{", StringUtil::replaceAllEmptySpace(status.message()), "}"));
35
    return;
35
  }
87377
  const bool modified_end_stream = (end_stream && continue_data_entry == encoder_filters_.end());
87377
  state_.non_100_response_headers_encoded_ = true;
87377
  filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);
87377
  if (state_.saw_downstream_reset_) {
2
    return;
2
  }
87375
  maybeEndEncode(modified_end_stream);
87375
  if (!modified_end_stream) {
33277
    maybeContinueEncoding(continue_data_entry);
33277
  }
87375
}
void FilterManager::encodeMetadata(ActiveStreamEncoderFilter* filter,
2723
                                   MetadataMapPtr&& metadata_map_ptr) {
2723
  filter_manager_callbacks_.resetIdleTimer();
2723
  StreamEncoderFilters::Iterator entry =
2723
      commonEncodePrefix(filter, false, FilterIterationStartState::CanStartFromCurrent);
2795
  for (; entry != encoder_filters_.end(); entry++) {
98
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
    // If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
    // If the filter pointed by entry hasn't returned from encodeHeaders, stores newly added
    // metadata in case encodeHeaders returns StopAllIteration. The latter can happen when headers
    // callbacks generate new metadata.
98
    if (!(*entry)->processed_headers_ || (*entry)->stoppedAll()) {
15
      (*entry)->getSavedResponseMetadata()->emplace_back(std::move(metadata_map_ptr));
15
      return;
15
    }
83
    state_.filter_call_state_ |= FilterCallState::EncodeMetadata;
83
    FilterMetadataStatus status = (*entry)->handle_->encodeMetadata(*metadata_map_ptr);
83
    state_.filter_call_state_ &= ~FilterCallState::EncodeMetadata;
83
    ENVOY_STREAM_LOG(trace, "encode metadata called: filter={} status={} metadata={}", *this,
83
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status),
83
                     *metadata_map_ptr);
83
    if (status == FilterMetadataStatus::StopIterationForLocalReply) {
      // In case of a local reply, we do not continue encoding metadata. Metadata is not sent on to
      // the client.
5
      return;
5
    }
78
    if (status == FilterMetadataStatus::ContinueAll && !(*entry)->canIterate()) {
6
      if (std::next(entry) != encoder_filters_.end()) {
1
        (*std::next(entry))->getSavedResponseMetadata()->emplace_back(std::move(metadata_map_ptr));
6
      } else {
5
        filter_manager_callbacks_.encodeMetadata(std::move(metadata_map_ptr));
5
      }
6
      (*entry)->commonContinue();
6
      return;
6
    }
78
  }
  // TODO(soya3129): update stats with metadata.
  // Now encode metadata via the codec.
2697
  if (!metadata_map_ptr->empty()) {
2697
    filter_manager_callbacks_.encodeMetadata(std::move(metadata_map_ptr));
2697
  }
2697
}
35
ResponseTrailerMap& FilterManager::addEncodedTrailers() {
  // Trailers can only be added during the last data frame (i.e. end_stream = true).
35
  ASSERT(state_.filter_call_state_ & FilterCallState::EndOfStream);
  // Trailers can only be added once.
35
  ASSERT(!filter_manager_callbacks_.responseTrailers());
35
  filter_manager_callbacks_.setResponseTrailers(ResponseTrailerMapImpl::create());
35
  return *filter_manager_callbacks_.responseTrailers();
35
}
void FilterManager::addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data,
1391
                                   bool streaming) {
1391
  if (state_.filter_call_state_ == 0 ||
1391
      (state_.filter_call_state_ & FilterCallState::EncodeHeaders) ||
1391
      (state_.filter_call_state_ & FilterCallState::EncodeData) ||
1391
      ((state_.filter_call_state_ & FilterCallState::EncodeTrailers) && !filter.canIterate())) {
    // Make sure if this triggers watermarks, the correct action is taken.
1343
    state_.encoder_filters_streaming_ = streaming;
    // If no call is happening or we are in the decode headers/data callback, buffer the data.
    // Inline processing happens in the decodeHeaders() callback if necessary.
1343
    filter.commonHandleBufferData(data);
1350
  } else if (state_.filter_call_state_ & FilterCallState::EncodeTrailers) {
    // In this case we need to inline dispatch the data to further filters. If those filters
    // choose to buffer/stop iteration that's fine.
41
    encodeData(&filter, data, false, FilterIterationStartState::AlwaysStartFromNext);
48
  } else {
7
    IS_ENVOY_BUG("Invalid response data");
7
    sendLocalReply(Http::Code::BadGateway, "Filter error", nullptr, absl::nullopt,
7
                   StreamInfo::ResponseCodeDetails::get().FilterAddedInvalidResponseData);
7
  }
1391
}
void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data,
                               bool end_stream,
460887
                               FilterIterationStartState filter_iteration_start_state) {
460887
  filter_manager_callbacks_.resetIdleTimer();
  // Filter iteration may start at the current filter.
460887
  StreamEncoderFilters::Iterator entry =
460887
      commonEncodePrefix(filter, end_stream, filter_iteration_start_state);
460887
  StreamEncoderFilters::Iterator trailers_added_entry = encoder_filters_.end();
460887
  const bool trailers_exists_at_start = filter_manager_callbacks_.responseTrailers().has_value();
474176
  for (; entry != encoder_filters_.end(); entry++) {
19200
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
    // If the filter pointed by entry has stopped for all frame type, return now.
19200
    if (handleDataIfStopAll(**entry, data, state_.encoder_filters_streaming_)) {
1006
      return;
1006
    }
    // If end_stream_ is marked for a filter, the data is not for this filter and filters after.
    // For details, please see the comment in the ActiveStream::decodeData() function.
18194
    if ((*entry)->end_stream_) {
2
      return;
2
    }
18192
    ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeData));
    // We check the response_trailers_ pointer here in case addEncodedTrailers
    // is called in encodeData during a previous filter invocation, at which point we communicate to
    // the current and future filters that the stream has not yet ended.
18192
    state_.filter_call_state_ |= FilterCallState::EncodeData;
18192
    if (end_stream) {
4031
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
4031
    }
18192
    recordLatestDataFilter(entry, state_.latest_data_encoding_filter_, encoder_filters_);
18192
    (*entry)->end_stream_ = end_stream && !filter_manager_callbacks_.responseTrailers();
18192
    FilterDataStatus status = (*entry)->handle_->encodeData(data, (*entry)->end_stream_);
18192
    if (state_.encoder_filter_chain_aborted_) {
35
      ENVOY_STREAM_LOG(trace, "encodeData filter iteration aborted due to local reply: filter={}",
35
                       *this, (*entry)->filter_context_.config_name);
35
      status = FilterDataStatus::StopIterationNoBuffer;
35
    }
18192
    if ((*entry)->end_stream_) {
4030
      (*entry)->handle_->encodeComplete();
4030
    }
18192
    state_.filter_call_state_ &= ~FilterCallState::EncodeData;
18192
    if (end_stream) {
4031
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
4031
    }
18192
    ENVOY_STREAM_LOG(trace, "encode data called: filter={} status={}", *this,
18192
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
18192
    if (!trailers_exists_at_start && filter_manager_callbacks_.responseTrailers() &&
18192
        trailers_added_entry == encoder_filters_.end()) {
31
      trailers_added_entry = entry;
31
    }
18192
    if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.encoder_filters_streaming_)) {
4903
      return;
4903
    }
18192
  }
454976
  const bool modified_end_stream = end_stream && trailers_added_entry == encoder_filters_.end();
454976
  filter_manager_callbacks_.encodeData(data, modified_end_stream);
454976
  if (state_.saw_downstream_reset_) {
1
    return;
1
  }
454975
  maybeEndEncode(modified_end_stream);
  // If trailers were adding during encodeData we need to trigger decodeTrailers in order
  // to allow filters to process the trailers.
454975
  if (trailers_added_entry != encoder_filters_.end()) {
31
    encodeTrailers(trailers_added_entry->get(), *filter_manager_callbacks_.responseTrailers());
31
  }
454975
}
void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter,
1336
                                   ResponseTrailerMap& trailers) {
1336
  filter_manager_callbacks_.resetIdleTimer();
  // Filter iteration may start at the current filter.
1336
  StreamEncoderFilters::Iterator entry =
1336
      commonEncodePrefix(filter, true, FilterIterationStartState::CanStartFromCurrent);
1631
  for (; entry != encoder_filters_.end(); entry++) {
400
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
    // If the filter pointed by entry has stopped for all frame type, return now.
400
    if ((*entry)->stoppedAll()) {
13
      return;
13
    }
387
    ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeTrailers));
387
    state_.filter_call_state_ |= FilterCallState::EncodeTrailers;
387
    FilterTrailersStatus status = (*entry)->handle_->encodeTrailers(trailers);
387
    (*entry)->handle_->encodeComplete();
387
    (*entry)->end_stream_ = true;
387
    state_.filter_call_state_ &= ~FilterCallState::EncodeTrailers;
387
    ENVOY_STREAM_LOG(trace, "encode trailers called: filter={} status={}", *this,
387
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
387
    if (!(*entry)->commonHandleAfterTrailersCallback(status)) {
92
      return;
92
    }
387
  }
1231
  filter_manager_callbacks_.encodeTrailers(trailers);
1231
  if (state_.saw_downstream_reset_) {
1
    return;
1
  }
1230
  maybeEndEncode(true);
1230
}
543991
void FilterManager::maybeEndEncode(bool end_stream) {
543991
  if (end_stream) {
84443
    ASSERT(!state_.encoder_filter_chain_complete_);
84443
    state_.encoder_filter_chain_complete_ = true;
84443
    if (filter_manager_callbacks_.isHalfCloseEnabled()) {
      // If independent half close is enabled the stream is closed when both decoder and encoder
      // filter chains has completed or were aborted.
289
      checkAndCloseStreamIfFullyClosed();
84287
    } else {
      // Otherwise encoding end_stream always closes the stream (and resets it if request was not
      // complete yet).
84154
      filter_manager_callbacks_.endStream();
84154
    }
84443
  }
543991
}
572476
void FilterManager::maybeEndDecode(bool terminal_filter_decoded_end_stream) {
572476
  if (terminal_filter_decoded_end_stream) {
81347
    ASSERT(!state_.decoder_filter_chain_complete_);
81347
    state_.decoder_filter_chain_complete_ = true;
    // If the decoder filter chain was aborted (i.e. due to local reply)
    // we rely on the encoding of end_stream to close the stream.
81347
    if (filter_manager_callbacks_.isHalfCloseEnabled() && !stopDecoderFilterChain()) {
231
      checkAndCloseStreamIfFullyClosed();
231
    }
81347
  }
572476
}
520
void FilterManager::checkAndCloseStreamIfFullyClosed() {
520
  ASSERT(filter_manager_callbacks_.isHalfCloseEnabled());
  // When the independent half close is enabled the stream is always closed on error responses
  // from the server.
520
  if (filter_manager_callbacks_.responseHeaders().has_value()) {
428
    const uint64_t response_status =
428
        Http::Utility::getResponseStatus(filter_manager_callbacks_.responseHeaders().ref());
428
    const bool error_response =
428
        !(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status));
    // Abort the decoder filter if it has not yet been completed.
428
    if (error_response && !state_.decoder_filter_chain_complete_) {
24
      state_.decoder_filter_chain_aborted_ = true;
24
    }
428
  }
  // Handle the case where the end_stream was received from both downstream client and upstream
  // server but a decoder filter added either request body or trailers AND paused decoder filter
  // chain iteration. This case is currently handled the same way as if independent half-close is
  // disabled, i.e. proxying is stopped as soon as encoding has finished.
  // TODO(yanavlasov): to support this case the codec needs to notify HCM that it has closed its low
  // level stream
  //                   to avoid use-after-free when HCM cleans-up its ActiveStream
520
  const bool downstream_client_sent_end_stream = decoderObservedEndStream();
520
  const bool decoder_filter_chain_paused =
520
      !state_.decoder_filter_chain_complete_ && !state_.decoder_filter_chain_aborted_;
520
  if (state_.encoder_filter_chain_complete_ && downstream_client_sent_end_stream &&
520
      decoder_filter_chain_paused) {
2
    state_.decoder_filter_chain_aborted_ = true;
2
  }
  // If independent half close is enabled then close the stream when
  // 1. Both encoder and decoder filter chains has completed.
  // 2. Encoder filter chain has completed and decoder filter chain was aborted (i.e. local reply).
  // There is no need to check for aborted encoder filter chain as the filter will either be
  // completed or stream is reset.
520
  if (state_.encoder_filter_chain_complete_ &&
520
      (state_.decoder_filter_chain_complete_ || state_.decoder_filter_chain_aborted_)) {
217
    ENVOY_STREAM_LOG(trace, "closing stream", *this);
217
    filter_manager_callbacks_.endStream();
217
  }
520
}
546307
bool FilterManager::processNewlyAddedMetadata() {
546307
  if (request_metadata_map_vector_ == nullptr) {
545383
    return false;
545383
  }
924
  for (const auto& metadata_map : *getRequestMetadataMapVector()) {
458
    decodeMetadata(nullptr, *metadata_map);
458
  }
924
  getRequestMetadataMapVector()->clear();
924
  return true;
546307
}
bool FilterManager::handleDataIfStopAll(ActiveStreamFilterBase& filter, Buffer::Instance& data,
478184
                                        bool& filter_streaming) {
478184
  if (filter.stoppedAll()) {
16695
    ASSERT(!filter.canIterate());
16695
    filter_streaming =
16695
        filter.iteration_state_ == ActiveStreamFilterBase::IterationState::StopAllWatermark;
16695
    filter.commonHandleBufferData(data);
16695
    return true;
16695
  }
461489
  return false;
478184
}
79034
void FilterManager::callHighWatermarkCallbacks() {
79034
  ++high_watermark_count_;
79041
  for (auto watermark_callbacks : watermark_callbacks_) {
17251
    watermark_callbacks->onAboveWriteBufferHighWatermark();
17251
  }
79034
}
78886
void FilterManager::callLowWatermarkCallbacks() {
78886
  ASSERT(high_watermark_count_ > 0);
78886
  --high_watermark_count_;
78892
  for (auto watermark_callbacks : watermark_callbacks_) {
17132
    watermark_callbacks->onBelowWriteBufferLowWatermark();
17132
  }
78886
}
314
void FilterManager::setBufferLimit(uint64_t new_limit) {
314
  ENVOY_STREAM_LOG(debug, "setting buffer limit to {}", *this, new_limit);
314
  buffer_limit_ = new_limit;
314
  if (buffered_request_data_) {
53
    buffered_request_data_->setWatermarks(buffer_limit_);
53
  }
314
  if (buffered_response_data_) {
1
    buffered_response_data_->setWatermarks(buffer_limit_);
1
  }
314
}
14047
void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) {
14047
  if (connection_.has_value()) {
14047
    tracked_object_stack.add(*connection_);
14047
  }
14047
  tracked_object_stack.add(filter_manager_callbacks_.scope());
14047
}
FilterManager::UpgradeResult
FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
92821
                                        FilterChainFactoryCallbacksImpl& callbacks) {
92821
  const HeaderEntry* upgrade = nullptr;
92821
  if (filter_manager_callbacks_.requestHeaders()) {
91472
    upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade();
    // Treat CONNECT requests as a special upgrade case.
91472
    if (!upgrade && HeaderUtility::isConnect(*filter_manager_callbacks_.requestHeaders())) {
451
      upgrade = filter_manager_callbacks_.requestHeaders()->Method();
451
    }
91472
  }
92821
  if (upgrade == nullptr) {
    // No upgrade header, no upgrade filter chain.
92169
    return UpgradeResult::UpgradeUnneeded;
92169
  }
652
  const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
652
  return filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(),
652
                                                       upgrade_map, callbacks)
652
             ? UpgradeResult::UpgradeAccepted
652
             : UpgradeResult::UpgradeRejected;
92821
}
FilterManager::CreateChainResult
240138
FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory) {
240138
  if (state_.create_chain_result_.created()) {
7035
    return state_.create_chain_result_;
7035
  }
  // After the filter chain creation is completed, the filter chain containers will not be
  // modified. So, we can safely complete the initialization of the filter chain now.
233103
  Cleanup initialize_filter_chain([this]() {
383935
    for (auto iter = decoder_filters_.begin(); iter != decoder_filters_.end(); ++iter) {
150832
      (*iter)->entry_ = iter;
150832
    }
242559
    for (auto iter = encoder_filters_.begin(); iter != encoder_filters_.end(); ++iter) {
9456
      (*iter)->entry_ = iter;
9456
    }
233103
  });
  // TODO(wbpcode): reserve memory for filters to avoid frequent reallocation.
233103
  OptRef<DownstreamStreamFilterCallbacks> downstream_callbacks =
233103
      filter_manager_callbacks_.downstreamCallbacks();
233103
  FilterChainFactoryCallbacksImpl callbacks(*this);
233103
  UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded;
  // Only try the upgrade filter chain for downstream filter chains.
233103
  if (downstream_callbacks.has_value()) {
92821
    upgrade = createUpgradeFilterChain(filter_chain_factory, callbacks);
92821
    if (upgrade == UpgradeResult::UpgradeAccepted) {
      // Upgrade filter chain is created. Return the result directly.
618
      state_.create_chain_result_ = CreateChainResult(true, upgrade);
618
      return state_.create_chain_result_;
618
    }
    // If the upgrade is unnecessary or the upgrade filter chain is rejected, fall through to
    // create the default filter chain.
92821
  }
232485
  state_.create_chain_result_ =
232485
      CreateChainResult(filter_chain_factory.createFilterChain(callbacks), upgrade);
232485
  return state_.create_chain_result_;
233103
}
133
void ActiveStreamDecoderFilter::requestDataDrained() {
  // If this is called it means the call to requestDataTooLarge() was a
  // streaming call, or a 413 would have been sent.
133
  onDecoderFilterBelowWriteBufferLowWatermark();
133
}
446316
void ActiveStreamDecoderFilter::onDecoderFilterBelowWriteBufferLowWatermark() {
446316
  parent_.filter_manager_callbacks_.onDecoderFilterBelowWriteBufferLowWatermark();
446316
}
void ActiveStreamDecoderFilter::addDownstreamWatermarkCallbacks(
91323
    DownstreamWatermarkCallbacks& watermark_callbacks) {
  // This is called exactly once per upstream-stream, by the router filter. Therefore, we
  // expect the same callbacks to not be registered twice.
91323
  ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(),
91323
                   &watermark_callbacks) == parent_.watermark_callbacks_.end());
91323
  parent_.watermark_callbacks_.emplace(parent_.watermark_callbacks_.end(), &watermark_callbacks);
91326
  for (uint32_t i = 0; i < parent_.high_watermark_count_; ++i) {
3
    watermark_callbacks.onAboveWriteBufferHighWatermark();
3
  }
91323
}
void ActiveStreamDecoderFilter::removeDownstreamWatermarkCallbacks(
91040
    DownstreamWatermarkCallbacks& watermark_callbacks) {
91040
  ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(),
91040
                   &watermark_callbacks) != parent_.watermark_callbacks_.end());
91040
  parent_.watermark_callbacks_.remove(&watermark_callbacks);
91040
}
314
bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers) {
  // Because the filter's and the HCM view of if the stream has a body and if
  // the stream is complete may differ, re-check bytesReceived() to make sure
  // there was no body from the HCM's point of view.
314
  if (!observedEndStream()) {
    return false;
  }
314
  parent_.state_.decoder_filter_chain_aborted_ = true;
314
  parent_.state_.encoder_filter_chain_aborted_ = true;
314
  parent_.state_.recreated_stream_ = true;
314
  parent_.streamInfo().setResponseCodeDetails(
314
      StreamInfo::ResponseCodeDetails::get().InternalRedirect);
314
  if (headers != nullptr) {
    // The call to setResponseHeaders is needed to ensure that the headers are properly logged in
    // access logs before the stream is destroyed. Since the function expects a
    // ResponseHeaderPtr&&, ownership of the headers must be passed. This cannot happen earlier in
    // the flow (such as in the call to setupRedirect) because at that point it is still possible
    // for the headers to be used in a different logical branch. We work around this by creating a
    // copy and passing ownership of the copy instead.
151
    ResponseHeaderMapPtr headers_copy = createHeaderMap<ResponseHeaderMapImpl>(*headers);
151
    parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers_copy));
151
    parent_.filter_manager_callbacks_.chargeStats(*headers);
151
  }
314
  parent_.filter_manager_callbacks_.recreateStream(parent_.streamInfo().filterState());
314
  return true;
314
}
void ActiveStreamDecoderFilter::addUpstreamSocketOptions(
    const Network::Socket::OptionsSharedPtr& options) {
  Network::Socket::appendOptions(parent_.upstream_options_, options);
}
43822
Network::Socket::OptionsSharedPtr ActiveStreamDecoderFilter::getUpstreamSocketOptions() const {
43822
  return parent_.upstream_options_;
43822
}
894
Buffer::InstancePtr ActiveStreamEncoderFilter::createBuffer() {
894
  auto buffer = dispatcher().getWatermarkFactory().createBuffer(
894
      [this]() -> void { this->responseDataDrained(); },
894
      [this]() -> void { this->responseDataTooLarge(); },
894
      []() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
894
  buffer->setWatermarks(parent_.buffer_limit_);
894
  return buffer;
894
}
22883
Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() {
22883
  return parent_.buffered_response_data_;
22883
}
1803
bool ActiveStreamEncoderFilter::observedEndStream() {
1803
  return parent_.state_.observed_encode_end_stream_;
1803
}
776
bool ActiveStreamEncoderFilter::has1xxHeaders() {
776
  return parent_.state_.has_1xx_headers_ && !continued_1xx_headers_;
776
}
1
void ActiveStreamEncoderFilter::do1xxHeaders() {
1
  parent_.encode1xxHeaders(this, *parent_.filter_manager_callbacks_.informationalHeaders());
1
}
741
void ActiveStreamEncoderFilter::doHeaders(bool end_stream) {
741
  parent_.encodeHeaders(this, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
741
}
720
void ActiveStreamEncoderFilter::doData(bool end_stream) {
720
  parent_.encodeData(this, *parent_.buffered_response_data_, end_stream,
720
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
720
}
13
void ActiveStreamEncoderFilter::drainSavedResponseMetadata() {
13
  ASSERT(saved_response_metadata_ != nullptr);
14
  for (auto& metadata_map : *getSavedResponseMetadata()) {
14
    parent_.encodeMetadata(this, std::move(metadata_map));
14
  }
13
  getSavedResponseMetadata()->clear();
13
}
8694
void ActiveStreamEncoderFilter::handleMetadataAfterHeadersCallback() {
8694
  if (parent_.state_.recreated_stream_) {
    // The stream has been recreated. In this case, there's no reason to encode saved metadata.
87
    getSavedResponseMetadata()->clear();
87
    return;
87
  }
  // If we drain accumulated metadata, the iteration must start with the current filter.
8607
  const bool saved_state = iterate_from_current_filter_;
8607
  iterate_from_current_filter_ = true;
  // If encodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait
  // for doMetadata() to drain the metadata after iteration continues.
8607
  if (!stoppedAll() && saved_response_metadata_ != nullptr &&
8607
      !getSavedResponseMetadata()->empty()) {
13
    drainSavedResponseMetadata();
13
  }
  // Restores the original value of iterate_from_current_filter_.
8607
  iterate_from_current_filter_ = saved_state;
8607
}
107
void ActiveStreamEncoderFilter::doTrailers() {
107
  parent_.encodeTrailers(this, *parent_.filter_manager_callbacks_.responseTrailers());
107
}
875
bool ActiveStreamEncoderFilter::hasTrailers() {
875
  return parent_.filter_manager_callbacks_.responseTrailers().has_value();
875
}
1391
void ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, bool streaming) {
1391
  return parent_.addEncodedData(*this, data, streaming);
1391
}
void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance& data,
1749
                                                               bool end_stream) {
1749
  if (!headers_continued_) {
59
    headers_continued_ = true;
59
    doHeaders(false);
59
  }
1749
  if (Runtime::runtimeFeatureEnabled(
1749
          "envoy.reloadable_features.ext_proc_inject_data_with_state_update")) {
1749
    parent_.state_.observed_encode_end_stream_ = end_stream;
1749
  }
1749
  parent_.encodeData(this, data, end_stream,
1749
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
1749
}
35
ResponseTrailerMap& ActiveStreamEncoderFilter::addEncodedTrailers() {
35
  return parent_.addEncodedTrailers();
35
}
816
void ActiveStreamEncoderFilter::addEncodedMetadata(MetadataMapPtr&& metadata_map_ptr) {
816
  return parent_.encodeMetadata(this, std::move(metadata_map_ptr));
816
}
62040
void ActiveStreamEncoderFilter::onEncoderFilterAboveWriteBufferHighWatermark() {
62040
  ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to filter callbacks.", parent_);
62040
  parent_.callHighWatermarkCallbacks();
62040
}
62034
void ActiveStreamEncoderFilter::onEncoderFilterBelowWriteBufferLowWatermark() {
62034
  ENVOY_STREAM_LOG(debug, "Enabling upstream stream due to filter callbacks.", parent_);
62034
  parent_.callLowWatermarkCallbacks();
62034
}
523
void ActiveStreamEncoderFilter::continueEncoding() { commonContinue(); }
798
const Buffer::Instance* ActiveStreamEncoderFilter::encodingBuffer() {
798
  return parent_.buffered_response_data_.get();
798
}
void ActiveStreamEncoderFilter::modifyEncodingBuffer(
112
    std::function<void(Buffer::Instance&)> callback) {
112
  ASSERT(parent_.state_.latest_data_encoding_filter_ == this);
112
  callback(*parent_.buffered_response_data_.get());
112
}
void ActiveStreamEncoderFilter::sendLocalReply(
    Code code, absl::string_view body,
    std::function<void(ResponseHeaderMap& headers)> modify_headers,
253
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
253
  ActiveStreamFilterBase::sendLocalReply(code, body, modify_headers, grpc_status, details);
253
}
125
void ActiveStreamEncoderFilter::responseDataTooLarge() {
125
  ENVOY_STREAM_LOG(debug, "response data too large watermark exceeded", parent_);
125
  if (parent_.state_.encoder_filters_streaming_) {
33
    onEncoderFilterAboveWriteBufferHighWatermark();
98
  } else {
92
    parent_.filter_manager_callbacks_.onResponseDataTooLarge();
    // In this case, sendLocalReply will either send a response directly to the encoder, or
    // reset the stream.
92
    parent_.sendLocalReply(
92
        Http::Code::InternalServerError, CodeUtility::toString(Http::Code::InternalServerError),
92
        nullptr, absl::nullopt, StreamInfo::ResponseCodeDetails::get().ResponsePayloadTooLarge);
92
  }
125
}
30
void ActiveStreamEncoderFilter::responseDataDrained() {
30
  onEncoderFilterBelowWriteBufferLowWatermark();
30
}
void FilterManager::resetStream(StreamResetReason reason,
2578
                                absl::string_view transport_failure_reason) {
  // Stop filter chain iteration if stream is reset while filter decoding or encoding callbacks
  // are running.
2578
  if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
52
    state_.decoder_filter_chain_aborted_ = true;
2526
  } else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
8
    state_.encoder_filter_chain_aborted_ = true;
8
  }
2578
  filter_manager_callbacks_.resetStream(reason, transport_failure_reason);
2578
}
74978
bool FilterManager::isTerminalDecoderFilter(const ActiveStreamDecoderFilter& filter) const {
74978
  return !decoder_filters_.entries_.empty() && decoder_filters_.entries_.back().get() == &filter;
74978
}
void ActiveStreamFilterBase::resetStream(Http::StreamResetReason reset_reason,
2578
                                         absl::string_view transport_failure_reason) {
2578
  parent_.resetStream(reset_reason, transport_failure_reason);
2578
}
172209
uint64_t ActiveStreamFilterBase::streamId() const { return parent_.streamId(); }
87128
Buffer::BufferMemoryAccountSharedPtr ActiveStreamDecoderFilter::account() const {
87128
  return parent_.account();
87128
}
void ActiveStreamDecoderFilter::setUpstreamOverrideHost(
19
    Upstream::LoadBalancerContext::OverrideHost upstream_override_host) {
19
  parent_.upstream_override_host_.first.assign(upstream_override_host.first);
19
  parent_.upstream_override_host_.second = upstream_override_host.second;
19
}
absl::optional<Upstream::LoadBalancerContext::OverrideHost>
44237
ActiveStreamDecoderFilter::upstreamOverrideHost() const {
44237
  if (parent_.upstream_override_host_.first.empty()) {
44218
    return absl::nullopt;
44218
  }
19
  return Upstream::LoadBalancerContext::OverrideHost{
19
      absl::string_view(parent_.upstream_override_host_.first),
19
      parent_.upstream_override_host_.second};
44237
}
} // namespace Http
} // namespace Envoy