1
#include "source/common/http/filter_manager.h"
2

            
3
#include <functional>
4

            
5
#include "envoy/http/header_map.h"
6
#include "envoy/matcher/matcher.h"
7

            
8
#include "source/common/common/enum_to_int.h"
9
#include "source/common/common/scope_tracked_object_stack.h"
10
#include "source/common/common/scope_tracker.h"
11
#include "source/common/http/codes.h"
12
#include "source/common/http/header_map_impl.h"
13
#include "source/common/http/header_utility.h"
14
#include "source/common/http/utility.h"
15
#include "source/common/runtime/runtime_features.h"
16

            
17
#include "matching/data_impl.h"
18

            
19
namespace Envoy {
20
namespace Http {
21

            
22
namespace {
23

            
24
// Shared helper for recording the latest filter used.
25
template <class Filters>
26
void recordLatestDataFilter(typename Filters::Iterator current_filter,
27
462236
                            typename Filters::Element*& latest_filter, Filters& filters) {
28
  // If this is the first time we're calling onData, just record the current filter.
29
462236
  if (latest_filter == nullptr) {
30
21243
    latest_filter = current_filter->get();
31
21243
    return;
32
21243
  }
33

            
34
  // We want to keep this pointing at the latest filter in the filter list that has received the
35
  // onData callback. To do so, we compare the current latest with the *previous* filter. If they
36
  // match, then we must be processing a new filter for the first time. We omit this check if we're
37
  // the first filter, since the above check handles that case.
38
  //
39
  // We compare against the previous filter to avoid multiple filter iterations from resetting the
40
  // pointer: If we just set latest to current, then the first onData filter iteration would
41
  // correctly iterate over the filters and set latest, but on subsequent onData iterations
42
  // we'd start from the beginning again, potentially allowing filter N to modify the buffer even
43
  // though filter M > N was the filter that inserted data into the buffer.
44
440993
  if (current_filter != filters.begin() && latest_filter == std::prev(current_filter)->get()) {
45
2819
    latest_filter = current_filter->get();
46
2819
  }
47
440993
}
48

            
49
void finalizeHeaders(FilterManagerCallbacks& callbacks, StreamInfo::StreamInfo& stream_info,
50
9841
                     ResponseHeaderMap& headers) {
51
9841
  const Router::RouteConstSharedPtr& route = stream_info.route();
52
9841
  if (route != nullptr && route->routeEntry() != nullptr) {
53
4947
    const Formatter::Context formatter_context{
54
4947
        callbacks.requestHeaders().ptr(), &headers, {}, {}, {}, &callbacks.activeSpan()};
55
4947
    route->routeEntry()->finalizeResponseHeaders(headers, formatter_context, stream_info);
56
4947
  }
57
9841
}
58

            
59
} // namespace
60

            
61
44335
void ActiveStreamFilterBase::commonContinue() {
62
44335
  if (!canContinue()) {
63
101
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
64
101
                     static_cast<const void*>(this));
65
101
    return;
66
101
  }
67

            
68
  // Set ScopeTrackerScopeState if there's no existing crash context.
69
44234
  ScopeTrackedObjectStack encapsulated_object;
70
44234
  absl::optional<ScopeTrackerScopeState> state;
71
44234
  if (parent_.dispatcher_.trackedObjectStackIsEmpty()) {
72
13948
    restoreContextOnContinue(encapsulated_object);
73
13948
    state.emplace(&encapsulated_object, parent_.dispatcher_);
74
13948
  }
75

            
76
44234
  ENVOY_STREAM_LOG(trace, "continuing filter chain: filter={}", *this,
77
44234
                   static_cast<const void*>(this));
78
44234
  ASSERT(!canIterate(),
79
44234
         "Attempting to continue iteration while the IterationState is already Continue");
80
  // If iteration has stopped for all frame types, set iterate_from_current_filter_ to true so the
81
  // filter iteration starts with the current filter instead of the next one.
82
44234
  if (stoppedAll()) {
83
41675
    iterate_from_current_filter_ = true;
84
41675
  }
85
44234
  allowIteration();
86

            
87
  // Only resume with do1xxHeaders() if we've actually seen 1xx headers.
88
44234
  if (has1xxHeaders()) {
89
1
    continued_1xx_headers_ = true;
90
1
    do1xxHeaders();
91
    // If the response headers have not yet come in, don't continue on with
92
    // headers and body. doHeaders expects request headers to exist.
93
1
    if (!parent_.filter_manager_callbacks_.responseHeaders()) {
94
1
      return;
95
1
    }
96
1
  }
97

            
98
44233
  if (!canContinue()) {
99
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
100
                     static_cast<const void*>(this));
101
    return;
102
  }
103

            
104
  // Make sure that we handle the zero byte data frame case. We make no effort to optimize this
105
  // case in terms of merging it into a header only request/response. This could be done in the
106
  // future.
107
44233
  if (!headers_continued_) {
108
43814
    headers_continued_ = true;
109
43814
    doHeaders(observedEndStream() && !bufferedData() && !hasTrailers());
110
43814
  }
111

            
112
44233
  if (!canContinue()) {
113
106
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
114
106
                     static_cast<const void*>(this));
115
106
    return;
116
106
  }
117

            
118
44127
  doMetadata();
119

            
120
44127
  if (!canContinue()) {
121
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
122
                     static_cast<const void*>(this));
123
    return;
124
  }
125

            
126
  // It is possible for trailers to be added during doData(). doData() itself handles continuation
127
  // of trailers for the non-continuation case. Thus, we must keep track of whether we had
128
  // trailers prior to calling doData(). If we do, then we continue them here, otherwise we rely
129
  // on doData() to do so.
130
44127
  const bool had_trailers_before_data = hasTrailers();
131
44127
  if (bufferedData()) {
132
10335
    doData(observedEndStream() && !had_trailers_before_data);
133
10335
  }
134

            
135
44127
  if (!canContinue()) {
136
603
    ENVOY_STREAM_LOG(trace, "cannot continue filter chain: filter={}", *this,
137
603
                     static_cast<const void*>(this));
138
603
    return;
139
603
  }
140

            
141
43524
  if (had_trailers_before_data) {
142
516
    doTrailers();
143
516
  }
144

            
145
43524
  iterate_from_current_filter_ = false;
