Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/common/dubbo/message.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <algorithm>
4
#include <memory>
5
#include <string>
6
7
#include "envoy/common/pure.h"
8
9
#include "source/common/buffer/buffer_impl.h"
10
#include "source/extensions/common/dubbo/hessian2_utils.h"
11
12
#include "absl/container/node_hash_map.h"
13
#include "absl/types/optional.h"
14
15
namespace Envoy {
16
namespace Extensions {
17
namespace Common {
18
namespace Dubbo {
19
20
/**
21
 * Stream reset reasons.
22
 */
23
enum class StreamResetReason : uint8_t {
24
  // If a local codec level reset was sent on the stream.
25
  LocalReset,
26
  // If a local codec level refused stream reset was sent on the stream (allowing for retry).
27
  LocalRefusedStreamReset,
28
  // If a remote codec level reset was received on the stream.
29
  RemoteReset,
30
  // If a remote codec level refused stream reset was received on the stream (allowing for retry).
31
  RemoteRefusedStreamReset,
32
  // If the stream was locally reset by a connection pool due to an initial connection failure.
33
  ConnectionFailure,
34
  // If the stream was locally reset due to connection termination.
35
  ConnectionTermination,
36
  // The stream was reset because of a resource overflow.
37
  Overflow
38
};
39
40
// Supported serialize type
41
enum class SerializeType : uint8_t {
42
  Hessian2 = 2,
43
};
44
45
// Message Type
46
enum class MessageType : uint8_t {
47
  Response = 0,
48
  Request = 1,
49
  // Special request without two-way flag.
50
  Oneway = 2,
51
  // Special response with non-Ok response status or exception response.
52
  Exception = 3,
53
  // Special request with event flag.
54
  HeartbeatRequest = 4,
55
  // Special response with event flag.
56
  HeartbeatResponse = 5,
57
58
  // ATTENTION: MAKE SURE THIS REMAINS EQUAL TO THE LAST MESSAGE TYPE
59
  LastMessageType = HeartbeatResponse,
60
};
61
62
/**
63
 * Dubbo protocol response status types.
64
 * See org.apache.dubbo.remoting.exchange
65
 */
66
enum class ResponseStatus : uint8_t {
67
  Ok = 20,
68
  ClientTimeout = 30,
69
  ServerTimeout = 31,
70
  BadRequest = 40,
71
  BadResponse = 50,
72
  ServiceNotFound = 60,
73
  ServiceError = 70,
74
  ServerError = 80,
75
  ClientError = 90,
76
  ServerThreadpoolExhaustedError = 100,
77
};
78
79
enum class RpcResponseType : uint8_t {
80
  ResponseWithException = 0,
81
  ResponseWithValue = 1,
82
  ResponseWithNullValue = 2,
83
  ResponseWithExceptionWithAttachments = 3,
84
  ResponseValueWithAttachments = 4,
85
  ResponseNullValueWithAttachments = 5,
86
};
87
88
using Attachments = absl::flat_hash_map<std::string, std::string>;
89
using ArgumentVec = absl::InlinedVector<Hessian2::ObjectPtr, 4>;
90
91
class RequestContent : Envoy::Logger::Loggable<Envoy::Logger::Id::dubbo> {
92
public:
93
  // Initialize the content buffer with the given buffer and length.
94
  void initialize(Buffer::Instance& buffer, uint64_t length);
95
96
  // Initialize the content buffer with the given types and arguments and attachments.
97
  // The initialize() call will also encode these types and arguments into the
98
  // content buffer.
99
  void initialize(std::string&& types, ArgumentVec&& argvs, Attachments&& attachs);
100
101
  // Underlying content buffer. This may re-encode the result and attachments into the
102
  // content buffer to ensure the returned buffer is up-to-date.
103
  const Buffer::Instance& buffer();
104
105
  // Move the content buffer to the given buffer. This only does the move and does not
106
  // re-encode the result and attachments.
107
  void bufferMoveTo(Buffer::Instance& buffer);
108
109
  // Get all the arguments of the request.
110
  const ArgumentVec& arguments();
111
112
  // Get all the attachments of the request.
113
  const Attachments& attachments();
114
115
  // Set the attachment with the given key and value. If the key already exists, the
116
  // value will be updated. Otherwise, a new key-value pair will be added.
117
  void setAttachment(absl::string_view key, absl::string_view val);
118
119
  // Remove the attachment with the given key.
120
  void delAttachment(absl::string_view key);
121
122
private:
123
  // Decode the content buffer into types, arguments and attachments. The decoding is
124
  // lazy and will be triggered when the content is accessed.
125
  void lazyDecode();
126
127
  // Re-encode the attachments into the content buffer.
128
  void encodeAttachments();
129
130
  // Re-encode the types, arguments and attachments into the content buffer.
131
  void encodeEverything();
132
133
  // Called when the content is broken. The whole content will be reset to an empty
134
  // state.
135
  void handleBrokenValue();
136
137
  Buffer::OwnedImpl content_buffer_;
138
139
  // If the content has been decoded. This ensures the decoding is only performed once.
140
  bool decoded_{false};
141
142
  // If the attachments has been updated. This ensures the re-encoding is only
143
  // when the attachment has been modified.
144
  bool updated_{false};
145
146
  uint64_t argvs_size_{0};
147
148
  std::string types_;
149
  ArgumentVec argvs_;
150
  Attachments attachs_;
151
};
152
153
/**
154
 * RpcRequest represent an rpc call.
155
 */
156
class RpcRequest {
157
public:
158
  RpcRequest(std::string&& version, std::string&& service, std::string&& service_version,
159
             std::string&& method)
160
      : version_(std::move(version)), service_(std::move(service)),
161
0
        service_version_(std::move(service_version)), method_(std::move(method)) {}
162
163
0
  absl::string_view version() const { return version_; }
164
0
  absl::string_view service() const { return service_; }
165
0
  absl::string_view serviceVersion() const { return service_version_; }
166
0
  absl::string_view method() const { return method_; }
167
168
0
  RequestContent& content() const { return content_; }
169
170
private:
171
  std::string version_;
172
  std::string service_;
173
  std::string service_version_;
174
  std::string method_;
175
176
  mutable RequestContent content_;
177
};
178
179
using RpcRequestPtr = std::unique_ptr<RpcRequest>;
180
181
class ResponseContent : public Envoy::Logger::Loggable<Envoy::Logger::Id::dubbo> {
182
public:
183
  // Initialize the content buffer with the given buffer and length.
184
  void initialize(Buffer::Instance& buffer, uint64_t length);
185
186
  // Initialize the content buffer with the given result and attachments. The initialize()
187
  // call will also encode the result and attachments into the content buffer.
188
  void initialize(Hessian2::ObjectPtr&& value, Attachments&& attachs);
189
190
  // Underlying content buffer. This may re-encode the result and attachments into the
191
  // content buffer to ensure the returned buffer is up-to-date.
192
  const Buffer::Instance& buffer();
193
194
  // Move the content buffer to the given buffer. This only does the move and does not
195
  // re-encode the result and attachments.
196
  void bufferMoveTo(Buffer::Instance& buffer);
197
198
  // Get the result of the response. If the content has not been decoded, the decoding
199
  // will be triggered.
200
  const Hessian2::Object* result();
201
202
  // Get all the attachments of the response.
203
  const Attachments& attachments();
204
205
  // Set the attachment with the given key and value. If the key already exists, the
206
  // value will be updated. Otherwise, a new key-value pair will be added.
207
  void setAttachment(absl::string_view key, absl::string_view val);
208
209
  // Remove the attachment with the given key.
210
  void delAttachment(absl::string_view key);
211
212
private:
213
  // Decode the content buffer into value and attachments. The decoding is lazy and will
214
  // be triggered when the content is accessed.
215
  void lazyDecode();
216
217
  // Re-encode the attachments into the content buffer.
218
  void encodeAttachments();
219
220
  // Re-encode the result and attachments into the content buffer.
221
  void encodeEverything();
222
223
  // Called when the content is broken. The whole content will be reset to an empty
224
  // state.
225
  void handleBrokenValue();
226
227
  Buffer::OwnedImpl content_buffer_;
228
229
  // If the content has been decoded. This ensures the decoding is only performed once.
230
  bool decoded_{false};
231
232
  // If the attachments has been updated. This ensures the re-encoding is only
233
  // when the attachment has been modified.
234
  bool updated_{false};
235
236
  uint64_t result_size_{0};
237
238
  Hessian2::ObjectPtr result_;
239
  Attachments attachs_;
240
};
241
242
/**
243
 * RpcResponse represent the result of an rpc call.
244
 */
245
class RpcResponse {
246
public:
247
0
  RpcResponse() = default;
248
249
0
  void setResponseType(RpcResponseType response_type) { response_type_ = response_type; }
250
0
  absl::optional<RpcResponseType> responseType() const { return response_type_; }
251
252
0
  ResponseContent& content() const { return content_; }
253
254
private:
255
  absl::optional<RpcResponseType> response_type_{};
256
  mutable ResponseContent content_;
257
};
258
259
using RpcResponsePtr = std::unique_ptr<RpcResponse>;
260
261
} // namespace Dubbo
262
} // namespace Common
263
} // namespace Extensions
264
} // namespace Envoy