Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/common/dubbo/codec.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/common/dubbo/codec.h"
2
3
#include <cstdint>
4
#include <memory>
5
6
#include "envoy/registry/registry.h"
7
8
#include "source/common/common/assert.h"
9
#include "source/extensions/common/dubbo/hessian2_serializer_impl.h"
10
#include "source/extensions/common/dubbo/message.h"
11
#include "source/extensions/common/dubbo/metadata.h"
12
13
namespace Envoy {
14
namespace Extensions {
15
namespace Common {
16
namespace Dubbo {
17
18
namespace {
19
20
constexpr uint16_t MagicNumber = 0xdabb;
21
constexpr uint8_t MessageTypeMask = 0x80;
22
constexpr uint8_t EventMask = 0x20;
23
constexpr uint8_t TwoWayMask = 0x40;
24
constexpr uint8_t SerializeTypeMask = 0x1f;
25
constexpr uint64_t FlagOffset = 2;
26
constexpr uint64_t StatusOffset = 3;
27
constexpr uint64_t RequestIDOffset = 4;
28
constexpr uint64_t BodySizeOffset = 12;
29
30
0
void encodeHeader(Buffer::Instance& buffer, Context& context, uint32_t body_size) {
31
  // Magic number.
32
0
  buffer.writeBEInt<uint16_t>(MagicNumber);
33
34
  // Serialize type and flag.
35
0
  uint8_t flag = static_cast<uint8_t>(SerializeType::Hessian2);
36
37
0
  switch (context.messageType()) {
38
0
  case MessageType::Response:
39
    // Normal response
40
0
    break;
41
0
  case MessageType::Request:
42
    // Normal request.
43
0
    flag ^= MessageTypeMask;
44
0
    flag ^= TwoWayMask;
45
0
    break;
46
0
  case MessageType::Oneway:
47
    // Oneway request.
48
0
    flag ^= MessageTypeMask;
49
0
    break;
50
0
  case MessageType::Exception:
51
    // Exception response.
52
0
    break;
53
0
  case MessageType::HeartbeatRequest:
54
    // Event request.
55
0
    flag ^= MessageTypeMask;
56
0
    flag ^= TwoWayMask;
57
0
    flag ^= EventMask;
58
0
    break;
59
0
  case MessageType::HeartbeatResponse:
60
0
    flag ^= EventMask;
61
0
    break;
62
0
  default:
63
0
    PANIC_DUE_TO_CORRUPT_ENUM;
64
0
  }
65
0
  buffer.writeByte(flag);
66
67
  // Optional response status.
68
0
  buffer.writeByte(context.hasResponseStatus() ? static_cast<uint8_t>(context.responseStatus())
69
0
                                               : 0x00);
70
71
  // Request id.
72
0
  buffer.writeBEInt<uint64_t>(context.requestId());
73
74
  // Because the body size in the context is the size of original request or response.
75
  // It may be changed after the processing of filters. So write the explicit specified
76
  // body size here.
77
0
  buffer.writeBEInt<uint32_t>(body_size);
78
0
}
79
80
} // namespace
81
82
// Consistent with the SerializeType
83
0
bool isValidSerializeType(SerializeType type) {
84
0
  switch (type) {
85
0
  case SerializeType::Hessian2:
86
0
    break;
87
0
  default:
88
0
    return false;
89
0
  }
90
0
  return true;
91
0
}
92
93
// Consistent with the ResponseStatus
94
0
bool isValidResponseStatus(ResponseStatus status) {
95
0
  switch (status) {
96
0
  case ResponseStatus::Ok:
97
0
  case ResponseStatus::ClientTimeout:
98
0
  case ResponseStatus::ServerTimeout:
99
0
  case ResponseStatus::BadRequest:
100
0
  case ResponseStatus::BadResponse:
101
0
  case ResponseStatus::ServiceNotFound:
102
0
  case ResponseStatus::ServiceError:
103
0
  case ResponseStatus::ServerError:
104
0
  case ResponseStatus::ClientError:
105
0
  case ResponseStatus::ServerThreadpoolExhaustedError:
106
0
    return true;
107
0
  }
108
0
  return false;
109
0
}
110
111
0
void parseRequestInfoFromBuffer(Buffer::Instance& data, Context& context) {
112
0
  ASSERT(data.length() >= DubboCodec::HeadersSize);
113
0
  uint8_t flag = data.peekInt<uint8_t>(FlagOffset);
114
0
  bool is_two_way = (flag & TwoWayMask) == TwoWayMask ? true : false;
115
116
  // Request without two flag should be one way request.
117
0
  if (!is_two_way && context.messageType() != MessageType::HeartbeatRequest) {
118
0
    context.setMessageType(MessageType::Oneway);
119
0
  }
120
0
}
121
122
0
void parseResponseInfoFromBuffer(Buffer::Instance& buffer, Context& context) {
123
0
  ASSERT(buffer.length() >= DubboCodec::HeadersSize);
124
0
  ResponseStatus status = static_cast<ResponseStatus>(buffer.peekInt<uint8_t>(StatusOffset));
125
0
  if (!isValidResponseStatus(status)) {
126
0
    throw EnvoyException(
127
0
        absl::StrCat("invalid dubbo message response status ",
128
0
                     static_cast<std::underlying_type<ResponseStatus>::type>(status)));
129
0
  }
130
0
  context.setResponseStatus(status);
131
132
0
  if (status != ResponseStatus::Ok) {
133
0
    context.setMessageType(MessageType::Exception);
134
0
  }
135
0
}
136
137
0
DubboCodecPtr DubboCodec::codecFromSerializeType(SerializeType type) {
138
0
  ASSERT(type == SerializeType::Hessian2);
139
140
0
  auto codec = std::make_unique<DubboCodec>();
141
0
  codec->initilize(std::make_unique<Hessian2SerializerImpl>());
142
0
  return codec;
143
0
}
144
145
0
DecodeStatus DubboCodec::decodeHeader(Buffer::Instance& buffer, MessageMetadata& metadata) {
146
  // Empty metadata.
147
0
  ASSERT(!metadata.hasContext());
148
149
0
  if (buffer.length() < DubboCodec::HeadersSize) {
150
0
    return DecodeStatus::Waiting;
151
0
  }
152
153
0
  uint16_t magic_number = buffer.peekBEInt<uint16_t>();
154
0
  if (magic_number != MagicNumber) {
155
0
    throw EnvoyException(absl::StrCat("invalid dubbo message magic number ", magic_number));
156
0
  }
157
158
0
  auto context = std::make_unique<Context>();
159
160
0
  uint8_t flag = buffer.peekInt<uint8_t>(FlagOffset);
161
162
  // Decode serialize type.
163
0
  SerializeType serialize_type = static_cast<SerializeType>(flag & SerializeTypeMask);
164
0
  if (!isValidSerializeType(serialize_type)) {
165
0
    throw EnvoyException(
166
0
        absl::StrCat("invalid dubbo message serialization type ",
167
0
                     static_cast<std::underlying_type<SerializeType>::type>(serialize_type)));
168
0
  }
169
170
  // Initial basic type of message.
171
0
  MessageType type =
172
0
      (flag & MessageTypeMask) == MessageTypeMask ? MessageType::Request : MessageType::Response;
173
174
0
  bool is_event = (flag & EventMask) == EventMask ? true : false;
175
176
0
  int64_t request_id = buffer.peekBEInt<int64_t>(RequestIDOffset);
177
178
0
  int32_t body_size = buffer.peekBEInt<int32_t>(BodySizeOffset);
179
180
  // The body size of the heartbeat message is zero.
181
0
  if (body_size > MaxBodySize || body_size < 0) {
182
0
    throw EnvoyException(absl::StrCat("invalid dubbo message size ", body_size));
183
0
  }
184
185
0
  context->setRequestId(request_id);
186
187
0
  if (type == MessageType::Request) {
188
0
    if (is_event) {
189
0
      type = MessageType::HeartbeatRequest;
190
0
    }
191
0
    context->setMessageType(type);
192
0
    parseRequestInfoFromBuffer(buffer, *context);
193
0
  } else {
194
0
    if (is_event) {
195
0
      type = MessageType::HeartbeatResponse;
196
0
    }
197
0
    context->setMessageType(type);
198
0
    parseResponseInfoFromBuffer(buffer, *context);
199
0
  }
200
201
0
  context->setBodySize(body_size);
202
203
0
  metadata.setContext(std::move(context));
204
205
  // Drain headers bytes.
206
0
  buffer.drain(DubboCodec::HeadersSize);
207
0
  return DecodeStatus::Success;
208
0
}
209
210
0
DecodeStatus DubboCodec::decodeData(Buffer::Instance& buffer, MessageMetadata& metadata) {
211
0
  ASSERT(metadata.hasContext());
212
0
  ASSERT(serializer_ != nullptr);
213
214
0
  auto& context = metadata.mutableContext();
215
216
0
  if (buffer.length() < context.bodySize()) {
217
0
    return DecodeStatus::Waiting;
218
0
  }
219
220
0
  switch (context.messageType()) {
221
0
  case MessageType::Response:
222
0
  case MessageType::Exception:
223
0
  case MessageType::HeartbeatResponse:
224
    // Handle response.
225
0
    metadata.setResponse(serializer_->deserializeRpcResponse(buffer, context));
226
0
    break;
227
0
  case MessageType::Request:
228
0
  case MessageType::Oneway:
229
0
  case MessageType::HeartbeatRequest:
230
    // Handle request.
231
0
    metadata.setRequest(serializer_->deserializeRpcRequest(buffer, context));
232
0
    break;
233
0
  default:
234
0
    PANIC_DUE_TO_CORRUPT_ENUM;
235
0
  }
236
237
0
  return DecodeStatus::Success;
238
0
}
239
240
0
void DubboCodec::encode(Buffer::Instance& buffer, MessageMetadata& metadata) {
241
0
  ASSERT(metadata.hasContext());
242
0
  ASSERT(serializer_);
243
244
0
  auto& context = metadata.mutableContext();
245
246
0
  Buffer::OwnedImpl body_buffer;
247
248
0
  switch (context.messageType()) {
249
0
  case MessageType::Response:
250
0
  case MessageType::Exception:
251
0
  case MessageType::HeartbeatResponse:
252
0
    serializer_->serializeRpcResponse(body_buffer, metadata);
253
0
    break;
254
0
  case MessageType::Request:
255
0
  case MessageType::Oneway:
256
0
  case MessageType::HeartbeatRequest:
257
0
    serializer_->serializeRpcRequest(body_buffer, metadata);
258
0
    break;
259
0
  default:
260
0
    PANIC_DUE_TO_CORRUPT_ENUM;
261
0
  }
262
263
0
  encodeHeader(buffer, context, body_buffer.length());
264
0
  buffer.move(body_buffer);
265
0
}
266
267
0
void DubboCodec::encodeHeaderForTest(Buffer::Instance& buffer, Context& context) {
268
0
  encodeHeader(buffer, context, context.bodySize());
269
0
}
270
271
0
MessageMetadataSharedPtr DirectResponseUtil::heartbeatResponse(MessageMetadata& heartbeat_request) {
272
0
  ASSERT(heartbeat_request.hasContext());
273
0
  ASSERT(heartbeat_request.messageType() == MessageType::HeartbeatRequest);
274
0
  const auto& request_context = heartbeat_request.context();
275
0
  auto context = std::make_unique<Context>();
276
277
  // Set context.
278
0
  context->setMessageType(MessageType::HeartbeatResponse);
279
0
  context->setResponseStatus(ResponseStatus::Ok);
280
0
  context->setRequestId(request_context.requestId());
281
282
0
  auto metadata = std::make_shared<MessageMetadata>();
283
0
  metadata->setContext(std::move(context));
284
0
  return metadata;
285
0
}
286
287
MessageMetadataSharedPtr DirectResponseUtil::localResponse(MessageMetadata& request,
288
                                                           ResponseStatus status,
289
                                                           absl::optional<RpcResponseType> type,
290
0
                                                           absl::string_view content) {
291
0
  if (!request.hasContext()) {
292
0
    request.setContext(std::make_unique<Context>());
293
0
  }
294
295
0
  const auto& request_context = request.context();
296
0
  auto context = std::make_unique<Context>();
297
298
  // Set context.
299
0
  if (status != ResponseStatus::Ok) {
300
0
    context->setMessageType(MessageType::Exception);
301
0
  } else if (type.has_value() &&
302
0
             (type.value() == RpcResponseType::ResponseWithException ||
303
0
              type.value() == RpcResponseType::ResponseWithExceptionWithAttachments)) {
304
0
    context->setMessageType(MessageType::Exception);
305
0
  } else {
306
0
    context->setMessageType(MessageType::Response);
307
0
  }
308
309
0
  context->setResponseStatus(status);
310
0
  context->setRequestId(request_context.requestId());
311
312
  // Set response.
313
0
  auto response = std::make_unique<RpcResponse>();
314
0
  if (status == ResponseStatus::Ok) {
315
    // No response type for non-Ok response.
316
0
    response->setResponseType(type.value_or(RpcResponseType::ResponseWithValue));
317
0
  }
318
0
  response->content().initialize(std::make_unique<Hessian2::StringObject>(content), {});
319
320
0
  auto metadata = std::make_shared<MessageMetadata>();
321
0
  metadata->setContext(std::move(context));
322
0
  metadata->setResponse(std::move(response));
323
324
0
  return metadata;
325
0
}
326
327
} // namespace Dubbo
328
} // namespace Common
329
} // namespace Extensions
330
} // namespace Envoy