/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h" |
2 | | |
3 | | #include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" |
4 | | |
5 | | namespace Envoy { |
6 | | namespace Extensions { |
7 | | namespace NetworkFilters { |
8 | | namespace ThriftProxy { |
9 | | namespace Router { |
10 | | |
11 | | UpstreamRequest::UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data, |
12 | | MessageMetadataSharedPtr& metadata, TransportType transport_type, |
13 | | ProtocolType protocol_type, bool close_downstream_on_error) |
14 | | : parent_(parent), stats_(parent.stats()), conn_pool_data_(pool_data), metadata_(metadata), |
15 | | transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()), |
16 | | protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()), |
17 | | request_complete_(false), response_underflow_(false), charged_response_timing_(false), |
18 | 0 | close_downstream_on_error_(close_downstream_on_error) {} |
19 | | |
20 | 0 | UpstreamRequest::~UpstreamRequest() { |
21 | 0 | if (conn_pool_handle_) { |
22 | 0 | conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); |
23 | 0 | } |
24 | 0 | } |
25 | | |
26 | 0 | FilterStatus UpstreamRequest::start() { |
27 | 0 | Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this); |
28 | 0 | if (handle) { |
29 | | // Pause while we wait for a connection. |
30 | 0 | conn_pool_handle_ = handle; |
31 | 0 | return FilterStatus::StopIteration; |
32 | 0 | } |
33 | | |
34 | 0 | if (upgrade_response_ != nullptr) { |
35 | | // Pause while we wait for an upgrade response. |
36 | 0 | return FilterStatus::StopIteration; |
37 | 0 | } |
38 | | |
39 | 0 | if (upstream_host_ == nullptr) { |
40 | 0 | return FilterStatus::StopIteration; |
41 | 0 | } |
42 | | |
43 | 0 | return FilterStatus::Continue; |
44 | 0 | } |
45 | | |
46 | 0 | void UpstreamRequest::releaseConnection(const bool close) { |
47 | 0 | ENVOY_LOG(debug, "releasing connection, close: {}", close); |
48 | 0 | if (conn_pool_handle_) { |
49 | 0 | conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); |
50 | 0 | conn_pool_handle_ = nullptr; |
51 | 0 | } |
52 | |
|
53 | 0 | conn_state_ = nullptr; |
54 | | |
55 | | // The event triggered by close will also release this connection so clear conn_data_ before |
56 | | // closing. |
57 | 0 | auto conn_data = std::move(conn_data_); |
58 | 0 | if (close && conn_data != nullptr) { |
59 | 0 | conn_data->connection().close(Network::ConnectionCloseType::NoFlush); |
60 | 0 | } |
61 | 0 | } |
62 | | |
63 | 0 | void UpstreamRequest::resetStream() { |
64 | 0 | ENVOY_LOG(debug, "reset stream"); |
65 | 0 | releaseConnection(true); |
66 | 0 | } |
67 | | |
68 | | void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, |
69 | 0 | Upstream::HostDescriptionConstSharedPtr host) { |
70 | 0 | ENVOY_LOG(debug, "on pool failure"); |
71 | 0 | conn_pool_handle_ = nullptr; |
72 | | |
73 | | // Mimic an upstream reset. |
74 | 0 | onUpstreamHostSelected(host); |
75 | 0 | if (!onResetStream(reason)) { |
76 | 0 | parent_.continueDecoding(); |
77 | 0 | } |
78 | 0 | } |
79 | | |
80 | | void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, |
81 | 0 | Upstream::HostDescriptionConstSharedPtr host) { |
82 | | // Only invoke continueDecoding if we'd previously stopped the filter chain. |
83 | 0 | bool continue_decoding = conn_pool_handle_ != nullptr; |
84 | |
|
85 | 0 | onUpstreamHostSelected(host); |
86 | 0 | host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess); |
87 | |
|
88 | 0 | conn_data_ = std::move(conn_data); |
89 | 0 | conn_data_->addUpstreamCallbacks(parent_.upstreamCallbacks()); |
90 | 0 | conn_pool_handle_ = nullptr; |
91 | |
|
92 | 0 | conn_state_ = conn_data_->connectionStateTyped<ThriftConnectionState>(); |
93 | 0 | if (conn_state_ == nullptr) { |
94 | 0 | conn_data_->setConnectionState(std::make_unique<ThriftConnectionState>()); |
95 | 0 | conn_state_ = conn_data_->connectionStateTyped<ThriftConnectionState>(); |
96 | 0 | } |
97 | |
|
98 | 0 | if (protocol_->supportsUpgrade()) { |
99 | 0 | auto& buffer = parent_.buffer(); |
100 | 0 | upgrade_response_ = protocol_->attemptUpgrade(*transport_, *conn_state_, buffer); |
101 | 0 | if (upgrade_response_ != nullptr) { |
102 | 0 | parent_.addSize(buffer.length()); |
103 | 0 | conn_data_->connection().write(buffer, false); |
104 | 0 | return; |
105 | 0 | } |
106 | 0 | } |
107 | | |
108 | 0 | onRequestStart(continue_decoding); |
109 | 0 | } |
110 | | |
111 | 0 | void UpstreamRequest::handleUpgradeResponse(Buffer::Instance& data) { |
112 | 0 | ENVOY_LOG(trace, "reading upgrade response: {} bytes", data.length()); |
113 | 0 | if (!upgrade_response_->onData(data)) { |
114 | | // Wait for more data. |
115 | 0 | return; |
116 | 0 | } |
117 | | |
118 | 0 | ENVOY_LOG(debug, "upgrade response complete"); |
119 | 0 | protocol_->completeUpgrade(*conn_state_, *upgrade_response_); |
120 | 0 | upgrade_response_.reset(); |
121 | 0 | onRequestStart(true); |
122 | 0 | } |
123 | | |
124 | | ThriftFilters::ResponseStatus |
125 | | UpstreamRequest::handleRegularResponse(Buffer::Instance& data, |
126 | 0 | UpstreamResponseCallbacks& callbacks) { |
127 | 0 | ENVOY_LOG(trace, "reading response: {} bytes", data.length()); |
128 | |
|
129 | 0 | if (response_state_ == ResponseState::None) { |
130 | 0 | callbacks.startUpstreamResponse(*transport_, *protocol_); |
131 | 0 | response_state_ = ResponseState::Started; |
132 | 0 | } |
133 | |
|
134 | 0 | const auto& cluster = parent_.cluster(); |
135 | |
|
136 | 0 | const auto status = callbacks.upstreamData(data); |
137 | 0 | if (status == ThriftFilters::ResponseStatus::Complete) { |
138 | |
|
139 | 0 | stats_.recordUpstreamResponseSize(cluster, response_size_); |
140 | |
|
141 | 0 | switch (callbacks.responseMetadata()->messageType()) { |
142 | 0 | case MessageType::Reply: |
143 | 0 | if (callbacks.responseSuccess()) { |
144 | 0 | upstream_host_->outlierDetector().putResult( |
145 | 0 | Upstream::Outlier::Result::ExtOriginRequestSuccess); |
146 | 0 | stats_.incResponseReplySuccess(cluster, upstream_host_); |
147 | 0 | } else { |
148 | 0 | upstream_host_->outlierDetector().putResult( |
149 | 0 | Upstream::Outlier::Result::ExtOriginRequestFailed); |
150 | 0 | stats_.incResponseReplyError(cluster, upstream_host_); |
151 | 0 | } |
152 | 0 | break; |
153 | | |
154 | 0 | case MessageType::Exception: |
155 | 0 | upstream_host_->outlierDetector().putResult( |
156 | 0 | Upstream::Outlier::Result::ExtOriginRequestFailed); |
157 | 0 | stats_.incResponseRemoteException(cluster, upstream_host_); |
158 | 0 | break; |
159 | | |
160 | 0 | default: |
161 | 0 | stats_.incResponseInvalidType(cluster, upstream_host_); |
162 | 0 | break; |
163 | 0 | } |
164 | | |
165 | 0 | response_state_ = ResponseState::Completed; |
166 | 0 | if (callbacks.responseMetadata()->isDraining()) { |
167 | 0 | ENVOY_LOG(debug, "got draining signal"); |
168 | 0 | stats_.incCloseDrain(cluster); |
169 | | // ResetStream triggers a local connection failure. However, we want to |
170 | | // keep the downstream connection after the upstream connection, i.e., |
171 | | // `conn_data->connection()`, is closed. Therefore, introduce a new state |
172 | | // ResponseState::Completed before ResponseState::ConnectionReleased to |
173 | | // hint that got all the response and not to close the downstream |
174 | | // connection, especially while we got a draining signal. |
175 | 0 | resetStream(); |
176 | 0 | } |
177 | 0 | onResponseComplete(); |
178 | 0 | } else if (status == ThriftFilters::ResponseStatus::Reset) { |
179 | | // Note: invalid responses are not accounted in the response size histogram. |
180 | 0 | ENVOY_LOG(debug, "upstream reset"); |
181 | 0 | upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::ExtOriginRequestFailed); |
182 | 0 | stats_.incResponseDecodingError(cluster, upstream_host_); |
183 | 0 | resetStream(); |
184 | 0 | } |
185 | | |
186 | 0 | return status; |
187 | 0 | } |
188 | | |
189 | | bool UpstreamRequest::handleUpstreamData(Buffer::Instance& data, bool end_stream, |
190 | 0 | UpstreamResponseCallbacks& callbacks) { |
191 | 0 | ASSERT(response_state_ < ResponseState::Completed); |
192 | | |
193 | 0 | response_size_ += data.length(); |
194 | |
|
195 | 0 | if (upgrade_response_ != nullptr) { |
196 | 0 | handleUpgradeResponse(data); |
197 | 0 | } else { |
198 | 0 | const auto status = handleRegularResponse(data, callbacks); |
199 | 0 | if (status != ThriftFilters::ResponseStatus::MoreData) { |
200 | 0 | return true; |
201 | 0 | } |
202 | 0 | } |
203 | | |
204 | 0 | if (end_stream) { |
205 | | // Response is incomplete, but no more data is coming. |
206 | 0 | ENVOY_LOG(debug, "response underflow"); |
207 | 0 | onResponseComplete(); |
208 | 0 | response_underflow_ = true; |
209 | 0 | onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure); |
210 | 0 | return true; |
211 | 0 | } |
212 | | |
213 | 0 | return false; |
214 | 0 | } |
215 | | |
216 | 0 | void UpstreamRequest::onEvent(Network::ConnectionEvent event) { |
217 | 0 | ASSERT(response_state_ != ResponseState::ConnectionReleased); |
218 | 0 | bool end_downstream = true; |
219 | |
|
220 | 0 | switch (event) { |
221 | 0 | case Network::ConnectionEvent::RemoteClose: |
222 | 0 | ENVOY_LOG(debug, "upstream remote close"); |
223 | 0 | end_downstream = onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure); |
224 | 0 | break; |
225 | 0 | case Network::ConnectionEvent::LocalClose: |
226 | 0 | ENVOY_LOG(debug, "upstream local close"); |
227 | 0 | end_downstream = onResetStream(ConnectionPool::PoolFailureReason::LocalConnectionFailure); |
228 | 0 | break; |
229 | 0 | case Network::ConnectionEvent::Connected: |
230 | 0 | case Network::ConnectionEvent::ConnectedZeroRtt: |
231 | | // Connected is consumed by the connection pool. |
232 | 0 | IS_ENVOY_BUG("reached unexpectedly"); |
233 | 0 | } |
234 | | |
235 | 0 | releaseConnection(false); |
236 | 0 | if (!end_downstream && request_complete_) { |
237 | 0 | ENVOY_LOG(debug, "reset parent callbacks"); |
238 | 0 | parent_.onReset(); |
239 | 0 | } |
240 | 0 | } |
241 | | |
242 | 0 | uint64_t UpstreamRequest::encodeAndWrite(Buffer::OwnedImpl& request_buffer) { |
243 | 0 | Buffer::OwnedImpl transport_buffer; |
244 | |
|
245 | 0 | metadata_->setProtocol(protocol_->type()); |
246 | 0 | transport_->encodeFrame(transport_buffer, *metadata_, request_buffer); |
247 | |
|
248 | 0 | uint64_t size = transport_buffer.length(); |
249 | |
|
250 | 0 | conn_data_->connection().write(transport_buffer, false); |
251 | |
|
252 | 0 | return size; |
253 | 0 | } |
254 | | |
255 | 0 | void UpstreamRequest::onRequestStart(bool continue_decoding) { |
256 | 0 | auto& buffer = parent_.buffer(); |
257 | 0 | parent_.initProtocolConverter(*protocol_, buffer); |
258 | |
|
259 | 0 | metadata_->setSequenceId(conn_state_->nextSequenceId()); |
260 | 0 | parent_.convertMessageBegin(metadata_); |
261 | |
|
262 | 0 | if (continue_decoding) { |
263 | 0 | parent_.continueDecoding(); |
264 | 0 | } |
265 | 0 | } |
266 | | |
267 | 0 | void UpstreamRequest::onRequestComplete() { |
268 | 0 | ENVOY_LOG(debug, "on request complete"); |
269 | 0 | Event::Dispatcher& dispatcher = parent_.dispatcher(); |
270 | 0 | downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime(); |
271 | 0 | request_complete_ = true; |
272 | 0 | } |
273 | | |
274 | 0 | void UpstreamRequest::onResponseComplete() { |
275 | 0 | ENVOY_LOG(debug, "on response complete"); |
276 | 0 | chargeResponseTiming(); |
277 | 0 | response_state_ = ResponseState::ConnectionReleased; |
278 | 0 | conn_state_ = nullptr; |
279 | 0 | conn_data_.reset(); |
280 | 0 | } |
281 | | |
282 | 0 | void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { |
283 | 0 | upstream_host_ = host; |
284 | 0 | } |
285 | | |
286 | | static Upstream::Outlier::Result |
287 | 0 | poolFailureReasonToResult(ConnectionPool::PoolFailureReason reason) { |
288 | 0 | switch (reason) { |
289 | 0 | case ConnectionPool::PoolFailureReason::LocalConnectionFailure: |
290 | 0 | FALLTHRU; |
291 | 0 | case ConnectionPool::PoolFailureReason::RemoteConnectionFailure: |
292 | 0 | FALLTHRU; |
293 | 0 | case ConnectionPool::PoolFailureReason::Overflow: |
294 | 0 | return Upstream::Outlier::Result::LocalOriginConnectFailed; |
295 | 0 | case ConnectionPool::PoolFailureReason::Timeout: |
296 | 0 | return Upstream::Outlier::Result::LocalOriginTimeout; |
297 | 0 | } |
298 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
299 | 0 | } |
300 | | |
301 | 0 | bool UpstreamRequest::onResetStream(ConnectionPool::PoolFailureReason reason) { |
302 | 0 | bool close_downstream = true; |
303 | |
|
304 | 0 | chargeResponseTiming(); |
305 | |
|
306 | 0 | switch (reason) { |
307 | 0 | case ConnectionPool::PoolFailureReason::Overflow: |
308 | 0 | stats_.incResponseLocalException(parent_.cluster()); |
309 | 0 | parent_.sendLocalReply(AppException(AppExceptionType::InternalError, |
310 | 0 | "thrift upstream request: too many connections"), |
311 | 0 | false /* Don't close the downstream connection. */); |
312 | 0 | close_downstream = false; |
313 | 0 | break; |
314 | 0 | case ConnectionPool::PoolFailureReason::LocalConnectionFailure: |
315 | 0 | FALLTHRU; |
316 | 0 | case ConnectionPool::PoolFailureReason::RemoteConnectionFailure: |
317 | 0 | FALLTHRU; |
318 | 0 | case ConnectionPool::PoolFailureReason::Timeout: |
319 | 0 | upstream_host_->outlierDetector().putResult(poolFailureReasonToResult(reason)); |
320 | | |
321 | | // Error occurred after an underflow response, propagate the reset to the downstream. |
322 | 0 | if (response_underflow_) { |
323 | 0 | ENVOY_LOG(debug, "reset downstream connection for an underflow response"); |
324 | 0 | stats_.incCloseUnderflowResponse(parent_.cluster()); |
325 | 0 | parent_.resetDownstreamConnection(); |
326 | 0 | } else if (response_state_ <= ResponseState::Started) { |
327 | 0 | close_downstream = close_downstream_on_error_; |
328 | 0 | if (response_state_ == ResponseState::Started) { |
329 | 0 | stats_.incClosePartialResponse(parent_.cluster()); |
330 | 0 | } |
331 | 0 | stats_.incResponseLocalException(parent_.cluster()); |
332 | 0 | parent_.sendLocalReply( |
333 | 0 | AppException(AppExceptionType::InternalError, |
334 | 0 | fmt::format("connection failure before response {}: {} '{}'", |
335 | 0 | response_state_ == ResponseState::None ? "start" : "complete", |
336 | 0 | PoolFailureReasonNames::get().fromReason(reason), |
337 | 0 | (upstream_host_ && upstream_host_->address()) |
338 | 0 | ? upstream_host_->address()->asString() |
339 | 0 | : "to upstream")), |
340 | 0 | close_downstream); |
341 | 0 | } |
342 | 0 | break; |
343 | 0 | } |
344 | | |
345 | 0 | ENVOY_LOG(debug, |
346 | 0 | "upstream reset complete. reason={}, close_downstream={}, response_state={}, " |
347 | 0 | "response_underflow={}", |
348 | 0 | PoolFailureReasonNames::get().fromReason(reason), close_downstream, |
349 | 0 | static_cast<uint8_t>(response_state_), static_cast<bool>(response_underflow_)); |
350 | 0 | return close_downstream; |
351 | 0 | } |
352 | | |
353 | 0 | void UpstreamRequest::chargeResponseTiming() { |
354 | 0 | if (charged_response_timing_ || !downstream_request_complete_time_.has_value()) { |
355 | 0 | return; |
356 | 0 | } |
357 | 0 | charged_response_timing_ = true; |
358 | 0 | Event::Dispatcher& dispatcher = parent_.dispatcher(); |
359 | 0 | const std::chrono::milliseconds response_time = |
360 | 0 | std::chrono::duration_cast<std::chrono::milliseconds>( |
361 | 0 | dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_.value()); |
362 | 0 | stats_.recordUpstreamResponseTime(parent_.cluster(), upstream_host_, response_time.count()); |
363 | 0 | } |
364 | | |
365 | | } // namespace Router |
366 | | } // namespace ThriftProxy |
367 | | } // namespace NetworkFilters |
368 | | } // namespace Extensions |
369 | | } // namespace Envoy |