Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/active_message.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include "envoy/event/deferred_deletable.h"
4
#include "envoy/network/connection.h"
5
#include "envoy/network/filter.h"
6
#include "envoy/stats/timespan.h"
7
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/common/linked_object.h"
10
#include "source/common/common/logger.h"
11
#include "source/common/stream_info/stream_info_impl.h"
12
#include "source/extensions/filters/network/dubbo_proxy/decoder.h"
13
#include "source/extensions/filters/network/dubbo_proxy/decoder_event_handler.h"
14
#include "source/extensions/filters/network/dubbo_proxy/filters/filter.h"
15
#include "source/extensions/filters/network/dubbo_proxy/metadata.h"
16
#include "source/extensions/filters/network/dubbo_proxy/router/router.h"
17
#include "source/extensions/filters/network/dubbo_proxy/stats.h"
18
19
#include "absl/types/optional.h"
20
21
namespace Envoy {
22
namespace Extensions {
23
namespace NetworkFilters {
24
namespace DubboProxy {
25
26
class ConnectionManager;
27
class ActiveMessage;
28
29
class ActiveResponseDecoder : public ResponseDecoderCallbacks,
30
                              public StreamHandler,
31
                              Logger::Loggable<Logger::Id::dubbo> {
32
public:
33
  ActiveResponseDecoder(ActiveMessage& parent, DubboFilterStats& stats,
34
                        Network::Connection& connection, ProtocolPtr&& protocol);
35
0
  ~ActiveResponseDecoder() override = default;
36
37
  DubboFilters::UpstreamResponseStatus onData(Buffer::Instance& data);
38
39
  // StreamHandler
40
  void onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) override;
41
42
  // ResponseDecoderCallbacks
43
0
  StreamHandler& newStream() override { return *this; }
44
0
  void onHeartbeat(MessageMetadataSharedPtr) override {}
45
46
0
  uint64_t requestId() const { return metadata_ ? metadata_->requestId() : 0; }
47
48
private:
49
  FilterStatus applyMessageEncodedFilters(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx);
50
51
  ActiveMessage& parent_;
52
  DubboFilterStats& stats_;
53
  Network::Connection& response_connection_;
54
  ProtocolPtr protocol_;
55
  ResponseDecoderPtr decoder_;
56
  MessageMetadataSharedPtr metadata_;
57
  bool complete_ : 1;
58
  DubboFilters::UpstreamResponseStatus response_status_{
59
      DubboFilters::UpstreamResponseStatus::MoreData};
60
};
61
62
using ActiveResponseDecoderPtr = std::unique_ptr<ActiveResponseDecoder>;
63
64
class ActiveMessageFilterBase : public virtual DubboFilters::FilterCallbacksBase {
65
public:
66
  ActiveMessageFilterBase(ActiveMessage& parent, bool dual_filter)
67
0
      : parent_(parent), dual_filter_(dual_filter) {}
68
0
  ~ActiveMessageFilterBase() override = default;
69
70
  // DubboFilters::FilterCallbacksBase
71
  uint64_t requestId() const override;
72
  uint64_t streamId() const override;
73
  const Network::Connection* connection() const override;
74
  DubboProxy::Router::RouteConstSharedPtr route() override;
75
  SerializationType serializationType() const override;
76
  ProtocolType protocolType() const override;
77
  StreamInfo::StreamInfo& streamInfo() override;
78
  Event::Dispatcher& dispatcher() override;
79
  void resetStream() override;
80
81
protected:
82
  ActiveMessage& parent_;
83
  const bool dual_filter_ : 1;
84
};
85
86
// Wraps a DecoderFilter and acts as the DecoderFilterCallbacks for the filter, enabling filter
87
// chain continuation.
88
class ActiveMessageDecoderFilter : public DubboFilters::DecoderFilterCallbacks,
89
                                   public ActiveMessageFilterBase,
90
                                   public LinkedObject<ActiveMessageDecoderFilter>,
91
                                   Logger::Loggable<Logger::Id::dubbo> {
92
public:
93
  ActiveMessageDecoderFilter(ActiveMessage& parent, DubboFilters::DecoderFilterSharedPtr filter,
94
                             bool dual_filter);
95
0
  ~ActiveMessageDecoderFilter() override = default;
96
97
  void continueDecoding() override;
98
  void sendLocalReply(const DubboFilters::DirectResponse& response, bool end_stream) override;
99
  void startUpstreamResponse() override;
100
  DubboFilters::UpstreamResponseStatus upstreamData(Buffer::Instance& buffer) override;
101
  void resetDownstreamConnection() override;
102
103
0
  DubboFilters::DecoderFilterSharedPtr handler() { return handle_; }
104
105
private:
106
  DubboFilters::DecoderFilterSharedPtr handle_;
107
};
108
109
using ActiveMessageDecoderFilterPtr = std::unique_ptr<ActiveMessageDecoderFilter>;
110
111
// Wraps a EncoderFilter and acts as the EncoderFilterCallbacks for the filter, enabling filter
112
// chain continuation.
113
class ActiveMessageEncoderFilter : public ActiveMessageFilterBase,
114
                                   public DubboFilters::EncoderFilterCallbacks,
115
                                   public LinkedObject<ActiveMessageEncoderFilter>,
116
                                   Logger::Loggable<Logger::Id::dubbo> {
117
public:
118
  ActiveMessageEncoderFilter(ActiveMessage& parent, DubboFilters::EncoderFilterSharedPtr filter,
119
                             bool dual_filter);
120
0
  ~ActiveMessageEncoderFilter() override = default;
121
122
  void continueEncoding() override;
123
0
  DubboFilters::EncoderFilterSharedPtr handler() { return handle_; }
124
125
private:
126
  DubboFilters::EncoderFilterSharedPtr handle_;
127
128
  friend class ActiveMessage;
129
};
130
131
using ActiveMessageEncoderFilterPtr = std::unique_ptr<ActiveMessageEncoderFilter>;
132
133
// ActiveMessage tracks downstream requests for which no response has been received.
134
class ActiveMessage : public LinkedObject<ActiveMessage>,
135
                      public Event::DeferredDeletable,
136
                      public StreamHandler,
137
                      public DubboFilters::FilterChainFactoryCallbacks,
138
                      Logger::Loggable<Logger::Id::dubbo> {
139
public:
140
  ActiveMessage(ConnectionManager& parent);
141
  ~ActiveMessage() override;
142
143
  // Indicates which filter to start the iteration with.
144
  enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent };
145
146
  // Returns the encoder filter to start iteration with.
147
  std::list<ActiveMessageEncoderFilterPtr>::iterator
148
  commonEncodePrefix(ActiveMessageEncoderFilter* filter, FilterIterationStartState state);
149
  // Returns the decoder filter to start iteration with.
150
  std::list<ActiveMessageDecoderFilterPtr>::iterator
151
  commonDecodePrefix(ActiveMessageDecoderFilter* filter, FilterIterationStartState state);
152
153
  // Dubbo::FilterChainFactoryCallbacks
154
  void addDecoderFilter(DubboFilters::DecoderFilterSharedPtr filter) override;
155
  void addEncoderFilter(DubboFilters::EncoderFilterSharedPtr filter) override;
156
  void addFilter(DubboFilters::CodecFilterSharedPtr filter) override;
157
158
  // StreamHandler
159
  void onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) override;
160
161
  uint64_t requestId() const;
162
  uint64_t streamId() const;
163
  const Network::Connection* connection() const;
164
  SerializationType serializationType() const;
165
  ProtocolType protocolType() const;
166
  StreamInfo::StreamInfo& streamInfo();
167
  Router::RouteConstSharedPtr route();
168
  void sendLocalReply(const DubboFilters::DirectResponse& response, bool end_stream);
169
  void startUpstreamResponse();
170
  DubboFilters::UpstreamResponseStatus upstreamData(Buffer::Instance& buffer);
171
  void resetDownstreamConnection();
172
  Event::Dispatcher& dispatcher();
173
  void resetStream();
174
175
  void createFilterChain();
176
  FilterStatus applyDecoderFilters(ActiveMessageDecoderFilter* filter,
177
                                   FilterIterationStartState state);
178
  FilterStatus applyEncoderFilters(ActiveMessageEncoderFilter* filter,
179
                                   FilterIterationStartState state);
180
  void finalizeRequest();
181
  void onReset();
182
  void onError(const std::string& what);
183
0
  MessageMetadataSharedPtr metadata() const { return metadata_; }
184
0
  ContextSharedPtr context() const { return context_; }
185
0
  bool pendingStreamDecoded() const { return pending_stream_decoded_; }
186
187
private:
188
  void addDecoderFilterWorker(DubboFilters::DecoderFilterSharedPtr filter, bool dual_filter);
189
  void addEncoderFilterWorker(DubboFilters::EncoderFilterSharedPtr, bool dual_filter);
190
191
  ConnectionManager& parent_;
192
193
  ContextSharedPtr context_;
194
  MessageMetadataSharedPtr metadata_;
195
  Stats::TimespanPtr request_timer_;
196
  ActiveResponseDecoderPtr response_decoder_;
197
198
  absl::optional<Router::RouteConstSharedPtr> cached_route_;
199
200
  std::list<ActiveMessageDecoderFilterPtr> decoder_filters_;
201
  std::function<FilterStatus(DubboFilters::DecoderFilter*)> filter_action_;
202
203
  std::list<ActiveMessageEncoderFilterPtr> encoder_filters_;
204
  std::function<FilterStatus(DubboFilters::EncoderFilter*)> encoder_filter_action_;
205
206
  // This value is used in the calculation of the weighted cluster.
207
  uint64_t stream_id_;
208
  StreamInfo::StreamInfoImpl stream_info_;
209
210
  Buffer::OwnedImpl response_buffer_;
211
212
  bool pending_stream_decoded_ : 1;
213
  bool local_response_sent_ : 1;
214
215
  friend class ActiveResponseDecoder;
216
};
217
218
using ActiveMessagePtr = std::unique_ptr<ActiveMessage>;
219
220
} // namespace DubboProxy
221
} // namespace NetworkFilters
222
} // namespace Extensions
223
} // namespace Envoy