/proc/self/cwd/source/common/router/upstream_codec_filter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/router/upstream_codec_filter.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <functional> |
6 | | #include <memory> |
7 | | #include <string> |
8 | | |
9 | | #include "envoy/event/dispatcher.h" |
10 | | #include "envoy/event/timer.h" |
11 | | #include "envoy/grpc/status.h" |
12 | | #include "envoy/http/header_map.h" |
13 | | |
14 | | #include "source/common/common/assert.h" |
15 | | #include "source/common/common/dump_state_utils.h" |
16 | | #include "source/common/common/empty_string.h" |
17 | | #include "source/common/common/enum_to_int.h" |
18 | | #include "source/common/common/scope_tracker.h" |
19 | | #include "source/common/common/utility.h" |
20 | | #include "source/common/grpc/common.h" |
21 | | #include "source/common/http/codes.h" |
22 | | #include "source/common/http/header_map_impl.h" |
23 | | #include "source/common/http/headers.h" |
24 | | #include "source/common/http/message_impl.h" |
25 | | #include "source/common/http/utility.h" |
26 | | |
27 | | namespace Envoy { |
28 | | namespace Router { |
29 | | |
30 | 0 | void UpstreamCodecFilter::onBelowWriteBufferLowWatermark() { |
31 | 0 | callbacks_->clusterInfo()->trafficStats()->upstream_flow_control_resumed_reading_total_.inc(); |
32 | 0 | callbacks_->upstreamCallbacks()->upstream()->readDisable(false); |
33 | 0 | } |
34 | | |
35 | 0 | void UpstreamCodecFilter::onAboveWriteBufferHighWatermark() { |
36 | 0 | callbacks_->clusterInfo()->trafficStats()->upstream_flow_control_paused_reading_total_.inc(); |
37 | 0 | callbacks_->upstreamCallbacks()->upstream()->readDisable(true); |
38 | 0 | } |
39 | | |
40 | 2.32k | void UpstreamCodecFilter::onUpstreamConnectionEstablished() { |
41 | 2.32k | if (latched_end_stream_.has_value()) { |
42 | 1.28k | const bool end_stream = *latched_end_stream_; |
43 | 1.28k | latched_end_stream_.reset(); |
44 | 1.28k | Http::FilterHeadersStatus status = decodeHeaders(*latched_headers_, end_stream); |
45 | 1.28k | if (status == Http::FilterHeadersStatus::Continue) { |
46 | 1.28k | callbacks_->continueDecoding(); |
47 | 1.28k | } |
48 | 1.28k | } |
49 | 2.32k | } |
50 | | |
51 | | // This is the last stop in the filter chain: take the headers and ship them to the codec. |
52 | | Http::FilterHeadersStatus UpstreamCodecFilter::decodeHeaders(Http::RequestHeaderMap& headers, |
53 | 3.68k | bool end_stream) { |
54 | 3.68k | ASSERT(callbacks_->upstreamCallbacks()); |
55 | 3.68k | if (!callbacks_->upstreamCallbacks()->upstream()) { |
56 | 1.35k | latched_headers_ = headers; |
57 | 1.35k | latched_end_stream_ = end_stream; |
58 | 1.35k | return Http::FilterHeadersStatus::StopAllIterationAndWatermark; |
59 | 1.35k | } |
60 | | |
61 | 2.32k | ENVOY_STREAM_LOG(trace, "proxying headers", *callbacks_); |
62 | 2.32k | calling_encode_headers_ = true; |
63 | 2.32k | const Http::Status status = |
64 | 2.32k | callbacks_->upstreamCallbacks()->upstream()->encodeHeaders(headers, end_stream); |
65 | | |
66 | 2.32k | calling_encode_headers_ = false; |
67 | 2.32k | if (!status.ok() || deferred_reset_) { |
68 | 0 | deferred_reset_ = false; |
69 | | // It is possible that encodeHeaders() fails. This can happen if filters or other extensions |
70 | | // erroneously remove required headers. |
71 | 0 | callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError); |
72 | 0 | const std::string details = |
73 | 0 | absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredRequestHeaders, |
74 | 0 | "{", StringUtil::replaceAllEmptySpace(status.message()), "}"); |
75 | 0 | callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, status.message(), nullptr, |
76 | 0 | absl::nullopt, details); |
77 | 0 | return Http::FilterHeadersStatus::StopIteration; |
78 | 0 | } |
79 | 2.32k | upstreamTiming().onFirstUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); |
80 | | |
81 | 2.32k | if (end_stream) { |
82 | 805 | upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); |
83 | 805 | } |
84 | 2.32k | if (callbacks_->upstreamCallbacks()->pausedForConnect()) { |
85 | 0 | return Http::FilterHeadersStatus::StopAllIterationAndWatermark; |
86 | 0 | } |
87 | 2.32k | return Http::FilterHeadersStatus::Continue; |
88 | 2.32k | } |
89 | | |
90 | | // This is the last stop in the filter chain: take the data and ship it to the codec. |
91 | 4.36k | Http::FilterDataStatus UpstreamCodecFilter::decodeData(Buffer::Instance& data, bool end_stream) { |
92 | 4.36k | ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); |
93 | 4.36k | ENVOY_STREAM_LOG(trace, "proxying {} bytes", *callbacks_, data.length()); |
94 | 4.36k | callbacks_->upstreamCallbacks()->upstreamStreamInfo().addBytesSent(data.length()); |
95 | | // TODO(alyssawilk) test intermediate filters calling continue. |
96 | 4.36k | callbacks_->upstreamCallbacks()->upstream()->encodeData(data, end_stream); |
97 | 4.36k | if (end_stream) { |
98 | 1.35k | upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); |
99 | 1.35k | } |
100 | 4.36k | return Http::FilterDataStatus::Continue; |
101 | 4.36k | } |
102 | | |
103 | | // This is the last stop in the filter chain: take the trailers and ship them to the codec. |
104 | 0 | Http::FilterTrailersStatus UpstreamCodecFilter::decodeTrailers(Http::RequestTrailerMap& trailers) { |
105 | 0 | ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); |
106 | 0 | ENVOY_STREAM_LOG(trace, "proxying trailers", *callbacks_); |
107 | 0 | callbacks_->upstreamCallbacks()->upstream()->encodeTrailers(trailers); |
108 | 0 | upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); |
109 | 0 | return Http::FilterTrailersStatus::Continue; |
110 | 0 | } |
111 | | |
112 | | // This is the last stop in the filter chain: take the metadata and ship them to the codec. |
113 | 4 | Http::FilterMetadataStatus UpstreamCodecFilter::decodeMetadata(Http::MetadataMap& metadata_map) { |
114 | 4 | ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); |
115 | 4 | ENVOY_STREAM_LOG(trace, "proxying metadata", *callbacks_); |
116 | 4 | Http::MetadataMapVector metadata_map_vector; |
117 | 4 | metadata_map_vector.emplace_back(std::make_unique<Http::MetadataMap>(metadata_map)); |
118 | 4 | callbacks_->upstreamCallbacks()->upstream()->encodeMetadata(metadata_map_vector); |
119 | 4 | return Http::FilterMetadataStatus::Continue; |
120 | 4 | } |
121 | | |
122 | | // Store the callbacks from the UpstreamFilterManager, for sending the response to. |
123 | 2.40k | void UpstreamCodecFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { |
124 | 2.40k | callbacks_ = &callbacks; |
125 | 2.40k | callbacks_->addDownstreamWatermarkCallbacks(*this); |
126 | 2.40k | callbacks_->upstreamCallbacks()->addUpstreamCallbacks(*this); |
127 | 2.40k | callbacks_->upstreamCallbacks()->setUpstreamToDownstream(bridge_); |
128 | 2.40k | } |
129 | | |
130 | | // This is the response 1xx headers arriving from the codec. Send them through the filter manager. |
131 | 1 | void UpstreamCodecFilter::CodecBridge::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) { |
132 | | // The filter manager can not handle more than 1 1xx header, so only forward |
133 | | // the first one. |
134 | 1 | if (!seen_1xx_headers_) { |
135 | 1 | seen_1xx_headers_ = true; |
136 | 1 | filter_.callbacks_->encode1xxHeaders(std::move(headers)); |
137 | 1 | } |
138 | 1 | } |
139 | | |
140 | | // This is the response headers arriving from the codec. Send them through the filter manager. |
141 | | void UpstreamCodecFilter::CodecBridge::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, |
142 | 2.22k | bool end_stream) { |
143 | | // TODO(rodaine): This is actually measuring after the headers are parsed and not the first |
144 | | // byte. |
145 | 2.22k | filter_.upstreamTiming().onFirstUpstreamRxByteReceived( |
146 | 2.22k | filter_.callbacks_->dispatcher().timeSource()); |
147 | | |
148 | 2.22k | if (filter_.callbacks_->upstreamCallbacks()->pausedForConnect() && |
149 | 2.22k | ((Http::Utility::getResponseStatus(*headers) == 200) || |
150 | 0 | ((Runtime::runtimeFeatureEnabled( |
151 | 0 | "envoy.reloadable_features.upstream_allow_connect_with_2xx")) && |
152 | 0 | (Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(*headers)))))) { |
153 | 0 | filter_.callbacks_->upstreamCallbacks()->setPausedForConnect(false); |
154 | 0 | filter_.callbacks_->continueDecoding(); |
155 | 0 | } |
156 | | |
157 | 2.22k | maybeEndDecode(end_stream); |
158 | 2.22k | filter_.callbacks_->encodeHeaders(std::move(headers), end_stream, |
159 | 2.22k | StreamInfo::ResponseCodeDetails::get().ViaUpstream); |
160 | 2.22k | } |
161 | | |
162 | | // This is response data arriving from the codec. Send it through the filter manager. |
163 | 4.28k | void UpstreamCodecFilter::CodecBridge::decodeData(Buffer::Instance& data, bool end_stream) { |
164 | 4.28k | maybeEndDecode(end_stream); |
165 | 4.28k | filter_.callbacks_->encodeData(data, end_stream); |
166 | 4.28k | } |
167 | | |
168 | | // This is response trailers arriving from the codec. Send them through the filter manager. |
169 | 3 | void UpstreamCodecFilter::CodecBridge::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) { |
170 | 3 | maybeEndDecode(true); |
171 | 3 | filter_.callbacks_->encodeTrailers(std::move(trailers)); |
172 | 3 | } |
173 | | |
174 | | // This is response metadata arriving from the codec. Send it through the filter manager. |
175 | 181 | void UpstreamCodecFilter::CodecBridge::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { |
176 | 181 | filter_.callbacks_->encodeMetadata(std::move(metadata_map)); |
177 | 181 | } |
178 | | |
179 | 0 | void UpstreamCodecFilter::CodecBridge::dumpState(std::ostream& os, int indent_level) const { |
180 | 0 | filter_.callbacks_->upstreamCallbacks()->dumpState(os, indent_level); |
181 | 0 | } |
182 | | |
183 | 6.51k | void UpstreamCodecFilter::CodecBridge::maybeEndDecode(bool end_stream) { |
184 | 6.51k | if (end_stream) { |
185 | 1.18k | filter_.upstreamTiming().onLastUpstreamRxByteReceived( |
186 | 1.18k | filter_.callbacks_->dispatcher().timeSource()); |
187 | 1.18k | } |
188 | 6.51k | } |
189 | | |
190 | | REGISTER_FACTORY(UpstreamCodecFilterFactory, |
191 | | Server::Configuration::UpstreamHttpFilterConfigFactory); |
192 | | |
193 | | } // namespace Router |
194 | | } // namespace Envoy |