Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/router/upstream_codec_filter.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/router/upstream_codec_filter.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11
#include "envoy/grpc/status.h"
12
#include "envoy/http/header_map.h"
13
14
#include "source/common/common/assert.h"
15
#include "source/common/common/dump_state_utils.h"
16
#include "source/common/common/empty_string.h"
17
#include "source/common/common/enum_to_int.h"
18
#include "source/common/common/scope_tracker.h"
19
#include "source/common/common/utility.h"
20
#include "source/common/grpc/common.h"
21
#include "source/common/http/codes.h"
22
#include "source/common/http/header_map_impl.h"
23
#include "source/common/http/headers.h"
24
#include "source/common/http/message_impl.h"
25
#include "source/common/http/utility.h"
26
27
namespace Envoy {
28
namespace Router {
29
30
0
void UpstreamCodecFilter::onBelowWriteBufferLowWatermark() {
31
0
  callbacks_->clusterInfo()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc();
32
0
  callbacks_->upstreamCallbacks()->upstream()->readDisable(false);
33
0
}
34
35
0
void UpstreamCodecFilter::onAboveWriteBufferHighWatermark() {
36
0
  callbacks_->clusterInfo()->trafficStats()->upstream_flow_control_paused_reading_total_.inc();
37
0
  callbacks_->upstreamCallbacks()->upstream()->readDisable(true);
38
0
}
39
40
2.32k
void UpstreamCodecFilter::onUpstreamConnectionEstablished() {
41
2.32k
  if (latched_end_stream_.has_value()) {
42
1.28k
    const bool end_stream = *latched_end_stream_;
43
1.28k
    latched_end_stream_.reset();
44
1.28k
    Http::FilterHeadersStatus status = decodeHeaders(*latched_headers_, end_stream);
45
1.28k
    if (status == Http::FilterHeadersStatus::Continue) {
46
1.28k
      callbacks_->continueDecoding();
47
1.28k
    }
48
1.28k
  }
49
2.32k
}
50
51
// This is the last stop in the filter chain: take the headers and ship them to the codec.
52
Http::FilterHeadersStatus UpstreamCodecFilter::decodeHeaders(Http::RequestHeaderMap& headers,
53
3.68k
                                                             bool end_stream) {
54
3.68k
  ASSERT(callbacks_->upstreamCallbacks());
55
3.68k
  if (!callbacks_->upstreamCallbacks()->upstream()) {
56
1.35k
    latched_headers_ = headers;
57
1.35k
    latched_end_stream_ = end_stream;
58
1.35k
    return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
59
1.35k
  }
60
61
2.32k
  ENVOY_STREAM_LOG(trace, "proxying headers", *callbacks_);
62
2.32k
  calling_encode_headers_ = true;
63
2.32k
  const Http::Status status =
64
2.32k
      callbacks_->upstreamCallbacks()->upstream()->encodeHeaders(headers, end_stream);
65
66
2.32k
  calling_encode_headers_ = false;
67
2.32k
  if (!status.ok() || deferred_reset_) {
68
0
    deferred_reset_ = false;
69
    // It is possible that encodeHeaders() fails. This can happen if filters or other extensions
70
    // erroneously remove required headers.
71
0
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError);
72
0
    const std::string details =
73
0
        absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredRequestHeaders,
74
0
                     "{", StringUtil::replaceAllEmptySpace(status.message()), "}");
75
0
    callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, status.message(), nullptr,
76
0
                               absl::nullopt, details);
77
0
    return Http::FilterHeadersStatus::StopIteration;
78
0
  }
79
2.32k
  upstreamTiming().onFirstUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
80
81
2.32k
  if (end_stream) {
82
805
    upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
83
805
  }
84
2.32k
  if (callbacks_->upstreamCallbacks()->pausedForConnect()) {
85
0
    return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
86
0
  }
87
2.32k
  return Http::FilterHeadersStatus::Continue;
88
2.32k
}
89
90
// This is the last stop in the filter chain: take the data and ship it to the codec.
91
4.36k
Http::FilterDataStatus UpstreamCodecFilter::decodeData(Buffer::Instance& data, bool end_stream) {
92
4.36k
  ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect());
93
4.36k
  ENVOY_STREAM_LOG(trace, "proxying {} bytes", *callbacks_, data.length());
94
4.36k
  callbacks_->upstreamCallbacks()->upstreamStreamInfo().addBytesSent(data.length());
95
  // TODO(alyssawilk) test intermediate filters calling continue.
96
4.36k
  callbacks_->upstreamCallbacks()->upstream()->encodeData(data, end_stream);
97
4.36k
  if (end_stream) {
98
1.35k
    upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
99
1.35k
  }
100
4.36k
  return Http::FilterDataStatus::Continue;
101
4.36k
}
102
103
// This is the last stop in the filter chain: take the trailers and ship them to the codec.
104
0
Http::FilterTrailersStatus UpstreamCodecFilter::decodeTrailers(Http::RequestTrailerMap& trailers) {
105
0
  ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect());
