// match, then we must be processing a new filter for the first time. We omit this check if we're
if (current_filter != filters.begin() && latest_filter == std::prev(current_filter)->get()) {
// It is possible for trailers to be added during doData(). doData() itself handles continuation
bool ActiveStreamFilterBase::commonHandleAfter1xxHeadersCallback(Filter1xxHeadersStatus status) {
// which is used for both encoding and decoding. When data first comes into our filter pipeline,
bool ActiveStreamFilterBase::commonHandleAfterTrailersCallback(FilterTrailersStatus status) {
StreamInfo::StreamInfo& ActiveStreamFilterBase::streamInfo() { return parent_.streamInfo(); }
void ActiveStreamFilterBase::setBufferLimit(uint64_t limit) { parent_.setBufferLimit(limit); }
if (!streamInfo().filterState()->hasData<LocalReplyOwnerObject>(LocalReplyFilterStateKey)) {
return !parent_.state_.encoder_filter_chain_complete_ && !parent_.stopEncoderFilterChain();
bool ActiveStreamDecoderFilter::observedEndStream() { return parent_.decoderObservedEndStream(); }
parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream);
if (!stoppedAll() && saved_request_metadata_ != nullptr && !getSavedRequestMetadata()->empty()) {
bool ActiveStreamDecoderFilter::shouldLoadShed() const { return parent_.shouldLoadShed(); }
// here. This avoids the potential situation where Envoy strips Expect: 100-Continue and sends a
parent_.encode1xxHeaders(nullptr, *parent_.filter_manager_callbacks_.informationalHeaders());
// Encoding end_stream by a non-terminal filters (i.e. cache filter) always causes the decoding to
// be stopped even if independent half-close is enabled. For simplicity, independent half-close is
parent_.encodeHeaders(nullptr, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
sendLocalReply(Code::PayloadTooLarge, CodeUtility::toString(Code::PayloadTooLarge), nullptr,
void FilterManager::maybeContinueDecoding(StreamDecoderFilters::Iterator continue_data_entry) {
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
// in headers by inserting an empty data frame with end_stream set. The empty data frame is sent
if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
const bool trailers_exists_at_start = filter_manager_callbacks_.requestTrailers().has_value();
StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, filter_iteration_start_state);
// is called in decodeData during a previous filter invocation, at which point we communicate to
ENVOY_STREAM_LOG(trace, "decodeData filter iteration aborted due to local reply: filter={}",
terminal_filter_decoded_end_stream = end_stream && std::next(entry) == decoder_filters_.end();
if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) &&
MetadataMapVector& FilterManager::addDecodedMetadata() { return *getRequestMetadataMapVector(); }
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers) {
void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) {
ENVOY_STREAM_LOG(trace, "decode metadata called: filter={} status={}, metadata: {}", *this,
void FilterManager::disarmRequestTimeout() { filter_manager_callbacks_.disarmRequestTimeout(); }
ENVOY_STREAM_LOG(trace, "commonEncodePrefix end_stream: {}, isHalfCloseEnabled: {}", *this,
// Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks
// send local replies immediately rather than queuing them. This ensures proper handling of the
// reversed connection flow and prevents potential issues with connection state and filter chain
prepareLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
sendLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request,
ENVOY_STREAM_LOG(debug, "Sending local reply with details {} directly to the encoder", *this,
ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. Prior headers have already been sent",
FilterManager::CreateChainResult DownstreamFilterManager::createDownstreamFilterChain() {
Utility::LocalReplyData{state_.is_grpc_request_, code, body, grpc_status, is_head_request});
void FilterManager::maybeContinueEncoding(StreamEncoderFilters::Iterator continue_data_entry) {
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
if (end_stream && buffered_response_data_ && continue_data_entry == encoder_filters_.end()) {
// Check if the filter chain above did not remove critical headers or set malformed header values.
const bool modified_end_stream = (end_stream && continue_data_entry == encoder_filters_.end());
((state_.filter_call_state_ & FilterCallState::EncodeTrailers) && !filter.canIterate())) {
const bool trailers_exists_at_start = filter_manager_callbacks_.responseTrailers().has_value();
// is called in encodeData during a previous filter invocation, at which point we communicate to
ENVOY_STREAM_LOG(trace, "encodeData filter iteration aborted due to local reply: filter={}",
if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.encoder_filters_streaming_)) {
const bool modified_end_stream = end_stream && trailers_added_entry == encoder_filters_.end();
// chain iteration. This case is currently handled the same way as if independent half-close is
// TODO(yanavlasov): to support this case the codec needs to notify HCM that it has closed its low
// 2. Encoder filter chain has completed and decoder filter chain was aborted (i.e. local reply).
bool FilterManager::handleDataIfStopAll(ActiveStreamFilterBase& filter, Buffer::Instance& data,
const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(),
parent_.watermark_callbacks_.emplace(parent_.watermark_callbacks_.end(), &watermark_callbacks);
ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(),
Network::Socket::OptionsSharedPtr ActiveStreamDecoderFilter::getUpstreamSocketOptions() const {
parent_.encodeHeaders(this, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
bool FilterManager::isTerminalDecoderFilter(const ActiveStreamDecoderFilter& filter) const {
return !decoder_filters_.entries_.empty() && decoder_filters_.entries_.back().get() == &filter;