Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/active_message.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/dubbo_proxy/active_message.h"
2
3
#include "source/common/stats/timespan_impl.h"
4
#include "source/extensions/filters/network/dubbo_proxy/app_exception.h"
5
#include "source/extensions/filters/network/dubbo_proxy/conn_manager.h"
6
7
namespace Envoy {
8
namespace Extensions {
9
namespace NetworkFilters {
10
namespace DubboProxy {
11
12
// class ActiveResponseDecoder
13
ActiveResponseDecoder::ActiveResponseDecoder(ActiveMessage& parent, DubboFilterStats& stats,
14
                                             Network::Connection& connection,
15
                                             ProtocolPtr&& protocol)
16
    : parent_(parent), stats_(stats), response_connection_(connection),
17
      protocol_(std::move(protocol)),
18
0
      decoder_(std::make_unique<ResponseDecoder>(*protocol_, *this)), complete_(false) {}
19
20
0
DubboFilters::UpstreamResponseStatus ActiveResponseDecoder::onData(Buffer::Instance& data) {
21
0
  ENVOY_LOG(debug, "dubbo response: the received reply data length is {}", data.length());
22
23
0
  bool underflow = false;
24
0
  decoder_->onData(data, underflow);
25
0
  ASSERT(complete_ || underflow);
26
27
0
  return response_status_;
28
0
}
29
30
void ActiveResponseDecoder::onStreamDecoded(MessageMetadataSharedPtr metadata,
31
0
                                            ContextSharedPtr ctx) {
32
0
  ASSERT(metadata->messageType() == MessageType::Response ||
33
0
         metadata->messageType() == MessageType::Exception);
34
0
  ASSERT(metadata->hasResponseStatus());
35
36
0
  metadata_ = metadata;
37
0
  if (applyMessageEncodedFilters(metadata, ctx) != FilterStatus::Continue) {
38
0
    response_status_ = DubboFilters::UpstreamResponseStatus::Complete;
39
0
    return;
40
0
  }
41
42
0
  if (response_connection_.state() != Network::Connection::State::Open) {
43
0
    throw DownstreamConnectionCloseException("Downstream has closed or closing");
44
0
  }
45
46
0
  response_connection_.write(ctx->originMessage(), false);
47
0
  ENVOY_LOG(debug,
48
0
            "dubbo response: the upstream response message has been forwarded to the downstream");
49
50
0
  stats_.response_.inc();
51
0
  stats_.response_decoding_success_.inc();
52
0
  if (metadata->messageType() == MessageType::Exception) {
53
0
    stats_.response_business_exception_.inc();
54
0
  }
55
56
0
  switch (metadata->responseStatus()) {
57
0
  case ResponseStatus::Ok:
58
0
    stats_.response_success_.inc();
59
0
    break;
60
0
  default:
61
0
    stats_.response_error_.inc();
62
0
    ENVOY_LOG(error, "dubbo response status: {}", static_cast<uint8_t>(metadata->responseStatus()));
63
0
    break;
64
0
  }
65
66
0
  complete_ = true;
67
0
  response_status_ = DubboFilters::UpstreamResponseStatus::Complete;
68
69
0
  ENVOY_LOG(debug, "dubbo response: complete processing of upstream response messages, id is {}",
70
0
            metadata->requestId());
71
0
}
72
73
FilterStatus ActiveResponseDecoder::applyMessageEncodedFilters(MessageMetadataSharedPtr metadata,
74
0
                                                               ContextSharedPtr ctx) {
75
0
  parent_.encoder_filter_action_ = [metadata,
76
0
                                    ctx](DubboFilters::EncoderFilter* filter) -> FilterStatus {
77
0
    return filter->onMessageEncoded(metadata, ctx);
78
0
  };
79
80
0
  auto status = parent_.applyEncoderFilters(
81
0
      nullptr, ActiveMessage::FilterIterationStartState::CanStartFromCurrent);
82
0
  switch (status) {
83
0
  case FilterStatus::StopIteration:
84
0
    break;
85
0
  default:
86
0
    ASSERT(FilterStatus::Continue == status);
87
0
    break;
88
0
  }
89
90
0
  return status;
91
0
}
92
93
// class ActiveMessageFilterBase
94
0
uint64_t ActiveMessageFilterBase::requestId() const { return parent_.requestId(); }
95
96
0
uint64_t ActiveMessageFilterBase::streamId() const { return parent_.streamId(); }
97
98
0
const Network::Connection* ActiveMessageFilterBase::connection() const {
99
0
  return parent_.connection();
100
0
}
101
102
0
Router::RouteConstSharedPtr ActiveMessageFilterBase::route() { return parent_.route(); }
103
104
0
SerializationType ActiveMessageFilterBase::serializationType() const {
105
0
  return parent_.serializationType();
106
0
}
107
108
0
ProtocolType ActiveMessageFilterBase::protocolType() const { return parent_.protocolType(); }
109
110
0
Event::Dispatcher& ActiveMessageFilterBase::dispatcher() { return parent_.dispatcher(); }
111
112
0
void ActiveMessageFilterBase::resetStream() { parent_.resetStream(); }
113
114
0
StreamInfo::StreamInfo& ActiveMessageFilterBase::streamInfo() { return parent_.streamInfo(); }
115
116
// class ActiveMessageDecoderFilter
117
ActiveMessageDecoderFilter::ActiveMessageDecoderFilter(ActiveMessage& parent,
118
                                                       DubboFilters::DecoderFilterSharedPtr filter,
119
                                                       bool dual_filter)
120
0
    : ActiveMessageFilterBase(parent, dual_filter), handle_(filter) {}
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageDecoderFilter::ActiveMessageDecoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::DecoderFilter>, bool)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageDecoderFilter::ActiveMessageDecoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::DecoderFilter>, bool)
121
122
0
void ActiveMessageDecoderFilter::continueDecoding() {
123
0
  ASSERT(parent_.context());
124
0
  auto state = ActiveMessage::FilterIterationStartState::AlwaysStartFromNext;
125
0
  if (0 != parent_.context()->originMessage().length()) {
126
0
    state = ActiveMessage::FilterIterationStartState::CanStartFromCurrent;
127
0
    ENVOY_LOG(warn, "The original message data is not consumed, triggering the decoder filter from "
128
0
                    "the current location");
129
0
  }
130
0
  const FilterStatus status = parent_.applyDecoderFilters(this, state);
131
0
  if (status == FilterStatus::Continue) {
132
0
    ENVOY_LOG(debug, "dubbo response: start upstream");
133
    // All filters have been executed for the current decoder state.
134
0
    if (parent_.pendingStreamDecoded()) {
135
      // If the filter stack was paused during messageEnd, handle end-of-request details.
136
0
      parent_.finalizeRequest();
137
0
    }
138
0
  }
139
0
}
140
141
void ActiveMessageDecoderFilter::sendLocalReply(const DubboFilters::DirectResponse& response,
142
0
                                                bool end_stream) {
143
0
  parent_.sendLocalReply(response, end_stream);
144
0
}
145
146
0
void ActiveMessageDecoderFilter::startUpstreamResponse() { parent_.startUpstreamResponse(); }
147
148
DubboFilters::UpstreamResponseStatus
149
0
ActiveMessageDecoderFilter::upstreamData(Buffer::Instance& buffer) {
150
0
  return parent_.upstreamData(buffer);
151
0
}
152
153
0
void ActiveMessageDecoderFilter::resetDownstreamConnection() {
154
0
  parent_.resetDownstreamConnection();
155
0
}
156
157
// class ActiveMessageEncoderFilter
158
ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(ActiveMessage& parent,
159
                                                       DubboFilters::EncoderFilterSharedPtr filter,
160
                                                       bool dual_filter)
161
0
    : ActiveMessageFilterBase(parent, dual_filter), handle_(filter) {}
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::EncoderFilter>, bool)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::EncoderFilter>, bool)
162
163
0
void ActiveMessageEncoderFilter::continueEncoding() {
164
0
  ASSERT(parent_.context());
165
0
  auto state = ActiveMessage::FilterIterationStartState::AlwaysStartFromNext;
166
0
  if (0 != parent_.context()->originMessage().length()) {
167
0
    state = ActiveMessage::FilterIterationStartState::CanStartFromCurrent;
168
0
    ENVOY_LOG(warn, "The original message data is not consumed, triggering the encoder filter from "
169
0
                    "the current location");
170
0
  }
171
0
  const FilterStatus status = parent_.applyEncoderFilters(this, state);
172
0
  if (FilterStatus::Continue == status) {
173
0
    ENVOY_LOG(debug, "All encoding filters have been executed");
174
0
  }
175
0
}
176
177
// class ActiveMessage
178
ActiveMessage::ActiveMessage(ConnectionManager& parent)
179
    : parent_(parent), request_timer_(std::make_unique<Stats::HistogramCompletableTimespanImpl>(
180
                           parent_.stats().request_time_ms_, parent.timeSystem())),