106
0
  ENVOY_STREAM_LOG(trace, "proxying trailers", *callbacks_);
107
0
  callbacks_->upstreamCallbacks()->upstream()->encodeTrailers(trailers);
108
0
  upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
109
0
  return Http::FilterTrailersStatus::Continue;
110
0
}
111
112
// This is the last stop in the filter chain: take the metadata and ship them to the codec.
113
4
Http::FilterMetadataStatus UpstreamCodecFilter::decodeMetadata(Http::MetadataMap& metadata_map) {
114
4
  ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect());
115
4
  ENVOY_STREAM_LOG(trace, "proxying metadata", *callbacks_);
116
4
  Http::MetadataMapVector metadata_map_vector;
117
4
  metadata_map_vector.emplace_back(std::make_unique<Http::MetadataMap>(metadata_map));
118
4
  callbacks_->upstreamCallbacks()->upstream()->encodeMetadata(metadata_map_vector);
119
4
  return Http::FilterMetadataStatus::Continue;
120
4
}
121
122
// Store the callbacks from the UpstreamFilterManager, for sending the response to.
123
2.40k
void UpstreamCodecFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
124
2.40k
  callbacks_ = &callbacks;
125
2.40k
  callbacks_->addDownstreamWatermarkCallbacks(*this);
126
2.40k
  callbacks_->upstreamCallbacks()->addUpstreamCallbacks(*this);
127
2.40k
  callbacks_->upstreamCallbacks()->setUpstreamToDownstream(bridge_);
128
2.40k
}
129
130
// This is the response 1xx headers arriving from the codec. Send them through the filter manager.
131
1
void UpstreamCodecFilter::CodecBridge::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
132
  // The filter manager can not handle more than 1 1xx header, so only forward
133
  // the first one.
134
1
  if (!seen_1xx_headers_) {
135
1
    seen_1xx_headers_ = true;
136
1
    filter_.callbacks_->encode1xxHeaders(std::move(headers));
137
1
  }
138
1
}
139
140
// This is the response headers arriving from the codec. Send them through the filter manager.
141
void UpstreamCodecFilter::CodecBridge::decodeHeaders(Http::ResponseHeaderMapPtr&& headers,
142
2.22k
                                                     bool end_stream) {
143
  // TODO(rodaine): This is actually measuring after the headers are parsed and not the first
144
  // byte.
145
2.22k
  filter_.upstreamTiming().onFirstUpstreamRxByteReceived(
146
2.22k
      filter_.callbacks_->dispatcher().timeSource());
147
148
2.22k
  if (filter_.callbacks_->upstreamCallbacks()->pausedForConnect() &&
149
2.22k
      ((Http::Utility::getResponseStatus(*headers) == 200) ||
150
0
       ((Runtime::runtimeFeatureEnabled(
151
0
            "envoy.reloadable_features.upstream_allow_connect_with_2xx")) &&
152
0
        (Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(*headers)))))) {
153
0
    filter_.callbacks_->upstreamCallbacks()->setPausedForConnect(false);
154
0
    filter_.callbacks_->continueDecoding();
155
0
  }
156
157
2.22k
  maybeEndDecode(end_stream);
158
2.22k
  filter_.callbacks_->encodeHeaders(std::move(headers), end_stream,
159
2.22k
                                    StreamInfo::ResponseCodeDetails::get().ViaUpstream);
160
2.22k
}
161
162
// This is response data arriving from the codec. Send it through the filter manager.
163
4.28k
void UpstreamCodecFilter::CodecBridge::decodeData(Buffer::Instance& data, bool end_stream) {
164
4.28k
  maybeEndDecode(end_stream);
165
4.28k
  filter_.callbacks_->encodeData(data, end_stream);
166
4.28k
}
167
168
// This is response trailers arriving from the codec. Send them through the filter manager.
169
3
void UpstreamCodecFilter::CodecBridge::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
170
3
  maybeEndDecode(true);
171
3
  filter_.callbacks_->encodeTrailers(std::move(trailers));
172
3
}
173
174
// This is response metadata arriving from the codec. Send it through the filter manager.
175
181
void UpstreamCodecFilter::CodecBridge::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
176
181
  filter_.callbacks_->encodeMetadata(std::move(metadata_map));
177
181
}
178
179
0
void UpstreamCodecFilter::CodecBridge::dumpState(std::ostream& os, int indent_level) const {
180
0
  filter_.callbacks_->upstreamCallbacks()->dumpState(os, indent_level);
181
0
}
182
183
6.51k
void UpstreamCodecFilter::CodecBridge::maybeEndDecode(bool end_stream) {
184
6.51k
  if (end_stream) {
185
1.18k
    filter_.upstreamTiming().onLastUpstreamRxByteReceived(
186
1.18k
        filter_.callbacks_->dispatcher().timeSource());
187
1.18k
  }
188
6.51k
}
189
190
REGISTER_FACTORY(UpstreamCodecFilterFactory,
191
                 Server::Configuration::UpstreamHttpFilterConfigFactory);
192
193
} // namespace Router
194
} // namespace Envoy