Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/conn_manager.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/thrift_proxy/conn_manager.h"
2
3
#include "envoy/common/exception.h"
4
#include "envoy/event/dispatcher.h"
5
6
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
7
#include "source/extensions/filters/network/thrift_proxy/protocol.h"
8
#include "source/extensions/filters/network/thrift_proxy/transport.h"
9
10
namespace Envoy {
11
namespace Extensions {
12
namespace NetworkFilters {
13
namespace ThriftProxy {
14
15
ConnectionManager::ConnectionManager(Config& config, Random::RandomGenerator& random_generator,
16
                                     TimeSource& time_source,
17
                                     const Network::DrainDecision& drain_decision)
18
    : config_(config), stats_(config_.stats()), transport_(config.createTransport()),
19
      protocol_(config.createProtocol()),
20
      decoder_(std::make_unique<Decoder>(*transport_, *protocol_, *this)),
21
      random_generator_(random_generator), time_source_(time_source),
22
0
      drain_decision_(drain_decision) {}
23
24
0
ConnectionManager::~ConnectionManager() = default;
25
26
0
Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) {
27
0
  request_buffer_.move(data);
28
0
  dispatch();
29
30
0
  if (end_stream) {
31
0
    ENVOY_CONN_LOG(trace, "downstream half-closed", read_callbacks_->connection());
32
33
    // Downstream has closed. Unless we're waiting for an upstream connection to complete a oneway
34
    // request, close. The special case for oneway requests allows them to complete before the
35
    // ConnectionManager is destroyed.
36
0
    if (stopped_) {
37
0
      ASSERT(!rpcs_.empty());
38
0
      MessageMetadata& metadata = *(*rpcs_.begin())->metadata_;
39
0
      ASSERT(metadata.hasMessageType());
40
0
      if (metadata.messageType() == MessageType::Oneway) {
41
0
        ENVOY_CONN_LOG(trace, "waiting for one-way completion", read_callbacks_->connection());
42
0
        half_closed_ = true;
43
0
        return Network::FilterStatus::StopIteration;
44
0
      }
45
0
    }
46
47
0
    resetAllRpcs(false);
48
0
    read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
49
0
  }
50
51
0
  return Network::FilterStatus::StopIteration;
52
0
}
53
54
void ConnectionManager::emitLogEntry(const Http::RequestHeaderMap* request_headers,
55
                                     const Http::ResponseHeaderMap* response_headers,
56
0
                                     const StreamInfo::StreamInfo& stream_info) {
57
0
  const Formatter::HttpFormatterContext log_context{request_headers, response_headers};
58
59
0
  for (const auto& access_log : config_.accessLogs()) {
60
0
    access_log->log(log_context, stream_info);
61
0
  }
62
0
}
63
64
0
void ConnectionManager::dispatch() {
65
0
  if (stopped_) {
66
0
    ENVOY_CONN_LOG(debug, "thrift filter stopped", read_callbacks_->connection());
67
0
    return;
68
0
  }
69
70
0
  if (requests_overflow_) {
71
0
    ENVOY_CONN_LOG(debug, "thrift filter requests overflow", read_callbacks_->connection());
72
0
    return;
73
0
  }
74
75
0
  TRY_NEEDS_AUDIT {
76
0
    bool underflow = false;
77
0
    while (!underflow) {
78
0
      FilterStatus status = decoder_->onData(request_buffer_, underflow);
79
0
      if (status == FilterStatus::StopIteration) {
80
0
        stopped_ = true;
81
0
        break;
82
0
      }
83
0
    }
84
85
0
    return;
86
0
  }
87
0
  END_TRY catch (const AppException& ex) {
88
0
    ENVOY_LOG(debug, "thrift application exception: {}", ex.what());
89
0
    if (rpcs_.empty()) {
90
0
      MessageMetadata metadata;
91
0
      sendLocalReply(metadata, ex, true);
92
0
    } else {
93
0
      sendLocalReply(*(*rpcs_.begin())->metadata_, ex, true);
94
0
    }
95
0
  }
96
0
  catch (const EnvoyException& ex) {
97
0
    ENVOY_CONN_LOG(debug, "thrift error: {}", read_callbacks_->connection(), ex.what());
98
99
0
    if (rpcs_.empty()) {
100
      // Transport/protocol mismatch (including errors in automatic detection). Just hang up
101
      // since we don't know how to encode a response.
102
0
      read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
103
0
    } else {
104
      // Use the current rpc's transport/protocol to send an error downstream.
105
0
      rpcs_.front()->onError(ex.what());
106
0
    }
107
0
  }
108
109
0
  stats_.request_decoding_error_.inc();
110
0
  resetAllRpcs(true);
111
0
}
112
113
absl::optional<DirectResponse::ResponseType>
114
ConnectionManager::sendLocalReply(MessageMetadata& metadata, const DirectResponse& response,
115
0
                                  bool end_stream) {
116
0
  if (read_callbacks_->connection().state() == Network::Connection::State::Closed) {
117
0
    return absl::nullopt;
118
0
  }
119
120
0
  DirectResponse::ResponseType result = DirectResponse::ResponseType::Exception;
121
122
0
  if (!metadata.hasMessageType() || metadata.messageType() != MessageType::Oneway) {
123
0
    Buffer::OwnedImpl buffer;
124
0
    result = response.encode(metadata, *protocol_, buffer);
125
0
    Buffer::OwnedImpl response_buffer;
126
0
    metadata.setProtocol(protocol_->type());
127
0
    transport_->encodeFrame(response_buffer, metadata, buffer);
128
129
0
    read_callbacks_->connection().write(response_buffer, end_stream);
130
0
  }
131
132
0
  if (end_stream) {
133
0
    read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
134
0
  }
135
136
0
  switch (result) {
137
0
  case DirectResponse::ResponseType::SuccessReply:
138
0
    stats_.response_success_.inc();
139
0
    break;
140
0
  case DirectResponse::ResponseType::ErrorReply:
141
0
    stats_.response_error_.inc();
142
0
    break;
143
0
  case DirectResponse::ResponseType::Exception:
144
0
    stats_.response_exception_.inc();
145
0
    break;
146
0
  }
147
0
  return result;
148
0
}
149
150
0
void ConnectionManager::continueDecoding() {
151
0
  ENVOY_CONN_LOG(debug, "thrift filter continued", read_callbacks_->connection());
152
0
  stopped_ = false;
153
0
  dispatch();
154
155
0
  if (!stopped_ && half_closed_) {
156
    // If we're half closed, but not stopped waiting for an upstream, reset any pending rpcs and
157
    // close the connection.
158
0
    resetAllRpcs(false);
159
0
    read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
160
0
  }
161
0
}
162
163
0
void ConnectionManager::doDeferredRpcDestroy(ConnectionManager::ActiveRpc& rpc) {
164
0
  if (!rpc.inserted()) {
165
0
    return;
166
0
  }
167
168
0
  read_callbacks_->connection().dispatcher().deferredDelete(rpc.removeFromList(rpcs_));
169
0
  if (requests_overflow_ && rpcs_.empty()) {
170
0
    read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
171
0
  }
172
0
}
173
174
0
void ConnectionManager::resetAllRpcs(bool local_reset) {
175
0
  while (!rpcs_.empty()) {
176
0
    if (local_reset) {
177
0
      ENVOY_CONN_LOG(debug, "local close with active request", read_callbacks_->connection());
178
0
      stats_.cx_destroy_local_with_active_rq_.inc();
179
0
    } else {
180
0
      ENVOY_CONN_LOG(debug, "remote close with active request", read_callbacks_->connection());
181
0
      stats_.cx_destroy_remote_with_active_rq_.inc();
182
0
    }
183
184
0
    rpcs_.front()->onReset();
185
0
  }
186
0
}
187
188
0
void ConnectionManager::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
189
0
  read_callbacks_ = &callbacks;
190
191
0
  read_callbacks_->connection().addConnectionCallbacks(*this);
192
0
  read_callbacks_->connection().enableHalfClose(true);
193
0
}
194
195
0
void ConnectionManager::onEvent(Network::ConnectionEvent event) {
196
0
  if (event != Network::ConnectionEvent::LocalClose &&
197
0
      event != Network::ConnectionEvent::RemoteClose) {
198
0
    return;
199
0
  }
200
0
  resetAllRpcs(event == Network::ConnectionEvent::LocalClose);
201
0
}
202
203
0
DecoderEventHandler& ConnectionManager::newDecoderEventHandler() {
204
0
  ENVOY_LOG(trace, "new decoder filter");
205
206
0
  ActiveRpcPtr new_rpc(new ActiveRpc(*this));
207
0
  new_rpc->createFilterChain();
208
0
  LinkedList::moveIntoList(std::move(new_rpc), rpcs_);
209
210
0
  return **rpcs_.begin();
211
0
}
212
213
0
bool ConnectionManager::ResponseDecoder::passthroughEnabled() const {
214
0
  return parent_.parent_.passthroughEnabled();
215
0
}
216
217
0
bool ConnectionManager::passthroughEnabled() const {
218
0
  if (!config_.payloadPassthrough()) {
219
0
    return false;
220
0
  }
221
222
  // If the rpcs list is empty, a local response happened.
223
  //
224
  // TODO(rgs1): we actually could still enable passthrough for local
225
  // responses as long as the transport is framed and the protocol is
226
  // not Twitter.
227
0
  if (rpcs_.empty()) {
228
0
    return false;
229
0
  }
230
231
0
  return (*rpcs_.begin())->passthroughSupported();
232
0
}
233
234
0
bool ConnectionManager::headerKeysPreserveCase() const { return config_.headerKeysPreserveCase(); }
235
236
0
bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) {
237
0
  upstream_buffer_.move(data);
238
239
0
  bool underflow = false;
240
0
  decoder_->onData(upstream_buffer_, underflow);
241
0
  ASSERT(complete_ || underflow);
242
0
  return complete_;
243
0
}
244
245
0
FilterStatus ConnectionManager::ResponseDecoder::transportBegin(MessageMetadataSharedPtr metadata) {
246
0
  return parent_.applyEncoderFilters(DecoderEvent::TransportBegin, metadata, protocol_converter_);
247
0
}
248
249
0
FilterStatus ConnectionManager::ResponseDecoder::transportEnd() {
250
0
  ASSERT(metadata_ != nullptr);
251
252
0
  FilterStatus status = parent_.applyEncoderFilters(DecoderEvent::TransportEnd, absl::monostate(),
253
0
                                                    protocol_converter_);
254
  // Currently we don't support returning FilterStatus::StopIteration from encoder filters.
255
  // Hence, this if-statement is always false.
256
0
  ASSERT(status == FilterStatus::Continue);
257
0
  if (status == FilterStatus::StopIteration) {
258
0
    pending_transport_end_ = true;
259
0
    return FilterStatus::StopIteration;
260
0
  }
261
262
0
  finalizeResponse();
263
0
  return FilterStatus::Continue;
264
0
}
265
266
0
void ConnectionManager::ResponseDecoder::finalizeResponse() {
267
0
  pending_transport_end_ = false;
268
0
  ConnectionManager& cm = parent_.parent_;
269
270
0
  if (cm.read_callbacks_->connection().state() == Network::Connection::State::Closed) {
271
0
    complete_ = true;
272
0
    throw EnvoyException("downstream connection is closed");
273
0
  }
274
275
0
  Buffer::OwnedImpl buffer;
276
277
  // Use the factory to get the concrete transport from the decoder transport (as opposed to
278
  // potentially pre-detection auto transport).
279
0
  TransportPtr transport =
280
0
      NamedTransportConfigFactory::getFactory(cm.decoder_->transportType()).createTransport();
281
282
0
  metadata_->setProtocol(cm.decoder_->protocolType());
283
0
  transport->encodeFrame(buffer, *metadata_, parent_.response_buffer_);
284
0
  complete_ = true;
285
286
0
  cm.read_callbacks_->connection().write(buffer, false);
287
288
0
  cm.stats_.response_.inc();
289
0
  if (passthrough_) {
290
0
    cm.stats_.response_passthrough_.inc();
291
0
  }
292
293
0
  switch (metadata_->messageType()) {
294
0
  case MessageType::Reply:
295
0
    cm.stats_.response_reply_.inc();
296
0
    if (success_) {
297
0
      if (success_.value()) {
298
0
        cm.stats_.response_success_.inc();
299
0
      } else {
300
0
        cm.stats_.response_error_.inc();
301
0
      }
302
0
    }
303
304
0
    break;
305
306
0
  case MessageType::Exception:
307
0
    cm.stats_.response_exception_.inc();
308
0
    break;
309
310
0
  default:
311
0
    cm.stats_.response_invalid_type_.inc();
312
0
    break;
313
0
  }
314
0
}
315
316
0
FilterStatus ConnectionManager::ResponseDecoder::passthroughData(Buffer::Instance& data) {
317
0
  passthrough_ = true;
318
319
0
  return parent_.applyEncoderFilters(DecoderEvent::PassthroughData, &data, protocol_converter_);
320
0
}
321
322
0
FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) {
323
0
  metadata_ = metadata;
324
0
  metadata_->setSequenceId(parent_.original_sequence_id_);
325
326
0
  if (metadata->hasReplyType()) {
327
    // TODO(kuochunghsu): the status of success could be altered by filters
328
0
    success_ = metadata->replyType() == ReplyType::Success;
329
0
  }
330
331
0
  ConnectionManager& cm = parent_.parent_;
332
333
0
  ENVOY_STREAM_LOG(
334
0
      trace, "Response message_type: {}, seq_id: {}, method: {}, frame size: {}, headers:\n{}",
335
0
      parent_,
336
0
      metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-",
337
0
      metadata_->sequenceId(), metadata->hasMethodName() ? metadata->methodName() : "-",
338
0
      metadata->hasFrameSize() ? metadata->frameSize() : -1, metadata->responseHeaders());
339
340
  // Check if the upstream host is draining.
341
  //
342
  // Note: the drain header needs to be checked here in messageBegin, and not transportBegin, so
343
  // that we can support the header in TTwitter protocol, which reads/adds response headers to
344
  // metadata in messageBegin when reading the response from upstream. Therefore detecting a drain
345
  // should happen here.
346
0
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.thrift_connection_draining")) {
347
0
    metadata_->setDraining(!metadata->responseHeaders().get(Headers::get().Drain).empty());
348
0
    metadata->responseHeaders().remove(Headers::get().Drain);
349
350
    // Check if this host itself is draining.
351
    //
352
    // Note: Similarly as above, the response is buffered until transportEnd. Therefore metadata
353
    // should be set before the encodeFrame() call. It should be set at or after the messageBegin
354
    // call so that the header is added after all upstream headers passed, due to messageBegin
355
    // possibly not getting headers in transportBegin.
356
0
    if (cm.drain_decision_.drainClose()) {
357
0
      ENVOY_STREAM_LOG(debug, "propogate Drain header for drain close decision", parent_);
358
      // TODO(rgs1): should the key value contain something useful (e.g.: minutes til drain is
359
      // over)?
360
0
      metadata->responseHeaders().addReferenceKey(Headers::get().Drain, "true");
361
0
      cm.stats_.downstream_response_drain_close_.inc();
362
0
    }
363
0
  }