146
43524
}
147

            
148
18
bool ActiveStreamFilterBase::commonHandleAfter1xxHeadersCallback(Filter1xxHeadersStatus status) {
149
18
  ASSERT(parent_.state_.has_1xx_headers_);
150
18
  ASSERT(!continued_1xx_headers_);
151
18
  ASSERT(canIterate());
152

            
153
18
  switch (status) {
154
17
  case Filter1xxHeadersStatus::Continue:
155
17
    continued_1xx_headers_ = true;
156
17
    return true;
157
1
  case Filter1xxHeadersStatus::StopIteration:
158
1
    iteration_state_ = IterationState::StopSingleIteration;
159
1
    return false;
160
18
  }
161

            
162
  PANIC_DUE_TO_CORRUPT_ENUM;
163
}
164

            
165
bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatus status,
166
113241
                                                              bool& end_stream) {
167
113241
  ASSERT(!headers_continued_);
168
113241
  ASSERT(canIterate());
169

            
170
113241
  switch (status) {
171
39798
  case FilterHeadersStatus::StopIteration:
172
39798
    iteration_state_ = IterationState::StopSingleIteration;
173
39798
    break;
174
114
  case FilterHeadersStatus::StopAllIterationAndBuffer:
175
114
    iteration_state_ = IterationState::StopAllBuffer;
176
114
    break;
177
43187
  case FilterHeadersStatus::StopAllIterationAndWatermark:
178
43187
    iteration_state_ = IterationState::StopAllWatermark;
179
43187
    break;
180
31
  case FilterHeadersStatus::ContinueAndDontEndStream:
181
31
    end_stream = false;
182
31
    headers_continued_ = true;
183
31
    ENVOY_STREAM_LOG(debug, "converting to headers and body (body not available yet)", parent_);
184
31
    break;
185
30111
  case FilterHeadersStatus::Continue:
186
30111
    headers_continued_ = true;
187
30111
    break;
188
113241
  }
189

            
190
113241
  handleMetadataAfterHeadersCallback();
191

            
192
113241
  if (stoppedAll() || status == FilterHeadersStatus::StopIteration) {
193
83099
    return false;
194
85313
  } else {
195
30142
    return true;
196
30142
  }
197
113241
}
198

            
199
30664
void ActiveStreamFilterBase::commonHandleBufferData(Buffer::Instance& provided_data) {
200

            
201
  // The way we do buffering is a little complicated which is why we have this common function
202
  // which is used for both encoding and decoding. When data first comes into our filter pipeline,
203
  // we send it through. Any filter can choose to stop iteration and buffer or not. If we then
204
  // continue iteration in the future, we use the buffered data. A future filter can stop and
205
  // buffer again. In this case, since we are already operating on buffered data, we don't
206
  // rebuffer, because we assume the filter has modified the buffer as it wishes in place.
207
30664
  if (bufferedData().get() != &provided_data) {
208
30154
    if (!bufferedData()) {
209
10635
      bufferedData() = createBuffer();
210
10635
    }
211
30154
    bufferedData()->move(provided_data);
212
30154
  }
213
30664
}
214

            
215
bool ActiveStreamFilterBase::commonHandleAfterDataCallback(FilterDataStatus status,
216
                                                           Buffer::Instance& provided_data,
217
462092
                                                           bool& buffer_was_streaming) {
218

            
219
462092
  if (status == FilterDataStatus::Continue) {
220
210842
    if (iteration_state_ == IterationState::StopSingleIteration) {
221
619
      commonHandleBufferData(provided_data);
222
619
      commonContinue();
223
619
      return false;
224
210234
    } else {
225
210223
      ASSERT(headers_continued_);
226
210223
    }
227
281976
  } else {
228
251250
    iteration_state_ = IterationState::StopSingleIteration;
229
251250
    if (status == FilterDataStatus::StopIterationAndBuffer ||
230
251250
        status == FilterDataStatus::StopIterationAndWatermark) {
231
10198
      buffer_was_streaming = status == FilterDataStatus::StopIterationAndWatermark;
232
10198
      commonHandleBufferData(provided_data);
233
247282
    } else if (observedEndStream() && !hasTrailers() && !bufferedData() &&
234
               // If the stream is destroyed, no need to handle the data buffer or trailers.
235
               // This can occur if the filter calls sendLocalReply.
236
241052
               !parent_.state_.destroyed_) {
237
      // If this filter is doing StopIterationNoBuffer and this stream is terminated with a zero
238
      // byte data frame, we need to create an empty buffer to make sure that when commonContinue
239
      // is called, the pipeline resumes with an empty data frame with end_stream = true
240
3424
      ASSERT(end_stream_);
241
3424
      bufferedData() = createBuffer();
242
3424
    }
243

            
244
251250
    return false;
245
251250
  }
246

            
247
210223
  return true;
248
462092
}
249

            
250
1714
bool ActiveStreamFilterBase::commonHandleAfterTrailersCallback(FilterTrailersStatus status) {
251

            
252
1714
  if (status == FilterTrailersStatus::Continue) {
253
1127
    if (iteration_state_ == IterationState::StopSingleIteration) {
254
91
      commonContinue();
255
91
      return false;
256
1061
    } else {
257
1036
      ASSERT(headers_continued_);
258
1036
    }
259
1240
  } else if (status == FilterTrailersStatus::StopIteration) {
260
587
    if (canIterate()) {
261
22
      iteration_state_ = IterationState::StopSingleIteration;
262
22
    }
263
587
    return false;
264
587
  }
265

            
266
1036
  return true;
267
1714
}
268

            
269
270875
OptRef<const Network::Connection> ActiveStreamFilterBase::connection() {
270
270875
  return parent_.connection();
271
270875
}
272

            
273
991390
Event::Dispatcher& ActiveStreamFilterBase::dispatcher() { return parent_.dispatcher_; }
274

            
275
1558117
StreamInfo::StreamInfo& ActiveStreamFilterBase::streamInfo() { return parent_.streamInfo(); }
276

            
277
126313
Tracing::Span& ActiveStreamFilterBase::activeSpan() {
278
126313
  return parent_.filter_manager_callbacks_.activeSpan();
279
126313
}
280

            
281
151005
const ScopeTrackedObject& ActiveStreamFilterBase::scope() {
282
151005
  return parent_.filter_manager_callbacks_.scope();
283
151005
}
284

            
285
void ActiveStreamFilterBase::restoreContextOnContinue(
286
13948
    ScopeTrackedObjectStack& tracked_object_stack) {
287
13948
  parent_.contextOnContinue(tracked_object_stack);
288
13948
}
289

            
290
43677
OptRef<const Tracing::Config> ActiveStreamFilterBase::tracingConfig() const {
291
43677
  return parent_.filter_manager_callbacks_.tracingConfig();
292
43677
}
293

            
294
694
Upstream::ClusterInfoConstSharedPtr ActiveStreamFilterBase::clusterInfo() {
295
694
  return parent_.filter_manager_callbacks_.clusterInfo();
296
694
}
297

            
298
135754
Router::RouteConstSharedPtr ActiveStreamFilterBase::route() { return getRoute(); }
299

            
300
142731
Router::RouteConstSharedPtr ActiveStreamFilterBase::getRoute() const {
301
142731
  if (parent_.filter_manager_callbacks_.downstreamCallbacks()) {
302
141643
    return parent_.filter_manager_callbacks_.downstreamCallbacks()->route(nullptr);
303
141643
  }
304
1088
  return parent_.streamInfo().route();
305
142731
}
306

            
307
12
void ActiveStreamFilterBase::resetIdleTimer() {
308
12
  parent_.filter_manager_callbacks_.resetIdleTimer();
309
12
}
310

            
311
const Router::RouteSpecificFilterConfig*
312
5399
ActiveStreamFilterBase::mostSpecificPerFilterConfig() const {
313
5399
  auto current_route = getRoute();
314
5399
  if (current_route == nullptr) {
315
140
    return nullptr;
316
140
  }
317
5259
  return current_route->mostSpecificPerFilterConfig(filter_context_.config_name);
318
5399
}
319

            
320
1578
Router::RouteSpecificFilterConfigs ActiveStreamFilterBase::perFilterConfigs() const {
321
1578
  Router::RouteConstSharedPtr current_route = getRoute();
322
1578
  if (current_route == nullptr) {
323
11
    return {};
324
11
  }
325

            
326
1567
  return current_route->perFilterConfigs(filter_context_.config_name);
327
1578
}
328

            
329
2
Http1StreamEncoderOptionsOptRef ActiveStreamFilterBase::http1StreamEncoderOptions() {
330
  // TODO(mattklein123): At some point we might want to actually wrap this interface but for now
331
  // we give the filter direct access to the encoder options.
332
2
  return parent_.filter_manager_callbacks_.http1StreamEncoderOptions();
333
2
}
334

            
335
1899
OptRef<DownstreamStreamFilterCallbacks> ActiveStreamFilterBase::downstreamCallbacks() {
336
1899
  return parent_.filter_manager_callbacks_.downstreamCallbacks();
337
1899
}
338

            
339
995012
OptRef<UpstreamStreamFilterCallbacks> ActiveStreamFilterBase::upstreamCallbacks() {
340
995012
  return parent_.filter_manager_callbacks_.upstreamCallbacks();
341
995012
}
342

            
343
369
RequestHeaderMapOptRef ActiveStreamFilterBase::requestHeaders() {
344
369
  return parent_.filter_manager_callbacks_.requestHeaders();
345
369
}
346
215
RequestTrailerMapOptRef ActiveStreamFilterBase::requestTrailers() {
347
215
  return parent_.filter_manager_callbacks_.requestTrailers();
348
215
}
349
3
ResponseHeaderMapOptRef ActiveStreamFilterBase::informationalHeaders() {
350
3
  return parent_.filter_manager_callbacks_.informationalHeaders();
351
3
}
352
262
ResponseHeaderMapOptRef ActiveStreamFilterBase::responseHeaders() {
353
262
  return parent_.filter_manager_callbacks_.responseHeaders();
354
262
}
355
122
ResponseTrailerMapOptRef ActiveStreamFilterBase::responseTrailers() {
356
122
  return parent_.filter_manager_callbacks_.responseTrailers();
357
122
}
358

            
359
309
void ActiveStreamFilterBase::setBufferLimit(uint64_t limit) { parent_.setBufferLimit(limit); }
360

            
361
249499
uint64_t ActiveStreamFilterBase::bufferLimit() { return parent_.buffer_limit_; }
362

            
363
void ActiveStreamFilterBase::sendLocalReply(
364
    Code code, absl::string_view body,
365
    std::function<void(ResponseHeaderMap& headers)> modify_headers,
366
7966
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
367
7966
  if (!streamInfo().filterState()->hasData<LocalReplyOwnerObject>(LocalReplyFilterStateKey)) {
368
7247
    streamInfo().filterState()->setData(
369
7247
        LocalReplyFilterStateKey,
370
7247
        std::make_shared<LocalReplyOwnerObject>(filter_context_.config_name),
371
7247
        StreamInfo::FilterState::StateType::ReadOnly,
372
7247
        StreamInfo::FilterState::LifeSpan::FilterChain);
373
7247
  }
374

            
375
7966
  parent_.sendLocalReply(code, body, modify_headers, grpc_status, details);
376
7966
}
377

            
378
217164
bool ActiveStreamDecoderFilter::canContinue() {
379
  // It is possible for the connection manager to respond directly to a request even while
380
  // a filter is trying to continue. If a response has already happened, we should not
381
  // continue to further filters. A concrete example of this is a filter buffering data, the
382
  // last data frame comes in and the filter continues, but the final buffering takes the stream
383
  // over the high watermark such that a 413 is returned.
384
217164
  return !parent_.stopDecoderFilterChain();
385
217164
}
386

            
387
3891
bool ActiveStreamEncoderFilter::canContinue() {
388
  // As with ActiveStreamDecoderFilter::canContinue() make sure we do not
389
  // continue if a local reply has been sent or ActiveStreamDecoderFilter::recreateStream() is
390
  // called, etc.
391
3891
  return !parent_.state_.encoder_filter_chain_complete_ && !parent_.stopEncoderFilterChain();
392
3891
}
393

            
394
13164
Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() {
395
13164
  auto buffer = dispatcher().getWatermarkFactory().createBuffer(
396
13164
      [this]() -> void { this->requestDataDrained(); },
397
13164
      [this]() -> void { this->requestDataTooLarge(); },
398
13164
      []() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
399
13164
  buffer->setWatermarks(parent_.buffer_limit_);
400
13164
  return buffer;
401
13164
}
402

            
403
166910
Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
404
166910
  return parent_.buffered_request_data_;
405
166910
}
406

            
407
293692
bool ActiveStreamDecoderFilter::observedEndStream() { return parent_.decoderObservedEndStream(); }
408

            
409
43270
void ActiveStreamDecoderFilter::doHeaders(bool end_stream) {
410
43270
  parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream);
411
43270
}
412

            
413
9614
void ActiveStreamDecoderFilter::doData(bool end_stream) {
414
9614
  parent_.decodeData(this, *parent_.buffered_request_data_, end_stream,
415
9614
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
416
9614
}
417

            
418
409
void ActiveStreamDecoderFilter::doTrailers() {
419
409
  parent_.decodeTrailers(this, *parent_.filter_manager_callbacks_.requestTrailers());
420
409
}
421
76078
bool ActiveStreamDecoderFilter::hasTrailers() {
422
76078
  return parent_.filter_manager_callbacks_.requestTrailers().has_value();
423
76078
}
424

            
425
152
void ActiveStreamDecoderFilter::drainSavedRequestMetadata() {
426
152
  ASSERT(saved_request_metadata_ != nullptr);
427
459
  for (auto& metadata_map : *getSavedRequestMetadata()) {
428
459
    parent_.decodeMetadata(this, *metadata_map);
429
459
  }
430
152
  getSavedRequestMetadata()->clear();
431
152
}
432

            
433
104401
void ActiveStreamDecoderFilter::handleMetadataAfterHeadersCallback() {
434
104401
  if (parent_.state_.decoder_filter_chain_aborted_) {
435
    // The decoder filter chain has been aborted, possibly due to a local reply. In this case,
436
    // there's no reason to decode saved metadata.
437
6353
    getSavedRequestMetadata()->clear();
438
6353
    return;
439
6353
  }
440
  // If we drain accumulated metadata, the iteration must start with the current filter.
441
98048
  const bool saved_state = iterate_from_current_filter_;
442
98048
  iterate_from_current_filter_ = true;
443
  // If decodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait
444
  // for doMetadata() to drain the metadata after iteration continues.
445
98048
  if (!stoppedAll() && saved_request_metadata_ != nullptr && !getSavedRequestMetadata()->empty()) {
446
41
    drainSavedRequestMetadata();
447
41
  }
448
  // Restores the original value of iterate_from_current_filter_.
449
98048
  iterate_from_current_filter_ = saved_state;
450
98048
}
451

            
452
30
RequestTrailerMap& ActiveStreamDecoderFilter::addDecodedTrailers() {
453
30
  return parent_.addDecodedTrailers();
454
30
}
455

            
456
1798
void ActiveStreamDecoderFilter::addDecodedData(Buffer::Instance& data, bool streaming) {
457
1798
  parent_.addDecodedData(*this, data, streaming);
458
1798
}
459

            
460
458
MetadataMapVector& ActiveStreamDecoderFilter::addDecodedMetadata() {
461
458
  return parent_.addDecodedMetadata();
462
458
}
463

            
464
void ActiveStreamDecoderFilter::injectDecodedDataToFilterChain(Buffer::Instance& data,
465
5750
                                                               bool end_stream) {
466
5750
  if (!headers_continued_) {
467
139
    headers_continued_ = true;
468
139
    doHeaders(false);
469
139
  }
470
5750
  if (Runtime::runtimeFeatureEnabled(
471
5750
          "envoy.reloadable_features.ext_proc_inject_data_with_state_update")) {
472
5750
    parent_.state().observed_decode_end_stream_ = end_stream;
473
5750
  }
474
5750
  parent_.decodeData(this, data, end_stream,
475
5750
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
476
5750
}
477

            
478
43042
void ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); }
479
117123
const Buffer::Instance* ActiveStreamDecoderFilter::decodingBuffer() {
480
117123
  return parent_.buffered_request_data_.get();
481
117123
}
482

            
483
43500
bool ActiveStreamDecoderFilter::shouldLoadShed() const { return parent_.shouldLoadShed(); }
484

            
485
void ActiveStreamDecoderFilter::modifyDecodingBuffer(
486
123
    std::function<void(Buffer::Instance&)> callback) {
487
123
  ASSERT(parent_.state_.latest_data_decoding_filter_ == this);
488
123
  callback(*parent_.buffered_request_data_.get());
489
123
}
490

            
491
void ActiveStreamDecoderFilter::sendLocalReply(
492
    Code code, absl::string_view body,
493
    std::function<void(ResponseHeaderMap& headers)> modify_headers,
494
7713
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
495
7713
  ActiveStreamFilterBase::sendLocalReply(code, body, modify_headers, grpc_status, details);
496
7713
}
497

            
498
18
void ActiveStreamDecoderFilter::sendGoAwayAndClose(bool graceful) {
499
18
  parent_.sendGoAwayAndClose(graceful);
500
18
}
501

            
502
302
void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers) {
503
  // If Envoy is not configured to proxy 100-Continue responses, swallow the 100 Continue
504
  // here. This avoids the potential situation where Envoy strips Expect: 100-Continue and sends a
505
  // 100-Continue, then proxies a duplicate 100 Continue from upstream.
506
302
  if (parent_.proxy_100_continue_) {
507
282
    parent_.filter_manager_callbacks_.setInformationalHeaders(std::move(headers));
508
282
    parent_.encode1xxHeaders(nullptr, *parent_.filter_manager_callbacks_.informationalHeaders());
509
282
  }
510
302
}
511

            
512
void ActiveStreamDecoderFilter::stopDecodingIfNonTerminalFilterEncodedEndStream(
513
533722
    bool encoded_end_stream) {
514
  // Encoding end_stream by a non-terminal filters (i.e. cache filter) always causes the decoding to
515
  // be stopped even if independent half-close is enabled. For simplicity, independent half-close is
516
  // allowed only when the terminal (router) filter is encoding the response.
517
533722
  if (encoded_end_stream && !parent_.isTerminalDecoderFilter(*this) &&
518
533722
      !parent_.state_.decoder_filter_chain_complete_) {
519
399
    parent_.state_.decoder_filter_chain_aborted_ = true;
520
399
  }
521
533722
}
522

            
523
void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
524
77947
                                              absl::string_view details) {
525
77947
  stopDecodingIfNonTerminalFilterEncodedEndStream(end_stream);
526
77947
  parent_.streamInfo().setResponseCodeDetails(details);
527
77947
  parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers));
