Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h"
2
3
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
4
5
namespace Envoy {
6
namespace Extensions {
7
namespace NetworkFilters {
8
namespace ThriftProxy {
9
namespace Router {
10
11
UpstreamRequest::UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data,
12
                                 MessageMetadataSharedPtr& metadata, TransportType transport_type,
13
                                 ProtocolType protocol_type, bool close_downstream_on_error)
14
    : parent_(parent), stats_(parent.stats()), conn_pool_data_(pool_data), metadata_(metadata),
15
      transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()),
16
      protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()),
17
      request_complete_(false), response_underflow_(false), charged_response_timing_(false),
18
0
      close_downstream_on_error_(close_downstream_on_error) {}
19
20
0
UpstreamRequest::~UpstreamRequest() {
21
0
  if (conn_pool_handle_) {
22
0
    conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
23
0
  }
24
0
}
25
26
0
FilterStatus UpstreamRequest::start() {
27
0
  Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this);
28
0
  if (handle) {
29
    // Pause while we wait for a connection.
30
0
    conn_pool_handle_ = handle;
31
0
    return FilterStatus::StopIteration;
32
0
  }
33
34
0
  if (upgrade_response_ != nullptr) {
35
    // Pause while we wait for an upgrade response.
36
0
    return FilterStatus::StopIteration;
37
0
  }
38
39
0
  if (upstream_host_ == nullptr) {
40
0
    return FilterStatus::StopIteration;
41
0
  }
42
43
0
  return FilterStatus::Continue;
44
0
}
45
46
0
void UpstreamRequest::releaseConnection(const bool close) {
47
0
  ENVOY_LOG(debug, "releasing connection, close: {}", close);
48
0
  if (conn_pool_handle_) {
49
0
    conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
50
0
    conn_pool_handle_ = nullptr;
51
0
  }
52
53
0
  conn_state_ = nullptr;
54
55
  // The event triggered by close will also release this connection so clear conn_data_ before
56
  // closing.
57
0
  auto conn_data = std::move(conn_data_);
58
0
  if (close && conn_data != nullptr) {
59
0
    conn_data->connection().close(Network::ConnectionCloseType::NoFlush);
60
0
  }
61
0
}
62
63
0
void UpstreamRequest::resetStream() {
64
0
  ENVOY_LOG(debug, "reset stream");
65
0
  releaseConnection(true);
66
0
}
67
68
void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view,
69
0
                                    Upstream::HostDescriptionConstSharedPtr host) {
70
0
  ENVOY_LOG(debug, "on pool failure");
71
0
  conn_pool_handle_ = nullptr;
72
73
  // Mimic an upstream reset.
74
0
  onUpstreamHostSelected(host);
75
0
  if (!onResetStream(reason)) {
76
0
    parent_.continueDecoding();
77
0
  }
78
0
}
79
80
void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
81
0
                                  Upstream::HostDescriptionConstSharedPtr host) {
82
  // Only invoke continueDecoding if we'd previously stopped the filter chain.
83
0
  bool continue_decoding = conn_pool_handle_ != nullptr;
84
85
0
  onUpstreamHostSelected(host);
86
0
  host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);
87
88
0
  conn_data_ = std::move(conn_data);
89
0
  conn_data_->addUpstreamCallbacks(parent_.upstreamCallbacks());
90
0
  conn_pool_handle_ = nullptr;
91
92
0
  conn_state_ = conn_data_->connectionStateTyped<ThriftConnectionState>();
93
0
  if (conn_state_ == nullptr) {
94
0
    conn_data_->setConnectionState(std::make_unique<ThriftConnectionState>());
95
0
    conn_state_ = conn_data_->connectionStateTyped<ThriftConnectionState>();
96
0
  }
97
98
0
  if (protocol_->supportsUpgrade()) {
99
0
    auto& buffer = parent_.buffer();
100
0
    upgrade_response_ = protocol_->attemptUpgrade(*transport_, *conn_state_, buffer);
101
0
    if (upgrade_response_ != nullptr) {
102
0
      parent_.addSize(buffer.length());
103
0
      conn_data_->connection().write(buffer, false);
104
0
      return;
105
0
    }
106
0
  }