181
      stream_id_(parent.randomGenerator().random()),
182
      stream_info_(parent.timeSystem(), parent_.connection().connectionInfoProviderSharedPtr()),
183
0
      pending_stream_decoded_(false), local_response_sent_(false) {
184
0
  parent_.stats().request_active_.inc();
185
0
}
186
187
0
ActiveMessage::~ActiveMessage() {
188
0
  parent_.stats().request_active_.dec();
189
0
  request_timer_->complete();
190
0
  for (auto& filter : decoder_filters_) {
191
0
    ENVOY_LOG(debug, "destroy decoder filter");
192
0
    filter->handler()->onDestroy();
193
0
  }
194
195
0
  for (auto& filter : encoder_filters_) {
196
    // Do not call on destroy twice for dual registered filters.
197
0
    if (!filter->dual_filter_) {
198
0
      ENVOY_LOG(debug, "destroy encoder filter");
199
0
      filter->handler()->onDestroy();
200
0
    }
201
0
  }
202
0
}
203
204
std::list<ActiveMessageEncoderFilterPtr>::iterator
205
ActiveMessage::commonEncodePrefix(ActiveMessageEncoderFilter* filter,
206
0
                                  FilterIterationStartState state) {
207
  // Only do base state setting on the initial call. Subsequent calls for filtering do not touch
208
  // the base state.
209
0
  if (filter == nullptr) {
210
    // ASSERT(!state_.local_complete_);
211
    // state_.local_complete_ = end_stream;
212
0
    return encoder_filters_.begin();
213
0
  }
214
215
0
  if (state == FilterIterationStartState::CanStartFromCurrent) {
216
    // The filter iteration has been stopped for all frame types, and now the iteration continues.
217
    // The current filter's encoding callback has not be called. Call it now.
218
0
    return filter->entry();
219
0
  }
220
0
  return std::next(filter->entry());
221
0
}
222
223
std::list<ActiveMessageDecoderFilterPtr>::iterator
224
ActiveMessage::commonDecodePrefix(ActiveMessageDecoderFilter* filter,
225
0
                                  FilterIterationStartState state) {
226
0
  if (!filter) {
227
0
    return decoder_filters_.begin();
228
0
  }
229
0
  if (state == FilterIterationStartState::CanStartFromCurrent) {
230
    // The filter iteration has been stopped for all frame types, and now the iteration continues.
231
    // The current filter's callback function has not been called. Call it now.
232
0
    return filter->entry();
233
0
  }
234
0
  return std::next(filter->entry());
235
0
}
236
237
0
void ActiveMessage::onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) {
238
0
  parent_.stats().request_decoding_success_.inc();
239
240
0
  metadata_ = metadata;
241
0
  context_ = ctx;
242
0
  filter_action_ = [metadata, ctx](DubboFilters::DecoderFilter* filter) -> FilterStatus {
243
0
    return filter->onMessageDecoded(metadata, ctx);
244
0
  };
245
246
0
  auto status = applyDecoderFilters(nullptr, FilterIterationStartState::CanStartFromCurrent);
247
0
  switch (status) {
248
0
  case FilterStatus::StopIteration:
249
0
    ENVOY_LOG(debug, "dubbo request: pause calling decoder filter, id is {}",
250
0
              metadata->requestId());
251
0
    pending_stream_decoded_ = true;
252
0
    return;
253
0
  case FilterStatus::AbortIteration:
254
0
    ENVOY_LOG(debug, "dubbo request: abort calling decoder filter, id is {}",
255
0
              metadata->requestId());
256
0
    parent_.deferredMessage(*this);
257
0
    return;
258
0
  case FilterStatus::Continue:
259
0
    ENVOY_LOG(debug, "dubbo request: complete processing of downstream request messages, id is {}",
260
0
              metadata->requestId());
261
0
    finalizeRequest();
262
0
    return;
263
0
  }
264
0
  PANIC_DUE_TO_CORRUPT_ENUM
265
0
}
266
267
0
void ActiveMessage::finalizeRequest() {
268
0
  pending_stream_decoded_ = false;
269
0
  parent_.stats().request_.inc();
270
0
  bool is_one_way = false;
271
0
  switch (metadata_->messageType()) {
272
0
  case MessageType::Request:
273
0
    parent_.stats().request_twoway_.inc();
274
0
    break;
275
0
  case MessageType::Oneway:
276
0
    parent_.stats().request_oneway_.inc();
277
0
    is_one_way = true;
278
0
    break;
279
0
  default:
280
0
    break;
281
0
  }
282
283
0
  if (local_response_sent_ || is_one_way) {
284
0
    parent_.deferredMessage(*this);
285
0
  }
286
0
}
287
288
0
void ActiveMessage::createFilterChain() {
289
0
  parent_.config().filterFactory().createFilterChain(*this);
290
0
}
291
292
0
DubboProxy::Router::RouteConstSharedPtr ActiveMessage::route() {
293
0
  if (cached_route_) {
294
0
    return cached_route_.value();
295
0
  }
296
297
0
  if (metadata_ != nullptr) {
298
0
    DubboProxy::Router::RouteConstSharedPtr route =
299
0
        parent_.config().routerConfig().route(*metadata_, stream_id_);
300
0
    cached_route_ = route;
301
0
    return cached_route_.value();
302
0
  }
303
304
0
  return nullptr;
305
0
}
306
307
FilterStatus ActiveMessage::applyDecoderFilters(ActiveMessageDecoderFilter* filter,
308
0
                                                FilterIterationStartState state) {
309
0
  ASSERT(filter_action_ != nullptr);
310
0
  if (!local_response_sent_) {
311
0
    for (auto entry = commonDecodePrefix(filter, state); entry != decoder_filters_.end(); entry++) {
312
0
      const FilterStatus status = filter_action_((*entry)->handler().get());
313
0
      if (local_response_sent_) {
314
0
        break;
315
0
      }
316
317
0
      if (status != FilterStatus::Continue) {
318
0
        return status;
319
0
      }
320
0
    }
321
0
  }
322
323
0
  filter_action_ = nullptr;
324
325
0
  return FilterStatus::Continue;
326
0
}
327
328
FilterStatus ActiveMessage::applyEncoderFilters(ActiveMessageEncoderFilter* filter,
329
0
                                                FilterIterationStartState state) {
330
0
  ASSERT(encoder_filter_action_ != nullptr);
331
332
0
  if (!local_response_sent_) {
333
0
    for (auto entry = commonEncodePrefix(filter, state); entry != encoder_filters_.end(); entry++) {
334
0
      const FilterStatus status = encoder_filter_action_((*entry)->handler().get());
335
0
      if (local_response_sent_) {
336
0
        break;
337
0
      }
338
339
0
      if (status != FilterStatus::Continue) {
340
0
        return status;
341
0
      }
342
0
    }
343
0
  }
344
345
0
  encoder_filter_action_ = nullptr;
346
347
0
  return FilterStatus::Continue;
348
0
}
349
350
0
void ActiveMessage::sendLocalReply(const DubboFilters::DirectResponse& response, bool end_stream) {
351
0
  ASSERT(metadata_);
352
0
  parent_.sendLocalReply(*metadata_, response, end_stream);
353
354
0
  if (end_stream) {
355
0
    return;
356
0
  }
357
358
0
  local_response_sent_ = true;
359
0
}
360
361
0
void ActiveMessage::startUpstreamResponse() {
362
0
  ENVOY_LOG(debug, "dubbo response: start upstream");
363
364
0
  ASSERT(response_decoder_ == nullptr);
365
366
0
  auto protocol =
367
0
      NamedProtocolConfigFactory::getFactory(protocolType()).createProtocol(serializationType());
368
369
  // Create a response message decoder.
370
0
  response_decoder_ = std::make_unique<ActiveResponseDecoder>(
371
0
      *this, parent_.stats(), parent_.connection(), std::move(protocol));
372
0
}
373
374
0
DubboFilters::UpstreamResponseStatus ActiveMessage::upstreamData(Buffer::Instance& buffer) {
375
0
  ASSERT(response_decoder_ != nullptr);
376
377
0
  TRY_NEEDS_AUDIT {
378
0
    auto status = response_decoder_->onData(buffer);
379
0
    if (status == DubboFilters::UpstreamResponseStatus::Complete) {
380
0
      if (requestId() != response_decoder_->requestId()) {
381
0
        throw EnvoyException(fmt::format("dubbo response: request ID is not equal, {}:{}",
382
0
                                         requestId(), response_decoder_->requestId()));
383
0
      }
384
385
      // Completed upstream response.
386
0
      parent_.deferredMessage(*this);
387
0
    } else if (status == DubboFilters::UpstreamResponseStatus::Retry) {
388
0
      response_decoder_.reset();
389
0
    }
390
391
0
    return status;
392
0
  }
393
0
  END_TRY catch (const DownstreamConnectionCloseException& ex) {
394
0
    ENVOY_CONN_LOG(error, "dubbo response: exception ({})", parent_.connection(), ex.what());
395
0
    onReset();
396
0
    parent_.stats().response_error_caused_connection_close_.inc();
397
0
    return DubboFilters::UpstreamResponseStatus::Reset;
398
0
  }
399
0
  catch (const EnvoyException& ex) {
400
0
    ENVOY_CONN_LOG(error, "dubbo response: exception ({})", parent_.connection(), ex.what());
401
0
    parent_.stats().response_decoding_error_.inc();
402
403
0
    onError(ex.what());
404
0
    return DubboFilters::UpstreamResponseStatus::Reset;
405
0
  }
406
0
}
407
408
0
void ActiveMessage::resetDownstreamConnection() {
409
0
  parent_.connection().close(Network::ConnectionCloseType::NoFlush);
410
0
}
411
412
0
void ActiveMessage::resetStream() { parent_.deferredMessage(*this); }
413
414
0
uint64_t ActiveMessage::requestId() const {
415
0
  return metadata_ != nullptr ? metadata_->requestId() : 0;
416
0
}
417
418
0
uint64_t ActiveMessage::streamId() const { return stream_id_; }
419
420
0
SerializationType ActiveMessage::serializationType() const {
421
0
  return parent_.downstreamSerializationType();
422
0
}
423
424
0
ProtocolType ActiveMessage::protocolType() const { return parent_.downstreamProtocolType(); }
425
426
0
StreamInfo::StreamInfo& ActiveMessage::streamInfo() { return stream_info_; }
427
428
0
Event::Dispatcher& ActiveMessage::dispatcher() { return parent_.connection().dispatcher(); }
429
430
0
const Network::Connection* ActiveMessage::connection() const { return &parent_.connection(); }
431
432
0
void ActiveMessage::addDecoderFilter(DubboFilters::DecoderFilterSharedPtr filter) {
433
0
  addDecoderFilterWorker(filter, false);
434
0
}
435
436
0
void ActiveMessage::addEncoderFilter(DubboFilters::EncoderFilterSharedPtr filter) {
437
0
  addEncoderFilterWorker(filter, false);
438
0
}
439
440
0
void ActiveMessage::addFilter(DubboFilters::CodecFilterSharedPtr filter) {
441
0
  addDecoderFilterWorker(filter, true);
442
0
  addEncoderFilterWorker(filter, true);
443
0
}
444
445
void ActiveMessage::addDecoderFilterWorker(DubboFilters::DecoderFilterSharedPtr filter,
446
0
                                           bool dual_filter) {
447
0
  ActiveMessageDecoderFilterPtr wrapper =
448
0
      std::make_unique<ActiveMessageDecoderFilter>(*this, filter, dual_filter);
449
0
  filter->setDecoderFilterCallbacks(*wrapper);
450
0
  LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_);