528
77947
  parent_.encodeHeaders(nullptr, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
529
77947
}
530

            
531
454579
void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stream) {
532
454579
  stopDecodingIfNonTerminalFilterEncodedEndStream(end_stream);
533
454579
  parent_.encodeData(nullptr, data, end_stream,
534
454579
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
535
454579
}
536

            
537
1196
void ActiveStreamDecoderFilter::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
538
1196
  stopDecodingIfNonTerminalFilterEncodedEndStream(true);
539
1196
  parent_.filter_manager_callbacks_.setResponseTrailers(std::move(trailers));
540
1196
  parent_.encodeTrailers(nullptr, *parent_.filter_manager_callbacks_.responseTrailers());
541
1196
}
542

            
543
1894
void ActiveStreamDecoderFilter::encodeMetadata(MetadataMapPtr&& metadata_map_ptr) {
544
1894
  parent_.encodeMetadata(nullptr, std::move(metadata_map_ptr));
545
1894
}
546

            
547
449659
void ActiveStreamDecoderFilter::onDecoderFilterAboveWriteBufferHighWatermark() {
548
449659
  parent_.filter_manager_callbacks_.onDecoderFilterAboveWriteBufferHighWatermark();
549
449659
}
550

            
551
247
void ActiveStreamDecoderFilter::requestDataTooLarge() {
552
247
  ENVOY_STREAM_LOG(debug, "request data too large watermark exceeded", parent_);
553
247
  if (parent_.state_.decoder_filters_streaming_) {
554
135
    onDecoderFilterAboveWriteBufferHighWatermark();
555
220
  } else {
556
112
    parent_.filter_manager_callbacks_.onRequestDataTooLarge();
557
112
    sendLocalReply(Code::PayloadTooLarge, CodeUtility::toString(Code::PayloadTooLarge), nullptr,
558
112
                   absl::nullopt, StreamInfo::ResponseCodeDetails::get().RequestPayloadTooLarge);
559
112
  }
560
247
}
561

            
562
141000
void FilterManager::maybeContinueDecoding(StreamDecoderFilters::Iterator continue_data_entry) {
563
141000
  if (continue_data_entry != decoder_filters_.end()) {
564
    // We use the continueDecoding() code since it will correctly handle not calling
565
    // decodeHeaders() again. Fake setting StopSingleIteration since the continueDecoding() code
566
    // expects it.
567
121
    ASSERT(buffered_request_data_);
568
121
    (*continue_data_entry)->iteration_state_ =
569
121
        ActiveStreamFilterBase::IterationState::StopSingleIteration;
570
121
    (*continue_data_entry)->continueDecoding();
571
121
  }
572
141000
}
573

            
574
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
575
141001
                                  bool end_stream) {
576
  // If the stream has been reset, do not process any more frames.
577
141001
  if (stopDecoderFilterChain()) {
578
1
    return;
579
1
  }
580

            
581
  // Headers filter iteration should always start with the next filter if available.
582
141000
  StreamDecoderFilters::Iterator entry =
583
141000
      commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
584
141000
  StreamDecoderFilters::Iterator continue_data_entry = decoder_filters_.end();
585
141000
  bool terminal_filter_decoded_end_stream = false;
586
141000
  ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end() ||
587
141000
         (*entry)->end_stream_);
588

            
589
239934
  for (; entry != decoder_filters_.end(); entry++) {
590
104401
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
591
104401
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
592
104401
    state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
593
104401
    (*entry)->end_stream_ = (end_stream && continue_data_entry == decoder_filters_.end());
594
104401
    if ((*entry)->end_stream_) {
595
73670
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
596
73670
    }
597
104401
    FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);
598
104401
    state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
599
104401
    if ((*entry)->end_stream_) {
600
73670
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
601
73670
    }
602
104401
    if (state_.decoder_filter_chain_aborted_) {
603
6353
      executeLocalReplyIfPrepared();
604
6353
      ENVOY_STREAM_LOG(trace,
605
6353
                       "decodeHeaders filter iteration aborted due to local reply: filter={}",
606
6353
                       *this, (*entry)->filter_context_.config_name);
607
6353
      status = FilterHeadersStatus::StopIteration;
608
6353
    }
609

            
610
104401
    ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
611
104401
           "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
612
104401
           "decodeHeaders when end_stream is already false");
613

            
614
104401
    ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
615
104401
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
616

            
617
104401
    (*entry)->processed_headers_ = true;
618

            
619
104401
    const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
620
104401
    ENVOY_BUG(!continue_iteration || !stopDecoderFilterChain(),
621
104401
              fmt::format(
622
104401
                  "filter={} did not return StopAll or StopIteration after sending a local reply.",
623
104401
                  (*entry)->filter_context_.config_name));
624

            
625
    // If this filter ended the stream, decodeComplete() should be called for it.
626
104401
    if ((*entry)->end_stream_) {
627
73670
      (*entry)->handle_->decodeComplete();
628
73670
    }
629

            
630
    // Skip processing metadata after sending local reply
631
104401
    if (stopDecoderFilterChain() && std::next(entry) != decoder_filters_.end()) {
632
2828
      maybeContinueDecoding(continue_data_entry);
633
2828
      return;
634
2828
    }
635

            
636
101573
    const bool new_metadata_added = processNewlyAddedMetadata();
637
    // If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream
638
    // in headers by inserting an empty data frame with end_stream set. The empty data frame is sent
639
    // after the new metadata.
640
101573
    if ((*entry)->end_stream_ && new_metadata_added && !buffered_request_data_) {
641
10
      Buffer::OwnedImpl empty_data("");
642
10
      ENVOY_STREAM_LOG(
643
10
          trace, "inserting an empty data frame for end_stream due metadata being added.", *this);
644
      // Metadata frame doesn't carry end of stream bit. We need an empty data frame to end the
645
      // stream.
646
10
      addDecodedData(*(*entry), empty_data, true);
647
10
    }
648

            
649
101573
    if (!continue_iteration && std::next(entry) != decoder_filters_.end()) {
650
      // Stop iteration IFF this is not the last filter. If it is the last filter, continue with
651
      // processing since we need to handle the case where a terminal filter wants to buffer, but
652
      // a previous filter has added body.
653
2639
      maybeContinueDecoding(continue_data_entry);
654
2639
      return;
655
2639
    }
656

            
657
    // Here we handle the case where we have a header only request, but a filter adds a body
658
    // to it. We need to not raise end_stream = true to further filters during inline iteration.
659
98934
    if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
660
121
      continue_data_entry = entry;
661
121
    }
662
    // The decoder filter chain is finished here if the following is true:
663
    // 1. the last filter has observed the end_stream AND
664
    // 2. no filter, including the last filter, has injected body during header iteration.
665
    // If body was injected the end_stream will be processed in the `decodeData()` method below.
666
98934
    const bool no_body_was_injected = continue_data_entry == decoder_filters_.end();
667
98934
    terminal_filter_decoded_end_stream =
668
98934
        (std::next(entry) == decoder_filters_.end() && (*entry)->end_stream_) &&
669
98934
        no_body_was_injected;
670
98934
  }
671

            
672
135533
  maybeContinueDecoding(continue_data_entry);
673

            
674
135533
  if (end_stream) {
675
92094
    disarmRequestTimeout();
676
92094
  }
677
135533
  maybeEndDecode(terminal_filter_decoded_end_stream);
678
135533
}
679

            
680
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
681
                               bool end_stream,
682
452365
                               FilterIterationStartState filter_iteration_start_state) {
683
452365
  ScopeTrackerScopeState scope(this, dispatcher_);
684
452365
  filter_manager_callbacks_.resetIdleTimer();
685

            
686
  // If a response is complete or a reset has been sent, filters do not care about further body
687
  // data. Just drop it.
688
452365
  if (stopDecoderFilterChain()) {
689
63
    return;
690
63
  }
691

            
692
452302
  auto trailers_added_entry = decoder_filters_.end();
693
452302
  const bool trailers_exists_at_start = filter_manager_callbacks_.requestTrailers().has_value();
694
  // Filter iteration may start at the current filter.
695
452302
  StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, filter_iteration_start_state);
696
452302
  bool terminal_filter_decoded_end_stream = false;
697
452302
  ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end() ||
698
452302
         (*entry)->end_stream_);
699

            
700
764966
  for (; entry != decoder_filters_.end(); entry++) {
701
459931
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
702
    // If the filter pointed by entry has stopped for all frame types, return now.
703
459931
    if (handleDataIfStopAll(**entry, data, state_.decoder_filters_streaming_)) {
704
15696
      return;
705
15696
    }
706
    // If end_stream_ is marked for a filter, the data is not for this filter and filters after.
707
    //
708
    // In following case, ActiveStreamFilterBase::commonContinue() could be called recursively and
709
    // its doData() is called with wrong data.
710
    //
711
    //  There are 3 decode filters and "wrapper" refers to ActiveStreamFilter object.
712
    //
713
    //  filter0->decodeHeaders(_, true)
714
    //    return STOP
715
    //  filter0->continueDecoding()
716
    //    wrapper0->commonContinue()
717
    //      wrapper0->decodeHeaders(_, _, true)
718
    //        filter1->decodeHeaders(_, true)
719
    //          filter1->addDecodeData()
720
    //          return CONTINUE
721
    //        filter2->decodeHeaders(_, false)
722
    //          return CONTINUE
723
    //        wrapper1->commonContinue() // Detects data is added.
724
    //          wrapper1->doData()
725
    //            wrapper1->decodeData()
726
    //              filter2->decodeData(_, true)
727
    //                 return CONTINUE
728
    //      wrapper0->doData() // This should not be called
729
    //        wrapper0->decodeData()
730
    //          filter1->decodeData(_, true)  // It will cause assertions.
731
    //
732
    // One way to solve this problem is to mark end_stream_ for each filter.
733
    // If a filter is already marked as end_stream_ when decodeData() is called, bails out the
734
    // whole function. If just skip the filter, the codes after the loop will be called with
735
    // wrong data. For encodeData, the response_encoder->encode() will be called.
736
444235
    if ((*entry)->end_stream_) {
737
105
      return;
738
105
    }
739
444130
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeData));
740

            
741
    // We check the request_trailers_ pointer here in case addDecodedTrailers