107
108
0
  onRequestStart(continue_decoding);
109
0
}
110
111
0
void UpstreamRequest::handleUpgradeResponse(Buffer::Instance& data) {
112
0
  ENVOY_LOG(trace, "reading upgrade response: {} bytes", data.length());
113
0
  if (!upgrade_response_->onData(data)) {
114
    // Wait for more data.
115
0
    return;
116
0
  }
117
118
0
  ENVOY_LOG(debug, "upgrade response complete");
119
0
  protocol_->completeUpgrade(*conn_state_, *upgrade_response_);
120
0
  upgrade_response_.reset();
121
0
  onRequestStart(true);
122
0
}
123
124
ThriftFilters::ResponseStatus
125
UpstreamRequest::handleRegularResponse(Buffer::Instance& data,
126
0
                                       UpstreamResponseCallbacks& callbacks) {
127
0
  ENVOY_LOG(trace, "reading response: {} bytes", data.length());
128
129
0
  if (response_state_ == ResponseState::None) {
130
0
    callbacks.startUpstreamResponse(*transport_, *protocol_);
131
0
    response_state_ = ResponseState::Started;
132
0
  }
133
134
0
  const auto& cluster = parent_.cluster();
135
136
0
  const auto status = callbacks.upstreamData(data);
137
0
  if (status == ThriftFilters::ResponseStatus::Complete) {
138
139
0
    stats_.recordUpstreamResponseSize(cluster, response_size_);
140
141
0
    switch (callbacks.responseMetadata()->messageType()) {
142
0
    case MessageType::Reply:
143
0
      if (callbacks.responseSuccess()) {
144
0
        upstream_host_->outlierDetector().putResult(
145
0
            Upstream::Outlier::Result::ExtOriginRequestSuccess);
146
0
        stats_.incResponseReplySuccess(cluster, upstream_host_);
147
0
      } else {
148
0
        upstream_host_->outlierDetector().putResult(
149
0
            Upstream::Outlier::Result::ExtOriginRequestFailed);
150
0
        stats_.incResponseReplyError(cluster, upstream_host_);
151
0
      }
152
0
      break;
153
154
0
    case MessageType::Exception:
155
0
      upstream_host_->outlierDetector().putResult(
156
0
          Upstream::Outlier::Result::ExtOriginRequestFailed);
157
0
      stats_.incResponseRemoteException(cluster, upstream_host_);
158
0
      break;
159
160
0
    default:
161
0
      stats_.incResponseInvalidType(cluster, upstream_host_);
162
0
      break;
163
0
    }
164
165
0
    response_state_ = ResponseState::Completed;
166
0
    if (callbacks.responseMetadata()->isDraining()) {
167
0
      ENVOY_LOG(debug, "got draining signal");
168
0
      stats_.incCloseDrain(cluster);
169
      // ResetStream triggers a local connection failure. However, we want to
170
      // keep the downstream connection after the upstream connection, i.e.,
171
      // `conn_data->connection()`, is closed. Therefore, introduce a new state
172
      // ResponseState::Completed before ResponseState::ConnectionReleased to
173
      // hint that got all the response and not to close the downstream
174
      // connection, especially while we got a draining signal.
175
0
      resetStream();
176
0
    }
177
0
    onResponseComplete();
178
0
  } else if (status == ThriftFilters::ResponseStatus::Reset) {
179
    // Note: invalid responses are not accounted in the response size histogram.
180
0
    ENVOY_LOG(debug, "upstream reset");
181
0
    upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::ExtOriginRequestFailed);
182
0
    stats_.incResponseDecodingError(cluster, upstream_host_);
183
0
    resetStream();
184
0
  }
185
186
0
  return status;
