Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/router/router_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/dubbo_proxy/router/router_impl.h"
2
3
#include "envoy/upstream/cluster_manager.h"
4
#include "envoy/upstream/thread_local_cluster.h"
5
6
#include "source/extensions/filters/network/dubbo_proxy/app_exception.h"
7
#include "source/extensions/filters/network/dubbo_proxy/message_impl.h"
8
9
namespace Envoy {
10
namespace Extensions {
11
namespace NetworkFilters {
12
namespace DubboProxy {
13
namespace Router {
14
15
0
void Router::onDestroy() {
16
0
  if (upstream_request_) {
17
0
    upstream_request_->resetStream();
18
0
  }
19
0
  cleanup();
20
0
}
21
22
0
void Router::setDecoderFilterCallbacks(DubboFilters::DecoderFilterCallbacks& callbacks) {
23
0
  callbacks_ = &callbacks;
24
0
}
25
26
0
FilterStatus Router::onMessageDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) {
27
0
  ASSERT(metadata->hasInvocationInfo());
28
0
  const auto& invocation = metadata->invocationInfo();
29
30
0
  route_ = callbacks_->route();
31
0
  if (!route_) {
32
0
    ENVOY_STREAM_LOG(debug, "dubbo router: no cluster match for interface '{}'", *callbacks_,
33
0
                     invocation.serviceName());
34
0
    callbacks_->sendLocalReply(AppException(ResponseStatus::ServiceNotFound,
35
0
                                            fmt::format("dubbo router: no route for interface '{}'",
36
0
                                                        invocation.serviceName())),
37
0
                               false);
38
0
    return FilterStatus::AbortIteration;
39
0
  }
40
41
0
  route_entry_ = route_->routeEntry();
42
43
0
  Upstream::ThreadLocalCluster* cluster =
44
0
      cluster_manager_.getThreadLocalCluster(route_entry_->clusterName());
45
0
  if (!cluster) {
46
0
    ENVOY_STREAM_LOG(debug, "dubbo router: unknown cluster '{}'", *callbacks_,
47
0
                     route_entry_->clusterName());
48
0
    callbacks_->sendLocalReply(
49
0
        AppException(ResponseStatus::ServerError, fmt::format("dubbo router: unknown cluster '{}'",
50
0
                                                              route_entry_->clusterName())),
51
0
        false);
52
0
    return FilterStatus::AbortIteration;
53
0
  }
54
55
0
  cluster_ = cluster->info();
56
0
  ENVOY_STREAM_LOG(debug, "dubbo router: cluster '{}' match for interface '{}'", *callbacks_,
57
0
                   route_entry_->clusterName(), invocation.serviceName());
58
59
0
  if (cluster_->maintenanceMode()) {
60
0
    callbacks_->sendLocalReply(
61
0
        AppException(ResponseStatus::ServerError,
62
0
                     fmt::format("dubbo router: maintenance mode for cluster '{}'",
63
0
                                 route_entry_->clusterName())),
64
0
        false);
65
0
    return FilterStatus::AbortIteration;
66
0
  }
67
68
0
  auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
69
0
  if (!conn_pool_data) {
70
0
    callbacks_->sendLocalReply(
71
0
        AppException(
72
0
            ResponseStatus::ServerError,
73
0
            fmt::format("dubbo router: no healthy upstream for '{}'", route_entry_->clusterName())),
74
0
        false);
75
0
    return FilterStatus::AbortIteration;
76
0
  }
77
78
0
  ENVOY_STREAM_LOG(debug, "dubbo router: decoding request", *callbacks_);
79
80
0
  const auto* invocation_impl = dynamic_cast<const RpcInvocationImpl*>(&invocation);
81
0
  ASSERT(invocation_impl);
82
83
0
  if (invocation_impl->hasAttachment() && invocation_impl->attachment().attachmentUpdated()) {
84
0
    constexpr size_t body_length_size = sizeof(uint32_t);
85
86
0
    const size_t attachment_offset = invocation_impl->attachment().attachmentOffset();
87
0
    const size_t request_header_size = ctx->headerSize();
88
89
0
    ASSERT(attachment_offset <= ctx->originMessage().length());
90
91
    // Move the other parts of the request headers except the body size to the upstream request
92
    // buffer.
93
0
    upstream_request_buffer_.move(ctx->originMessage(), request_header_size - body_length_size);
94
    // Discard the old body size.
95
0
    ctx->originMessage().drain(body_length_size);
96
97
    // Re-serialize the updated attachment.
98
0
    Buffer::OwnedImpl attachment_buffer;
99
0
    Hessian2::Encoder encoder(std::make_unique<BufferWriter>(attachment_buffer));
100
0
    encoder.encode(invocation_impl->attachment().attachment());
101
102
0
    size_t new_body_size = attachment_offset - request_header_size + attachment_buffer.length();
103
104
0
    upstream_request_buffer_.writeBEInt<uint32_t>(new_body_size);
105
0
    upstream_request_buffer_.move(ctx->originMessage(), attachment_offset - request_header_size);
106
0
    upstream_request_buffer_.move(attachment_buffer);
107
108
    // Discard the old attachment.
109
0
    ctx->originMessage().drain(ctx->messageSize() - attachment_offset);
110
0
  } else {
111
0
    upstream_request_buffer_.move(ctx->originMessage(), ctx->messageSize());
112
0
  }
113
114
0
  upstream_request_ = std::make_unique<UpstreamRequest>(*this, *conn_pool_data, metadata,
115
0
                                                        callbacks_->serializationType(),
116
0
                                                        callbacks_->protocolType());
117
0
  return upstream_request_->start();
118
0
}
119
120
0
void Router::setEncoderFilterCallbacks(DubboFilters::EncoderFilterCallbacks& callbacks) {
121
0
  encoder_callbacks_ = &callbacks;
122
0
}
123
124
0
FilterStatus Router::onMessageEncoded(MessageMetadataSharedPtr metadata, ContextSharedPtr) {
125
0
  if (!metadata->hasResponseStatus() || upstream_request_ == nullptr) {
126
0
    return FilterStatus::Continue;
127
0
  }
128
129
0
  ENVOY_STREAM_LOG(trace, "dubbo router: response status: {}", *encoder_callbacks_,
130
0
                   static_cast<int>(metadata->responseStatus()));
131
132
0
  switch (metadata->responseStatus()) {
133
0
  case ResponseStatus::Ok:
134
0
    if (metadata->messageType() == MessageType::Exception) {
135
0
      upstream_request_->upstream_host_->outlierDetector().putResult(
136
0
          Upstream::Outlier::Result::ExtOriginRequestFailed);
137
0
    } else {
138
0
      upstream_request_->upstream_host_->outlierDetector().putResult(
139
0
          Upstream::Outlier::Result::ExtOriginRequestSuccess);
140
0
    }
141
0
    break;
142
0
  case ResponseStatus::ServerTimeout:
143
0
    upstream_request_->upstream_host_->outlierDetector().putResult(
144
0
        Upstream::Outlier::Result::LocalOriginTimeout);
145
0
    break;
146
0
  case ResponseStatus::ServiceError:
147
0
    FALLTHRU;
148
0
  case ResponseStatus::ServerError:
149
0
    FALLTHRU;
150
0
  case ResponseStatus::ServerThreadpoolExhaustedError:
151
0
    upstream_request_->upstream_host_->outlierDetector().putResult(
152
0
        Upstream::Outlier::Result::ExtOriginRequestFailed);
153
0
    break;
154
0
  default:
155
0
    break;
156
0
  }
157
158
0
  return FilterStatus::Continue;
159
0
}
160
161
0
void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
162
0
  ASSERT(!upstream_request_->response_complete_);
163
164
0
  ENVOY_STREAM_LOG(trace, "dubbo router: reading response: {} bytes", *callbacks_, data.length());
165
166
  // Handle normal response.
167
0
  if (!upstream_request_->response_started_) {
168
0
    callbacks_->startUpstreamResponse();
169
0
    upstream_request_->response_started_ = true;
170
0
  }
171
172
0
  DubboFilters::UpstreamResponseStatus status = callbacks_->upstreamData(data);
173
0
  if (status == DubboFilters::UpstreamResponseStatus::Complete) {
174
0
    ENVOY_STREAM_LOG(debug, "dubbo router: response complete", *callbacks_);
175
0
    upstream_request_->onResponseComplete();
176
0
    cleanup();
177
0
    return;
178
0
  } else if (status == DubboFilters::UpstreamResponseStatus::Reset) {
179
0
    ENVOY_STREAM_LOG(debug, "dubbo router: upstream reset", *callbacks_);
180
    // When the upstreamData function returns Reset,
181
    // the current stream is already released from the upper layer,
182
    // so there is no need to call callbacks_->resetStream() to notify
183
    // the upper layer to release the stream.
184
0
    upstream_request_->resetStream();
185
0
    return;
186
0
  }
187
188
0
  if (end_stream) {
189
    // Response is incomplete, but no more data is coming.
190
0
    ENVOY_STREAM_LOG(debug, "dubbo router: response underflow", *callbacks_);
191
0
    upstream_request_->onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure);
192
0
    upstream_request_->onResponseComplete();
193
0
    cleanup();
194
0
  }
195
0
}
196
197
0
void Router::onEvent(Network::ConnectionEvent event) {
198
0
  if (!upstream_request_ || upstream_request_->response_complete_) {
199
0
    ENVOY_BUG(event == Network::ConnectionEvent::RemoteClose ||
200
0
                  event == Network::ConnectionEvent::LocalClose,
201
0
              "Unexpected event");
202
    // Client closed connection after completing response.
203
0
    ENVOY_LOG(debug, "dubbo upstream request: the upstream request had completed");
204
0
    return;
205
0
  }
206
207
0
  if (upstream_request_->stream_reset_ && event == Network::ConnectionEvent::LocalClose) {
208
0
    ENVOY_LOG(debug, "dubbo upstream request: the stream reset");
209
0
    return;
210
0
  }
211
212
0
  switch (event) {
213
0
  case Network::ConnectionEvent::RemoteClose:
214
0
    upstream_request_->onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure);
215
0
    upstream_request_->upstream_host_->outlierDetector().putResult(
216
0
        Upstream::Outlier::Result::LocalOriginConnectFailed);
217
0
    break;
218
0
  case Network::ConnectionEvent::LocalClose:
219
0
    upstream_request_->onResetStream(ConnectionPool::PoolFailureReason::LocalConnectionFailure);
220
0
    break;
221
0
  case Network::ConnectionEvent::Connected:
222
0
  case Network::ConnectionEvent::ConnectedZeroRtt:
223
    // Connected is consumed by the connection pool.
224
0
    IS_ENVOY_BUG("unexpected");
225
0
  }
226
0
}
227
228
0
const Envoy::Router::MetadataMatchCriteria* Router::metadataMatchCriteria() {
229
  // Have we been called before? If so, there's no need to recompute because
230
  // by the time this method is called for the first time, route_entry_ should
231
  // not change anymore
232
0
  if (metadata_match_ != nullptr) {
233
0
    return metadata_match_.get();
234
0
  }
235
236
0
  const Envoy::Router::MetadataMatchCriteria* route_criteria =
237
0
      (route_entry_ != nullptr) ? route_entry_->metadataMatchCriteria() : nullptr;
238
239
  // The request's metadata, if present, takes precedence over the route's.
240
0
  const auto& request_metadata = callbacks_->streamInfo().dynamicMetadata().filter_metadata();
241
0
  const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
242
243
0
  if (filter_it == request_metadata.end()) {
244
0
    return route_criteria;
245
0
  }
246
247
0
  if (route_criteria != nullptr) {
248
0
    metadata_match_ = route_criteria->mergeMatchCriteria(filter_it->second);
249
0
  } else {
250
0
    metadata_match_ = std::make_unique<Envoy::Router::MetadataMatchCriteriaImpl>(filter_it->second);
251
0
  }
252
0
  return metadata_match_.get();
253
0
}
254
255
0
const Network::Connection* Router::downstreamConnection() const {
256
0
  return callbacks_ != nullptr ? callbacks_->connection() : nullptr;
257
0
}
258
259
0
void Router::cleanup() {
260
0
  if (upstream_request_) {
261
0
    upstream_request_.reset();
262
0
  }
263
0
}
264
265
Router::UpstreamRequest::UpstreamRequest(Router& parent, Upstream::TcpPoolData& pool_data,
266
                                         MessageMetadataSharedPtr& metadata,
267
                                         SerializationType serialization_type,
268
                                         ProtocolType protocol_type)
269
    : parent_(parent), conn_pool_data_(pool_data), metadata_(metadata),
270
      protocol_(
271
          NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol(serialization_type)),
272
      request_complete_(false), response_started_(false), response_complete_(false),
273
0
      stream_reset_(false) {}
274
275
0
Router::UpstreamRequest::~UpstreamRequest() = default;
276
277
0
FilterStatus Router::UpstreamRequest::start() {
278
0
  Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this);
279
0
  if (handle) {
280
    // Pause while we wait for a connection.
281
0
    conn_pool_handle_ = handle;
282
0
    return FilterStatus::StopIteration;
283
0
  }
284
285
0
  return FilterStatus::Continue;
286
0
}
287
288
0
void Router::UpstreamRequest::resetStream() {
289
0
  stream_reset_ = true;
290
291
0
  if (conn_pool_handle_) {
292
0
    ASSERT(!conn_data_);
293
0
    conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
294
0
    conn_pool_handle_ = nullptr;
295
0
    ENVOY_LOG(debug, "dubbo upstream request: reset connection pool handler");
296
0
  }
297
298
0
  if (conn_data_) {
299
0
    ASSERT(!conn_pool_handle_);
300
0
    conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
301
0
    conn_data_.reset();
302
0
    ENVOY_LOG(debug, "dubbo upstream request: reset connection data");
303
0
  }
304
0
}
305
306
0
void Router::UpstreamRequest::encodeData(Buffer::Instance& data) {
307
0
  ASSERT(conn_data_);
308
0
  ASSERT(!conn_pool_handle_);
309
310
0
  ENVOY_STREAM_LOG(trace, "proxying {} bytes", *parent_.callbacks_, data.length());
311
0
  conn_data_->connection().write(data, false);
312
0
}
313
314
void Router::UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
315
                                            absl::string_view,