742
    // is called in decodeData during a previous filter invocation, at which point we communicate to
743
    // the current and future filters that the stream has not yet ended.
744
444130
    if (end_stream) {
745
15759
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
746
15759
    }
747

            
748
444130
    recordLatestDataFilter(entry, state_.latest_data_decoding_filter_, decoder_filters_);
749

            
750
444130
    state_.filter_call_state_ |= FilterCallState::DecodeData;
751
444130
    (*entry)->end_stream_ = end_stream && !filter_manager_callbacks_.requestTrailers();
752
444130
    FilterDataStatus status = (*entry)->handle_->decodeData(data, (*entry)->end_stream_);
753
444130
    if ((*entry)->end_stream_) {
754
15759
      (*entry)->handle_->decodeComplete();
755
15759
    }
756
444130
    state_.filter_call_state_ &= ~FilterCallState::DecodeData;
757
444130
    if (end_stream) {
758
15759
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
759
15759
    }
760
444130
    ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this,
761
444130
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
762
444130
    if (state_.decoder_filter_chain_aborted_) {
763
144
      executeLocalReplyIfPrepared();
764
144
      ENVOY_STREAM_LOG(trace, "decodeData filter iteration aborted due to local reply: filter={}",
765
144
                       *this, (*entry)->filter_context_.config_name);
766
144
      return;
767
144
    }
768

            
769
443986
    processNewlyAddedMetadata();
770

            
771
443986
    if (!trailers_exists_at_start && filter_manager_callbacks_.requestTrailers() &&
772
443986
        trailers_added_entry == decoder_filters_.end()) {
773
26
      end_stream = false;
774
26
      trailers_added_entry = entry;
775
26
    }
776

            
777
    // The decoder filter chain is finished here if the following is true:
778
    // 1. the last filter has observed the end_stream AND
779
    // 2. no filter, including the last filter, has injected trailers during header iteration.
780
    //    NOTE: end_stream is set to false above if a filter injected trailers.
781
    // If trailers were injected the end_stream will be processed in the `decodeTrailers()` method
782
    // below.
783
443986
    terminal_filter_decoded_end_stream = end_stream && std::next(entry) == decoder_filters_.end();
784

            
785
443986
    if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) &&
786
443986
        std::next(entry) != decoder_filters_.end()) {
787
      // Stop iteration IFF this is not the last filter. If it is the last filter, continue with
788
      // processing since we need to handle the case where a terminal filter wants to buffer, but
789
      // a previous filter has added trailers.
790
131322
      break;
791
131322
    }
792
443986
  }
793

            
794
  // If trailers were adding during decodeData we need to trigger decodeTrailers in order
795
  // to allow filters to process the trailers.
796
436357
  if (trailers_added_entry != decoder_filters_.end()) {
797
10
    decodeTrailers(trailers_added_entry->get(), *filter_manager_callbacks_.requestTrailers());
798
10
  }
799

            
800
436357
  if (end_stream) {
801
14264
    disarmRequestTimeout();
802
14264
  }
803
436357
  maybeEndDecode(terminal_filter_decoded_end_stream);
804
436357
}
805

            
806
30
RequestTrailerMap& FilterManager::addDecodedTrailers() {
807
  // Trailers can only be added during the last data frame (i.e. end_stream = true).
808
30
  ASSERT(state_.filter_call_state_ & FilterCallState::EndOfStream);
809

            
810
30
  filter_manager_callbacks_.setRequestTrailers(RequestTrailerMapImpl::create());
811
30
  return *filter_manager_callbacks_.requestTrailers();
812
30
}
813

            
814
void FilterManager::addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data,
815
1808
                                   bool streaming) {
816
1808
  if (state_.filter_call_state_ == 0 ||
817
1808
      (state_.filter_call_state_ & FilterCallState::DecodeHeaders) ||
818
1808
      (state_.filter_call_state_ & FilterCallState::DecodeData) ||
819
1808
      ((state_.filter_call_state_ & FilterCallState::DecodeTrailers) && !filter.canIterate())) {
820
    // Make sure if this triggers watermarks, the correct action is taken.
821
1760
    state_.decoder_filters_streaming_ = streaming;
822
    // If no call is happening or we are in the decode headers/data callback, buffer the data.
823
    // Inline processing happens in the decodeHeaders() callback if necessary.
824
1760
    filter.commonHandleBufferData(data);
825
1760
  } else if (state_.filter_call_state_ & FilterCallState::DecodeTrailers) {
826
    // In this case we need to inline dispatch the data to further filters. If those filters
827
    // choose to buffer/stop iteration that's fine.
828
37
    decodeData(&filter, data, false, FilterIterationStartState::AlwaysStartFromNext);
829
42
  } else {
830
11
    IS_ENVOY_BUG("Invalid request data");
831
11
    sendLocalReply(Http::Code::BadGateway, "Filter error", nullptr, absl::nullopt,
832
11
                   StreamInfo::ResponseCodeDetails::get().FilterAddedInvalidRequestData);
833
11
  }
834
1808
}
835

            
836
458
MetadataMapVector& FilterManager::addDecodedMetadata() { return *getRequestMetadataMapVector(); }
837

            
838
1374
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers) {
839
  // If a response is complete or a reset has been sent, filters do not care about further body
840
  // data. Just drop it.
841
1374
  if (stopDecoderFilterChain()) {
842
9
    return;
843
9
  }
844

            
845
  // Filter iteration may start at the current filter.
846
1365
  StreamDecoderFilters::Iterator entry =
847
1365
      commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);
848
1365
  bool terminal_filter_reached = false;
849
1365
  ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end());
850

            
851
2592
  for (; entry != decoder_filters_.end(); entry++) {
852
1630
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
853
    // If the filter pointed by entry has stopped for all frame type, return now.
854
1630
    if ((*entry)->stoppedAll()) {
855
303
      return;
856
303
    }
857
1327
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeTrailers));
858
1327
    state_.filter_call_state_ |= FilterCallState::DecodeTrailers;
859
1327
    FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers);
860
1327
    (*entry)->handle_->decodeComplete();
861
1327
    (*entry)->end_stream_ = true;
862
1327
    state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers;
863
1327
    ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this,
864
1327
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
865
1327
    if (state_.decoder_filter_chain_aborted_) {
866
11
      executeLocalReplyIfPrepared();
867
11
      ENVOY_STREAM_LOG(trace,
868
11
                       "decodeTrailers filter iteration aborted due to local reply: filter={}",
869
11
                       *this, (*entry)->filter_context_.config_name);
870
11
      status = FilterTrailersStatus::StopIteration;
871
11
    }
872

            
873
1327
    processNewlyAddedMetadata();
874
    // Check if the last filter has processed trailers
875
1327
    terminal_filter_reached = std::next(entry) == decoder_filters_.end();
876

            
877
1327
    if (!(*entry)->commonHandleAfterTrailersCallback(status) && !terminal_filter_reached) {
878
100
      return;
879
100
    }
880
1327
  }
881
962
  disarmRequestTimeout();
882
962
  maybeEndDecode(terminal_filter_reached);
883
962
}
884

            
885
3057
void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) {
886
3057
  ScopeTrackerScopeState scope(&*this, dispatcher_);
887
3057
  filter_manager_callbacks_.resetIdleTimer();
888

            
889
  // If the stream has been reset, do not process any more frames.
890
3057
  if (stopDecoderFilterChain()) {
891
2
    return;
892
2
  }
893

            
894
  // Filter iteration may start at the current filter.
895
3055
  StreamDecoderFilters::Iterator entry =
896
3055
      commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);
897

            
898
3055
  ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeMetadata));
899

            
900
6040
  for (; entry != decoder_filters_.end(); entry++) {
901
3622
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
902
    // If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
903
    // If the filter pointed by entry hasn't returned from decodeHeaders, stores newly added
904
    // metadata in case decodeHeaders returns StopAllIteration. The latter can happen when headers
905
    // callbacks generate new metadata.
906
3622
    if (!(*entry)->processed_headers_ || (*entry)->stoppedAll()) {
907
576
      Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
908
576
      (*entry)->getSavedRequestMetadata()->emplace_back(std::move(metadata_map_ptr));
909
576
      return;
910
576
    }
911
3046
    state_.filter_call_state_ |= FilterCallState::DecodeMetadata;
912
3046
    FilterMetadataStatus status = (*entry)->handle_->decodeMetadata(metadata_map);
913
3046
    state_.filter_call_state_ &= ~FilterCallState::DecodeMetadata;
914

            
915
3046
    ENVOY_STREAM_LOG(trace, "decode metadata called: filter={} status={}, metadata: {}", *this,
916
3046
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status),
917
3046
                     metadata_map);
918
3046
    if (state_.decoder_filter_chain_aborted_) {
919
      // If the decoder filter chain has been aborted, then either:
920
      // 1. This filter has sent a local reply or GoAway from decode metadata.
921
      // 2. This filter is the terminal http filter, and an upstream HTTP filter has sent a local
922
      // reply.
923
7
      ASSERT((status == FilterMetadataStatus::StopIterationForLocalReply) ||
924
7
             (std::next(entry) == decoder_filters_.end()));
925
7
      executeLocalReplyIfPrepared();
926
7
      ENVOY_STREAM_LOG(trace,
927
7
                       "decodeMetadata filter iteration aborted due to local reply: filter={}",
928
7
                       *this, (*entry)->filter_context_.config_name);
929
7
      return;
930
7
    }
931

            
932
    // Add the processed metadata to the next entry.
933
3039
    if (status == FilterMetadataStatus::ContinueAll && !(*entry)->canIterate()) {
934
54
      if (std::next(entry) != decoder_filters_.end()) {
935
54
        (*std::next(entry))
936
54
            ->getSavedRequestMetadata()
937
54
            ->emplace_back(std::make_unique<MetadataMap>(metadata_map));
938
54
      }
939
54
      (*entry)->commonContinue();
940
54
      return;
941
54
    }
942
3039
  }