451
0
}
452
void ActiveMessage::addEncoderFilterWorker(DubboFilters::EncoderFilterSharedPtr filter,
453
0
                                           bool dual_filter) {
454
0
  ActiveMessageEncoderFilterPtr wrapper =
455
0
      std::make_unique<ActiveMessageEncoderFilter>(*this, filter, dual_filter);
456
0
  filter->setEncoderFilterCallbacks(*wrapper);
457
0
  LinkedList::moveIntoListBack(std::move(wrapper), encoder_filters_);
458
0
}
459
460
0
void ActiveMessage::onReset() { parent_.deferredMessage(*this); }
461
462
0
void ActiveMessage::onError(const std::string& what) {
463
0
  if (!metadata_) {
464
    // It's possible that an error occurred before the decoder generated metadata,
465
    // and a metadata object needs to be created in order to generate a local reply.
466
0
    metadata_ = std::make_shared<MessageMetadata>();
467
0
  }
468
469
0
  ASSERT(metadata_);
470
0
  ENVOY_LOG(error, "Bad response: {}", what);
471
0
  sendLocalReply(AppException(ResponseStatus::BadResponse, what), false);
472
0
  parent_.deferredMessage(*this);
473
0
}
474
475
} // namespace DubboProxy
476
} // namespace NetworkFilters
477
} // namespace Extensions
478
} // namespace Envoy