1
#include "source/extensions/common/dubbo/hessian2_serializer_impl.h"
2

            
3
#include <cstddef>
4

            
5
#include "envoy/common/exception.h"
6

            
7
#include "source/common/common/assert.h"
8
#include "source/common/common/macros.h"
9
#include "source/extensions/common/dubbo/hessian2_utils.h"
10
#include "source/extensions/common/dubbo/message.h"
11
#include "source/extensions/common/dubbo/metadata.h"
12

            
13
#include "hessian2/object.hpp"
14

            
15
namespace Envoy {
16
namespace Extensions {
17
namespace Common {
18
namespace Dubbo {
19

            
20
RpcRequestPtr Hessian2SerializerImpl::deserializeRpcRequest(Buffer::Instance& buffer,
21
7
                                                            Context& context) {
22
7
  ASSERT(context.bodySize() <= buffer.length());
23

            
24
  // Handle heartbeat.
25
7
  if (context.heartbeat()) {
26
1
    buffer.drain(context.bodySize());
27
1
    return nullptr;
28
1
  }
29

            
30
  // Handle normal request or oneway request.
31
6
  ASSERT(context.messageType() == MessageType::Request ||
32
6
         context.messageType() == MessageType::Oneway);
33
6
  ASSERT(context.bodySize() <= buffer.length());
34

            
35
6
  Hessian2::Decoder decoder(std::make_unique<BufferReader>(buffer));
36

            
37
  // TODO(zyfjeff): Add format checker
38
6
  auto dubbo_version = decoder.decode<std::string>();
39
6
  auto service_name = decoder.decode<std::string>();
40
6
  auto service_version = decoder.decode<std::string>();
41
6
  auto method_name = decoder.decode<std::string>();
42

            
43
6
  const auto decoded_size = decoder.offset();
44

            
45
6
  if (context.bodySize() < decoded_size) {
46
1
    throw EnvoyException(fmt::format("RpcRequest size({}) larger than body size({})", decoded_size,
47
1
                                     context.bodySize()));
48
1
  }
49

            
50
5
  if (dubbo_version == nullptr || service_name == nullptr || service_version == nullptr ||
51
5
      method_name == nullptr) {
52
1
    throw EnvoyException(fmt::format("RpcRequest has no request metadata"));
53
1
  }
54

            
55
4
  buffer.drain(decoded_size);
56

            
57
4
  auto request = std::make_unique<RpcRequest>(std::move(*dubbo_version), std::move(*service_name),
58
4
                                              std::move(*service_version), std::move(*method_name));
59
4
  request->content().initialize(buffer, context.bodySize() - decoded_size);
60

            
61
4
  return request;
62
5
}
63

            
64
RpcResponsePtr Hessian2SerializerImpl::deserializeRpcResponse(Buffer::Instance& buffer,
65
10
                                                              Context& context) {
66
10
  ASSERT(context.bodySize() <= buffer.length());
67

            
68
  // Handle heartbeat.
69
10
  if (context.heartbeat()) {
70
1
    buffer.drain(context.bodySize());
71
1
    return nullptr;
72
1
  }
73

            
74
9
  ASSERT(context.hasResponseStatus());
75

            
76
  // Handle normal response or exception response.
77
9
  ASSERT(context.messageType() == MessageType::Response ||
78
9
         context.messageType() == MessageType::Exception);
79

            
80
9
  ASSERT(context.hasResponseStatus());
81
9
  auto response = std::make_unique<RpcResponse>();
82

            
83
  // Non `Ok` response body has no response type info and skip deserialization.
84
9
  if (context.messageType() == MessageType::Exception) {
85
1
    ASSERT(context.responseStatus() != ResponseStatus::Ok);
86
1
    response->content().initialize(buffer, context.bodySize());
87
1
    return response;
88
1
  }
89

            
90
8
  Hessian2::Decoder decoder(std::make_unique<BufferReader>(buffer));
91
8
  auto type_value = decoder.decode<int32_t>();
92
8
  if (type_value == nullptr) {
93
1
    throw EnvoyException(fmt::format("Cannot parse RpcResponse type from buffer"));
94
1
  }
95

            
96
7
  const RpcResponseType type = static_cast<RpcResponseType>(*type_value);
97

            
98
7
  switch (type) {
99
1
  case RpcResponseType::ResponseWithException:
100
2
  case RpcResponseType::ResponseWithExceptionWithAttachments:
101
2
    context.setMessageType(MessageType::Exception);
102
2
    break;
103
  case RpcResponseType::ResponseWithNullValue:
104
1
  case RpcResponseType::ResponseNullValueWithAttachments:
105
2
  case RpcResponseType::ResponseWithValue:
106
4
  case RpcResponseType::ResponseValueWithAttachments:
107
4
    break;
108
1
  default:
109
1
    throw EnvoyException(fmt::format("not supported return type {}", static_cast<uint8_t>(type)));
110
7
  }
111

            
112
6
  const auto decoded_size = decoder.offset();
113

            
114
6
  if (context.bodySize() < decoded_size) {
115
1
    throw EnvoyException(fmt::format("RpcResponse size({}) large than body size({})", decoded_size,
116
1
                                     context.bodySize()));
117
1
  }
118

            
119
5
  buffer.drain(decoded_size);
120

            
121
5
  response->setResponseType(type);
122
5
  response->content().initialize(buffer, context.bodySize() - decoded_size);
123
5
  return response;
124
6
}
125

            
126
void Hessian2SerializerImpl::serializeRpcResponse(Buffer::Instance& buffer,
127
2
                                                  MessageMetadata& metadata) {
128
2
  ASSERT(metadata.hasContext());
129
2
  ASSERT(metadata.context().hasResponseStatus());
130

            
131
2
  const auto& context = metadata.context();
132

            
133
2
  if (context.heartbeat()) {
134
1
    buffer.writeByte('N');
135
1
    return;
136
1
  }
137

            
138
1
  ASSERT(metadata.hasResponse());
139
1
  if (auto type = metadata.response().responseType(); type.has_value()) {
140
1
    ASSERT(metadata.context().responseStatus() == ResponseStatus::Ok);
141
1
    buffer.writeByte(0x90 + static_cast<uint8_t>(type.value()));
142
1
  }
143

            
144
1
  buffer.add(metadata.response().content().buffer());
145
1
}
146

            
147
void Hessian2SerializerImpl::serializeRpcRequest(Buffer::Instance& buffer,
148
3
                                                 MessageMetadata& metadata) {
149

            
150
3
  ASSERT(metadata.hasContext());
151
3
  const auto& context = metadata.context();
152

            
153
3
  if (context.heartbeat()) {
154
1
    buffer.writeByte('N');
155
1
    return;
156
1
  }
157

            
158
2
  ASSERT(metadata.hasRequest());
159
2
  ASSERT(metadata.context().messageType() == MessageType::Request ||
160
2
         metadata.context().messageType() == MessageType::Oneway);
161

            
162
2
  Hessian2::Encoder encoder(std::make_unique<BufferWriter>(buffer));
163

            
164
2
  encoder.encode<absl::string_view>(metadata.request().version());
165
2
  encoder.encode<absl::string_view>(metadata.request().service());
166
2
  encoder.encode<absl::string_view>(metadata.request().serviceVersion());
167
2
  encoder.encode<absl::string_view>(metadata.request().method());
168

            
169
2
  buffer.add(metadata.request().content().buffer());
170
2
}
171

            
172
} // namespace Dubbo
173
} // namespace Common
174
} // namespace Extensions
175
} // namespace Envoy