943
3055
}
944

            
945
292670
void FilterManager::disarmRequestTimeout() { filter_manager_callbacks_.disarmRequestTimeout(); }
946

            
947
StreamEncoderFilters::Iterator
948
FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
949
553263
                                  FilterIterationStartState filter_iteration_start_state) {
950
  // Only do base state setting on the initial call. Subsequent calls for filtering do not touch
951
  // the base state.
952
553263
  ENVOY_STREAM_LOG(trace, "commonEncodePrefix end_stream: {}, isHalfCloseEnabled: {}", *this,
953
553263
                   end_stream, filter_manager_callbacks_.isHalfCloseEnabled());
954
553263
  if (filter == nullptr) {
955
549040
    if (end_stream) {
956
84311
      ASSERT(!state_.observed_encode_end_stream_);
957
84311
      state_.observed_encode_end_stream_ = true;
958

            
959
      // When half close semantics are disabled, receiving end stream from the upstream causes
960
      // decoder filter to stop, as neither filters nor upstream is interested in downstream data.
961
      // half close is enabled in case tcp proxying is done with http1 encoder. In this case, we
962
      // should not stop decoder filter chain when end_stream is true, as it will cause any data
963
      // sent in the upstream direction to be
964
      // dropped.
965
84311
      if (!filter_manager_callbacks_.isHalfCloseEnabled()) {
966
84022
        state_.decoder_filter_chain_aborted_ = true;
967
84022
      }
968
84311
    }
969
549040
    return encoder_filters_.begin();
970
549040
  }
971

            
972
4223
  if (filter_iteration_start_state == FilterIterationStartState::CanStartFromCurrent &&
973
4223
      (*(filter->entry()))->iterate_from_current_filter_) {
974
    // The filter iteration has been stopped for all frame types, and now the iteration continues.
975
    // The current filter's encoding callback has not be called. Call it now.
976
130
    return filter->entry();
977
130
  }
978
4093
  return std::next(filter->entry());
979
4223
}
980

            
981
StreamDecoderFilters::Iterator
982
FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter,
983
597722
                                  FilterIterationStartState filter_iteration_start_state) {
984
597722
  if (!filter) {
985
538182
    return decoder_filters_.begin();
986
538182
  }
987
59540
  if (filter_iteration_start_state == FilterIterationStartState::CanStartFromCurrent &&
988
59540
      (*(filter->entry()))->iterate_from_current_filter_) {
989
    // The filter iteration has been stopped for all frame types, and now the iteration continues.
990
    // The current filter's callback function has not been called. Call it now.
991
9511
    return filter->entry();
992
9511
  }
993
50029
  return std::next(filter->entry());
994
59540
}
995

            
996
10142
void DownstreamFilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) {
997
10142
  state_.under_on_local_reply_ = true;
998
10142
  filter_manager_callbacks_.onLocalReply(data.code_);
999

            
13652
  for (auto entry : filters_) {
11269
    if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
3
      data.reset_imminent_ = true;
3
    }
11269
  }
10142
  state_.under_on_local_reply_ = false;
10142
}
void DownstreamFilterManager::sendLocalReply(
    Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
10142
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
10142
  ASSERT(!state_.under_on_local_reply_);
10142
  const bool is_head_request = state_.is_head_request_;
10142
  const bool is_grpc_request = state_.is_grpc_request_;
  // Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks
  // are running.
10142
  if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
5384
    state_.decoder_filter_chain_aborted_ = true;
9647
  } else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
158
    state_.encoder_filter_chain_aborted_ = true;
158
  }
  // When independent half-close is enabled local reply always stops decoder filter chain.
10142
  if (filter_manager_callbacks_.isHalfCloseEnabled()) {
8
    state_.decoder_filter_chain_aborted_ = true;
8
  }
10142
  streamInfo().setResponseCodeDetails(details);
10142
  StreamFilterBase::LocalReplyData data{code, grpc_status, details, false};
10142
  onLocalReply(data);
10142
  if (data.reset_imminent_) {
3
    ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this,
3
                     details);
3
    filter_manager_callbacks_.resetStream(Http::StreamResetReason::LocalReset, "");
3
    return;
3
  }
10139
  if (!filter_manager_callbacks_.responseHeaders().has_value() &&
10139
      (!filter_manager_callbacks_.informationalHeaders().has_value() ||
9605
       !(state_.filter_call_state_ & FilterCallState::IsEncodingMask))) {
    // If the response has not started at all, or if the only response so far is an informational
    // 1xx that has already been fully processed, send the response through the filter chain.
9602
    if (auto cb = filter_manager_callbacks_.downstreamCallbacks(); cb.has_value()) {
      // The initial route maybe never be set or the cached route maybe cleared by the filters.
      // This will force route refreshment if there is not a cached route to avoid potential
      // route refreshment in the response filter chain.
9591
      cb->route(nullptr);
9591
    }
    // We only prepare a local reply to execute later if we're actively invoking filters to avoid
    // re-entrant in filters.
    //
    // For reverse connections (where upstream initiates the connection to downstream), we need to
    // send local replies immediately rather than queuing them. This ensures proper handling of the
    // reversed connection flow and prevents potential issues with connection state and filter chain
    // processing.
9602
    if (!Runtime::runtimeFeatureEnabled(
9602
            "envoy.reloadable_features.reverse_conn_force_local_reply") &&
9602
        (state_.filter_call_state_ & FilterCallState::IsDecodingMask)) {
5381
      prepareLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
5381
                                      grpc_status, details);
9218
    } else {
4221
      sendLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
4221
                                   grpc_status, details);
4221
    }
9691
  } else if (!state_.non_100_response_headers_encoded_) {
241
    ENVOY_STREAM_LOG(debug, "Sending local reply with details {} directly to the encoder", *this,
241
                     details);
    // In this case, at least the header and possibly the body has started
    // processing through the filter chain, but no non-informational headers
    // have been sent downstream. To ensure that filters don't get their
    // state machine screwed up, bypass the filter chain and send the local
    // reply directly to the codec.
    //
241
    sendDirectLocalReply(code, body, modify_headers, state_.is_head_request_, grpc_status);
490
  } else {
    // If we land in this branch, response headers have already been sent to the client.
    // All we can do at this point is reset the stream.
296
    ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. Prior headers have already been sent",
296
                     *this, details);
    // TODO(snowp): This means we increment the tx_reset stat which we weren't doing previously.
    // Intended?
296
    filter_manager_callbacks_.resetStream();
296
  }
10139
}
void DownstreamFilterManager::prepareLocalReplyViaFilterChain(
    bool is_grpc_request, Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
5381
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
5381
  ENVOY_STREAM_LOG(debug, "Preparing local reply with details {}", *this, details);
5381
  ASSERT(!filter_manager_callbacks_.responseHeaders().has_value());
  // For early error handling, do a best-effort attempt to create a filter chain
  // to ensure access logging. If the filter chain already exists this will be
  // a no-op.
5381
  createDownstreamFilterChain();
5381
  if (prepared_local_reply_) {
2
    return;
2
  }
5379
  prepared_local_reply_ = Utility::prepareLocalReply(
5379
      Utility::EncodeFunctions{
5379
          [this, modify_headers](ResponseHeaderMap& headers) -> void {
5379
            finalizeHeaders(filter_manager_callbacks_, streamInfo(), headers);
5379
            if (modify_headers) {
712
              modify_headers(headers);
712
            }
5379
          },
5379
          [this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
5379
                 absl::string_view& content_type) -> void {
5379
            local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
5379
                                 streamInfo(), code, body, content_type);
5379
          },
5379
          [this](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
5379
            filter_manager_callbacks_.setResponseHeaders(std::move(headers));
5379
            encodeHeaders(nullptr, filter_manager_callbacks_.responseHeaders().ref(), end_stream);
5379
          },
5379
          [this](Buffer::Instance& data, bool end_stream) -> void {
651
            encodeData(nullptr, data, end_stream,
651
                       FilterManager::FilterIterationStartState::CanStartFromCurrent);
651
          }},
5379
      Utility::LocalReplyData{is_grpc_request, code, body, grpc_status, is_head_request});
5379
}
5840
void DownstreamFilterManager::executeLocalReplyIfPrepared() {
5840
  if (!prepared_local_reply_) {
461
    return;
461
  }
5379
  ENVOY_STREAM_LOG(debug, "Executing sending local reply.", *this);
5379
  Utility::encodeLocalReply(state_.destroyed_, std::move(prepared_local_reply_));
5379
}
99948
FilterManager::CreateChainResult DownstreamFilterManager::createDownstreamFilterChain() {
99948
  return createFilterChain(filter_chain_factory_);
99948
}
void DownstreamFilterManager::sendLocalReplyViaFilterChain(
    bool is_grpc_request, Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap& headers)>& modify_headers, bool is_head_request,
4221
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
4221
  ENVOY_STREAM_LOG(debug, "Sending local reply with details {}", *this, details);
4221
  ASSERT(!filter_manager_callbacks_.responseHeaders().has_value());
  // For early error handling, do a best-effort attempt to create a filter chain
  // to ensure access logging. If the filter chain already exists this will be
  // a no-op.
4221
  createDownstreamFilterChain();
4221
  Utility::sendLocalReply(
4221
      state_.destroyed_,
4221
      Utility::EncodeFunctions{
4221
          [this, modify_headers](ResponseHeaderMap& headers) -> void {
4221
            finalizeHeaders(filter_manager_callbacks_, streamInfo(), headers);
4221
            if (modify_headers) {
1414
              modify_headers(headers);
1414
            }
4221
          },
4221
          [this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
4221
                 absl::string_view& content_type) -> void {
4221
            local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
4221
                                 streamInfo(), code, body, content_type);
4221
          },
4221
          [this](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
4221
            filter_manager_callbacks_.setResponseHeaders(std::move(headers));
4221
            encodeHeaders(nullptr, filter_manager_callbacks_.responseHeaders().ref(), end_stream);
4221
          },
4221
          [this](Buffer::Instance& data, bool end_stream) -> void {
2891
            encodeData(nullptr, data, end_stream,
2891
                       FilterManager::FilterIterationStartState::CanStartFromCurrent);
2891
          }},
4221
      Utility::LocalReplyData{is_grpc_request, code, body, grpc_status, is_head_request});
4221
}
void DownstreamFilterManager::sendDirectLocalReply(
    Code code, absl::string_view body,
    const std::function<void(ResponseHeaderMap&)>& modify_headers, bool is_head_request,
241
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status) {
  // Make sure we won't end up with nested watermark calls from the body buffer.
241
  state_.encoder_filters_streaming_ = true;
241
  Http::Utility::sendLocalReply(
241
      state_.destroyed_,
241
      Utility::EncodeFunctions{
241
          [this, modify_headers](ResponseHeaderMap& headers) -> void {
241
            finalizeHeaders(filter_manager_callbacks_, streamInfo(), headers);
241
            if (modify_headers) {
73
              modify_headers(headers);
73
            }
241
          },
241
          [&](ResponseHeaderMap& response_headers, Code& code, std::string& body,
241
              absl::string_view& content_type) -> void {
241
            local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
241
                                 streamInfo(), code, body, content_type);
241
          },
241
          [&](ResponseHeaderMapPtr&& response_headers, bool end_stream) -> void {
            // Move the response headers into the FilterManager to make sure they're visible to
            // access logs.
241
            filter_manager_callbacks_.setResponseHeaders(std::move(response_headers));
241
            state_.non_100_response_headers_encoded_ = true;
241
            filter_manager_callbacks_.encodeHeaders(*filter_manager_callbacks_.responseHeaders(),
241
                                                    end_stream);
241
            if (state_.saw_downstream_reset_) {
              return;
            }
241
            maybeEndEncode(end_stream);
241
          },
241
          [&](Buffer::Instance& data, bool end_stream) -> void {
171
            filter_manager_callbacks_.encodeData(data, end_stream);
171
            if (state_.saw_downstream_reset_) {
1
              return;
1
            }
170
            maybeEndEncode(end_stream);
170
          }},
241
      Utility::LocalReplyData{state_.is_grpc_request_, code, body, grpc_status, is_head_request});