316
0
                                            Upstream::HostDescriptionConstSharedPtr host) {
317
0
  conn_pool_handle_ = nullptr;
318
319
  // Mimic an upstream reset.
320
0
  onUpstreamHostSelected(host);
321
0
  onResetStream(reason);
322
323
0
  parent_.upstream_request_buffer_.drain(parent_.upstream_request_buffer_.length());
324
325
  // If it is a connection error, it means that the connection pool returned
326
  // the error asynchronously and the upper layer needs to be notified to continue decoding.
327
  // If it is a non-connection error, it is returned synchronously from the connection pool
328
  // and is still in the callback at the current Filter, nothing to do.
329
0
  if (reason == ConnectionPool::PoolFailureReason::Timeout ||
330
0
      reason == ConnectionPool::PoolFailureReason::LocalConnectionFailure ||
331
0
      reason == ConnectionPool::PoolFailureReason::RemoteConnectionFailure) {
332
0
    if (reason == ConnectionPool::PoolFailureReason::Timeout) {
333
0
      host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginTimeout);
334
0
    } else {
335
0
      host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectFailed);
336
0
    }
337
0
    parent_.callbacks_->continueDecoding();
338
0
  }
339
0
}
340
341
void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
342
0
                                          Upstream::HostDescriptionConstSharedPtr host) {
343
0
  ENVOY_LOG(debug, "dubbo upstream request: tcp connection has ready");
344
345
  // Only invoke continueDecoding if we'd previously stopped the filter chain.
346
0
  bool continue_decoding = conn_pool_handle_ != nullptr;
347
348
0
  onUpstreamHostSelected(host);
349
0
  host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
350
351
0
  conn_data_ = std::move(conn_data);
352
0
  conn_data_->addUpstreamCallbacks(parent_);
353
0
  conn_pool_handle_ = nullptr;
354
355
0
  onRequestStart(continue_decoding);
356
0
  encodeData(parent_.upstream_request_buffer_);
357
0
}
358
359
0
void Router::UpstreamRequest::onRequestStart(bool continue_decoding) {
360
0
  ENVOY_LOG(debug, "dubbo upstream request: start sending data to the server {}",
361
0
            upstream_host_->address()->asString());
362
363
0
  if (continue_decoding) {
364
0
    parent_.callbacks_->continueDecoding();
365
0
  }
366
0
  onRequestComplete();
367
0
}
368
369
0
void Router::UpstreamRequest::onRequestComplete() { request_complete_ = true; }
370
371
0
void Router::UpstreamRequest::onResponseComplete() {
372
0
  response_complete_ = true;
373
0
  conn_data_.reset();
374
0
}
375
376
0
void Router::UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) {
377
0
  ENVOY_LOG(debug, "dubbo upstream request: selected upstream {}", host->address()->asString());
378
0
  upstream_host_ = host;
379
0
}
380
381
0
void Router::UpstreamRequest::onResetStream(ConnectionPool::PoolFailureReason reason) {
382
0
  if (metadata_->messageType() == MessageType::Oneway) {
383
    // For oneway requests, we should not attempt a response. Reset the downstream to signal
384
    // an error.
385
0
    ENVOY_LOG(debug, "dubbo upstream request: the request is oneway, reset downstream stream");
386
0
    parent_.callbacks_->resetStream();
387
0
    return;
388
0
  }
389
390
  // When the filter's callback does not end, the sendLocalReply function call
391
  // triggers the release of the current stream at the end of the filter's callback.
392
0
  switch (reason) {
393
0
  case ConnectionPool::PoolFailureReason::Overflow:
394
0
    parent_.callbacks_->sendLocalReply(
395
0
        AppException(ResponseStatus::ServerError,
396
0
                     fmt::format("dubbo upstream request: too many connections")),
397
0
        false);
398
0
    break;
399
0
  case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
400
    // Should only happen if we closed the connection, due to an error condition, in which case
401
    // we've already handled any possible downstream response.
402
0
    parent_.callbacks_->sendLocalReply(
403
0
        AppException(ResponseStatus::ServerError,
404
0
                     fmt::format("dubbo upstream request: local connection failure '{}'",
405
0
                                 upstream_host_->address()->asString())),
406
0
        false);
407
0
    break;
408
0
  case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
409
0
    parent_.callbacks_->sendLocalReply(
410
0
        AppException(ResponseStatus::ServerError,
411
0
                     fmt::format("dubbo upstream request: remote connection failure '{}'",
412
0
                                 upstream_host_->address()->asString())),
413
0
        false);
414
0
    break;
415
0
  case ConnectionPool::PoolFailureReason::Timeout:
416
0
    parent_.callbacks_->sendLocalReply(
417
0
        AppException(ResponseStatus::ServerError,
418
0
                     fmt::format("dubbo upstream request: connection failure '{}' due to timeout",
419
0
                                 upstream_host_->address()->asString())),
420
0
        false);
421
0
    break;
422
0
  }
423
424
0
  if (parent_.filter_complete_ && !response_complete_) {
425
    // When the filter's callback has ended and the reply message has not been processed,
426
    // call resetStream to release the current stream.
427
    // the resetStream eventually triggers the onDestroy function call.
428
0
    parent_.callbacks_->resetStream();
429
0
  }
430
0
}
431
432
} // namespace Router
433
} // namespace DubboProxy
434
} // namespace NetworkFilters
435
} // namespace Extensions
436
} // namespace Envoy