187
0
}
188
189
bool UpstreamRequest::handleUpstreamData(Buffer::Instance& data, bool end_stream,
190
0
                                         UpstreamResponseCallbacks& callbacks) {
191
0
  ASSERT(response_state_ < ResponseState::Completed);
192
193
0
  response_size_ += data.length();
194
195
0
  if (upgrade_response_ != nullptr) {
196
0
    handleUpgradeResponse(data);
197
0
  } else {
198
0
    const auto status = handleRegularResponse(data, callbacks);
199
0
    if (status != ThriftFilters::ResponseStatus::MoreData) {
200
0
      return true;
201
0
    }
202
0
  }
203
204
0
  if (end_stream) {
205
    // Response is incomplete, but no more data is coming.
206
0
    ENVOY_LOG(debug, "response underflow");
207
0
    onResponseComplete();
208
0
    response_underflow_ = true;
209
0
    onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure);
210
0
    return true;
211
0
  }
212
213
0
  return false;
214
0
}
215
216
0
void UpstreamRequest::onEvent(Network::ConnectionEvent event) {
217
0
  ASSERT(response_state_ != ResponseState::ConnectionReleased);
218
0
  bool end_downstream = true;
219
220
0
  switch (event) {
221
0
  case Network::ConnectionEvent::RemoteClose:
222
0
    ENVOY_LOG(debug, "upstream remote close");
223
0
    end_downstream = onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure);
224
0
    break;
225
0
  case Network::ConnectionEvent::LocalClose:
226
0
    ENVOY_LOG(debug, "upstream local close");
227
0
    end_downstream = onResetStream(ConnectionPool::PoolFailureReason::LocalConnectionFailure);
228
0
    break;
229
0
  case Network::ConnectionEvent::Connected:
230
0
  case Network::ConnectionEvent::ConnectedZeroRtt:
231
    // Connected is consumed by the connection pool.
232
0
    IS_ENVOY_BUG("reached unexpectedly");
233
0
  }
234
235
0
  releaseConnection(false);
236
0
  if (!end_downstream && request_complete_) {
237
0
    ENVOY_LOG(debug, "reset parent callbacks");
238
0
    parent_.onReset();
239
0
  }