241
}
void FilterManager::encode1xxHeaders(ActiveStreamEncoderFilter* filter,
283
                                     ResponseHeaderMap& headers) {
283
  filter_manager_callbacks_.resetIdleTimer();
283
  ASSERT(proxy_100_continue_);
  // The caller must guarantee that encode1xxHeaders() is invoked at most once.
283
  ASSERT(!state_.has_1xx_headers_ || filter != nullptr);
  // Make sure commonContinue continues encode1xxHeaders.
283
  state_.has_1xx_headers_ = true;
  // Similar to the block in encodeHeaders, run encode1xxHeaders on each
  // filter. This is simpler than that case because 100 continue implies no
  // end-stream, and because there are normal headers coming there's no need for
  // complex continuation logic.
  // 100-continue filter iteration should always start with the next filter if available.
283
  StreamEncoderFilters::Iterator entry =
283
      commonEncodePrefix(filter, false, FilterIterationStartState::AlwaysStartFromNext);
300
  for (; entry != encoder_filters_.end(); entry++) {
21
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
21
    ASSERT(!(state_.filter_call_state_ & FilterCallState::Encode1xxHeaders));
21
    state_.filter_call_state_ |= FilterCallState::Encode1xxHeaders;
21
    const Filter1xxHeadersStatus status = (*entry)->handle_->encode1xxHeaders(headers);
21
    state_.filter_call_state_ &= ~FilterCallState::Encode1xxHeaders;
21
    ENVOY_STREAM_LOG(trace, "encode 1xx continue headers called: filter={} status={}", *this,
21
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
21
    if (state_.encoder_filter_chain_aborted_) {
3
      ENVOY_STREAM_LOG(trace,
3
                       "encode1xxHeaders filter iteration aborted due to local reply: filter={}",
3
                       *this, (*entry)->filter_context_.config_name);
3
      return;
3
    }
18
    if (!(*entry)->commonHandleAfter1xxHeadersCallback(status)) {
1
      return;
1
    }
18
  }
279
  filter_manager_callbacks_.encode1xxHeaders(headers);
279
}
33883
void FilterManager::maybeContinueEncoding(StreamEncoderFilters::Iterator continue_data_entry) {
33883
  if (continue_data_entry != encoder_filters_.end()) {
    // We use the continueEncoding() code since it will correctly handle not calling
    // encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code
    // expects it.
49
    ASSERT(buffered_response_data_);
49
    (*continue_data_entry)->iteration_state_ =
49
        ActiveStreamFilterBase::IterationState::StopSingleIteration;
49
    (*continue_data_entry)->continueEncoding();
49
  }
33883
}
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
88289
                                  bool end_stream) {
  // See encodeHeaders() comments in envoy/http/filter.h for why the 1xx precondition holds.
88289
  ASSERT(!CodeUtility::is1xx(Utility::getResponseStatus(headers)) ||
88289
         Utility::getResponseStatus(headers) == enumToInt(Http::Code::SwitchingProtocols));
88289
  filter_manager_callbacks_.resetIdleTimer();
88289
  disarmRequestTimeout();
  // Headers filter iteration should always start with the next filter if available.
88289
  StreamEncoderFilters::Iterator entry =
88289
      commonEncodePrefix(filter, end_stream, FilterIterationStartState::AlwaysStartFromNext);
88289
  StreamEncoderFilters::Iterator continue_data_entry = encoder_filters_.end();
96050
  for (; entry != encoder_filters_.end(); entry++) {
8840
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
8840
    ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders));
8840
    state_.filter_call_state_ |= FilterCallState::EncodeHeaders;
8840
    (*entry)->end_stream_ = (end_stream && continue_data_entry == encoder_filters_.end());
8840
    if ((*entry)->end_stream_) {
4158
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
4158
    }
8840
    FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);
8840
    if (state_.encoder_filter_chain_aborted_) {
242
      ENVOY_STREAM_LOG(trace,
242
                       "encodeHeaders filter iteration aborted due to local reply: filter={}",
242
                       *this, (*entry)->filter_context_.config_name);
242
      status = FilterHeadersStatus::StopIteration;
242
    }
8840
    ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
8840
           "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
8840
           "encodeHeaders when end_stream is already false");
8840
    state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders;
8840
    if ((*entry)->end_stream_) {
4158
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
4158
    }
8840
    ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this,
8840
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
8840
    (*entry)->processed_headers_ = true;
8840
    const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
    // If this filter ended the stream, encodeComplete() should be called for it.
8840
    if ((*entry)->end_stream_) {
4158
      (*entry)->handle_->encodeComplete();
4158
    }
8840
    if (!continue_iteration) {
1079
      if (!(*entry)->end_stream_) {
835
        maybeContinueEncoding(continue_data_entry);
835
      }
1079
      return;
1079
    }
    // Here we handle the case where we have a header only response, but a filter adds a body
    // to it. We need to not raise end_stream = true to further filters during inline iteration.
7761
    if (end_stream && buffered_response_data_ && continue_data_entry == encoder_filters_.end()) {
49
      continue_data_entry = entry;
49
    }
7761
  }
  // Check if the filter chain above did not remove critical headers or set malformed header values.
  // We could do this at the codec in order to prevent other places than the filter chain from
  // removing critical headers, but it will come with the implementation complexity.
  // See the previous attempt (#15658) for detail, and for now we choose to protect only against
  // filter chains.
87210
  const auto status = HeaderUtility::checkRequiredResponseHeaders(headers);
87210
  if (!status.ok()) {
    // If the check failed, then we reply with BadGateway, and stop the further processing.
35
    sendLocalReply(
35
        Http::Code::BadGateway, status.message(), nullptr, absl::nullopt,
35
        absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredResponseHeaders,
35
                     "{", StringUtil::replaceAllEmptySpace(status.message()), "}"));
35
    return;
35
  }
87175
  const bool modified_end_stream = (end_stream && continue_data_entry == encoder_filters_.end());
87175
  state_.non_100_response_headers_encoded_ = true;
87175
  filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);
87175
  if (state_.saw_downstream_reset_) {
2
    return;
2
  }
87173
  maybeEndEncode(modified_end_stream);
87173
  if (!modified_end_stream) {
33048
    maybeContinueEncoding(continue_data_entry);
33048
  }
87173
}
void FilterManager::encodeMetadata(ActiveStreamEncoderFilter* filter,
2725
                                   MetadataMapPtr&& metadata_map_ptr) {
2725
  filter_manager_callbacks_.resetIdleTimer();
2725
  StreamEncoderFilters::Iterator entry =
2725
      commonEncodePrefix(filter, false, FilterIterationStartState::CanStartFromCurrent);
2797
  for (; entry != encoder_filters_.end(); entry++) {
98
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
    // If the filter pointed by entry has stopped for all frame type, stores metadata and returns.
    // If the filter pointed by entry hasn't returned from encodeHeaders, stores newly added
    // metadata in case encodeHeaders returns StopAllIteration. The latter can happen when headers
    // callbacks generate new metadata.
98
    if (!(*entry)->processed_headers_ || (*entry)->stoppedAll()) {
15
      (*entry)->getSavedResponseMetadata()->emplace_back(std::move(metadata_map_ptr));
15
      return;
15
    }
83
    state_.filter_call_state_ |= FilterCallState::EncodeMetadata;
83
    FilterMetadataStatus status = (*entry)->handle_->encodeMetadata(*metadata_map_ptr);
83
    state_.filter_call_state_ &= ~FilterCallState::EncodeMetadata;
83
    ENVOY_STREAM_LOG(trace, "encode metadata called: filter={} status={} metadata={}", *this,
83
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status),
83
                     *metadata_map_ptr);
83
    if (status == FilterMetadataStatus::StopIterationForLocalReply) {
      // In case of a local reply, we do not continue encoding metadata. Metadata is not sent on to
      // the client.
5
      return;
5
    }
78
    if (status == FilterMetadataStatus::ContinueAll && !(*entry)->canIterate()) {
6
      if (std::next(entry) != encoder_filters_.end()) {
1
        (*std::next(entry))->getSavedResponseMetadata()->emplace_back(std::move(metadata_map_ptr));
6
      } else {
5
        filter_manager_callbacks_.encodeMetadata(std::move(metadata_map_ptr));
5
      }
6
      (*entry)->commonContinue();
6
      return;
6
    }
78
  }
  // TODO(soya3129): update stats with metadata.
  // Now encode metadata via the codec.
2699
  if (!metadata_map_ptr->empty()) {
2699
    filter_manager_callbacks_.encodeMetadata(std::move(metadata_map_ptr));
2699
  }
2699
}
35
ResponseTrailerMap& FilterManager::addEncodedTrailers() {
  // Trailers can only be added during the last data frame (i.e. end_stream = true).
35
  ASSERT(state_.filter_call_state_ & FilterCallState::EndOfStream);
  // Trailers can only be added once.
35
  ASSERT(!filter_manager_callbacks_.responseTrailers());
35
  filter_manager_callbacks_.setResponseTrailers(ResponseTrailerMapImpl::create());
35
  return *filter_manager_callbacks_.responseTrailers();
35
}
void FilterManager::addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data,
1392
                                   bool streaming) {
1392
  if (state_.filter_call_state_ == 0 ||
1392
      (state_.filter_call_state_ & FilterCallState::EncodeHeaders) ||
1392
      (state_.filter_call_state_ & FilterCallState::EncodeData) ||
1392
      ((state_.filter_call_state_ & FilterCallState::EncodeTrailers) && !filter.canIterate())) {
    // Make sure if this triggers watermarks, the correct action is taken.
1344
    state_.encoder_filters_streaming_ = streaming;
    // If no call is happening or we are in the decode headers/data callback, buffer the data.
    // Inline processing happens in the decodeHeaders() callback if necessary.
1344
    filter.commonHandleBufferData(data);
1351
  } else if (state_.filter_call_state_ & FilterCallState::EncodeTrailers) {
    // In this case we need to inline dispatch the data to further filters. If those filters
    // choose to buffer/stop iteration that's fine.
41
    encodeData(&filter, data, false, FilterIterationStartState::AlwaysStartFromNext);
48
  } else {
7
    IS_ENVOY_BUG("Invalid response data");
7
    sendLocalReply(Http::Code::BadGateway, "Filter error", nullptr, absl::nullopt,
7
                   StreamInfo::ResponseCodeDetails::get().FilterAddedInvalidResponseData);
7
  }
1392
}
void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data,
                               bool end_stream,
460632
                               FilterIterationStartState filter_iteration_start_state) {
460632
  filter_manager_callbacks_.resetIdleTimer();
  // Filter iteration may start at the current filter.
460632
  StreamEncoderFilters::Iterator entry =
460632
      commonEncodePrefix(filter, end_stream, filter_iteration_start_state);
460632
  StreamEncoderFilters::Iterator trailers_added_entry = encoder_filters_.end();
460632
  const bool trailers_exists_at_start = filter_manager_callbacks_.responseTrailers().has_value();
473912
  for (; entry != encoder_filters_.end(); entry++) {
19155
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
    // If the filter pointed by entry has stopped for all frame type, return now.
19155
    if (handleDataIfStopAll(**entry, data, state_.encoder_filters_streaming_)) {
1047
      return;
1047
    }
    // If end_stream_ is marked for a filter, the data is not for this filter and filters after.
    // For details, please see the comment in the ActiveStream::decodeData() function.
18108
    if ((*entry)->end_stream_) {
2
      return;
2
    }
18106
    ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeData));
    // We check the response_trailers_ pointer here in case addEncodedTrailers
    // is called in encodeData during a previous filter invocation, at which point we communicate to
    // the current and future filters that the stream has not yet ended.
18106
    state_.filter_call_state_ |= FilterCallState::EncodeData;
18106
    if (end_stream) {
4045
      state_.filter_call_state_ |= FilterCallState::EndOfStream;
4045
    }
18106
    recordLatestDataFilter(entry, state_.latest_data_encoding_filter_, encoder_filters_);
18106
    (*entry)->end_stream_ = end_stream && !filter_manager_callbacks_.responseTrailers();
18106
    FilterDataStatus status = (*entry)->handle_->encodeData(data, (*entry)->end_stream_);
18106
    if (state_.encoder_filter_chain_aborted_) {
35
      ENVOY_STREAM_LOG(trace, "encodeData filter iteration aborted due to local reply: filter={}",
35
                       *this, (*entry)->filter_context_.config_name);
35
      status = FilterDataStatus::StopIterationNoBuffer;
35
    }
18106
    if ((*entry)->end_stream_) {
4044
      (*entry)->handle_->encodeComplete();
4044
    }
