/proc/self/cwd/source/extensions/filters/network/thrift_proxy/conn_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/filters/network/thrift_proxy/conn_manager.h" |
2 | | |
3 | | #include "envoy/common/exception.h" |
4 | | #include "envoy/event/dispatcher.h" |
5 | | |
6 | | #include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" |
7 | | #include "source/extensions/filters/network/thrift_proxy/protocol.h" |
8 | | #include "source/extensions/filters/network/thrift_proxy/transport.h" |
9 | | |
10 | | namespace Envoy { |
11 | | namespace Extensions { |
12 | | namespace NetworkFilters { |
13 | | namespace ThriftProxy { |
14 | | |
15 | | ConnectionManager::ConnectionManager(Config& config, Random::RandomGenerator& random_generator, |
16 | | TimeSource& time_source, |
17 | | const Network::DrainDecision& drain_decision) |
18 | | : config_(config), stats_(config_.stats()), transport_(config.createTransport()), |
19 | | protocol_(config.createProtocol()), |
20 | | decoder_(std::make_unique<Decoder>(*transport_, *protocol_, *this)), |
21 | | random_generator_(random_generator), time_source_(time_source), |
22 | 0 | drain_decision_(drain_decision) {} |
23 | | |
24 | 0 | ConnectionManager::~ConnectionManager() = default; |
25 | | |
26 | 0 | Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) { |
27 | 0 | request_buffer_.move(data); |
28 | 0 | dispatch(); |
29 | |
|
30 | 0 | if (end_stream) { |
31 | 0 | ENVOY_CONN_LOG(trace, "downstream half-closed", read_callbacks_->connection()); |
32 | | |
33 | | // Downstream has closed. Unless we're waiting for an upstream connection to complete a oneway |
34 | | // request, close. The special case for oneway requests allows them to complete before the |
35 | | // ConnectionManager is destroyed. |
36 | 0 | if (stopped_) { |
37 | 0 | ASSERT(!rpcs_.empty()); |
38 | 0 | MessageMetadata& metadata = *(*rpcs_.begin())->metadata_; |
39 | 0 | ASSERT(metadata.hasMessageType()); |
40 | 0 | if (metadata.messageType() == MessageType::Oneway) { |
41 | 0 | ENVOY_CONN_LOG(trace, "waiting for one-way completion", read_callbacks_->connection()); |
42 | 0 | half_closed_ = true; |
43 | 0 | return Network::FilterStatus::StopIteration; |
44 | 0 | } |
45 | 0 | } |
46 | | |
47 | 0 | resetAllRpcs(false); |
48 | 0 | read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); |
49 | 0 | } |
50 | | |
51 | 0 | return Network::FilterStatus::StopIteration; |
52 | 0 | } |
53 | | |
54 | | void ConnectionManager::emitLogEntry(const Http::RequestHeaderMap* request_headers, |
55 | | const Http::ResponseHeaderMap* response_headers, |
56 | 0 | const StreamInfo::StreamInfo& stream_info) { |
57 | 0 | const Formatter::HttpFormatterContext log_context{request_headers, response_headers}; |
58 | |
|
59 | 0 | for (const auto& access_log : config_.accessLogs()) { |
60 | 0 | access_log->log(log_context, stream_info); |
61 | 0 | } |
62 | 0 | } |
63 | | |
64 | 0 | void ConnectionManager::dispatch() { |
65 | 0 | if (stopped_) { |
66 | 0 | ENVOY_CONN_LOG(debug, "thrift filter stopped", read_callbacks_->connection()); |
67 | 0 | return; |
68 | 0 | } |
69 | | |
70 | 0 | if (requests_overflow_) { |
71 | 0 | ENVOY_CONN_LOG(debug, "thrift filter requests overflow", read_callbacks_->connection()); |
72 | 0 | return; |
73 | 0 | } |
74 | | |
75 | 0 | TRY_NEEDS_AUDIT { |
76 | 0 | bool underflow = false; |
77 | 0 | while (!underflow) { |
78 | 0 | FilterStatus status = decoder_->onData(request_buffer_, underflow); |
79 | 0 | if (status == FilterStatus::StopIteration) { |
80 | 0 | stopped_ = true; |
81 | 0 | break; |
82 | 0 | } |
83 | 0 | } |
84 | |
|
85 | 0 | return; |
86 | 0 | } |
87 | 0 | END_TRY catch (const AppException& ex) { |
88 | 0 | ENVOY_LOG(debug, "thrift application exception: {}", ex.what()); |
89 | 0 | if (rpcs_.empty()) { |
90 | 0 | MessageMetadata metadata; |
91 | 0 | sendLocalReply(metadata, ex, true); |
92 | 0 | } else { |
93 | 0 | sendLocalReply(*(*rpcs_.begin())->metadata_, ex, true); |
94 | 0 | } |
95 | 0 | } |
96 | 0 | catch (const EnvoyException& ex) { |
97 | 0 | ENVOY_CONN_LOG(debug, "thrift error: {}", read_callbacks_->connection(), ex.what()); |
98 | |
|
99 | 0 | if (rpcs_.empty()) { |
100 | | // Transport/protocol mismatch (including errors in automatic detection). Just hang up |
101 | | // since we don't know how to encode a response. |
102 | 0 | read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); |
103 | 0 | } else { |
104 | | // Use the current rpc's transport/protocol to send an error downstream. |
105 | 0 | rpcs_.front()->onError(ex.what()); |
106 | 0 | } |
107 | 0 | } |
108 | | |
109 | 0 | stats_.request_decoding_error_.inc(); |
110 | 0 | resetAllRpcs(true); |
111 | 0 | } |
112 | | |
113 | | absl::optional<DirectResponse::ResponseType> |
114 | | ConnectionManager::sendLocalReply(MessageMetadata& metadata, const DirectResponse& response, |
115 | 0 | bool end_stream) { |
116 | 0 | if (read_callbacks_->connection().state() == Network::Connection::State::Closed) { |
117 | 0 | return absl::nullopt; |
118 | 0 | } |
119 | | |
120 | 0 | DirectResponse::ResponseType result = DirectResponse::ResponseType::Exception; |
121 | |
|
122 | 0 | if (!metadata.hasMessageType() || metadata.messageType() != MessageType::Oneway) { |
123 | 0 | Buffer::OwnedImpl buffer; |
124 | 0 | result = response.encode(metadata, *protocol_, buffer); |
125 | 0 | Buffer::OwnedImpl response_buffer; |
126 | 0 | metadata.setProtocol(protocol_->type()); |
127 | 0 | transport_->encodeFrame(response_buffer, metadata, buffer); |
128 | |
|
129 | 0 | read_callbacks_->connection().write(response_buffer, end_stream); |
130 | 0 | } |
131 | |
|
132 | 0 | if (end_stream) { |
133 | 0 | read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); |
134 | 0 | } |
135 | |
|
136 | 0 | switch (result) { |
137 | 0 | case DirectResponse::ResponseType::SuccessReply: |
138 | 0 | stats_.response_success_.inc(); |
139 | 0 | break; |
140 | 0 | case DirectResponse::ResponseType::ErrorReply: |
141 | 0 | stats_.response_error_.inc(); |
142 | 0 | break; |
143 | 0 | case DirectResponse::ResponseType::Exception: |
144 | 0 | stats_.response_exception_.inc(); |
145 | 0 | break; |
146 | 0 | } |
147 | 0 | return result; |
148 | 0 | } |
149 | | |
150 | 0 | void ConnectionManager::continueDecoding() { |
151 | 0 | ENVOY_CONN_LOG(debug, "thrift filter continued", read_callbacks_->connection()); |
152 | 0 | stopped_ = false; |
153 | 0 | dispatch(); |
154 | |
|
155 | 0 | if (!stopped_ && half_closed_) { |
156 | | // If we're half closed, but not stopped waiting for an upstream, reset any pending rpcs and |
157 | | // close the connection. |
158 | 0 | resetAllRpcs(false); |
159 | 0 | read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); |
160 | 0 | } |
161 | 0 | } |
162 | | |
163 | 0 | void ConnectionManager::doDeferredRpcDestroy(ConnectionManager::ActiveRpc& rpc) { |
164 | 0 | if (!rpc.inserted()) { |
165 | 0 | return; |
166 | 0 | } |
167 | | |
168 | 0 | read_callbacks_->connection().dispatcher().deferredDelete(rpc.removeFromList(rpcs_)); |
169 | 0 | if (requests_overflow_ && rpcs_.empty()) { |
170 | 0 | read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); |
171 | 0 | } |
172 | 0 | } |
173 | | |
174 | 0 | void ConnectionManager::resetAllRpcs(bool local_reset) { |
175 | 0 | while (!rpcs_.empty()) { |
176 | 0 | if (local_reset) { |
177 | 0 | ENVOY_CONN_LOG(debug, "local close with active request", read_callbacks_->connection()); |
178 | 0 | stats_.cx_destroy_local_with_active_rq_.inc(); |
179 | 0 | } else { |
180 | 0 | ENVOY_CONN_LOG(debug, "remote close with active request", read_callbacks_->connection()); |
181 | 0 | stats_.cx_destroy_remote_with_active_rq_.inc(); |
182 | 0 | } |
183 | |
|
184 | 0 | rpcs_.front()->onReset(); |
185 | 0 | } |
186 | 0 | } |
187 | | |
188 | 0 | void ConnectionManager::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { |
189 | 0 | read_callbacks_ = &callbacks; |
190 | |
|
191 | 0 | read_callbacks_->connection().addConnectionCallbacks(*this); |
192 | 0 | read_callbacks_->connection().enableHalfClose(true); |
193 | 0 | } |
194 | | |
195 | 0 | void ConnectionManager::onEvent(Network::ConnectionEvent event) { |
196 | 0 | if (event != Network::ConnectionEvent::LocalClose && |
197 | 0 | event != Network::ConnectionEvent::RemoteClose) { |
198 | 0 | return; |
199 | 0 | } |
200 | 0 | resetAllRpcs(event == Network::ConnectionEvent::LocalClose); |
201 | 0 | } |
202 | | |
203 | 0 | DecoderEventHandler& ConnectionManager::newDecoderEventHandler() { |
204 | 0 | ENVOY_LOG(trace, "new decoder filter"); |
205 | |
|
206 | 0 | ActiveRpcPtr new_rpc(new ActiveRpc(*this)); |
207 | 0 | new_rpc->createFilterChain(); |
208 | 0 | LinkedList::moveIntoList(std::move(new_rpc), rpcs_); |
209 | |
|
210 | 0 | return **rpcs_.begin(); |
211 | 0 | } |
212 | | |
213 | 0 | bool ConnectionManager::ResponseDecoder::passthroughEnabled() const { |
214 | 0 | return parent_.parent_.passthroughEnabled(); |
215 | 0 | } |
216 | | |
217 | 0 | bool ConnectionManager::passthroughEnabled() const { |
218 | 0 | if (!config_.payloadPassthrough()) { |
219 | 0 | return false; |
220 | 0 | } |
221 | | |
222 | | // If the rpcs list is empty, a local response happened. |
223 | | // |
224 | | // TODO(rgs1): we actually could still enable passthrough for local |
225 | | // responses as long as the transport is framed and the protocol is |
226 | | // not Twitter. |
227 | 0 | if (rpcs_.empty()) { |
228 | 0 | return false; |
229 | 0 | } |
230 | | |
231 | 0 | return (*rpcs_.begin())->passthroughSupported(); |
232 | 0 | } |
233 | | |
234 | 0 | bool ConnectionManager::headerKeysPreserveCase() const { return config_.headerKeysPreserveCase(); } |
235 | | |
236 | 0 | bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) { |
237 | 0 | upstream_buffer_.move(data); |
238 | |
|
239 | 0 | bool underflow = false; |
240 | 0 | decoder_->onData(upstream_buffer_, underflow); |
241 | 0 | ASSERT(complete_ || underflow); |
242 | 0 | return complete_; |
243 | 0 | } |
244 | | |
245 | 0 | FilterStatus ConnectionManager::ResponseDecoder::transportBegin(MessageMetadataSharedPtr metadata) { |
246 | 0 | return parent_.applyEncoderFilters(DecoderEvent::TransportBegin, metadata, protocol_converter_); |
247 | 0 | } |
248 | | |
249 | 0 | FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { |
250 | 0 | ASSERT(metadata_ != nullptr); |
251 | | |
252 | 0 | FilterStatus status = parent_.applyEncoderFilters(DecoderEvent::TransportEnd, absl::monostate(), |
253 | 0 | protocol_converter_); |
254 | | // Currently we don't support returning FilterStatus::StopIteration from encoder filters. |
255 | | // Hence, this if-statement is always false. |
256 | 0 | ASSERT(status == FilterStatus::Continue); |
257 | 0 | if (status == FilterStatus::StopIteration) { |
258 | 0 | pending_transport_end_ = true; |
259 | 0 | return FilterStatus::StopIteration; |
260 | 0 | } |
261 | | |
262 | 0 | finalizeResponse(); |
263 | 0 | return FilterStatus::Continue; |
264 | 0 | } |
265 | | |
266 | 0 | void ConnectionManager::ResponseDecoder::finalizeResponse() { |
267 | 0 | pending_transport_end_ = false; |
268 | 0 | ConnectionManager& cm = parent_.parent_; |
269 | |
|
270 | 0 | if (cm.read_callbacks_->connection().state() == Network::Connection::State::Closed) { |
271 | 0 | complete_ = true; |
272 | 0 | throw EnvoyException("downstream connection is closed"); |
273 | 0 | } |
274 | | |
275 | 0 | Buffer::OwnedImpl buffer; |
276 | | |
277 | | // Use the factory to get the concrete transport from the decoder transport (as opposed to |
278 | | // potentially pre-detection auto transport). |
279 | 0 | TransportPtr transport = |
280 | 0 | NamedTransportConfigFactory::getFactory(cm.decoder_->transportType()).createTransport(); |
281 | |
|
282 | 0 | metadata_->setProtocol(cm.decoder_->protocolType()); |
283 | 0 | transport->encodeFrame(buffer, *metadata_, parent_.response_buffer_); |
284 | 0 | complete_ = true; |
285 | |
|
286 | 0 | cm.read_callbacks_->connection().write(buffer, false); |
287 | |
|
288 | 0 | cm.stats_.response_.inc(); |
289 | 0 | if (passthrough_) { |
290 | 0 | cm.stats_.response_passthrough_.inc(); |
291 | 0 | } |
292 | |
|
293 | 0 | switch (metadata_->messageType()) { |
294 | 0 | case MessageType::Reply: |
295 | 0 | cm.stats_.response_reply_.inc(); |
296 | 0 | if (success_) { |
297 | 0 | if (success_.value()) { |
298 | 0 | cm.stats_.response_success_.inc(); |
299 | 0 | } else { |
300 | 0 | cm.stats_.response_error_.inc(); |
301 | 0 | } |
302 | 0 | } |
303 | |
|
304 | 0 | break; |
305 | | |
306 | 0 | case MessageType::Exception: |
307 | 0 | cm.stats_.response_exception_.inc(); |
308 | 0 | break; |
309 | | |
310 | 0 | default: |
311 | 0 | cm.stats_.response_invalid_type_.inc(); |
312 | 0 | break; |
313 | 0 | } |
314 | 0 | } |
315 | | |
316 | 0 | FilterStatus ConnectionManager::ResponseDecoder::passthroughData(Buffer::Instance& data) { |
317 | 0 | passthrough_ = true; |
318 | |
|
319 | 0 | return parent_.applyEncoderFilters(DecoderEvent::PassthroughData, &data, protocol_converter_); |
320 | 0 | } |
321 | | |
322 | 0 | FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) { |
323 | 0 | metadata_ = metadata; |
324 | 0 | metadata_->setSequenceId(parent_.original_sequence_id_); |
325 | |
|
326 | 0 | if (metadata->hasReplyType()) { |
327 | | // TODO(kuochunghsu): the status of success could be altered by filters |
328 | 0 | success_ = metadata->replyType() == ReplyType::Success; |
329 | 0 | } |
330 | |
|
331 | 0 | ConnectionManager& cm = parent_.parent_; |
332 | |
|
333 | 0 | ENVOY_STREAM_LOG( |
334 | 0 | trace, "Response message_type: {}, seq_id: {}, method: {}, frame size: {}, headers:\n{}", |
335 | 0 | parent_, |
336 | 0 | metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-", |
337 | 0 | metadata_->sequenceId(), metadata->hasMethodName() ? metadata->methodName() : "-", |
338 | 0 | metadata->hasFrameSize() ? metadata->frameSize() : -1, metadata->responseHeaders()); |
339 | | |
340 | | // Check if the upstream host is draining. |
341 | | // |
342 | | // Note: the drain header needs to be checked here in messageBegin, and not transportBegin, so |
343 | | // that we can support the header in TTwitter protocol, which reads/adds response headers to |
344 | | // metadata in messageBegin when reading the response from upstream. Therefore detecting a drain |
345 | | // should happen here. |
346 | 0 | if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.thrift_connection_draining")) { |
347 | 0 | metadata_->setDraining(!metadata->responseHeaders().get(Headers::get().Drain).empty()); |
348 | 0 | metadata->responseHeaders().remove(Headers::get().Drain); |
349 | | |
350 | | // Check if this host itself is draining. |
351 | | // |
352 | | // Note: Similarly as above, the response is buffered until transportEnd. Therefore metadata |
353 | | // should be set before the encodeFrame() call. It should be set at or after the messageBegin |
354 | | // call so that the header is added after all upstream headers passed, due to messageBegin |
355 | | // possibly not getting headers in transportBegin. |
356 | 0 | if (cm.drain_decision_.drainClose()) { |
357 | 0 | ENVOY_STREAM_LOG(debug, "propogate Drain header for drain close decision", parent_); |
358 | | // TODO(rgs1): should the key value contain something useful (e.g.: minutes til drain is |
359 | | // over)? |
360 | 0 | metadata->responseHeaders().addReferenceKey(Headers::get().Drain, "true"); |
361 | 0 | cm.stats_.downstream_response_drain_close_.inc(); |
362 | 0 | } |
363 | 0 | } |
364 | |
|
365 | 0 | parent_.recordResponseAccessLog(metadata); |
366 | |
|
367 | 0 | return parent_.applyEncoderFilters(DecoderEvent::MessageBegin, metadata, protocol_converter_); |
368 | 0 | } |
369 | | |
370 | 0 | FilterStatus ConnectionManager::ResponseDecoder::messageEnd() { |
371 | 0 | return parent_.applyEncoderFilters(DecoderEvent::MessageEnd, absl::monostate(), |
372 | 0 | protocol_converter_); |
373 | 0 | } |
374 | | |
375 | 0 | FilterStatus ConnectionManager::ResponseDecoder::structBegin(absl::string_view name) { |
376 | 0 | return parent_.applyEncoderFilters(DecoderEvent::StructBegin, std::string(name), |
377 | 0 | protocol_converter_); |
378 | 0 | } |
379 | | |
380 | 0 | FilterStatus ConnectionManager::ResponseDecoder::structEnd() { |
381 | 0 | return parent_.applyEncoderFilters(DecoderEvent::StructEnd, absl::monostate(), |
382 | 0 | protocol_converter_); |
383 | 0 | } |
384 | | |
385 | | FilterStatus ConnectionManager::ResponseDecoder::fieldBegin(absl::string_view name, |
386 | | FieldType& field_type, |
387 | 0 | int16_t& field_id) { |
388 | 0 | return parent_.applyEncoderFilters(DecoderEvent::FieldBegin, |
389 | 0 | std::make_tuple(std::string(name), field_type, field_id), |
390 | 0 | protocol_converter_); |
391 | 0 | } |
392 | | |
393 | 0 | FilterStatus ConnectionManager::ResponseDecoder::fieldEnd() { |
394 | 0 | return parent_.applyEncoderFilters(DecoderEvent::FieldEnd, absl::monostate(), |
395 | 0 | protocol_converter_); |
396 | 0 | } |
397 | | |
398 | 0 | FilterStatus ConnectionManager::ResponseDecoder::boolValue(bool& value) { |
399 | 0 | return parent_.applyEncoderFilters(DecoderEvent::BoolValue, value, protocol_converter_); |
400 | 0 | } |
401 | | |
402 | 0 | FilterStatus ConnectionManager::ResponseDecoder::byteValue(uint8_t& value) { |
403 | 0 | return parent_.applyEncoderFilters(DecoderEvent::ByteValue, value, protocol_converter_); |
404 | 0 | } |
405 | | |
406 | 0 | FilterStatus ConnectionManager::ResponseDecoder::int16Value(int16_t& value) { |
407 | 0 | return parent_.applyEncoderFilters(DecoderEvent::Int16Value, value, protocol_converter_); |
408 | 0 | } |
409 | | |
410 | 0 | FilterStatus ConnectionManager::ResponseDecoder::int32Value(int32_t& value) { |
411 | 0 | return parent_.applyEncoderFilters(DecoderEvent::Int32Value, value, protocol_converter_); |
412 | 0 | } |
413 | | |
414 | 0 | FilterStatus ConnectionManager::ResponseDecoder::int64Value(int64_t& value) { |
415 | 0 | return parent_.applyEncoderFilters(DecoderEvent::Int64Value, value, protocol_converter_); |
416 | 0 | } |
417 | | |
418 | 0 | FilterStatus ConnectionManager::ResponseDecoder::doubleValue(double& value) { |
419 | 0 | return parent_.applyEncoderFilters(DecoderEvent::DoubleValue, value, protocol_converter_); |
420 | 0 | } |
421 | | |
422 | 0 | FilterStatus ConnectionManager::ResponseDecoder::stringValue(absl::string_view value) { |
423 | 0 | return parent_.applyEncoderFilters(DecoderEvent::StringValue, std::string(value), |
424 | 0 | protocol_converter_); |
425 | 0 | } |
426 | | |
427 | | FilterStatus ConnectionManager::ResponseDecoder::mapBegin(FieldType& key_type, |
428 | 0 | FieldType& value_type, uint32_t& size) { |
429 | 0 | return parent_.applyEncoderFilters( |
430 | 0 | DecoderEvent::MapBegin, std::make_tuple(key_type, value_type, size), protocol_converter_); |
431 | 0 | } |
432 | | |
433 | 0 | FilterStatus ConnectionManager::ResponseDecoder::mapEnd() { |
434 | 0 | return parent_.applyEncoderFilters(DecoderEvent::MapEnd, absl::monostate(), protocol_converter_); |
435 | 0 | } |
436 | | |
437 | 0 | FilterStatus ConnectionManager::ResponseDecoder::listBegin(FieldType& elem_type, uint32_t& size) { |
438 | 0 | return parent_.applyEncoderFilters(DecoderEvent::ListBegin, std::make_tuple(elem_type, size), |
439 | 0 | protocol_converter_); |
440 | 0 | } |
441 | | |
442 | 0 | FilterStatus ConnectionManager::ResponseDecoder::listEnd() { |
443 | 0 | return parent_.applyEncoderFilters(DecoderEvent::ListEnd, absl::monostate(), protocol_converter_); |
444 | 0 | } |
445 | | |
446 | 0 | FilterStatus ConnectionManager::ResponseDecoder::setBegin(FieldType& elem_type, uint32_t& size) { |
447 | 0 | return parent_.applyEncoderFilters(DecoderEvent::SetBegin, std::make_tuple(elem_type, size), |
448 | 0 | protocol_converter_); |
449 | 0 | } |
450 | | |
451 | 0 | FilterStatus ConnectionManager::ResponseDecoder::setEnd() { |
452 | 0 | return parent_.applyEncoderFilters(DecoderEvent::SetEnd, absl::monostate(), protocol_converter_); |
453 | 0 | } |
454 | | |
455 | 0 | bool ConnectionManager::ResponseDecoder::headerKeysPreserveCase() const { |
456 | 0 | return parent_.parent_.headerKeysPreserveCase(); |
457 | 0 | } |
458 | | |
459 | 0 | void ConnectionManager::ActiveRpcDecoderFilter::continueDecoding() { |
460 | 0 | const FilterStatus status = |
461 | 0 | parent_.applyDecoderFilters(DecoderEvent::ContinueDecode, absl::monostate(), this); |
462 | 0 | if (status == FilterStatus::Continue) { |
463 | | // All filters have been executed for the current decoder state. |
464 | 0 | if (parent_.pending_transport_end_) { |
465 | | // If the filter stack was paused during transportEnd, handle end-of-request details. |
466 | 0 | parent_.finalizeRequest(); |
467 | 0 | } |
468 | |
|
469 | 0 | parent_.continueDecoding(); |
470 | 0 | } |
471 | 0 | } |
472 | | |
473 | 0 | void ConnectionManager::ActiveRpcEncoderFilter::continueEncoding() { |
474 | | // Not supported. |
475 | 0 | ASSERT(false); |
476 | 0 | } |
477 | | |
478 | | FilterStatus ConnectionManager::ActiveRpc::applyDecoderFilters(DecoderEvent state, |
479 | | FilterContext&& data, |
480 | 0 | ActiveRpcDecoderFilter* filter) { |
481 | 0 | ASSERT(filter_action_ == nullptr || state == DecoderEvent::ContinueDecode); |
482 | 0 | prepareFilterAction(state, std::move(data)); |
483 | |
|
484 | 0 | if (local_response_sent_) { |
485 | 0 | filter_action_ = nullptr; |
486 | 0 | return FilterStatus::Continue; |
487 | 0 | } |
488 | | |
489 | 0 | if (upgrade_handler_) { |
490 | | // Divert events to the current protocol upgrade handler. |
491 | 0 | const FilterStatus status = filter_action_(upgrade_handler_.get()); |
492 | 0 | filter_action_ = nullptr; |
493 | 0 | return status; |
494 | 0 | } |
495 | | |
496 | 0 | return applyFilters<ActiveRpcDecoderFilter>(filter, decoder_filters_); |
497 | 0 | } |
498 | | |
499 | | FilterStatus |
500 | | ConnectionManager::ActiveRpc::applyEncoderFilters(DecoderEvent state, FilterContext&& data, |
501 | | ProtocolConverterSharedPtr protocol_converter, |
502 | 0 | ActiveRpcEncoderFilter* filter) { |
503 | 0 | ASSERT(filter_action_ == nullptr || state == DecoderEvent::ContinueDecode); |
504 | 0 | prepareFilterAction(state, std::move(data)); |
505 | |
|
506 | 0 | FilterStatus status = |
507 | 0 | applyFilters<ActiveRpcEncoderFilter>(filter, encoder_filters_, protocol_converter); |
508 | | // FilterStatus::StopIteration is currently not supported. |
509 | 0 | ASSERT(status == FilterStatus::Continue); |
510 | | |
511 | 0 | return status; |
512 | 0 | } |
513 | | |
514 | | template <typename FilterType> |
515 | | FilterStatus |
516 | | ConnectionManager::ActiveRpc::applyFilters(FilterType* filter, |
517 | | std::list<std::unique_ptr<FilterType>>& filter_list, |
518 | 0 | ProtocolConverterSharedPtr protocol_converter) { |
519 | |
|
520 | 0 | typename std::list<std::unique_ptr<FilterType>>::iterator entry = |
521 | 0 | !filter ? filter_list.begin() : std::next(filter->entry()); |
522 | 0 | for (; entry != filter_list.end(); entry++) { |
523 | 0 | const FilterStatus status = filter_action_((*entry)->decodeEventHandler()); |
524 | 0 | ENVOY_STREAM_LOG(trace, "apply filter called: filter={} status={}", *this, |
525 | 0 | static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); |
526 | 0 | if (local_response_sent_) { |
527 | | // The filter called sendLocalReply but _did not_ close the connection. |
528 | | // We return FilterStatus::Continue irrespective of the current result, |
529 | | // which is fine because subsequent calls to this method will skip |
530 | | // filters anyway. |
531 | | // |
532 | | // Note: we need to return FilterStatus::Continue here, in order for decoding |
533 | | // to proceed. This is important because as noted above, the connection remains |
534 | | // open so we need to consume the remaining bytes. |
535 | 0 | break; |
536 | 0 | } |
537 | | |
538 | 0 | if (status != FilterStatus::Continue) { |
539 | | // If we got FilterStatus::StopIteration and a local reply happened but |
540 | | // local_response_sent_ was not set, the connection was closed. |
541 | | // |
542 | | // In this case, either resetAllRpcs() gets called via onEvent(LocalClose) or |
543 | | // dispatch() stops the processing. |
544 | | // |
545 | | // In other words, after a local reply closes the connection and StopIteration |
546 | | // is returned we are done. |
547 | 0 | return status; |
548 | 0 | } |
549 | 0 | } |
550 | | |
551 | | // The protocol converter writes the data to a buffer for response. |
552 | 0 | if (protocol_converter) { |
553 | 0 | filter_action_(protocol_converter.get()); |
554 | 0 | } |
555 | |
|
556 | 0 | filter_action_ = nullptr; |
557 | |
|
558 | 0 | return FilterStatus::Continue; |
559 | 0 | } Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::FilterStatus Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpc::applyFilters<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter>(Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter*, std::__1::list<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter> >, std::__1::allocator<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter> > > >&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ProtocolConverter>) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::FilterStatus Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpc::applyFilters<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter>(Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter*, std::__1::list<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter> >, std::__1::allocator<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter> > > >&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ProtocolConverter>) |
560 | | |
561 | 0 | void ConnectionManager::ActiveRpc::prepareFilterAction(DecoderEvent event, FilterContext&& data) { |
562 | | // DecoderEvent::ContinueDecode indicates we're handling previous filter action with the |
563 | | // filter chain. Therefore, we should not reset filter_action_. |
564 | 0 | if (event == DecoderEvent::ContinueDecode) { |
565 | 0 | return; |
566 | 0 | } |
567 | | |
568 | 0 | switch (event) { |
569 | 0 | case DecoderEvent::TransportBegin: |
570 | 0 | filter_action_ = [filter_context = |
571 | 0 | std::move(data)](DecoderEventHandler* filter) -> FilterStatus { |
572 | 0 | MessageMetadataSharedPtr metadata = absl::get<MessageMetadataSharedPtr>(filter_context); |
573 | 0 | return filter->transportBegin(metadata); |
574 | 0 | }; |
575 | 0 | break; |
576 | 0 | case DecoderEvent::TransportEnd: |
577 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { |
578 | 0 | return filter->transportEnd(); |
579 | 0 | }; |
580 | 0 | break; |
581 | 0 | case DecoderEvent::PassthroughData: |
582 | 0 | filter_action_ = [filter_context = |
583 | 0 | std::move(data)](DecoderEventHandler* filter) -> FilterStatus { |
584 | 0 | Buffer::Instance* data = absl::get<Buffer::Instance*>(filter_context); |
585 | 0 | return filter->passthroughData(*data); |
586 | 0 | }; |
587 | 0 | break; |
588 | 0 | case DecoderEvent::MessageBegin: |
589 | 0 | filter_action_ = [filter_context = |
590 | 0 | std::move(data)](DecoderEventHandler* filter) -> FilterStatus { |
591 | 0 | MessageMetadataSharedPtr metadata = absl::get<MessageMetadataSharedPtr>(filter_context); |
592 | 0 | return filter->messageBegin(metadata); |
593 | 0 | }; |
594 | 0 | break; |
595 | 0 | case DecoderEvent::MessageEnd: |
596 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { |
597 | 0 | return filter->messageEnd(); |
598 | 0 | }; |
599 | 0 | break; |
600 | 0 | case DecoderEvent::StructBegin: |
601 | 0 | filter_action_ = [filter_context = |
602 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
603 | 0 | std::string& name = absl::get<std::string>(filter_context); |
604 | 0 | return filter->structBegin(name); |
605 | 0 | }; |
606 | 0 | break; |
607 | 0 | case DecoderEvent::StructEnd: |
608 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { |
609 | 0 | return filter->structEnd(); |
610 | 0 | }; |
611 | 0 | break; |
612 | 0 | case DecoderEvent::FieldBegin: |
613 | 0 | filter_action_ = [filter_context = |
614 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
615 | 0 | std::tuple<std::string, FieldType, int16_t>& t = |
616 | 0 | absl::get<std::tuple<std::string, FieldType, int16_t>>(filter_context); |
617 | 0 | std::string& name = std::get<0>(t); |
618 | 0 | FieldType& field_type = std::get<1>(t); |
619 | 0 | int16_t& field_id = std::get<2>(t); |
620 | 0 | return filter->fieldBegin(name, field_type, field_id); |
621 | 0 | }; |
622 | 0 | break; |
623 | 0 | case DecoderEvent::FieldEnd: |
624 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->fieldEnd(); }; |
625 | 0 | break; |
626 | 0 | case DecoderEvent::BoolValue: |
627 | 0 | filter_action_ = [filter_context = |
628 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
629 | 0 | bool& value = absl::get<bool>(filter_context); |
630 | 0 | return filter->boolValue(value); |
631 | 0 | }; |
632 | 0 | break; |
633 | 0 | case DecoderEvent::ByteValue: |
634 | 0 | filter_action_ = [filter_context = |
635 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
636 | 0 | uint8_t& value = absl::get<uint8_t>(filter_context); |
637 | 0 | return filter->byteValue(value); |
638 | 0 | }; |
639 | 0 | break; |
640 | 0 | case DecoderEvent::Int16Value: |
641 | 0 | filter_action_ = [filter_context = |
642 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
643 | 0 | int16_t& value = absl::get<int16_t>(filter_context); |
644 | 0 | return filter->int16Value(value); |
645 | 0 | }; |
646 | 0 | break; |
647 | 0 | case DecoderEvent::Int32Value: |
648 | 0 | filter_action_ = [filter_context = |
649 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
650 | 0 | int32_t& value = absl::get<int32_t>(filter_context); |
651 | 0 | return filter->int32Value(value); |
652 | 0 | }; |
653 | 0 | break; |
654 | 0 | case DecoderEvent::Int64Value: |
655 | 0 | filter_action_ = [filter_context = |
656 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
657 | 0 | int64_t& value = absl::get<int64_t>(filter_context); |
658 | 0 | return filter->int64Value(value); |
659 | 0 | }; |
660 | 0 | break; |
661 | 0 | case DecoderEvent::DoubleValue: |
662 | 0 | filter_action_ = [filter_context = |
663 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
664 | 0 | double& value = absl::get<double>(filter_context); |
665 | 0 | return filter->doubleValue(value); |
666 | 0 | }; |
667 | 0 | break; |
668 | 0 | case DecoderEvent::StringValue: |
669 | 0 | filter_action_ = [filter_context = |
670 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
671 | 0 | std::string& value = absl::get<std::string>(filter_context); |
672 | 0 | return filter->stringValue(value); |
673 | 0 | }; |
674 | 0 | break; |
675 | 0 | case DecoderEvent::MapBegin: |
676 | 0 | filter_action_ = [filter_context = |
677 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
678 | 0 | std::tuple<FieldType, FieldType, uint32_t>& t = |
679 | 0 | absl::get<std::tuple<FieldType, FieldType, uint32_t>>(filter_context); |
680 | 0 | FieldType& key_type = std::get<0>(t); |
681 | 0 | FieldType& value_type = std::get<1>(t); |
682 | 0 | uint32_t& size = std::get<2>(t); |
683 | 0 | return filter->mapBegin(key_type, value_type, size); |
684 | 0 | }; |
685 | 0 | break; |
686 | 0 | case DecoderEvent::MapEnd: |
687 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->mapEnd(); }; |
688 | 0 | break; |
689 | 0 | case DecoderEvent::ListBegin: |
690 | 0 | filter_action_ = [filter_context = |
691 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
692 | 0 | std::tuple<FieldType, uint32_t>& t = |
693 | 0 | absl::get<std::tuple<FieldType, uint32_t>>(filter_context); |
694 | 0 | FieldType& elem_type = std::get<0>(t); |
695 | 0 | uint32_t& size = std::get<1>(t); |
696 | 0 | return filter->listBegin(elem_type, size); |
697 | 0 | }; |
698 | 0 | break; |
699 | 0 | case DecoderEvent::ListEnd: |
700 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->listEnd(); }; |
701 | 0 | break; |
702 | 0 | case DecoderEvent::SetBegin: |
703 | 0 | filter_action_ = [filter_context = |
704 | 0 | std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus { |
705 | 0 | std::tuple<FieldType, uint32_t>& t = |
706 | 0 | absl::get<std::tuple<FieldType, uint32_t>>(filter_context); |
707 | 0 | FieldType& elem_type = std::get<0>(t); |
708 | 0 | uint32_t& size = std::get<1>(t); |
709 | 0 | return filter->setBegin(elem_type, size); |
710 | 0 | }; |
711 | 0 | break; |
712 | 0 | case DecoderEvent::SetEnd: |
713 | 0 | filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->setEnd(); }; |
714 | 0 | break; |
715 | 0 | default: |
716 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
717 | 0 | } |
718 | 0 | } |
719 | | |
720 | 0 | FilterStatus ConnectionManager::ActiveRpc::transportBegin(MessageMetadataSharedPtr metadata) { |
721 | 0 | return applyDecoderFilters(DecoderEvent::TransportBegin, metadata); |
722 | 0 | } |
723 | | |
724 | 0 | FilterStatus ConnectionManager::ActiveRpc::transportEnd() { |
725 | 0 | ASSERT(metadata_ != nullptr); |
726 | | |
727 | 0 | FilterStatus status; |
728 | 0 | if (upgrade_handler_) { |
729 | 0 | status = upgrade_handler_->transportEnd(); |
730 | |
|
731 | 0 | if (metadata_->isProtocolUpgradeMessage()) { |
732 | 0 | ENVOY_CONN_LOG(error, "thrift: sending protocol upgrade response", |
733 | 0 | parent_.read_callbacks_->connection()); |
734 | 0 | sendLocalReply(*parent_.protocol_->upgradeResponse(*upgrade_handler_), false); |
735 | 0 | } |
736 | 0 | } else { |
737 | 0 | status = applyDecoderFilters(DecoderEvent::TransportEnd, absl::monostate()); |
738 | 0 | if (status == FilterStatus::StopIteration) { |
739 | 0 | pending_transport_end_ = true; |
740 | 0 | return status; |
741 | 0 | } |
742 | 0 | } |
743 | | |
744 | 0 | finalizeRequest(); |
745 | |
|
746 | 0 | return status; |
747 | 0 | } |
748 | | |
749 | 0 | void ConnectionManager::ActiveRpc::finalizeRequest() { |
750 | 0 | pending_transport_end_ = false; |
751 | |
|
752 | 0 | parent_.stats_.request_.inc(); |
753 | |
|
754 | 0 | parent_.accumulated_requests_++; |
755 | 0 | if (parent_.config_.maxRequestsPerConnection() > 0 && |
756 | 0 | parent_.accumulated_requests_ >= parent_.config_.maxRequestsPerConnection()) { |
757 | 0 | parent_.read_callbacks_->connection().readDisable(true); |
758 | 0 | parent_.requests_overflow_ = true; |
759 | 0 | parent_.stats_.downstream_cx_max_requests_.inc(); |
760 | 0 | } |
761 | |
|
762 | 0 | if (passthrough_) { |
763 | 0 | parent_.stats_.request_passthrough_.inc(); |
764 | 0 | } |
765 | |
|
766 | 0 | bool destroy_rpc = false; |
767 | 0 | switch (original_msg_type_) { |
768 | 0 | case MessageType::Call: |
769 | 0 | parent_.stats_.request_call_.inc(); |
770 | | |
771 | | // Local response or protocol upgrade mean we don't wait for an upstream response. |
772 | 0 | destroy_rpc = local_response_sent_ || (upgrade_handler_ != nullptr); |
773 | 0 | break; |
774 | | |
775 | 0 | case MessageType::Oneway: |
776 | 0 | parent_.stats_.request_oneway_.inc(); |
777 | | |
778 | | // No response forthcoming, we're done. |
779 | 0 | destroy_rpc = true; |
780 | 0 | break; |
781 | | |
782 | 0 | default: |
783 | 0 | parent_.stats_.request_invalid_type_.inc(); |
784 | | |
785 | | // Invalid request, implies no response. |
786 | 0 | destroy_rpc = true; |
787 | 0 | break; |
788 | 0 | } |
789 | | |
790 | 0 | if (destroy_rpc) { |
791 | 0 | parent_.doDeferredRpcDestroy(*this); |
792 | 0 | } |
793 | 0 | } |
794 | | |
795 | | // TODO(kuochunghsu): passthroughSupported for decoder/encoder filters with more flexibility. |
796 | | // That is, supporting passthrough data for decoder filters if all decoder filters agree, |
797 | | // and supporting passthrough data for encoder filters if all encoder filters agree. |
798 | 0 | bool ConnectionManager::ActiveRpc::passthroughSupported() const { |
799 | 0 | for (auto& entry : decoder_filters_) { |
800 | 0 | if (!entry->decoder_handle_->passthroughSupported()) { |
801 | 0 | return false; |
802 | 0 | } |
803 | 0 | } |
804 | 0 | for (auto& entry : encoder_filters_) { |
805 | 0 | if (!entry->encoder_handle_->passthroughSupported()) { |
806 | 0 | return false; |
807 | 0 | } |
808 | 0 | } |
809 | 0 | return true; |
810 | 0 | } |
811 | | |
812 | | void ConnectionManager::ActiveRpc::recordResponseAccessLog( |
813 | 0 | DirectResponse::ResponseType direct_response_type, const MessageMetadataSharedPtr& metadata) { |
814 | 0 | if (direct_response_type == DirectResponse::ResponseType::Exception) { |
815 | 0 | recordResponseAccessLog(MessageTypeNames::get().fromType(MessageType::Exception), "-"); |
816 | 0 | return; |
817 | 0 | } |
818 | 0 | const std::string& message_type_name = |
819 | 0 | metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-"; |
820 | 0 | const std::string& reply_type_name = ReplyTypeNames::get().fromType( |
821 | 0 | direct_response_type == DirectResponse::ResponseType::SuccessReply ? ReplyType::Success |
822 | 0 | : ReplyType::Error); |
823 | 0 | recordResponseAccessLog(message_type_name, reply_type_name); |
824 | 0 | } |
825 | | |
826 | | void ConnectionManager::ActiveRpc::recordResponseAccessLog( |
827 | 0 | const MessageMetadataSharedPtr& metadata) { |
828 | 0 | recordResponseAccessLog( |
829 | 0 | metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-", |
830 | 0 | metadata->hasReplyType() ? ReplyTypeNames::get().fromType(metadata->replyType()) : "-"); |
831 | 0 | } |
832 | | |
833 | | void ConnectionManager::ActiveRpc::recordResponseAccessLog(const std::string& message_type, |
834 | 0 | const std::string& reply_type) { |
835 | 0 | ProtobufWkt::Struct stats_obj; |
836 | 0 | auto& fields_map = *stats_obj.mutable_fields(); |
837 | 0 | auto& response_fields_map = *fields_map["response"].mutable_struct_value()->mutable_fields(); |
838 | |
|
839 | 0 | response_fields_map["transport_type"] = |
840 | 0 | ValueUtil::stringValue(TransportNames::get().fromType(downstreamTransportType())); |
841 | 0 | response_fields_map["protocol_type"] = |
842 | 0 | ValueUtil::stringValue(ProtocolNames::get().fromType(downstreamProtocolType())); |
843 | 0 | response_fields_map["message_type"] = ValueUtil::stringValue(message_type); |
844 | 0 | response_fields_map["reply_type"] = ValueUtil::stringValue(reply_type); |
845 | |
|
846 | 0 | streamInfo().setDynamicMetadata("thrift.proxy", stats_obj); |
847 | 0 | } |
848 | | |
849 | 0 | FilterStatus ConnectionManager::ActiveRpc::passthroughData(Buffer::Instance& data) { |
850 | 0 | passthrough_ = true; |
851 | 0 | return applyDecoderFilters(DecoderEvent::PassthroughData, &data); |
852 | 0 | } |
853 | | |
854 | 0 | FilterStatus ConnectionManager::ActiveRpc::messageBegin(MessageMetadataSharedPtr metadata) { |
855 | 0 | ASSERT(metadata->hasSequenceId()); |
856 | 0 | ASSERT(metadata->hasMessageType()); |
857 | | |
858 | 0 | metadata_ = metadata; |
859 | 0 | original_sequence_id_ = metadata_->sequenceId(); |
860 | 0 | original_msg_type_ = metadata_->messageType(); |
861 | |
|
862 | 0 | auto& connection = parent_.read_callbacks_->connection(); |
863 | |
|
864 | 0 | if (metadata_->isProtocolUpgradeMessage()) { |
865 | 0 | ASSERT(parent_.protocol_->supportsUpgrade()); |
866 | | |
867 | 0 | ENVOY_CONN_LOG(debug, "thrift: decoding protocol upgrade request", connection); |
868 | 0 | upgrade_handler_ = parent_.protocol_->upgradeRequestDecoder(); |
869 | 0 | ASSERT(upgrade_handler_ != nullptr); |
870 | 0 | } |
871 | | |
872 | 0 | FilterStatus result = FilterStatus::StopIteration; |
873 | 0 | absl::optional<std::string> error; |
874 | 0 | TRY_NEEDS_AUDIT { result = applyDecoderFilters(DecoderEvent::MessageBegin, metadata); } |
875 | 0 | END_TRY catch (const std::bad_function_call& e) { error = std::string(e.what()); } |
876 | |
|
877 | 0 | const auto& route_ptr = route(); |
878 | 0 | const std::string& cluster_name = route_ptr ? route_ptr->routeEntry()->clusterName() : "-"; |
879 | 0 | const std::string& method = metadata->hasMethodName() ? metadata->methodName() : "-"; |
880 | 0 | const int32_t frame_size = |
881 | 0 | metadata->hasFrameSize() ? static_cast<int32_t>(metadata->frameSize()) : -1; |
882 | |
|
883 | 0 | if (error.has_value()) { |
884 | 0 | parent_.stats_.request_internal_error_.inc(); |
885 | 0 | std::ostringstream oss; |
886 | 0 | parent_.read_callbacks_->connection().dumpState(oss, 0); |
887 | 0 | ENVOY_STREAM_LOG(error, |
888 | 0 | "Catch exception: {}. Request seq_id: {}, method: {}, frame size: {}, cluster " |
889 | 0 | "name: {}, downstream connection state {}, headers:\n{}", |
890 | 0 | *this, error.value(), metadata_->sequenceId(), method, frame_size, |
891 | 0 | cluster_name, oss.str(), metadata->requestHeaders()); |
892 | 0 | throw EnvoyException(error.value()); |
893 | 0 | } |
894 | | |
895 | 0 | ProtobufWkt::Struct stats_obj; |
896 | 0 | auto& fields_map = *stats_obj.mutable_fields(); |
897 | 0 | fields_map["cluster"] = ValueUtil::stringValue(cluster_name); |
898 | 0 | fields_map["method"] = ValueUtil::stringValue(method); |
899 | 0 | fields_map["passthrough"] = ValueUtil::stringValue(passthroughSupported() ? "true" : "false"); |
900 | |
|
901 | 0 | auto& request_fields_map = *fields_map["request"].mutable_struct_value()->mutable_fields(); |
902 | 0 | request_fields_map["transport_type"] = |
903 | 0 | ValueUtil::stringValue(TransportNames::get().fromType(downstreamTransportType())); |
904 | 0 | request_fields_map["protocol_type"] = |
905 | 0 | ValueUtil::stringValue(ProtocolNames::get().fromType(downstreamProtocolType())); |
906 | 0 | request_fields_map["message_type"] = ValueUtil::stringValue( |
907 | 0 | metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-"); |
908 | |
|
909 | 0 | streamInfo().setDynamicMetadata("thrift.proxy", stats_obj); |
910 | 0 | ENVOY_STREAM_LOG(trace, "Request seq_id: {}, method: {}, frame size: {}, headers:\n{}", *this, |
911 | 0 | metadata_->sequenceId(), method, frame_size, metadata->requestHeaders()); |
912 | |
|
913 | 0 | return result; |
914 | 0 | } |
915 | | |
916 | 0 | FilterStatus ConnectionManager::ActiveRpc::messageEnd() { |
917 | 0 | return applyDecoderFilters(DecoderEvent::MessageEnd, absl::monostate()); |
918 | 0 | } |
919 | | |
920 | 0 | FilterStatus ConnectionManager::ActiveRpc::structBegin(absl::string_view name) { |
921 | 0 | return applyDecoderFilters(DecoderEvent::StructBegin, std::string(name)); |
922 | 0 | } |
923 | | |
924 | 0 | FilterStatus ConnectionManager::ActiveRpc::structEnd() { |
925 | 0 | return applyDecoderFilters(DecoderEvent::StructEnd, absl::monostate()); |
926 | 0 | } |
927 | | |
928 | | FilterStatus ConnectionManager::ActiveRpc::fieldBegin(absl::string_view name, FieldType& field_type, |
929 | 0 | int16_t& field_id) { |
930 | 0 | return applyDecoderFilters(DecoderEvent::FieldBegin, |
931 | 0 | std::make_tuple(std::string(name), field_type, field_id)); |
932 | 0 | } |
933 | | |
934 | 0 | FilterStatus ConnectionManager::ActiveRpc::fieldEnd() { |
935 | 0 | return applyDecoderFilters(DecoderEvent::FieldEnd, absl::monostate()); |
936 | 0 | } |
937 | | |
938 | 0 | FilterStatus ConnectionManager::ActiveRpc::boolValue(bool& value) { |
939 | 0 | return applyDecoderFilters(DecoderEvent::BoolValue, value); |
940 | 0 | } |
941 | | |
942 | 0 | FilterStatus ConnectionManager::ActiveRpc::byteValue(uint8_t& value) { |
943 | 0 | return applyDecoderFilters(DecoderEvent::ByteValue, value); |
944 | 0 | } |
945 | | |
946 | 0 | FilterStatus ConnectionManager::ActiveRpc::int16Value(int16_t& value) { |
947 | 0 | return applyDecoderFilters(DecoderEvent::Int16Value, value); |
948 | 0 | } |
949 | | |
950 | 0 | FilterStatus ConnectionManager::ActiveRpc::int32Value(int32_t& value) { |
951 | 0 | return applyDecoderFilters(DecoderEvent::Int32Value, value); |
952 | 0 | } |
953 | | |
954 | 0 | FilterStatus ConnectionManager::ActiveRpc::int64Value(int64_t& value) { |
955 | 0 | return applyDecoderFilters(DecoderEvent::Int64Value, value); |
956 | 0 | } |
957 | | |
958 | 0 | FilterStatus ConnectionManager::ActiveRpc::doubleValue(double& value) { |
959 | 0 | return applyDecoderFilters(DecoderEvent::DoubleValue, value); |
960 | 0 | } |
961 | | |
962 | 0 | FilterStatus ConnectionManager::ActiveRpc::stringValue(absl::string_view value) { |
963 | 0 | return applyDecoderFilters(DecoderEvent::StringValue, std::string(value)); |
964 | 0 | } |
965 | | |
966 | | FilterStatus ConnectionManager::ActiveRpc::mapBegin(FieldType& key_type, FieldType& value_type, |
967 | 0 | uint32_t& size) { |
968 | 0 | return applyDecoderFilters(DecoderEvent::MapBegin, std::make_tuple(key_type, value_type, size)); |
969 | 0 | } |
970 | | |
971 | 0 | FilterStatus ConnectionManager::ActiveRpc::mapEnd() { |
972 | 0 | return applyDecoderFilters(DecoderEvent::MapEnd, absl::monostate()); |
973 | 0 | } |
974 | | |
975 | 0 | FilterStatus ConnectionManager::ActiveRpc::listBegin(FieldType& value_type, uint32_t& size) { |
976 | 0 | return applyDecoderFilters(DecoderEvent::ListBegin, std::make_tuple(value_type, size)); |
977 | 0 | } |
978 | | |
979 | 0 | FilterStatus ConnectionManager::ActiveRpc::listEnd() { |
980 | 0 | return applyDecoderFilters(DecoderEvent::ListEnd, absl::monostate()); |
981 | 0 | } |
982 | | |
983 | 0 | FilterStatus ConnectionManager::ActiveRpc::setBegin(FieldType& value_type, uint32_t& size) { |
984 | 0 | return applyDecoderFilters(DecoderEvent::SetBegin, std::make_tuple(value_type, size)); |
985 | 0 | } |
986 | | |
987 | 0 | FilterStatus ConnectionManager::ActiveRpc::setEnd() { |
988 | 0 | return applyDecoderFilters(DecoderEvent::SetEnd, absl::monostate()); |
989 | 0 | } |
990 | | |
991 | 0 | void ConnectionManager::ActiveRpc::createFilterChain() { |
992 | 0 | parent_.config_.filterFactory().createFilterChain(*this); |
993 | 0 | } |
994 | | |
995 | 0 | void ConnectionManager::ActiveRpc::onReset() { |
996 | | // TODO(zuercher): e.g., parent_.stats_.named_.downstream_rq_rx_reset_.inc(); |
997 | 0 | parent_.doDeferredRpcDestroy(*this); |
998 | 0 | } |
999 | | |
1000 | 0 | void ConnectionManager::ActiveRpc::onError(const std::string& what) { |
1001 | 0 | if (metadata_) { |
1002 | 0 | sendLocalReply(AppException(AppExceptionType::ProtocolError, what), true); |
1003 | 0 | return; |
1004 | 0 | } |
1005 | | |
1006 | | // Transport or protocol error happened before (or during message begin) parsing. It's not |
1007 | | // possible to provide a valid response, so don't try. |
1008 | | |
1009 | 0 | parent_.doDeferredRpcDestroy(*this); |
1010 | 0 | parent_.read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); |
1011 | 0 | } |
1012 | | |
1013 | 0 | const Network::Connection* ConnectionManager::ActiveRpc::connection() const { |
1014 | 0 | return &parent_.read_callbacks_->connection(); |
1015 | 0 | } |
1016 | | |
1017 | 0 | Router::RouteConstSharedPtr ConnectionManager::ActiveRpc::route() { |
1018 | 0 | if (!cached_route_) { |
1019 | 0 | if (metadata_ != nullptr) { |
1020 | 0 | Router::RouteConstSharedPtr route = |
1021 | 0 | parent_.config_.routerConfig().route(*metadata_, stream_id_); |
1022 | 0 | cached_route_ = std::move(route); |
1023 | 0 | } else { |
1024 | 0 | cached_route_ = absl::nullopt; |
1025 | 0 | } |
1026 | 0 | } |
1027 | |
|
1028 | 0 | return cached_route_.value(); |
1029 | 0 | } |
1030 | | |
1031 | 0 | void ConnectionManager::ActiveRpc::onLocalReply(const MessageMetadata& metadata, bool end_stream) { |
1032 | 0 | under_on_local_reply_ = true; |
1033 | 0 | for (auto& filter : base_filters_) { |
1034 | 0 | filter->onLocalReply(metadata, end_stream); |
1035 | 0 | } |
1036 | 0 | under_on_local_reply_ = false; |
1037 | 0 | } |
1038 | | |
1039 | 0 | void ConnectionManager::ActiveRpc::sendLocalReply(const DirectResponse& response, bool end_stream) { |
1040 | 0 | ASSERT(!under_on_local_reply_); |
1041 | 0 | ENVOY_STREAM_LOG(debug, "Sending local reply, end_stream: {}, seq_id: {}", *this, end_stream, |
1042 | 0 | original_sequence_id_); |
1043 | 0 | localReplyMetadata_ = metadata_->createResponseMetadata(); |
1044 | 0 | localReplyMetadata_->setSequenceId(original_sequence_id_); |
1045 | |
|
1046 | 0 | onLocalReply(*localReplyMetadata_, end_stream); |
1047 | |
|
1048 | 0 | if (end_stream && |
1049 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.thrift_connection_draining")) { |
1050 | 0 | localReplyMetadata_->responseHeaders().addReferenceKey(Headers::get().Drain, "true"); |
1051 | 0 | ConnectionManager& cm = parent_; |
1052 | 0 | cm.stats_.downstream_response_drain_close_.inc(); |
1053 | 0 | } |
1054 | |
|
1055 | 0 | auto result = parent_.sendLocalReply(*localReplyMetadata_, response, end_stream); |
1056 | | // Only report while the local reply is successfully written. |
1057 | 0 | if (result.has_value()) { |
1058 | 0 | recordResponseAccessLog(*result, localReplyMetadata_); |
1059 | 0 | } |
1060 | |
|
1061 | 0 | if (end_stream) { |
1062 | 0 | return; |
1063 | 0 | } |
1064 | | |
1065 | 0 | if (!upgrade_handler_) { |
1066 | | // Consume any remaining request data from the downstream. |
1067 | 0 | local_response_sent_ = true; |
1068 | 0 | } |
1069 | 0 | } |
1070 | | |
1071 | 0 | void ConnectionManager::ActiveRpc::startUpstreamResponse(Transport& transport, Protocol& protocol) { |
1072 | 0 | ASSERT(response_decoder_ == nullptr); |
1073 | | |
1074 | 0 | response_decoder_ = std::make_unique<ResponseDecoder>(*this, transport, protocol); |
1075 | 0 | } |
1076 | | |
1077 | 0 | ThriftFilters::ResponseStatus ConnectionManager::ActiveRpc::upstreamData(Buffer::Instance& buffer) { |
1078 | 0 | ASSERT(response_decoder_ != nullptr); |
1079 | | |
1080 | 0 | TRY_NEEDS_AUDIT { |
1081 | 0 | if (response_decoder_->onData(buffer)) { |
1082 | | // Completed upstream response. |
1083 | 0 | parent_.doDeferredRpcDestroy(*this); |
1084 | 0 | return ThriftFilters::ResponseStatus::Complete; |
1085 | 0 | } |
1086 | 0 | return ThriftFilters::ResponseStatus::MoreData; |
1087 | 0 | } |
1088 | 0 | END_TRY catch (const AppException& ex) { |
1089 | 0 | ENVOY_LOG(error, "thrift response application error: {}", ex.what()); |
1090 | 0 | parent_.stats_.response_decoding_error_.inc(); |
1091 | |
|
1092 | 0 | sendLocalReply(ex, true); |
1093 | 0 | return ThriftFilters::ResponseStatus::Reset; |
1094 | 0 | } |
1095 | 0 | catch (const EnvoyException& ex) { |
1096 | 0 | ENVOY_CONN_LOG(error, "thrift response error: {}", parent_.read_callbacks_->connection(), |
1097 | 0 | ex.what()); |
1098 | 0 | parent_.stats_.response_decoding_error_.inc(); |
1099 | |
|
1100 | 0 | onError(ex.what()); |
1101 | 0 | return ThriftFilters::ResponseStatus::Reset; |
1102 | 0 | } |
1103 | 0 | } |
1104 | | |
1105 | 0 | void ConnectionManager::ActiveRpc::clearRouteCache() { cached_route_ = absl::nullopt; } |
1106 | | |
1107 | 0 | void ConnectionManager::ActiveRpc::resetDownstreamConnection() { |
1108 | 0 | ENVOY_CONN_LOG(debug, "resetting downstream connection", parent_.read_callbacks_->connection()); |
1109 | 0 | parent_.read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); |
1110 | 0 | } |
1111 | | |
1112 | | } // namespace ThriftProxy |
1113 | | } // namespace NetworkFilters |
1114 | | } // namespace Extensions |
1115 | | } // namespace Envoy |