1
#include "source/extensions/common/dubbo/codec.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/registry/registry.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/extensions/common/dubbo/hessian2_serializer_impl.h"
10
#include "source/extensions/common/dubbo/message.h"
11
#include "source/extensions/common/dubbo/metadata.h"
12

            
13
namespace Envoy {
14
namespace Extensions {
15
namespace Common {
16
namespace Dubbo {
17

            
18
namespace {
19

            
20
constexpr uint16_t MagicNumber = 0xdabb;
21
constexpr uint8_t MessageTypeMask = 0x80;
22
constexpr uint8_t EventMask = 0x20;
23
constexpr uint8_t TwoWayMask = 0x40;
24
constexpr uint8_t SerializeTypeMask = 0x1f;
25
constexpr uint64_t FlagOffset = 2;
26
constexpr uint64_t StatusOffset = 3;
27
constexpr uint64_t RequestIDOffset = 4;
28
constexpr uint64_t BodySizeOffset = 12;
29

            
30
10
void encodeHeader(Buffer::Instance& buffer, Context& context, uint32_t body_size) {
31
  // Magic number.
32
10
  buffer.writeBEInt<uint16_t>(MagicNumber);
33

            
34
  // Serialize type and flag.
35
10
  uint8_t flag = static_cast<uint8_t>(SerializeType::Hessian2);
36

            
37
10
  switch (context.messageType()) {
38
2
  case MessageType::Response:
39
    // Normal response
40
2
    break;
41
2
  case MessageType::Request:
42
    // Normal request.
43
2
    flag ^= MessageTypeMask;
44
2
    flag ^= TwoWayMask;
45
2
    break;
46
2
  case MessageType::Oneway:
47
    // Oneway request.
48
2
    flag ^= MessageTypeMask;
49
2
    break;
50
2
  case MessageType::Exception:
51
    // Exception response.
52
2
    break;
53
1
  case MessageType::HeartbeatRequest:
54
    // Event request.
55
1
    flag ^= MessageTypeMask;
56
1
    flag ^= TwoWayMask;
57
1
    flag ^= EventMask;
58
1
    break;
59
1
  case MessageType::HeartbeatResponse:
60
1
    flag ^= EventMask;
61
1
    break;
62
  default:
63
    PANIC_DUE_TO_CORRUPT_ENUM;
64
10
  }
65
10
  buffer.writeByte(flag);
66

            
67
  // Optional response status.
68
10
  buffer.writeByte(context.hasResponseStatus() ? static_cast<uint8_t>(context.responseStatus())
69
10
                                               : 0x00);
70

            
71
  // Request id.
72
10
  buffer.writeBEInt<uint64_t>(context.requestId());
73

            
74
  // Because the body size in the context is the size of original request or response.
75
  // It may be changed after the processing of filters. So write the explicit specified
76
  // body size here.
77
10
  buffer.writeBEInt<uint32_t>(body_size);
78
10
}
79

            
80
} // namespace
81

            
82
// Consistent with the SerializeType
83
15
bool isValidSerializeType(SerializeType type) {
84
15
  switch (type) {
85
14
  case SerializeType::Hessian2:
86
14
    break;
87
1
  default:
88
1
    return false;
89
15
  }
90
14
  return true;
91
15
}
92

            
93
// Consistent with the ResponseStatus
94
6
bool isValidResponseStatus(ResponseStatus status) {
95
6
  switch (status) {
96
4
  case ResponseStatus::Ok:
97
4
  case ResponseStatus::ClientTimeout:
98
4
  case ResponseStatus::ServerTimeout:
99
5
  case ResponseStatus::BadRequest:
100
5
  case ResponseStatus::BadResponse:
101
5
  case ResponseStatus::ServiceNotFound:
102
5
  case ResponseStatus::ServiceError:
103
5
  case ResponseStatus::ServerError:
104
5
  case ResponseStatus::ClientError:
105
5
  case ResponseStatus::ServerThreadpoolExhaustedError:
106
5
    return true;
107
6
  }
108
1
  return false;
109
6
}
110

            
111
7
void parseRequestInfoFromBuffer(Buffer::Instance& data, Context& context) {
112
7
  ASSERT(data.length() >= DubboCodec::HeadersSize);
113
7
  uint8_t flag = data.peekInt<uint8_t>(FlagOffset);
114
7
  bool is_two_way = (flag & TwoWayMask) == TwoWayMask ? true : false;
115

            
116
  // Request without two flag should be one way request.
117
7
  if (!is_two_way && context.messageType() != MessageType::HeartbeatRequest) {
118
1
    context.setMessageType(MessageType::Oneway);
119
1
  }
120
7
}
121

            
122
6
void parseResponseInfoFromBuffer(Buffer::Instance& buffer, Context& context) {
123
6
  ASSERT(buffer.length() >= DubboCodec::HeadersSize);
124
6
  ResponseStatus status = static_cast<ResponseStatus>(buffer.peekInt<uint8_t>(StatusOffset));
125
6
  if (!isValidResponseStatus(status)) {
126
1
    throw EnvoyException(
127
1
        absl::StrCat("invalid dubbo message response status ",
128
1
                     static_cast<std::underlying_type<ResponseStatus>::type>(status)));
129
1
  }
130
5
  context.setResponseStatus(status);
131

            
132
5
  if (status != ResponseStatus::Ok) {
133
1
    context.setMessageType(MessageType::Exception);
134
1
  }
135
5
}
136

            
137
3
DubboCodecPtr DubboCodec::codecFromSerializeType(SerializeType type) {
138
3
  ASSERT(type == SerializeType::Hessian2);
139

            
140
3
  auto codec = std::make_unique<DubboCodec>();
141
3
  codec->initilize(std::make_unique<Hessian2SerializerImpl>());
142
3
  return codec;
143
3
}
144

            
145
26
DecodeStatus DubboCodec::decodeHeader(Buffer::Instance& buffer, MessageMetadata& metadata) {
146
  // Empty metadata.
147
26
  ASSERT(!metadata.hasContext());
148

            
149
26
  if (buffer.length() < DubboCodec::HeadersSize) {
150
8
    return DecodeStatus::Waiting;
151
8
  }
152

            
153
18
  uint16_t magic_number = buffer.peekBEInt<uint16_t>();
154
18
  if (magic_number != MagicNumber) {
155
3
    throw EnvoyException(absl::StrCat("invalid dubbo message magic number ", magic_number));
156
3
  }
157

            
158
15
  auto context = std::make_unique<Context>();
159

            
160
15
  uint8_t flag = buffer.peekInt<uint8_t>(FlagOffset);
161

            
162
  // Decode serialize type.
163
15
  SerializeType serialize_type = static_cast<SerializeType>(flag & SerializeTypeMask);
164
15
  if (!isValidSerializeType(serialize_type)) {
165
1
    throw EnvoyException(
166
1
        absl::StrCat("invalid dubbo message serialization type ",
167
1
                     static_cast<std::underlying_type<SerializeType>::type>(serialize_type)));
168
1
  }
169

            
170
  // Initial basic type of message.
171
14
  MessageType type =
172
14
      (flag & MessageTypeMask) == MessageTypeMask ? MessageType::Request : MessageType::Response;
173

            
174
14
  bool is_event = (flag & EventMask) == EventMask ? true : false;
175

            
176
14
  int64_t request_id = buffer.peekBEInt<int64_t>(RequestIDOffset);
177

            
178
14
  int32_t body_size = buffer.peekBEInt<int32_t>(BodySizeOffset);
179

            
180
  // The body size of the heartbeat message is zero.
181
14
  if (body_size > MaxBodySize || body_size < 0) {
182
1
    throw EnvoyException(absl::StrCat("invalid dubbo message size ", body_size));
183
1
  }
184

            
185
13
  context->setRequestId(request_id);
186

            
187
13
  if (type == MessageType::Request) {
188
7
    if (is_event) {
189
3
      type = MessageType::HeartbeatRequest;
190
3
    }
191
7
    context->setMessageType(type);
192
7
    parseRequestInfoFromBuffer(buffer, *context);
193
8
  } else {
194
6
    if (is_event) {
195
1
      type = MessageType::HeartbeatResponse;
196
1
    }
197
6
    context->setMessageType(type);
198
6
    parseResponseInfoFromBuffer(buffer, *context);
199
6
  }
200

            
201
13
  context->setBodySize(body_size);
202

            
203
13
  metadata.setContext(std::move(context));
204

            
205
  // Drain headers bytes.
206
13
  buffer.drain(DubboCodec::HeadersSize);
207
13
  return DecodeStatus::Success;
208
14
}
209

            
210
11
DecodeStatus DubboCodec::decodeData(Buffer::Instance& buffer, MessageMetadata& metadata) {
211
11
  ASSERT(metadata.hasContext());
212
11
  ASSERT(serializer_ != nullptr);
213

            
214
11
  auto& context = metadata.mutableContext();
215

            
216
11
  if (buffer.length() < context.bodySize()) {
217
3
    return DecodeStatus::Waiting;
218
3
  }
219

            
220
8
  switch (context.messageType()) {
221
2
  case MessageType::Response:
222
2
  case MessageType::Exception:
223
3
  case MessageType::HeartbeatResponse:
224
    // Handle response.
225
3
    metadata.setResponse(serializer_->deserializeRpcResponse(buffer, context));
226
3
    break;
227
2
  case MessageType::Request:
228
2
  case MessageType::Oneway:
229
5
  case MessageType::HeartbeatRequest:
230
    // Handle request.
231
5
    metadata.setRequest(serializer_->deserializeRpcRequest(buffer, context));
232
5
    break;
233
  default:
234
    PANIC_DUE_TO_CORRUPT_ENUM;
235
8
  }
236

            
237
8
  return DecodeStatus::Success;
238
8
}
239

            
240
10
void DubboCodec::encode(Buffer::Instance& buffer, MessageMetadata& metadata) {
241
10
  ASSERT(metadata.hasContext());
242
10
  ASSERT(serializer_);
243

            
244
10
  auto& context = metadata.mutableContext();
245

            
246
10
  Buffer::OwnedImpl body_buffer;
247

            
248
10
  switch (context.messageType()) {
249
2
  case MessageType::Response:
250
4
  case MessageType::Exception:
251
5
  case MessageType::HeartbeatResponse:
252
5
    serializer_->serializeRpcResponse(body_buffer, metadata);
253
5
    break;
254
2
  case MessageType::Request:
255
4
  case MessageType::Oneway:
256
5
  case MessageType::HeartbeatRequest:
257
5
    serializer_->serializeRpcRequest(body_buffer, metadata);
258
5
    break;
259
  default:
260
    PANIC_DUE_TO_CORRUPT_ENUM;
261
10
  }
262

            
263
10
  encodeHeader(buffer, context, body_buffer.length());
264
10
  buffer.move(body_buffer);
265
10
}
266

            
267
void DubboCodec::encodeHeaderForTest(Buffer::Instance& buffer, Context& context) {
268
  encodeHeader(buffer, context, context.bodySize());
269
}
270

            
271
1
MessageMetadataSharedPtr DirectResponseUtil::heartbeatResponse(MessageMetadata& heartbeat_request) {
272
1
  ASSERT(heartbeat_request.hasContext());
273
1
  ASSERT(heartbeat_request.messageType() == MessageType::HeartbeatRequest);
274
1
  const auto& request_context = heartbeat_request.context();
275
1
  auto context = std::make_unique<Context>();
276

            
277
  // Set context.
278
1
  context->setMessageType(MessageType::HeartbeatResponse);
279
1
  context->setResponseStatus(ResponseStatus::Ok);
280
1
  context->setRequestId(request_context.requestId());
281

            
282
1
  auto metadata = std::make_shared<MessageMetadata>();
283
1
  metadata->setContext(std::move(context));
284
1
  return metadata;
285
1
}
286

            
287
MessageMetadataSharedPtr DirectResponseUtil::localResponse(MessageMetadata& request,
288
                                                           ResponseStatus status,
289
                                                           absl::optional<RpcResponseType> type,
290
26
                                                           absl::string_view content) {
291
26
  if (!request.hasContext()) {
292
1
    request.setContext(std::make_unique<Context>());
293
1
  }
294

            
295
26
  const auto& request_context = request.context();
296
26
  auto context = std::make_unique<Context>();
297

            
298
  // Set context.
299
26
  if (status != ResponseStatus::Ok) {
300
13
    context->setMessageType(MessageType::Exception);
301
15
  } else if (type.has_value() &&
302
13
             (type.value() == RpcResponseType::ResponseWithException ||
303
12
              type.value() == RpcResponseType::ResponseWithExceptionWithAttachments)) {
304
5
    context->setMessageType(MessageType::Exception);
305
8
  } else {
306
8
    context->setMessageType(MessageType::Response);
307
8
  }
308

            
309
26
  context->setResponseStatus(status);
310
26
  context->setRequestId(request_context.requestId());
311

            
312
  // Set response.
313
26
  auto response = std::make_unique<RpcResponse>();
314
26
  if (status == ResponseStatus::Ok) {
315
    // No response type for non-Ok response.
316
13
    response->setResponseType(type.value_or(RpcResponseType::ResponseWithValue));
317
13
  }
318
26
  response->content().initialize(std::make_unique<Hessian2::StringObject>(content), {});
319

            
320
26
  auto metadata = std::make_shared<MessageMetadata>();
321
26
  metadata->setContext(std::move(context));
322
26
  metadata->setResponse(std::move(response));
323

            
324
26
  return metadata;
325
26
}
326

            
327
} // namespace Dubbo
328
} // namespace Common
329
} // namespace Extensions
330
} // namespace Envoy