18106
    state_.filter_call_state_ &= ~FilterCallState::EncodeData;
18106
    if (end_stream) {
4045
      state_.filter_call_state_ &= ~FilterCallState::EndOfStream;
4045
    }
18106
    ENVOY_STREAM_LOG(trace, "encode data called: filter={} status={}", *this,
18106
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
18106
    if (!trailers_exists_at_start && filter_manager_callbacks_.responseTrailers() &&
18106
        trailers_added_entry == encoder_filters_.end()) {
31
      trailers_added_entry = entry;
31
    }
18106
    if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.encoder_filters_streaming_)) {
4826
      return;
4826
    }
18106
  }
454757
  const bool modified_end_stream = end_stream && trailers_added_entry == encoder_filters_.end();
454757
  filter_manager_callbacks_.encodeData(data, modified_end_stream);
454757
  if (state_.saw_downstream_reset_) {
1
    return;
1
  }
454756
  maybeEndEncode(modified_end_stream);
  // If trailers were adding during encodeData we need to trigger decodeTrailers in order
  // to allow filters to process the trailers.
454756
  if (trailers_added_entry != encoder_filters_.end()) {
31
    encodeTrailers(trailers_added_entry->get(), *filter_manager_callbacks_.responseTrailers());
31
  }
454756
}
void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter,
1334
                                   ResponseTrailerMap& trailers) {
1334
  filter_manager_callbacks_.resetIdleTimer();
  // Filter iteration may start at the current filter.
1334
  StreamEncoderFilters::Iterator entry =
1334
      commonEncodePrefix(filter, true, FilterIterationStartState::CanStartFromCurrent);
1629
  for (; entry != encoder_filters_.end(); entry++) {
400
    ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_);
    // If the filter pointed by entry has stopped for all frame type, return now.
400
    if ((*entry)->stoppedAll()) {
13
      return;
13
    }
387
    ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeTrailers));
387
    state_.filter_call_state_ |= FilterCallState::EncodeTrailers;
387
    FilterTrailersStatus status = (*entry)->handle_->encodeTrailers(trailers);
387
    (*entry)->handle_->encodeComplete();
387
    (*entry)->end_stream_ = true;
387
    state_.filter_call_state_ &= ~FilterCallState::EncodeTrailers;
387
    ENVOY_STREAM_LOG(trace, "encode trailers called: filter={} status={}", *this,
387
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
387
    if (!(*entry)->commonHandleAfterTrailersCallback(status)) {
92
      return;
92
    }
387
  }
1229
  filter_manager_callbacks_.encodeTrailers(trailers);
1229
  if (state_.saw_downstream_reset_) {
1
    return;
1
  }
1228
  maybeEndEncode(true);
1228
}
543568
void FilterManager::maybeEndEncode(bool end_stream) {
543568
  if (end_stream) {
84286
    ASSERT(!state_.encoder_filter_chain_complete_);
84286
    state_.encoder_filter_chain_complete_ = true;
84286
    if (filter_manager_callbacks_.isHalfCloseEnabled()) {
      // If independent half close is enabled the stream is closed when both decoder and encoder
      // filter chains has completed or were aborted.
289
      checkAndCloseStreamIfFullyClosed();
84130
    } else {
      // Otherwise encoding end_stream always closes the stream (and resets it if request was not
      // complete yet).
83997
      filter_manager_callbacks_.endStream();
83997
    }
84286
  }
543568
}
572852
void FilterManager::maybeEndDecode(bool terminal_filter_decoded_end_stream) {
572852
  if (terminal_filter_decoded_end_stream) {
81092
    ASSERT(!state_.decoder_filter_chain_complete_);
81092
    state_.decoder_filter_chain_complete_ = true;
    // If the decoder filter chain was aborted (i.e. due to local reply)
    // we rely on the encoding of end_stream to close the stream.
81092
    if (filter_manager_callbacks_.isHalfCloseEnabled() && !stopDecoderFilterChain()) {
238
      checkAndCloseStreamIfFullyClosed();
238
    }
81092
  }
572852
}
527
void FilterManager::checkAndCloseStreamIfFullyClosed() {
527
  ASSERT(filter_manager_callbacks_.isHalfCloseEnabled());
  // When the independent half close is enabled the stream is always closed on error responses
  // from the server.
527
  if (filter_manager_callbacks_.responseHeaders().has_value()) {
432
    const uint64_t response_status =
432
        Http::Utility::getResponseStatus(filter_manager_callbacks_.responseHeaders().ref());
432
    const bool error_response =
432
        !(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status));
    // Abort the decoder filter if it has not yet been completed.
432
    if (error_response && !state_.decoder_filter_chain_complete_) {
24
      state_.decoder_filter_chain_aborted_ = true;
24
    }
432
  }
  // Handle the case where the end_stream was received from both downstream client and upstream
  // server but a decoder filter added either request body or trailers AND paused decoder filter
  // chain iteration. This case is currently handled the same way as if independent half-close is
  // disabled, i.e. proxying is stopped as soon as encoding has finished.
  // TODO(yanavlasov): to support this case the codec needs to notify HCM that it has closed its low
  // level stream
  //                   to avoid use-after-free when HCM cleans-up its ActiveStream
527
  const bool downstream_client_sent_end_stream = decoderObservedEndStream();
527
  const bool decoder_filter_chain_paused =
527
      !state_.decoder_filter_chain_complete_ && !state_.decoder_filter_chain_aborted_;
527
  if (state_.encoder_filter_chain_complete_ && downstream_client_sent_end_stream &&
527
      decoder_filter_chain_paused) {
2
    state_.decoder_filter_chain_aborted_ = true;
2
  }
  // If independent half close is enabled then close the stream when
  // 1. Both encoder and decoder filter chains has completed.
  // 2. Encoder filter chain has completed and decoder filter chain was aborted (i.e. local reply).
  // There is no need to check for aborted encoder filter chain as the filter will either be
  // completed or stream is reset.
527
  if (state_.encoder_filter_chain_complete_ &&
527
      (state_.decoder_filter_chain_complete_ || state_.decoder_filter_chain_aborted_)) {
217
    ENVOY_STREAM_LOG(trace, "closing stream", *this);
217
    filter_manager_callbacks_.endStream();
217
  }
527
}
546886
bool FilterManager::processNewlyAddedMetadata() {
546886
  if (request_metadata_map_vector_ == nullptr) {
545978
    return false;
545978
  }
908
  for (const auto& metadata_map : *getRequestMetadataMapVector()) {
447
    decodeMetadata(nullptr, *metadata_map);
447
  }
908
  getRequestMetadataMapVector()->clear();
908
  return true;
546886
}
bool FilterManager::handleDataIfStopAll(ActiveStreamFilterBase& filter, Buffer::Instance& data,
479086
                                        bool& filter_streaming) {
479086
  if (filter.stoppedAll()) {
16743
    ASSERT(!filter.canIterate());
16743
    filter_streaming =
16743
        filter.iteration_state_ == ActiveStreamFilterBase::IterationState::StopAllWatermark;
16743
    filter.commonHandleBufferData(data);
16743
    return true;
16743
  }
462343
  return false;
479086
}
81357
void FilterManager::callHighWatermarkCallbacks() {
81357
  ++high_watermark_count_;
81364
  for (auto watermark_callbacks : watermark_callbacks_) {
19602
    watermark_callbacks->onAboveWriteBufferHighWatermark();
19602
  }
81357
}
81199
void FilterManager::callLowWatermarkCallbacks() {
81199
  ASSERT(high_watermark_count_ > 0);
81199
  --high_watermark_count_;
81205
  for (auto watermark_callbacks : watermark_callbacks_) {
19474
    watermark_callbacks->onBelowWriteBufferLowWatermark();
19474
  }
81199
}
314
void FilterManager::setBufferLimit(uint64_t new_limit) {
314
  ENVOY_STREAM_LOG(debug, "setting buffer limit to {}", *this, new_limit);
314
  buffer_limit_ = new_limit;
314
  if (buffered_request_data_) {
53
    buffered_request_data_->setWatermarks(buffer_limit_);
53
  }
314
  if (buffered_response_data_) {
1
    buffered_response_data_->setWatermarks(buffer_limit_);
1
  }
314
}
13948
void FilterManager::contextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) {
13948
  if (connection_.has_value()) {
13948
    tracked_object_stack.add(*connection_);
13948
  }
13948
  tracked_object_stack.add(filter_manager_callbacks_.scope());
13948
}
FilterManager::UpgradeResult
FilterManager::createUpgradeFilterChain(const FilterChainFactory& filter_chain_factory,
92779
                                        FilterChainFactoryCallbacksImpl& callbacks) {
92779
  const HeaderEntry* upgrade = nullptr;
92779
  if (filter_manager_callbacks_.requestHeaders()) {
91430
    upgrade = filter_manager_callbacks_.requestHeaders()->Upgrade();
    // Treat CONNECT requests as a special upgrade case.
91430
    if (!upgrade && HeaderUtility::isConnect(*filter_manager_callbacks_.requestHeaders())) {
451
      upgrade = filter_manager_callbacks_.requestHeaders()->Method();
451
    }
91430
  }
92779
  if (upgrade == nullptr) {
    // No upgrade header, no upgrade filter chain.
92127
    return UpgradeResult::UpgradeUnneeded;
92127
  }
652
  const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
652
  return filter_chain_factory.createUpgradeFilterChain(upgrade->value().getStringView(),
652
                                                       upgrade_map, callbacks)
652
             ? UpgradeResult::UpgradeAccepted
652
             : UpgradeResult::UpgradeRejected;
92779
}
FilterManager::CreateChainResult
239626
FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory) {
239626
  if (state_.create_chain_result_.created()) {
7145
    return state_.create_chain_result_;
7145
  }
  // After the filter chain creation is completed, the filter chain containers will not be
  // modified. So, we can safely complete the initialization of the filter chain now.
232481
  Cleanup initialize_filter_chain([this]() {
383153
    for (auto iter = decoder_filters_.begin(); iter != decoder_filters_.end(); ++iter) {
150672
      (*iter)->entry_ = iter;
150672
    }
242087
    for (auto iter = encoder_filters_.begin(); iter != encoder_filters_.end(); ++iter) {
9606
      (*iter)->entry_ = iter;
9606
    }
232481
  });
  // TODO(wbpcode): reserve memory for filters to avoid frequent reallocation.
232481
  OptRef<DownstreamStreamFilterCallbacks> downstream_callbacks =
232481
      filter_manager_callbacks_.downstreamCallbacks();
232481
  FilterChainFactoryCallbacksImpl callbacks(*this);
232481
  UpgradeResult upgrade = UpgradeResult::UpgradeUnneeded;
  // Only try the upgrade filter chain for downstream filter chains.
232481
  if (downstream_callbacks.has_value()) {
92779
    upgrade = createUpgradeFilterChain(filter_chain_factory, callbacks);
92779
    if (upgrade == UpgradeResult::UpgradeAccepted) {
      // Upgrade filter chain is created. Return the result directly.
618
      state_.create_chain_result_ = CreateChainResult(true, upgrade);
618
      return state_.create_chain_result_;
618
    }
    // If the upgrade is unnecessary or the upgrade filter chain is rejected, fall through to
    // create the default filter chain.
92779
  }
231863
  state_.create_chain_result_ =
231863
      CreateChainResult(filter_chain_factory.createFilterChain(callbacks), upgrade);
231863
  return state_.create_chain_result_;
232481
}
133
void ActiveStreamDecoderFilter::requestDataDrained() {
  // If this is called it means the call to requestDataTooLarge() was a
  // streaming call, or a 413 would have been sent.
133
  onDecoderFilterBelowWriteBufferLowWatermark();
133
}
449628
void ActiveStreamDecoderFilter::onDecoderFilterBelowWriteBufferLowWatermark() {
449628
  parent_.filter_manager_callbacks_.onDecoderFilterBelowWriteBufferLowWatermark();
449628
}
void ActiveStreamDecoderFilter::addDownstreamWatermarkCallbacks(
91010
    DownstreamWatermarkCallbacks& watermark_callbacks) {
  // This is called exactly once per upstream-stream, by the router filter. Therefore, we
  // expect the same callbacks to not be registered twice.
91010
  ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(),
91010
                   &watermark_callbacks) == parent_.watermark_callbacks_.end());