240
0
}
241
242
0
uint64_t UpstreamRequest::encodeAndWrite(Buffer::OwnedImpl& request_buffer) {
243
0
  Buffer::OwnedImpl transport_buffer;
244
245
0
  metadata_->setProtocol(protocol_->type());
246
0
  transport_->encodeFrame(transport_buffer, *metadata_, request_buffer);
247
248
0
  uint64_t size = transport_buffer.length();
249
250
0
  conn_data_->connection().write(transport_buffer, false);
251
252
0
  return size;
253
0
}
254
255
0
void UpstreamRequest::onRequestStart(bool continue_decoding) {
256
0
  auto& buffer = parent_.buffer();
257
0
  parent_.initProtocolConverter(*protocol_, buffer);
258
259
0
  metadata_->setSequenceId(conn_state_->nextSequenceId());
260
0
  parent_.convertMessageBegin(metadata_);
261
262
0
  if (continue_decoding) {
263
0
    parent_.continueDecoding();
264
0
  }
265
0
}
266
267
0
void UpstreamRequest::onRequestComplete() {
268
0
  ENVOY_LOG(debug, "on request complete");
269
0
  Event::Dispatcher& dispatcher = parent_.dispatcher();
270
0
  downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
271
0
  request_complete_ = true;
272
0
}
273
274
0
void UpstreamRequest::onResponseComplete() {
275
0
  ENVOY_LOG(debug, "on response complete");
276
0
  chargeResponseTiming();
277
0
  response_state_ = ResponseState::ConnectionReleased;
278
0
  conn_state_ = nullptr;
279
0
  conn_data_.reset();
280
0
}
281
282
0
void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) {
283
0
  upstream_host_ = host;
284
0
}
285
286
static Upstream::Outlier::Result
287
0
poolFailureReasonToResult(ConnectionPool::PoolFailureReason reason) {
288
0
  switch (reason) {
289
0
  case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
290
0
    FALLTHRU;
291
0
  case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
292
0
    FALLTHRU;
293
0
  case ConnectionPool::PoolFailureReason::Overflow:
294
0
    return Upstream::Outlier::Result::LocalOriginConnectFailed;
295
0
  case ConnectionPool::PoolFailureReason::Timeout:
296
0
    return Upstream::Outlier::Result::LocalOriginTimeout;
297
0
  }
298
0
  PANIC_DUE_TO_CORRUPT_ENUM;
299
0
}
300
301
0
bool UpstreamRequest::onResetStream(ConnectionPool::PoolFailureReason reason) {
302
0
  bool close_downstream = true;
303
304
0
  chargeResponseTiming();
305
306
0
  switch (reason) {
307
0
  case ConnectionPool::PoolFailureReason::Overflow:
308
0
    stats_.incResponseLocalException(parent_.cluster());
309
0
    parent_.sendLocalReply(AppException(AppExceptionType::InternalError,
310
0
                                        "thrift upstream request: too many connections"),
311
0
                           false /* Don't close the downstream connection. */);
312
0
    close_downstream = false;
313
0
    break;
314
0
  case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
315
0
    FALLTHRU;
316
0
  case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
317
0
    FALLTHRU;
318
0
  case ConnectionPool::PoolFailureReason::Timeout:
319
0
    upstream_host_->outlierDetector().putResult(poolFailureReasonToResult(reason));
320
321
    // Error occurred after an underflow response, propagate the reset to the downstream.
322
0
    if (response_underflow_) {
323
0
      ENVOY_LOG(debug, "reset downstream connection for an underflow response");
324
0
      stats_.incCloseUnderflowResponse(parent_.cluster());
325
0
      parent_.resetDownstreamConnection();
326
0
    } else if (response_state_ <= ResponseState::Started) {
327
0
      close_downstream = close_downstream_on_error_;
328
0
      if (response_state_ == ResponseState::Started) {
329
0
        stats_.incClosePartialResponse(parent_.cluster());
330
0
      }
331
0
      stats_.incResponseLocalException(parent_.cluster());
332
0
      parent_.sendLocalReply(
333
0
          AppException(AppExceptionType::InternalError,
334
0
                       fmt::format("connection failure before response {}: {} '{}'",
335
0
                                   response_state_ == ResponseState::None ? "start" : "complete",
336
0
                                   PoolFailureReasonNames::get().fromReason(reason),
337
0
                                   (upstream_host_ && upstream_host_->address())
338
0
                                       ? upstream_host_->address()->asString()
339
0
                                       : "to upstream")),
340
0
          close_downstream);
341
0
    }
342
0
    break;
343
0
  }
344
345
0
  ENVOY_LOG(debug,
346
0
            "upstream reset complete. reason={}, close_downstream={}, response_state={}, "
347
0
            "response_underflow={}",
348
0
            PoolFailureReasonNames::get().fromReason(reason), close_downstream,
349
0
            static_cast<uint8_t>(response_state_), static_cast<bool>(response_underflow_));
350
0
  return close_downstream;
351
0
}
352
353
0
void UpstreamRequest::chargeResponseTiming() {
354
0
  if (charged_response_timing_ || !downstream_request_complete_time_.has_value()) {
355
0
    return;
356
0
  }
357
0
  charged_response_timing_ = true;
358
0
  Event::Dispatcher& dispatcher = parent_.dispatcher();
359
0
  const std::chrono::milliseconds response_time =
360
0
      std::chrono::duration_cast<std::chrono::milliseconds>(
361
0
          dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_.value());
362
0
  stats_.recordUpstreamResponseTime(parent_.cluster(), upstream_host_, response_time.count());
363
0
}
364
365
} // namespace Router
366
} // namespace ThriftProxy
367
} // namespace NetworkFilters
368
} // namespace Extensions
369
} // namespace Envoy