Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h"
2
3
#include <memory>
4
5
#include "envoy/upstream/cluster_manager.h"
6
#include "envoy/upstream/thread_local_cluster.h"
7
8
#include "source/common/common/utility.h"
9
#include "source/extensions/filters/network/well_known_names.h"
10
11
namespace Envoy {
12
namespace Extensions {
13
namespace NetworkFilters {
14
namespace ThriftProxy {
15
namespace Router {
16
17
OptRef<ShadowRouterHandle> ShadowWriterImpl::submit(const std::string& cluster_name,
18
                                                    MessageMetadataSharedPtr metadata,
19
                                                    TransportType original_transport,
20
0
                                                    ProtocolType original_protocol) {
21
0
  auto shadow_router = std::make_unique<ShadowRouterImpl>(*this, cluster_name, metadata,
22
0
                                                          original_transport, original_protocol);
23
0
  const bool created = shadow_router->createUpstreamRequest();
24
0
  if (!created || !tls_.get().has_value()) {
25
0
    stats_.named_.shadow_request_submit_failure_.inc();
26
0
    return absl::nullopt;
27
0
  }
28
29
0
  auto& active_routers = tls_->activeRouters();
30
31
0
  LinkedList::moveIntoList(std::move(shadow_router), active_routers);
32
0
  return *active_routers.front();
33
0
}
34
35
ShadowRouterImpl::ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name,
36
                                   MessageMetadataSharedPtr& metadata, TransportType transport_type,
37
                                   ProtocolType protocol_type)
38
    : RequestOwner(parent.clusterManager(), parent.stats()), parent_(parent),
39
      cluster_name_(cluster_name), metadata_(metadata->clone()), transport_type_(transport_type),
40
      protocol_type_(protocol_type),
41
      transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()),
42
0
      protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()) {
43
0
  response_decoder_ = std::make_unique<NullResponseDecoder>(*transport_, *protocol_);
44
0
  upstream_response_callbacks_ =
45
0
      std::make_unique<ShadowUpstreamResponseCallbacksImpl>(*response_decoder_);
46
0
}
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowRouterImpl::ShadowRouterImpl(Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowWriterImpl&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::MessageMetadata>&, Envoy::Extensions::NetworkFilters::ThriftProxy::TransportType, Envoy::Extensions::NetworkFilters::ThriftProxy::ProtocolType)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowRouterImpl::ShadowRouterImpl(Envoy::Extensions::NetworkFilters::ThriftProxy::Router::ShadowWriterImpl&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::ThriftProxy::MessageMetadata>&, Envoy::Extensions::NetworkFilters::ThriftProxy::TransportType, Envoy::Extensions::NetworkFilters::ThriftProxy::ProtocolType)
47
48
0
Event::Dispatcher& ShadowRouterImpl::dispatcher() { return parent_.dispatcher(); }
49
50
0
bool ShadowRouterImpl::createUpstreamRequest() {
51
0
  auto prepare_result =
52
0
      prepareUpstreamRequest(cluster_name_, metadata_, transport_type_, protocol_type_, this);
53
0
  if (prepare_result.exception.has_value()) {
54
0
    return false;
55
0
  }
56
57
0
  auto& upstream_req_info = prepare_result.upstream_request_info.value();
58
59
0
  upstream_request_ = std::make_unique<UpstreamRequest>(*this, *upstream_req_info.conn_pool_data,
60
0
                                                        metadata_, upstream_req_info.transport,
61
0
                                                        upstream_req_info.protocol, true);
62
0
  upstream_request_->start();
63
0
  return true;
64
0
}
65
66
0
bool ShadowRouterImpl::requestStarted() const {
67
0
  return upstream_request_->conn_data_ != nullptr &&
68
0
         upstream_request_->upgrade_response_ == nullptr;
69
0
}
70
71
0
void ShadowRouterImpl::flushPendingCallbacks() {
72
0
  if (pending_callbacks_.empty()) {
73
0
    return;
74
0
  }
75
76
0
  for (auto& cb : pending_callbacks_) {
77
0
    cb();
78
0
  }
79
80
0
  pending_callbacks_.clear();
81
0
}
82
83
FilterStatus ShadowRouterImpl::runOrSave(std::function<FilterStatus()>&& cb,
84
0
                                         const std::function<void()>& on_save) {
85
0
  if (requestStarted()) {
86
0
    return cb();
87
0
  }
88
89
0
  pending_callbacks_.push_back(std::move(cb));
90
91
0
  if (on_save) {
92
0
    on_save();
93
0
  }
94
95
0
  return FilterStatus::Continue;
96
0
}
97
98
0
FilterStatus ShadowRouterImpl::passthroughData(Buffer::Instance& data) {
99
0
  if (requestStarted()) {
100
0
    return ProtocolConverter::passthroughData(data);
101
0
  }
102
103
0
  auto copied = std::make_shared<Buffer::OwnedImpl>(data);
104
0
  auto cb = [copied = std::move(copied), this]() mutable -> FilterStatus {
105
0
    return ProtocolConverter::passthroughData(*copied);
106
0
  };
107
0
  pending_callbacks_.push_back(std::move(cb));
108
109
0
  return FilterStatus::Continue;
110
0
}
111
112
0
FilterStatus ShadowRouterImpl::structBegin(absl::string_view name) {
113
0
  if (requestStarted()) {
114
0
    return ProtocolConverter::structBegin(name);
115
0
  }
116
117
0
  auto cb = [name_str = std::string(name), this]() -> FilterStatus {
118
0
    return ProtocolConverter::structBegin(absl::string_view(name_str));
119
0
  };
120
0
  pending_callbacks_.push_back(std::move(cb));
121
122
0
  return FilterStatus::Continue;
123
0
}
124
125
0
FilterStatus ShadowRouterImpl::structEnd() {
126
0
  return runOrSave([this]() -> FilterStatus { return ProtocolConverter::structEnd(); });
127
0
}
128
129
FilterStatus ShadowRouterImpl::fieldBegin(absl::string_view name, FieldType& field_type,
130
0
                                          int16_t& field_id) {
131
0
  if (requestStarted()) {
132
0
    return ProtocolConverter::fieldBegin(name, field_type, field_id);
133
0
  }
134
135
0
  auto cb = [name_str = std::string(name), field_type, field_id, this]() mutable -> FilterStatus {
136
0
    return ProtocolConverter::fieldBegin(absl::string_view(name_str), field_type, field_id);
137
0
  };
138
0
  pending_callbacks_.push_back(std::move(cb));
139
140
0
  return FilterStatus::Continue;
141
0
}
142
143
0
FilterStatus ShadowRouterImpl::fieldEnd() {
144
0
  return runOrSave([this]() -> FilterStatus { return ProtocolConverter::fieldEnd(); });
145
0
}
146
147
0
FilterStatus ShadowRouterImpl::boolValue(bool& value) {
148
0
  return runOrSave(
149
0
      [value, this]() mutable -> FilterStatus { return ProtocolConverter::boolValue(value); });
150
0
}
151
152
0
FilterStatus ShadowRouterImpl::byteValue(uint8_t& value) {
153
0
  return runOrSave(
154
0
      [value, this]() mutable -> FilterStatus { return ProtocolConverter::byteValue(value); });
155
0
}
156
157
0
FilterStatus ShadowRouterImpl::int16Value(int16_t& value) {
158
0
  return runOrSave(
159
0
      [value, this]() mutable -> FilterStatus { return ProtocolConverter::int16Value(value); });
160
0
}
161
162
0
FilterStatus ShadowRouterImpl::int32Value(int32_t& value) {
163
0
  return runOrSave(
164
0
      [value, this]() mutable -> FilterStatus { return ProtocolConverter::int32Value(value); });
165
0
}
166
167
0
FilterStatus ShadowRouterImpl::int64Value(int64_t& value) {
168
0
  return runOrSave(
169
0
      [value, this]() mutable -> FilterStatus { return ProtocolConverter::int64Value(value); });
170
0
}
171
172
0
FilterStatus ShadowRouterImpl::doubleValue(double& value) {
173
0
  return runOrSave(
174
0
      [value, this]() mutable -> FilterStatus { return ProtocolConverter::doubleValue(value); });
175
0
}
176
177
0
FilterStatus ShadowRouterImpl::stringValue(absl::string_view value) {
178
0
  if (requestStarted()) {
179
0
    return ProtocolConverter::stringValue(value);
180
0
  }
181
182
0
  auto cb = [value_str = std::string(value), this]() -> FilterStatus {
183
0
    return ProtocolConverter::stringValue(absl::string_view(value_str));
184
0
  };
185
0
  pending_callbacks_.push_back(std::move(cb));
186
187
0
  return FilterStatus::Continue;
188
0
}
189
190
FilterStatus ShadowRouterImpl::mapBegin(FieldType& key_type, FieldType& value_type,
191
0
                                        uint32_t& size) {
192
0
  return runOrSave([key_type, value_type, size, this]() mutable -> FilterStatus {
193
0
    return ProtocolConverter::mapBegin(key_type, value_type, size);
194
0
  });
195
0
}
196
197
0
FilterStatus ShadowRouterImpl::mapEnd() {
198
0
  return runOrSave([this]() -> FilterStatus { return ProtocolConverter::mapEnd(); });
199
0
}
200
201
0
FilterStatus ShadowRouterImpl::listBegin(FieldType& elem_type, uint32_t& size) {
202
0
  return runOrSave([elem_type, size, this]() mutable -> FilterStatus {
203
0
    return ProtocolConverter::listBegin(elem_type, size);
204
0
  });
205
0
}
206
207
0
FilterStatus ShadowRouterImpl::listEnd() {
208
0
  return runOrSave([this]() -> FilterStatus { return ProtocolConverter::listEnd(); });
209
0
}
210
211
0
FilterStatus ShadowRouterImpl::setBegin(FieldType& elem_type, uint32_t& size) {
212
0
  return runOrSave([elem_type, size, this]() mutable -> FilterStatus {
213
0
    return ProtocolConverter::setBegin(elem_type, size);
214
0
  });
215
0
}
216
217
0
FilterStatus ShadowRouterImpl::setEnd() {
218
0
  return runOrSave([this]() -> FilterStatus { return ProtocolConverter::setEnd(); });
219
0
}
220
221
0
FilterStatus ShadowRouterImpl::messageEnd() {
222
0
  auto cb = [this]() -> FilterStatus {
223
0
    ASSERT(upstream_request_->conn_data_ != nullptr);
224
225
0
    ProtocolConverter::messageEnd();
226
0
    const auto encode_size = upstream_request_->encodeAndWrite(upstream_request_buffer_);
227
0
    addSize(encode_size);
228
0
    stats().recordUpstreamRequestSize(*cluster_, request_size_);
229
230
0
    request_sent_ = true;
231
232
0
    if (metadata_->messageType() == MessageType::Oneway) {
233
0
      upstream_request_->releaseConnection(false);
234
0
    }
235
236
0
    return FilterStatus::Continue;
237
0
  };
238
239
0
  return runOrSave(std::move(cb), [this]() -> void { request_ready_ = true; });
240
0
}
241
242
0
bool ShadowRouterImpl::requestInProgress() {
243
0
  const bool connection_open = upstream_request_->conn_data_ != nullptr;
244
0
  const bool connection_waiting = upstream_request_->conn_pool_handle_ != nullptr;
245
246
  // Connection open and message sent.
247
0
  const bool message_sent = connection_open && request_sent_;
248
249
  // Request ready to go and connection ready or almost ready.
250
0
  const bool message_ready = request_ready_ && (connection_open || connection_waiting);
251
252
0
  return message_sent || message_ready;
253
0
}
254
255
0
void ShadowRouterImpl::onRouterDestroy() {
256
0
  ASSERT(!deferred_deleting_);
257
258
  // Mark the shadow request to be destroyed when the response gets back
259
  // or the upstream connection finally fails.
260
0
  router_destroyed_ = true;
261
262
0
  if (!requestInProgress()) {
263
0
    maybeCleanup();
264
0
  }
265
0
}
266
267
0
bool ShadowRouterImpl::waitingForConnection() const {
268
0
  return upstream_request_->conn_pool_handle_ != nullptr;
269
0
}
270
271
0
void ShadowRouterImpl::maybeCleanup() {
272
0
  ENVOY_LOG(debug, "maybeCleanup, removed: {}, router_destroyed: {}", removed_, router_destroyed_);
273
0
  if (removed_) {
274
0
    return;
275
0
  }
276
277
0
  ASSERT(!deferred_deleting_);
278
279
0
  if (router_destroyed_) {
280
0
    removed_ = true;
281
0
    upstream_request_->resetStream();
282
0
    parent_.remove(*this);
283
0
  }
284
0
}
285
286
0
void ShadowRouterImpl::onUpstreamData(Buffer::Instance& data, bool end_stream) {
287
0
  const bool done =
288
0
      upstream_request_->handleUpstreamData(data, end_stream, *upstream_response_callbacks_);
289
0
  if (done) {
290
0
    maybeCleanup();
291
0
  }
292
0
}
293
294
0
void ShadowRouterImpl::onEvent(Network::ConnectionEvent event) {
295
0
  upstream_request_->onEvent(event);
296
0
  maybeCleanup();
297
0
}
298
299
} // namespace Router
300
} // namespace ThriftProxy
301
} // namespace NetworkFilters
302
} // namespace Extensions
303
} // namespace Envoy