91010
  parent_.watermark_callbacks_.emplace(parent_.watermark_callbacks_.end(), &watermark_callbacks);
91015
  for (uint32_t i = 0; i < parent_.high_watermark_count_; ++i) {
5
    watermark_callbacks.onAboveWriteBufferHighWatermark();
5
  }
91010
}
void ActiveStreamDecoderFilter::removeDownstreamWatermarkCallbacks(
90727
    DownstreamWatermarkCallbacks& watermark_callbacks) {
90727
  ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(),
90727
                   &watermark_callbacks) != parent_.watermark_callbacks_.end());
90727
  parent_.watermark_callbacks_.remove(&watermark_callbacks);
90727
}
296
bool ActiveStreamDecoderFilter::recreateStream(const ResponseHeaderMap* headers) {
  // Because the filter's and the HCM view of if the stream has a body and if
  // the stream is complete may differ, re-check bytesReceived() to make sure
  // there was no body from the HCM's point of view.
296
  if (!observedEndStream()) {
    return false;
  }
296
  parent_.state_.decoder_filter_chain_aborted_ = true;
296
  parent_.state_.encoder_filter_chain_aborted_ = true;
296
  parent_.state_.recreated_stream_ = true;
296
  parent_.streamInfo().setResponseCodeDetails(
296
      StreamInfo::ResponseCodeDetails::get().InternalRedirect);
296
  if (headers != nullptr) {
    // The call to setResponseHeaders is needed to ensure that the headers are properly logged in
    // access logs before the stream is destroyed. Since the function expects a
    // ResponseHeaderPtr&&, ownership of the headers must be passed. This cannot happen earlier in
    // the flow (such as in the call to setupRedirect) because at that point it is still possible
    // for the headers to be used in a different logical branch. We work around this by creating a
    // copy and passing ownership of the copy instead.
151
    ResponseHeaderMapPtr headers_copy = createHeaderMap<ResponseHeaderMapImpl>(*headers);
151
    parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers_copy));
151
    parent_.filter_manager_callbacks_.chargeStats(*headers);
151
  }
296
  parent_.filter_manager_callbacks_.recreateStream(parent_.streamInfo().filterState());
296
  return true;
296
}
void ActiveStreamDecoderFilter::addUpstreamSocketOptions(
    const Network::Socket::OptionsSharedPtr& options) {
  Network::Socket::appendOptions(parent_.upstream_options_, options);
}
43690
Network::Socket::OptionsSharedPtr ActiveStreamDecoderFilter::getUpstreamSocketOptions() const {
43690
  return parent_.upstream_options_;
43690
}
895
Buffer::InstancePtr ActiveStreamEncoderFilter::createBuffer() {
895
  auto buffer = dispatcher().getWatermarkFactory().createBuffer(
895
      [this]() -> void { this->responseDataDrained(); },
895
      [this]() -> void { this->responseDataTooLarge(); },
895
      []() -> void { /* TODO(adisuissa): Handle overflow watermark */ });
895
  buffer->setWatermarks(parent_.buffer_limit_);
895
  return buffer;
895
}
22781
Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() {
22781
  return parent_.buffered_response_data_;
22781
}
1805
bool ActiveStreamEncoderFilter::observedEndStream() {
1805
  return parent_.state_.observed_encode_end_stream_;
1805
}
777
bool ActiveStreamEncoderFilter::has1xxHeaders() {
777
  return parent_.state_.has_1xx_headers_ && !continued_1xx_headers_;
777
}
1
void ActiveStreamEncoderFilter::do1xxHeaders() {
1
  parent_.encode1xxHeaders(this, *parent_.filter_manager_callbacks_.informationalHeaders());
1
}
742
void ActiveStreamEncoderFilter::doHeaders(bool end_stream) {
742
  parent_.encodeHeaders(this, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
742
}
721
void ActiveStreamEncoderFilter::doData(bool end_stream) {
721
  parent_.encodeData(this, *parent_.buffered_response_data_, end_stream,
721
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
721
}
13
void ActiveStreamEncoderFilter::drainSavedResponseMetadata() {
13
  ASSERT(saved_response_metadata_ != nullptr);
14
  for (auto& metadata_map : *getSavedResponseMetadata()) {
14
    parent_.encodeMetadata(this, std::move(metadata_map));
14
  }
13
  getSavedResponseMetadata()->clear();
13
}
8840
void ActiveStreamEncoderFilter::handleMetadataAfterHeadersCallback() {
8840
  if (parent_.state_.recreated_stream_) {
    // The stream has been recreated. In this case, there's no reason to encode saved metadata.
87
    getSavedResponseMetadata()->clear();
87
    return;
87
  }
  // If we drain accumulated metadata, the iteration must start with the current filter.
8753
  const bool saved_state = iterate_from_current_filter_;
8753
  iterate_from_current_filter_ = true;
  // If encodeHeaders() returns StopAllIteration, we should skip draining metadata, and wait
  // for doMetadata() to drain the metadata after iteration continues.
8753
  if (!stoppedAll() && saved_response_metadata_ != nullptr &&
8753
      !getSavedResponseMetadata()->empty()) {
13
    drainSavedResponseMetadata();
13
  }
  // Restores the original value of iterate_from_current_filter_.
8753
  iterate_from_current_filter_ = saved_state;
8753
}
107
void ActiveStreamEncoderFilter::doTrailers() {
107
  parent_.encodeTrailers(this, *parent_.filter_manager_callbacks_.responseTrailers());
107
}
876
bool ActiveStreamEncoderFilter::hasTrailers() {
876
  return parent_.filter_manager_callbacks_.responseTrailers().has_value();
876
}
1392
void ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, bool streaming) {
1392
  return parent_.addEncodedData(*this, data, streaming);
1392
}
void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance& data,
1749
                                                               bool end_stream) {
1749
  if (!headers_continued_) {
59
    headers_continued_ = true;
59
    doHeaders(false);
59
  }
1749
  if (Runtime::runtimeFeatureEnabled(
1749
          "envoy.reloadable_features.ext_proc_inject_data_with_state_update")) {
1749
    parent_.state_.observed_encode_end_stream_ = end_stream;
1749
  }
1749
  parent_.encodeData(this, data, end_stream,
1749
                     FilterManager::FilterIterationStartState::CanStartFromCurrent);
1749
}
35
ResponseTrailerMap& ActiveStreamEncoderFilter::addEncodedTrailers() {
35
  return parent_.addEncodedTrailers();
35
}
817
void ActiveStreamEncoderFilter::addEncodedMetadata(MetadataMapPtr&& metadata_map_ptr) {
817
  return parent_.encodeMetadata(this, std::move(metadata_map_ptr));
817
}
62016
void ActiveStreamEncoderFilter::onEncoderFilterAboveWriteBufferHighWatermark() {
62016
  ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to filter callbacks.", parent_);
62016
  parent_.callHighWatermarkCallbacks();
62016
}
62010
void ActiveStreamEncoderFilter::onEncoderFilterBelowWriteBufferLowWatermark() {
62010
  ENVOY_STREAM_LOG(debug, "Enabling upstream stream due to filter callbacks.", parent_);
62010
  parent_.callLowWatermarkCallbacks();
62010
}
523
void ActiveStreamEncoderFilter::continueEncoding() { commonContinue(); }
798
const Buffer::Instance* ActiveStreamEncoderFilter::encodingBuffer() {
798
  return parent_.buffered_response_data_.get();
798
}
void ActiveStreamEncoderFilter::modifyEncodingBuffer(
112
    std::function<void(Buffer::Instance&)> callback) {
112
  ASSERT(parent_.state_.latest_data_encoding_filter_ == this);
112
  callback(*parent_.buffered_response_data_.get());
112
}
void ActiveStreamEncoderFilter::sendLocalReply(
    Code code, absl::string_view body,
    std::function<void(ResponseHeaderMap& headers)> modify_headers,
253
    const absl::optional<Grpc::Status::GrpcStatus> grpc_status, absl::string_view details) {
253
  ActiveStreamFilterBase::sendLocalReply(code, body, modify_headers, grpc_status, details);
253
}
125
void ActiveStreamEncoderFilter::responseDataTooLarge() {
125
  ENVOY_STREAM_LOG(debug, "response data too large watermark exceeded", parent_);
125
  if (parent_.state_.encoder_filters_streaming_) {
33
    onEncoderFilterAboveWriteBufferHighWatermark();
98
  } else {
92
    parent_.filter_manager_callbacks_.onResponseDataTooLarge();
    // In this case, sendLocalReply will either send a response directly to the encoder, or
    // reset the stream.
92
    parent_.sendLocalReply(
92
        Http::Code::InternalServerError, CodeUtility::toString(Http::Code::InternalServerError),
92
        nullptr, absl::nullopt, StreamInfo::ResponseCodeDetails::get().ResponsePayloadTooLarge);
92
  }
125
}
30
void ActiveStreamEncoderFilter::responseDataDrained() {
30
  onEncoderFilterBelowWriteBufferLowWatermark();
30
}
void FilterManager::resetStream(StreamResetReason reason,
2522
                                absl::string_view transport_failure_reason) {
  // Stop filter chain iteration if stream is reset while filter decoding or encoding callbacks
  // are running.
2522
  if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
46
    state_.decoder_filter_chain_aborted_ = true;
2476
  } else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
8
    state_.encoder_filter_chain_aborted_ = true;
8
  }
2522
  filter_manager_callbacks_.resetStream(reason, transport_failure_reason);
2522
}
74711
bool FilterManager::isTerminalDecoderFilter(const ActiveStreamDecoderFilter& filter) const {
74711
  return !decoder_filters_.entries_.empty() && decoder_filters_.entries_.back().get() == &filter;
74711
}
void ActiveStreamFilterBase::resetStream(Http::StreamResetReason reset_reason,
2522
                                         absl::string_view transport_failure_reason) {
2522
  parent_.resetStream(reset_reason, transport_failure_reason);
2522
}
172098
uint64_t ActiveStreamFilterBase::streamId() const { return parent_.streamId(); }
86871
Buffer::BufferMemoryAccountSharedPtr ActiveStreamDecoderFilter::account() const {
86871
  return parent_.account();
86871
}
void ActiveStreamDecoderFilter::setUpstreamOverrideHost(
19
    Upstream::LoadBalancerContext::OverrideHost upstream_override_host) {
19
  parent_.upstream_override_host_.first.assign(upstream_override_host.first);
19
  parent_.upstream_override_host_.second = upstream_override_host.second;
19
}
absl::optional<Upstream::LoadBalancerContext::OverrideHost>
44102
ActiveStreamDecoderFilter::upstreamOverrideHost() const {
44102
  if (parent_.upstream_override_host_.first.empty()) {
44083
    return absl::nullopt;
44083
  }
19
  return Upstream::LoadBalancerContext::OverrideHost{
19
      absl::string_view(parent_.upstream_override_host_.first),
19
      parent_.upstream_override_host_.second};
44102
}
} // namespace Http
} // namespace Envoy