Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <memory>
4
5
#include "envoy/event/dispatcher.h"
6
#include "envoy/router/router.h"
7
#include "envoy/tcp/conn_pool.h"
8
#include "envoy/upstream/load_balancer.h"
9
10
#include "source/common/common/linked_object.h"
11
#include "source/common/common/logger.h"
12
#include "source/common/upstream/load_balancer_impl.h"
13
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
14
#include "source/extensions/filters/network/thrift_proxy/conn_manager.h"
15
#include "source/extensions/filters/network/thrift_proxy/router/router.h"
16
#include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h"
17
18
namespace Envoy {
19
namespace Extensions {
20
namespace NetworkFilters {
21
namespace ThriftProxy {
22
namespace Router {
23
24
struct NullResponseDecoder : public DecoderCallbacks, public ProtocolConverter {
25
  NullResponseDecoder(Transport& transport, Protocol& protocol)
26
0
      : decoder_(std::make_unique<Decoder>(transport, protocol, *this)) {
27
0
    initProtocolConverter(protocol, response_buffer_);
28
0
  }
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::NullResponseDecoder::NullResponseDecoder(Envoy::Extensions::NetworkFilters::ThriftProxy::Transport&, Envoy::Extensions::NetworkFilters::ThriftProxy::Protocol&)
Unexecuted instantiation: Envoy::Extensions::NetworkFilters::ThriftProxy::Router::NullResponseDecoder::NullResponseDecoder(Envoy::Extensions::NetworkFilters::ThriftProxy::Transport&, Envoy::Extensions::NetworkFilters::ThriftProxy::Protocol&)
29
30
0
  virtual ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& data) {
31
0
    upstream_buffer_.move(data);
32
33
0
    bool underflow = false;
34
0
    TRY_NEEDS_AUDIT { underflow = onData(); }
35
0
    END_TRY catch (const AppException&) { return ThriftFilters::ResponseStatus::Reset; }
36
0
    catch (const EnvoyException&) {
37
0
      return ThriftFilters::ResponseStatus::Reset;
38
0
    }
39
40
0
    ASSERT(complete_ || underflow);
41
0
    return complete_ ? ThriftFilters::ResponseStatus::Complete
42
0
                     : ThriftFilters::ResponseStatus::MoreData;
43
0
  }
44
0
  virtual bool onData() {
45
0
    bool underflow = false;
46
0
    decoder_->onData(upstream_buffer_, underflow);
47
0
    return underflow;
48
0
  }
49
0
  MessageMetadataSharedPtr& responseMetadata() {
50
0
    ASSERT(metadata_ != nullptr);
51
0
    return metadata_;
52
0
  }
53
0
  bool responseSuccess() { return success_.value_or(false); }
54
55
  // ProtocolConverter
56
0
  FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override {
57
0
    metadata_ = metadata;
58
0
    if (metadata_->hasReplyType()) {
59
0
      success_ = metadata_->replyType() == ReplyType::Success;
60
0
    }
61
0
    return FilterStatus::Continue;
62
0
  }
63
0
  FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override {
64
0
    UNREFERENCED_PARAMETER(metadata);
65
0
    return FilterStatus::Continue;
66
0
  }
67
0
  FilterStatus transportEnd() override {
68
0
    ASSERT(metadata_ != nullptr);
69
0
    complete_ = true;
70
0
    return FilterStatus::Continue;
71
0
  }
72
73
  // DecoderCallbacks
74
0
  DecoderEventHandler& newDecoderEventHandler() override { return *this; }
75
0
  bool passthroughEnabled() const override { return true; }
76
0
  bool isRequest() const override { return false; }
77
0
  bool headerKeysPreserveCase() const override { return false; }
78
79
  DecoderPtr decoder_;
80
  Buffer::OwnedImpl response_buffer_;
81
  Buffer::OwnedImpl upstream_buffer_;
82
  MessageMetadataSharedPtr metadata_;
83
  absl::optional<bool> success_;
84
  bool complete_ : 1;
85
};
86
using NullResponseDecoderPtr = std::unique_ptr<NullResponseDecoder>;
87
88
// Adapter from NullResponseDecoder to UpstreamResponseCallbacks.
89
class ShadowUpstreamResponseCallbacksImpl : public UpstreamResponseCallbacks {
90
public:
91
  ShadowUpstreamResponseCallbacksImpl(NullResponseDecoder& response_decoder)
92
0
      : response_decoder_(response_decoder) {}
93
94
0
  void startUpstreamResponse(Transport&, Protocol&) override {}
95
0
  ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override {
96
0
    return response_decoder_.upstreamData(buffer);
97
0
  }
98
0
  MessageMetadataSharedPtr responseMetadata() override {
99
0
    return response_decoder_.responseMetadata();
100
0
  }
101
0
  bool responseSuccess() override { return response_decoder_.responseSuccess(); }
102
103
private:
104
  NullResponseDecoder& response_decoder_;
105
};
106
using ShadowUpstreamResponseCallbacksImplPtr = std::unique_ptr<ShadowUpstreamResponseCallbacksImpl>;
107
108
class ShadowWriterImpl;
109
110
class ShadowRouterImpl : public ShadowRouterHandle,
111
                         public RequestOwner,
112
                         public Tcp::ConnectionPool::UpstreamCallbacks,
113
                         public Upstream::LoadBalancerContextBase,
114
                         public Event::DeferredDeletable,
115
                         public LinkedObject<ShadowRouterImpl> {
116
public:
117
  ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name,
118
                   MessageMetadataSharedPtr& metadata, TransportType transport_type,
119
                   ProtocolType protocol_type);
120
0
  ~ShadowRouterImpl() override = default;
121
122
  bool createUpstreamRequest();
123
  void maybeCleanup();
124
0
  void resetStream() {
125
0
    if (upstream_request_ != nullptr) {
126
0
      upstream_request_->releaseConnection(true);
127
0
    }
128
0
  }
129
130
  // ShadowRouterHandle
131
  void onRouterDestroy() override;
132
  bool waitingForConnection() const override;
133
0
  RequestOwner& requestOwner() override { return *this; }
134
135
  // RequestOwner
136
0
  Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() override { return *this; }
137
0
  Buffer::OwnedImpl& buffer() override { return upstream_request_buffer_; }
138
  Event::Dispatcher& dispatcher() override;
139
0
  void addSize(uint64_t size) override { request_size_ += size; }
140
0
  void continueDecoding() override { flushPendingCallbacks(); }
141
0
  void resetDownstreamConnection() override {}
142
0
  void sendLocalReply(const ThriftProxy::DirectResponse&, bool) override {}
143
144
  // RequestOwner::ProtocolConverter
145
0
  FilterStatus transportBegin(MessageMetadataSharedPtr) override { return FilterStatus::Continue; }
146
0
  FilterStatus transportEnd() override { return FilterStatus::Continue; }
147
  FilterStatus messageEnd() override;
148
  FilterStatus passthroughData(Buffer::Instance& data) override;
149
  FilterStatus structBegin(absl::string_view name) override;
150
  FilterStatus structEnd() override;
151
  FilterStatus fieldBegin(absl::string_view name, FieldType& field_type,
152
                          int16_t& field_id) override;
153
  FilterStatus fieldEnd() override;
154
  FilterStatus boolValue(bool& value) override;
155
  FilterStatus byteValue(uint8_t& value) override;
156
  FilterStatus int16Value(int16_t& value) override;
157
  FilterStatus int32Value(int32_t& value) override;
158
  FilterStatus int64Value(int64_t& value) override;
159
  FilterStatus doubleValue(double& value) override;
160
  FilterStatus stringValue(absl::string_view value) override;
161
  FilterStatus mapBegin(FieldType& key_type, FieldType& value_type, uint32_t& size) override;
162
  FilterStatus mapEnd() override;
163
  FilterStatus listBegin(FieldType& elem_type, uint32_t& size) override;
164
  FilterStatus listEnd() override;
165
  FilterStatus setBegin(FieldType& elem_type, uint32_t& size) override;
166
  FilterStatus setEnd() override;
167
168
  // Tcp::ConnectionPool::UpstreamCallbacks
169
  void onUpstreamData(Buffer::Instance& data, bool end_stream) override;
170
  void onEvent(Network::ConnectionEvent event) override;
171
0
  void onAboveWriteBufferHighWatermark() override {}
172
0
  void onBelowWriteBufferLowWatermark() override {}
173
174
  // Upstream::LoadBalancerContextBase
175
0
  const Network::Connection* downstreamConnection() const override { return nullptr; }
176
0
  const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override { return nullptr; }
177
178
  // Event::DeferredDeletable
179
0
  void deleteIsPending() override { deferred_deleting_ = true; }
180
181
private:
182
  friend class ShadowWriterTest;
183
  using ConverterCallback = std::function<FilterStatus()>;
184
185
  void writeRequest();
186
  bool requestInProgress();
187
  bool requestStarted() const;
188
  void flushPendingCallbacks();
189
  FilterStatus runOrSave(std::function<FilterStatus()>&& cb,
190
                         const std::function<void()>& on_save = {});
191
192
  ShadowWriterImpl& parent_;
193
  const std::string cluster_name_;
194
  MessageMetadataSharedPtr metadata_;
195
  const TransportType transport_type_;
196
  const ProtocolType protocol_type_;
197
  TransportPtr transport_;
198
  ProtocolPtr protocol_;
199
  NullResponseDecoderPtr response_decoder_;
200
  ShadowUpstreamResponseCallbacksImplPtr upstream_response_callbacks_;
201
  bool router_destroyed_{};
202
  bool request_sent_{};
203
  Buffer::OwnedImpl upstream_request_buffer_;
204
  std::unique_ptr<UpstreamRequest> upstream_request_;
205
  uint64_t request_size_{};
206
  uint64_t response_size_{};
207
  bool request_ready_ : 1;
208
209
  std::list<ConverterCallback> pending_callbacks_;
210
  bool removed_{};
211
  bool deferred_deleting_{};
212
};
213
214
class ActiveRouters : public ThreadLocal::ThreadLocalObject {
215
public:
216
0
  ActiveRouters(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
217
0
  ~ActiveRouters() override {
218
0
    while (!active_routers_.empty()) {
219
0
      auto& router = active_routers_.front();
220
0
      router->resetStream();
221
0
      remove(*router);
222
0
    }
223
0
  }
224
225
0
  std::list<std::unique_ptr<ShadowRouterImpl>>& activeRouters() { return active_routers_; }
226
227
0
  void remove(ShadowRouterImpl& router) {
228
0
    dispatcher_.deferredDelete(router.removeFromList(active_routers_));
229
0
  }
230
231
private:
232
  Event::Dispatcher& dispatcher_;
233
  std::list<std::unique_ptr<ShadowRouterImpl>> active_routers_;
234
};
235
236
class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrift> {
237
public:
238
  ShadowWriterImpl(Upstream::ClusterManager& cm, const RouterStats& stats,
239
                   Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls)
240
0
      : cm_(cm), stats_(stats), dispatcher_(dispatcher), tls_(tls) {
241
0
    tls_.set(
242
0
        [](Event::Dispatcher& dispatcher) { return std::make_shared<ActiveRouters>(dispatcher); });
243
0
  }
244
245
0
  ~ShadowWriterImpl() override = default;
246
247
0
  void remove(ShadowRouterImpl& router) {
248
    // While router is destroyed but the request is in progress. The cleanup
249
    // is possibly deferred to tls shutdown, thus leading us to check if
250
    // the storage is valid.
251
0
    if (tls_.get().has_value()) {
252
0
      tls_->remove(router);
253
0
    }
254
0
  }
255
0
  const RouterStats& stats() { return stats_; }
256
257
  // Router::ShadowWriter
258
0
  Upstream::ClusterManager& clusterManager() override { return cm_; }
259
0
  Event::Dispatcher& dispatcher() override { return dispatcher_; }
260
  OptRef<ShadowRouterHandle> submit(const std::string& cluster_name,
261
                                    MessageMetadataSharedPtr metadata,
262
                                    TransportType original_transport,
263
                                    ProtocolType original_protocol) override;
264
265
private:
266
  friend class ShadowRouterImpl;
267
  friend class ShadowWriterTest;
268
269
  Upstream::ClusterManager& cm_;
270
  const RouterStats& stats_;
271
  Event::Dispatcher& dispatcher_;
272
  ThreadLocal::TypedSlot<ActiveRouters> tls_;
273
};
274
275
} // namespace Router
276
} // namespace ThriftProxy
277
} // namespace NetworkFilters
278
} // namespace Extensions
279
} // namespace Envoy