364
365
0
  parent_.recordResponseAccessLog(metadata);
366
367
0
  return parent_.applyEncoderFilters(DecoderEvent::MessageBegin, metadata, protocol_converter_);
368
0
}
369
370
0
FilterStatus ConnectionManager::ResponseDecoder::messageEnd() {
371
0
  return parent_.applyEncoderFilters(DecoderEvent::MessageEnd, absl::monostate(),
372
0
                                     protocol_converter_);
373
0
}
374
375
0
FilterStatus ConnectionManager::ResponseDecoder::structBegin(absl::string_view name) {
376
0
  return parent_.applyEncoderFilters(DecoderEvent::StructBegin, std::string(name),
377
0
                                     protocol_converter_);
378
0
}
379
380
0
FilterStatus ConnectionManager::ResponseDecoder::structEnd() {
381
0
  return parent_.applyEncoderFilters(DecoderEvent::StructEnd, absl::monostate(),
382
0
                                     protocol_converter_);
383
0
}
384
385
FilterStatus ConnectionManager::ResponseDecoder::fieldBegin(absl::string_view name,
386
                                                            FieldType& field_type,
387
0
                                                            int16_t& field_id) {
388
0
  return parent_.applyEncoderFilters(DecoderEvent::FieldBegin,
389
0
                                     std::make_tuple(std::string(name), field_type, field_id),
390
0
                                     protocol_converter_);
391
0
}
392
393
0
FilterStatus ConnectionManager::ResponseDecoder::fieldEnd() {
394
0
  return parent_.applyEncoderFilters(DecoderEvent::FieldEnd, absl::monostate(),
395
0
                                     protocol_converter_);
396
0
}
397
398
0
FilterStatus ConnectionManager::ResponseDecoder::boolValue(bool& value) {
399
0
  return parent_.applyEncoderFilters(DecoderEvent::BoolValue, value, protocol_converter_);
400
0
}
401
402
0
FilterStatus ConnectionManager::ResponseDecoder::byteValue(uint8_t& value) {
403
0
  return parent_.applyEncoderFilters(DecoderEvent::ByteValue, value, protocol_converter_);
404
0
}
405
406
0
FilterStatus ConnectionManager::ResponseDecoder::int16Value(int16_t& value) {
407
0
  return parent_.applyEncoderFilters(DecoderEvent::Int16Value, value, protocol_converter_);
408
0
}
409
410
0
FilterStatus ConnectionManager::ResponseDecoder::int32Value(int32_t& value) {
411
0
  return parent_.applyEncoderFilters(DecoderEvent::Int32Value, value, protocol_converter_);
412
0
}
413
414
0
FilterStatus ConnectionManager::ResponseDecoder::int64Value(int64_t& value) {
415
0
  return parent_.applyEncoderFilters(DecoderEvent::Int64Value, value, protocol_converter_);
416
0
}
417
418
0
FilterStatus ConnectionManager::ResponseDecoder::doubleValue(double& value) {
419
0
  return parent_.applyEncoderFilters(DecoderEvent::DoubleValue, value, protocol_converter_);
420
0
}
421
422
0
FilterStatus ConnectionManager::ResponseDecoder::stringValue(absl::string_view value) {
423
0
  return parent_.applyEncoderFilters(DecoderEvent::StringValue, std::string(value),
424
0
                                     protocol_converter_);
425
0
}
426
427
FilterStatus ConnectionManager::ResponseDecoder::mapBegin(FieldType& key_type,
428
0
                                                          FieldType& value_type, uint32_t& size) {
429
0
  return parent_.applyEncoderFilters(
430
0
      DecoderEvent::MapBegin, std::make_tuple(key_type, value_type, size), protocol_converter_);
431
0
}
432
433
0
FilterStatus ConnectionManager::ResponseDecoder::mapEnd() {
434
0
  return parent_.applyEncoderFilters(DecoderEvent::MapEnd, absl::monostate(), protocol_converter_);
435
0
}
436
437
0
FilterStatus ConnectionManager::ResponseDecoder::listBegin(FieldType& elem_type, uint32_t& size) {
438
0
  return parent_.applyEncoderFilters(DecoderEvent::ListBegin, std::make_tuple(elem_type, size),
439
0
                                     protocol_converter_);
440
0
}
441
442
0
FilterStatus ConnectionManager::ResponseDecoder::listEnd() {
443
0
  return parent_.applyEncoderFilters(DecoderEvent::ListEnd, absl::monostate(), protocol_converter_);
444
0
}
445
446
0
FilterStatus ConnectionManager::ResponseDecoder::setBegin(FieldType& elem_type, uint32_t& size) {
447
0
  return parent_.applyEncoderFilters(DecoderEvent::SetBegin, std::make_tuple(elem_type, size),
448
0
                                     protocol_converter_);
449
0
}
450
451
0
FilterStatus ConnectionManager::ResponseDecoder::setEnd() {
452
0
  return parent_.applyEncoderFilters(DecoderEvent::SetEnd, absl::monostate(), protocol_converter_);
453
0
}
454
455
0
bool ConnectionManager::ResponseDecoder::headerKeysPreserveCase() const {
456
0
  return parent_.parent_.headerKeysPreserveCase();
457
0
}
458
459
0
void ConnectionManager::ActiveRpcDecoderFilter::continueDecoding() {
460
0
  const FilterStatus status =
461
0
      parent_.applyDecoderFilters(DecoderEvent::ContinueDecode, absl::monostate(), this);
462
0
  if (status == FilterStatus::Continue) {
463
    // All filters have been executed for the current decoder state.
464
0
    if (parent_.pending_transport_end_) {
465
      // If the filter stack was paused during transportEnd, handle end-of-request details.
466
0
      parent_.finalizeRequest();
467
0
    }
468
469
0
    parent_.continueDecoding();
470
0
  }
471
0
}
472
473
0
void ConnectionManager::ActiveRpcEncoderFilter::continueEncoding() {
474
  // Not supported.
475
0
  ASSERT(false);
476
0
}
477
478
FilterStatus ConnectionManager::ActiveRpc::applyDecoderFilters(DecoderEvent state,
479
                                                               FilterContext&& data,
480
0
                                                               ActiveRpcDecoderFilter* filter) {
481
0
  ASSERT(filter_action_ == nullptr || state == DecoderEvent::ContinueDecode);
482
0
  prepareFilterAction(state, std::move(data));
483
484
0
  if (local_response_sent_) {
485
0
    filter_action_ = nullptr;
486
0
    return FilterStatus::Continue;
487
0
  }
488
489
0
  if (upgrade_handler_) {
490
    // Divert events to the current protocol upgrade handler.
491
0
    const FilterStatus status = filter_action_(upgrade_handler_.get());
492
0
    filter_action_ = nullptr;
493
0
    return status;
494
0
  }
495
496
0
  return applyFilters<ActiveRpcDecoderFilter>(filter, decoder_filters_);
497
0
}
498
499
FilterStatus
500
ConnectionManager::ActiveRpc::applyEncoderFilters(DecoderEvent state, FilterContext&& data,
501
                                                  ProtocolConverterSharedPtr protocol_converter,
502
0
                                                  ActiveRpcEncoderFilter* filter) {
503
0
  ASSERT(filter_action_ == nullptr || state == DecoderEvent::ContinueDecode);
504
0
  prepareFilterAction(state, std::move(data));
505
506
0
  FilterStatus status =
507
0
      applyFilters<ActiveRpcEncoderFilter>(filter, encoder_filters_, protocol_converter);
508
  // FilterStatus::StopIteration is currently not supported.
509
0
  ASSERT(status == FilterStatus::Continue);
510
511
0
  return status;
512
0
}
513
514
template <typename FilterType>
515
FilterStatus
516
ConnectionManager::ActiveRpc::applyFilters(FilterType* filter,
517
                                           std::list<std::unique_ptr<FilterType>>& filter_list,
518
0
                                           ProtocolConverterSharedPtr protocol_converter) {
519
520
0
  typename std::list<std::unique_ptr<FilterType>>::iterator entry =
521
0
      !filter ? filter_list.begin() : std::next(filter->entry());
522
0
  for (; entry != filter_list.end(); entry++) {
523
0
    const FilterStatus status = filter_action_((*entry)->decodeEventHandler());
524
0
    ENVOY_STREAM_LOG(trace, "apply filter called: filter={} status={}", *this,
525
0
                     static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
526
0
    if (local_response_sent_) {
527
      // The filter called sendLocalReply but _did not_ close the connection.
528
      // We return FilterStatus::Continue irrespective of the current result,
529
      // which is fine because subsequent calls to this method will skip
530
      // filters anyway.
531
      //
532
      // Note: we need to return FilterStatus::Continue here, in order for decoding
533
      // to proceed. This is important because as noted above, the connection remains
534
      // open so we need to consume the remaining bytes.
535
0
      break;
536
0
    }
537
538
0
    if (status != FilterStatus::Continue) {
539
      // If we got FilterStatus::StopIteration and a local reply happened but
540
      // local_response_sent_ was not set, the connection was closed.
541
      //
542
      // In this case, either resetAllRpcs() gets called via onEvent(LocalClose) or
543
      // dispatch() stops the processing.
544
      //
545
      // In other words, after a local reply closes the connection and StopIteration
546
      // is returned we are done.
547
0
      return status;
548
0
    }
549
0
  }
550
551
  // The protocol converter writes the data to a buffer for response.
552
0
  if (protocol_converter) {
553
0
    filter_action_(protocol_converter.get());
554
0
  }
555
556
0
  filter_action_ = nullptr;
557
558
0
  return FilterStatus::Continue;
559
0
}
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::FilterStatus Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpc::applyFilters<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter>(Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter*, std::__1::list<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter> >, std::__1::allocator<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcDecoderFilter> > > >&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ProtocolConverter>)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::FilterStatus Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpc::applyFilters<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter>(Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter*, std::__1::list<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter> >, std::__1::allocator<std::__1::unique_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter, std::__1::default_delete<Envoy::Extensions::NetworkFilters::ThriftProxy::ConnectionManager::ActiveRpcEncoderFilter> > > >&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::ProtocolConverter>)
560
561
0
void ConnectionManager::ActiveRpc::prepareFilterAction(DecoderEvent event, FilterContext&& data) {
562
  // DecoderEvent::ContinueDecode indicates we're handling previous filter action with the
563
  // filter chain. Therefore, we should not reset filter_action_.
564
0
  if (event == DecoderEvent::ContinueDecode) {
565
0
    return;
566
0
  }
567
568
0
  switch (event) {
569
0
  case DecoderEvent::TransportBegin:
570
0
    filter_action_ = [filter_context =
571
0
                          std::move(data)](DecoderEventHandler* filter) -> FilterStatus {
572
0
      MessageMetadataSharedPtr metadata = absl::get<MessageMetadataSharedPtr>(filter_context);
573
0
      return filter->transportBegin(metadata);
574
0
    };
575
0
    break;
576
0
  case DecoderEvent::TransportEnd:
577
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus {
578
0
      return filter->transportEnd();
579
0
    };
580
0
    break;
581
0
  case DecoderEvent::PassthroughData:
582
0
    filter_action_ = [filter_context =
583
0
                          std::move(data)](DecoderEventHandler* filter) -> FilterStatus {
584
0
      Buffer::Instance* data = absl::get<Buffer::Instance*>(filter_context);
585
0
      return filter->passthroughData(*data);
586
0
    };
587
0
    break;
588
0
  case DecoderEvent::MessageBegin:
589
0
    filter_action_ = [filter_context =
590
0
                          std::move(data)](DecoderEventHandler* filter) -> FilterStatus {
591
0
      MessageMetadataSharedPtr metadata = absl::get<MessageMetadataSharedPtr>(filter_context);
592
0
      return filter->messageBegin(metadata);
593
0
    };
594
0
    break;
595
0
  case DecoderEvent::MessageEnd:
596
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus {
597
0
      return filter->messageEnd();
598
0
    };
599
0
    break;
600
0
  case DecoderEvent::StructBegin:
601
0
    filter_action_ = [filter_context =
602
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
603
0
      std::string& name = absl::get<std::string>(filter_context);
604
0
      return filter->structBegin(name);
605
0
    };
606
0
    break;
607
0
  case DecoderEvent::StructEnd:
608
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus {
609
0
      return filter->structEnd();
610
0
    };
611
0
    break;
612
0
  case DecoderEvent::FieldBegin:
613
0
    filter_action_ = [filter_context =
614
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
615
0
      std::tuple<std::string, FieldType, int16_t>& t =
616
0
          absl::get<std::tuple<std::string, FieldType, int16_t>>(filter_context);
617
0
      std::string& name = std::get<0>(t);
618
0
      FieldType& field_type = std::get<1>(t);
619
0
      int16_t& field_id = std::get<2>(t);
620
0
      return filter->fieldBegin(name, field_type, field_id);
621
0
    };
622
0
    break;
623
0
  case DecoderEvent::FieldEnd:
624
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->fieldEnd(); };
625
0
    break;
626
0
  case DecoderEvent::BoolValue:
627
0
    filter_action_ = [filter_context =
628
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
629
0
      bool& value = absl::get<bool>(filter_context);
630
0
      return filter->boolValue(value);
631
0
    };
632
0
    break;
633
0
  case DecoderEvent::ByteValue:
634
0
    filter_action_ = [filter_context =
635
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
636
0
      uint8_t& value = absl::get<uint8_t>(filter_context);
637
0
      return filter->byteValue(value);
638
0
    };
639
0
    break;
640
0
  case DecoderEvent::Int16Value:
641
0
    filter_action_ = [filter_context =
642
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
643
0
      int16_t& value = absl::get<int16_t>(filter_context);
644
0
      return filter->int16Value(value);
645
0
    };
646
0
    break;
647
0
  case DecoderEvent::Int32Value:
648
0
    filter_action_ = [filter_context =
649
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
650
0
      int32_t& value = absl::get<int32_t>(filter_context);
651
0
      return filter->int32Value(value);
652
0
    };
653
0
    break;
654
0
  case DecoderEvent::Int64Value:
655
0
    filter_action_ = [filter_context =
656
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
657
0
      int64_t& value = absl::get<int64_t>(filter_context);
658
0
      return filter->int64Value(value);
659
0
    };
660
0
    break;
661
0
  case DecoderEvent::DoubleValue:
662
0
    filter_action_ = [filter_context =
663
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
664
0
      double& value = absl::get<double>(filter_context);
665
0
      return filter->doubleValue(value);
666
0
    };
667
0
    break;
668
0
  case DecoderEvent::StringValue:
669
0
    filter_action_ = [filter_context =
670
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
671
0
      std::string& value = absl::get<std::string>(filter_context);
672
0
      return filter->stringValue(value);
673
0
    };
674
0
    break;
675
0
  case DecoderEvent::MapBegin:
676
0
    filter_action_ = [filter_context =
677
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
678
0
      std::tuple<FieldType, FieldType, uint32_t>& t =
679
0
          absl::get<std::tuple<FieldType, FieldType, uint32_t>>(filter_context);
680
0
      FieldType& key_type = std::get<0>(t);
681
0
      FieldType& value_type = std::get<1>(t);
682
0
      uint32_t& size = std::get<2>(t);
683
0
      return filter->mapBegin(key_type, value_type, size);
684
0
    };
685
0
    break;
686
0
  case DecoderEvent::MapEnd:
687
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->mapEnd(); };
688
0
    break;
689
0
  case DecoderEvent::ListBegin:
690
0
    filter_action_ = [filter_context =
691
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
692
0
      std::tuple<FieldType, uint32_t>& t =
693
0
          absl::get<std::tuple<FieldType, uint32_t>>(filter_context);
694
0
      FieldType& elem_type = std::get<0>(t);
695
0
      uint32_t& size = std::get<1>(t);
696
0
      return filter->listBegin(elem_type, size);
697
0
    };
698
0
    break;
699
0
  case DecoderEvent::ListEnd:
700
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->listEnd(); };
701
0
    break;
702
0
  case DecoderEvent::SetBegin:
703
0
    filter_action_ = [filter_context =
704
0
                          std::move(data)](DecoderEventHandler* filter) mutable -> FilterStatus {
705
0
      std::tuple<FieldType, uint32_t>& t =
706
0
          absl::get<std::tuple<FieldType, uint32_t>>(filter_context);
707
0
      FieldType& elem_type = std::get<0>(t);
708
0
      uint32_t& size = std::get<1>(t);
709
0
      return filter->setBegin(elem_type, size);
710
0
    };
711
0
    break;
712
0
  case DecoderEvent::SetEnd:
713
0
    filter_action_ = [](DecoderEventHandler* filter) -> FilterStatus { return filter->setEnd(); };
714
0
    break;
715
0
  default:
716
0
    PANIC_DUE_TO_CORRUPT_ENUM;
717
0
  }
718
0
}
719
720
0
FilterStatus ConnectionManager::ActiveRpc::transportBegin(MessageMetadataSharedPtr metadata) {
721
0
  return applyDecoderFilters(DecoderEvent::TransportBegin, metadata);
722
0
}
723
724
0
FilterStatus ConnectionManager::ActiveRpc::transportEnd() {
725
0
  ASSERT(metadata_ != nullptr);
726
727
0
  FilterStatus status;
728
0
  if (upgrade_handler_) {
729
0
    status = upgrade_handler_->transportEnd();
730
731
0
    if (metadata_->isProtocolUpgradeMessage()) {
732
0
      ENVOY_CONN_LOG(error, "thrift: sending protocol upgrade response",
733
0
                     parent_.read_callbacks_->connection());
734
0
      sendLocalReply(*parent_.protocol_->upgradeResponse(*upgrade_handler_), false);
735
0
    }
736
0
  } else {
737
0
    status = applyDecoderFilters(DecoderEvent::TransportEnd, absl::monostate());
738
0
    if (status == FilterStatus::StopIteration) {
739
0
      pending_transport_end_ = true;
740
0
      return status;
741
0
    }
742
0
  }
743
744
0
  finalizeRequest();
745
746
0
  return status;
747
0
}
748
749
0
void ConnectionManager::ActiveRpc::finalizeRequest() {
750
0
  pending_transport_end_ = false;
751
752
0
  parent_.stats_.request_.inc();
753
754
0
  parent_.accumulated_requests_++;
755
0
  if (parent_.config_.maxRequestsPerConnection() > 0 &&
756
0
      parent_.accumulated_requests_ >= parent_.config_.maxRequestsPerConnection()) {
757
0
    parent_.read_callbacks_->connection().readDisable(true);
758
0
    parent_.requests_overflow_ = true;
759
0
    parent_.stats_.downstream_cx_max_requests_.inc();
760
0
  }
761
762
0
  if (passthrough_) {
763
0
    parent_.stats_.request_passthrough_.inc();
764
0
  }
765
766
0
  bool destroy_rpc = false;
767
0
  switch (original_msg_type_) {
768
0
  case MessageType::Call:
769
0
    parent_.stats_.request_call_.inc();
770
771
    // Local response or protocol upgrade mean we don't wait for an upstream response.
772
0
    destroy_rpc = local_response_sent_ || (upgrade_handler_ != nullptr);
773
0
    break;
774
775
0
  case MessageType::Oneway:
776
0
    parent_.stats_.request_oneway_.inc();
777
778
    // No response forthcoming, we're done.
779
0
    destroy_rpc = true;
780
0
    break;
781
782
0
  default:
783
0
    parent_.stats_.request_invalid_type_.inc();
784
785
    // Invalid request, implies no response.
786
0
    destroy_rpc = true;
787
0
    break;
788
0
  }
789
790
0
  if (destroy_rpc) {
791
0
    parent_.doDeferredRpcDestroy(*this);
792
0
  }
793
0
}
794
795
// TODO(kuochunghsu): passthroughSupported for decoder/encoder filters with more flexibility.
796
// That is, supporting passthrough data for decoder filters if all  decoder filters agree,
797
// and supporting passthrough data for encoder filters if all encoder filters agree.
798
0
bool ConnectionManager::ActiveRpc::passthroughSupported() const {
799
0
  for (auto& entry : decoder_filters_) {
800
0
    if (!entry->decoder_handle_->passthroughSupported()) {
801
0
      return false;
802
0
    }
803
0
  }
804
0
  for (auto& entry : encoder_filters_) {
805
0
    if (!entry->encoder_handle_->passthroughSupported()) {
806
0
      return false;
807
0
    }
808
0
  }
809
0
  return true;
810
0
}
811
812
void ConnectionManager::ActiveRpc::recordResponseAccessLog(
813
0
    DirectResponse::ResponseType direct_response_type, const MessageMetadataSharedPtr& metadata) {
814
0
  if (direct_response_type == DirectResponse::ResponseType::Exception) {
815
0
    recordResponseAccessLog(MessageTypeNames::get().fromType(MessageType::Exception), "-");
816
0
    return;
817
0
  }
818
0
  const std::string& message_type_name =
819
0
      metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-";
820
0
  const std::string& reply_type_name = ReplyTypeNames::get().fromType(
821
0
      direct_response_type == DirectResponse::ResponseType::SuccessReply ? ReplyType::Success
822
0
                                                                         : ReplyType::Error);
823
0
  recordResponseAccessLog(message_type_name, reply_type_name);
824
0
}
825
826
void ConnectionManager::ActiveRpc::recordResponseAccessLog(
827
0
    const MessageMetadataSharedPtr& metadata) {
828
0
  recordResponseAccessLog(
829
0
      metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-",
830
0
      metadata->hasReplyType() ? ReplyTypeNames::get().fromType(metadata->replyType()) : "-");
831
0
}
832
833
void ConnectionManager::ActiveRpc::recordResponseAccessLog(const std::string& message_type,
834
0
                                                           const std::string& reply_type) {
835
0
  ProtobufWkt::Struct stats_obj;
836
0
  auto& fields_map = *stats_obj.mutable_fields();
837
0
  auto& response_fields_map = *fields_map["response"].mutable_struct_value()->mutable_fields();
838
839
0
  response_fields_map["transport_type"] =
840
0
      ValueUtil::stringValue(TransportNames::get().fromType(downstreamTransportType()));
841
0
  response_fields_map["protocol_type"] =
842
0
      ValueUtil::stringValue(ProtocolNames::get().fromType(downstreamProtocolType()));
843
0
  response_fields_map["message_type"] = ValueUtil::stringValue(message_type);
844
0
  response_fields_map["reply_type"] = ValueUtil::stringValue(reply_type);
845
846
0
  streamInfo().setDynamicMetadata("thrift.proxy", stats_obj);
847
0
}
848
849
0
FilterStatus ConnectionManager::ActiveRpc::passthroughData(Buffer::Instance& data) {
850
0
  passthrough_ = true;
851
0
  return applyDecoderFilters(DecoderEvent::PassthroughData, &data);
852
0
}
853
854
0
FilterStatus ConnectionManager::ActiveRpc::messageBegin(MessageMetadataSharedPtr metadata) {
855
0
  ASSERT(metadata->hasSequenceId());
856
0
  ASSERT(metadata->hasMessageType());
857
858
0
  metadata_ = metadata;
859
0
  original_sequence_id_ = metadata_->sequenceId();
860
0
  original_msg_type_ = metadata_->messageType();
861
862
0
  auto& connection = parent_.read_callbacks_->connection();
863
864
0
  if (metadata_->isProtocolUpgradeMessage()) {
865
0
    ASSERT(parent_.protocol_->supportsUpgrade());
866
867
0
    ENVOY_CONN_LOG(debug, "thrift: decoding protocol upgrade request", connection);
868
0
    upgrade_handler_ = parent_.protocol_->upgradeRequestDecoder();
869
0
    ASSERT(upgrade_handler_ != nullptr);
870
0
  }
871
872
0
  FilterStatus result = FilterStatus::StopIteration;
873
0
  absl::optional<std::string> error;
874
0
  TRY_NEEDS_AUDIT { result = applyDecoderFilters(DecoderEvent::MessageBegin, metadata); }
875
0
  END_TRY catch (const std::bad_function_call& e) { error = std::string(e.what()); }
876
877
0
  const auto& route_ptr = route();
878
0
  const std::string& cluster_name = route_ptr ? route_ptr->routeEntry()->clusterName() : "-";
879
0
  const std::string& method = metadata->hasMethodName() ? metadata->methodName() : "-";
880
0
  const int32_t frame_size =
881
0
      metadata->hasFrameSize() ? static_cast<int32_t>(metadata->frameSize()) : -1;
882
883
0
  if (error.has_value()) {
884
0
    parent_.stats_.request_internal_error_.inc();
885
0
    std::ostringstream oss;
886
0
    parent_.read_callbacks_->connection().dumpState(oss, 0);
887
0
    ENVOY_STREAM_LOG(error,
888
0
                     "Catch exception: {}. Request seq_id: {}, method: {}, frame size: {}, cluster "
889
0
                     "name: {}, downstream connection state {}, headers:\n{}",
890
0
                     *this, error.value(), metadata_->sequenceId(), method, frame_size,
891
0
                     cluster_name, oss.str(), metadata->requestHeaders());
892
0
    throw EnvoyException(error.value());
893
0
  }
894
895
0
  ProtobufWkt::Struct stats_obj;
896
0
  auto& fields_map = *stats_obj.mutable_fields();
897
0
  fields_map["cluster"] = ValueUtil::stringValue(cluster_name);
898
0
  fields_map["method"] = ValueUtil::stringValue(method);
899
0
  fields_map["passthrough"] = ValueUtil::stringValue(passthroughSupported() ? "true" : "false");
900
901
0
  auto& request_fields_map = *fields_map["request"].mutable_struct_value()->mutable_fields();
902
0
  request_fields_map["transport_type"] =
903
0
      ValueUtil::stringValue(TransportNames::get().fromType(downstreamTransportType()));
904
0
  request_fields_map["protocol_type"] =
905
0
      ValueUtil::stringValue(ProtocolNames::get().fromType(downstreamProtocolType()));
906
0
  request_fields_map["message_type"] = ValueUtil::stringValue(
907
0
      metadata->hasMessageType() ? MessageTypeNames::get().fromType(metadata->messageType()) : "-");
908
909
0
  streamInfo().setDynamicMetadata("thrift.proxy", stats_obj);
910
0
  ENVOY_STREAM_LOG(trace, "Request seq_id: {}, method: {}, frame size: {}, headers:\n{}", *this,
911
0
                   metadata_->sequenceId(), method, frame_size, metadata->requestHeaders());
912
913
0
  return result;
914
0
}
915
916
0
FilterStatus ConnectionManager::ActiveRpc::messageEnd() {
917
0
  return applyDecoderFilters(DecoderEvent::MessageEnd, absl::monostate());
918
0
}
919
920
0
FilterStatus ConnectionManager::ActiveRpc::structBegin(absl::string_view name) {
921
0
  return applyDecoderFilters(DecoderEvent::StructBegin, std::string(name));
922
0
}
923
924
0
FilterStatus ConnectionManager::ActiveRpc::structEnd() {
925
0
  return applyDecoderFilters(DecoderEvent::StructEnd, absl::monostate());
926
0
}
927
928
FilterStatus ConnectionManager::ActiveRpc::fieldBegin(absl::string_view name, FieldType& field_type,
929
0
                                                      int16_t& field_id) {
930
0
  return applyDecoderFilters(DecoderEvent::FieldBegin,
931
0
                             std::make_tuple(std::string(name), field_type, field_id));
932
0
}
933
934
0
FilterStatus ConnectionManager::ActiveRpc::fieldEnd() {
935
0
  return applyDecoderFilters(DecoderEvent::FieldEnd, absl::monostate());
936
0
}
937
938
0
FilterStatus ConnectionManager::ActiveRpc::boolValue(bool& value) {
939
0
  return applyDecoderFilters(DecoderEvent::BoolValue, value);
940
0
}
941
942
0
FilterStatus ConnectionManager::ActiveRpc::byteValue(uint8_t& value) {
943
0
  return applyDecoderFilters(DecoderEvent::ByteValue, value);
944
0
}
945
946
0
FilterStatus ConnectionManager::ActiveRpc::int16Value(int16_t& value) {
947
0
  return applyDecoderFilters(DecoderEvent::Int16Value, value);
948
0
}
949
950
0
FilterStatus ConnectionManager::ActiveRpc::int32Value(int32_t& value) {
951
0
  return applyDecoderFilters(DecoderEvent::Int32Value, value);
952
0
}
953
954
0
FilterStatus ConnectionManager::ActiveRpc::int64Value(int64_t& value) {
955
0
  return applyDecoderFilters(DecoderEvent::Int64Value, value);
956
0
}
957
958
0
FilterStatus ConnectionManager::ActiveRpc::doubleValue(double& value) {
959
0
  return applyDecoderFilters(DecoderEvent::DoubleValue, value);
960
0
}
961
962
0
FilterStatus ConnectionManager::ActiveRpc::stringValue(absl::string_view value) {
963
0
  return applyDecoderFilters(DecoderEvent::StringValue, std::string(value));
964
0
}
965
966
FilterStatus ConnectionManager::ActiveRpc::mapBegin(FieldType& key_type, FieldType& value_type,
967
0
                                                    uint32_t& size) {
968
0
  return applyDecoderFilters(DecoderEvent::MapBegin, std::make_tuple(key_type, value_type, size));
969
0
}
970
971
0
FilterStatus ConnectionManager::ActiveRpc::mapEnd() {
972
0
  return applyDecoderFilters(DecoderEvent::MapEnd, absl::monostate());
973
0
}
974
975
0
FilterStatus ConnectionManager::ActiveRpc::listBegin(FieldType& value_type, uint32_t& size) {
976
0
  return applyDecoderFilters(DecoderEvent::ListBegin, std::make_tuple(value_type, size));
977
0
}
978
979
0
FilterStatus ConnectionManager::ActiveRpc::listEnd() {
980
0
  return applyDecoderFilters(DecoderEvent::ListEnd, absl::monostate());
981
0
}
982
983
0
FilterStatus ConnectionManager::ActiveRpc::setBegin(FieldType& value_type, uint32_t& size) {
984
0
  return applyDecoderFilters(DecoderEvent::SetBegin, std::make_tuple(value_type, size));
985
0
}
986
987
0
FilterStatus ConnectionManager::ActiveRpc::setEnd() {
988
0
  return applyDecoderFilters(DecoderEvent::SetEnd, absl::monostate());
989
0
}
990
991
0
void ConnectionManager::ActiveRpc::createFilterChain() {
992
0
  parent_.config_.filterFactory().createFilterChain(*this);
993
0
}
994
995
0
void ConnectionManager::ActiveRpc::onReset() {
996
  // TODO(zuercher): e.g., parent_.stats_.named_.downstream_rq_rx_reset_.inc();
997
0
  parent_.doDeferredRpcDestroy(*this);
998
0
}
999
1000
0
void ConnectionManager::ActiveRpc::onError(const std::string& what) {
1001
0
  if (metadata_) {
1002
0
    sendLocalReply(AppException(AppExceptionType::ProtocolError, what), true);
1003
0
    return;
1004
0
  }
1005
1006
  // Transport or protocol error happened before (or during message begin) parsing. It's not
1007
  // possible to provide a valid response, so don't try.
1008
1009
0
  parent_.doDeferredRpcDestroy(*this);
1010
0
  parent_.read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
1011
0
}
1012
1013
0
const Network::Connection* ConnectionManager::ActiveRpc::connection() const {
1014
0
  return &parent_.read_callbacks_->connection();
1015
0
}
1016
1017
0
Router::RouteConstSharedPtr ConnectionManager::ActiveRpc::route() {
1018
0
  if (!cached_route_) {
1019
0
    if (metadata_ != nullptr) {
1020
0
      Router::RouteConstSharedPtr route =
1021
0
          parent_.config_.routerConfig().route(*metadata_, stream_id_);
1022
0
      cached_route_ = std::move(route);
1023
0
    } else {
1024
0
      cached_route_ = absl::nullopt;
1025
0
    }
1026
0
  }
1027
1028
0
  return cached_route_.value();
1029
0
}
1030
1031
0
void ConnectionManager::ActiveRpc::onLocalReply(const MessageMetadata& metadata, bool end_stream) {
1032
0
  under_on_local_reply_ = true;
1033
0
  for (auto& filter : base_filters_) {
1034
0
    filter->onLocalReply(metadata, end_stream);
1035
0
  }
1036
0
  under_on_local_reply_ = false;
1037
0
}
1038
1039
0
void ConnectionManager::ActiveRpc::sendLocalReply(const DirectResponse& response, bool end_stream) {
1040
0
  ASSERT(!under_on_local_reply_);
1041
0
  ENVOY_STREAM_LOG(debug, "Sending local reply, end_stream: {}, seq_id: {}", *this, end_stream,
1042
0
                   original_sequence_id_);
1043
0
  localReplyMetadata_ = metadata_->createResponseMetadata();
1044
0
  localReplyMetadata_->setSequenceId(original_sequence_id_);
1045
1046
0
  onLocalReply(*localReplyMetadata_, end_stream);
1047
1048
0
  if (end_stream &&
1049
0
      Runtime::runtimeFeatureEnabled("envoy.reloadable_features.thrift_connection_draining")) {
1050
0
    localReplyMetadata_->responseHeaders().addReferenceKey(Headers::get().Drain, "true");
1051
0
    ConnectionManager& cm = parent_;
1052
0
    cm.stats_.downstream_response_drain_close_.inc();
1053
0
  }
1054
1055
0
  auto result = parent_.sendLocalReply(*localReplyMetadata_, response, end_stream);
1056
  // Only report while the local reply is successfully written.
1057
0
  if (result.has_value()) {
1058
0
    recordResponseAccessLog(*result, localReplyMetadata_);
1059
0
  }
1060
1061
0
  if (end_stream) {
1062
0
    return;
1063
0
  }
1064
1065
0
  if (!upgrade_handler_) {
1066
    // Consume any remaining request data from the downstream.
1067
0
    local_response_sent_ = true;
1068
0
  }
1069
0
}
1070
1071
0
void ConnectionManager::ActiveRpc::startUpstreamResponse(Transport& transport, Protocol& protocol) {
1072
0
  ASSERT(response_decoder_ == nullptr);
1073
1074
0
  response_decoder_ = std::make_unique<ResponseDecoder>(*this, transport, protocol);
1075
0
}
1076
1077
0
ThriftFilters::ResponseStatus ConnectionManager::ActiveRpc::upstreamData(Buffer::Instance& buffer) {
1078
0
  ASSERT(response_decoder_ != nullptr);
1079
1080
0
  TRY_NEEDS_AUDIT {
1081
0
    if (response_decoder_->onData(buffer)) {
1082
      // Completed upstream response.
1083
0
      parent_.doDeferredRpcDestroy(*this);
1084
0
      return ThriftFilters::ResponseStatus::Complete;
1085
0
    }
1086
0
    return ThriftFilters::ResponseStatus::MoreData;
1087
0
  }
1088
0
  END_TRY catch (const AppException& ex) {
1089
0
    ENVOY_LOG(error, "thrift response application error: {}", ex.what());
1090
0
    parent_.stats_.response_decoding_error_.inc();
1091
1092
0
    sendLocalReply(ex, true);
1093
0
    return ThriftFilters::ResponseStatus::Reset;
1094
0
  }
1095
0
  catch (const EnvoyException& ex) {
1096
0
    ENVOY_CONN_LOG(error, "thrift response error: {}", parent_.read_callbacks_->connection(),
1097
0
                   ex.what());
1098
0
    parent_.stats_.response_decoding_error_.inc();
1099
1100
0
    onError(ex.what());
1101
0
    return ThriftFilters::ResponseStatus::Reset;
1102
0
  }
1103
0
}
1104
1105
0
void ConnectionManager::ActiveRpc::clearRouteCache() { cached_route_ = absl::nullopt; }
1106
1107
0
void ConnectionManager::ActiveRpc::resetDownstreamConnection() {
1108
0
  ENVOY_CONN_LOG(debug, "resetting downstream connection", parent_.read_callbacks_->connection());
1109
0
  parent_.read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
1110
0
}
1111
1112
} // namespace ThriftProxy
1113
} // namespace NetworkFilters
1114
} // namespace Extensions
1115
} // namespace Envoy