1
#include "source/extensions/common/dubbo/message.h"
2

            
3
#include "source/common/common/logger.h"
4
#include "source/extensions/common/dubbo/hessian2_utils.h"
5

            
6
namespace Envoy {
7
namespace Extensions {
8
namespace Common {
9
namespace Dubbo {
10

            
11
12
void RequestContent::initialize(Buffer::Instance& buffer, uint64_t length) {
12
12
  ASSERT(content_buffer_.length() == 0, "content buffer has been initialized");
13

            
14
12
  content_buffer_.move(buffer, length);
15

            
16
  // Clear the types, arguments and attachments.
17
12
  types_.clear();
18
12
  argvs_.clear();
19
12
  attachs_.clear();
20

            
21
  // Set both decoded and updated to false since the content has been initialized
22
  // by raw buffer.
23
12
  decoded_ = false;
24
12
  updated_ = false;
25
12
}
26

            
27
12
void RequestContent::initialize(std::string&& types, ArgumentVec&& argvs, Attachments&& attachs) {
28
12
  ASSERT(content_buffer_.length() == 0, "content buffer has been initialized");
29

            
30
  // Set the types, arguments and attachments.
31
12
  types_ = std::move(types);
32
12
  argvs_ = std::move(argvs);
33
12
  attachs_ = std::move(attachs);
34

            
35
  // Encode the types, arguments and attachments into the content buffer.
36
12
  encodeEverything();
37

            
38
  // Set decoded to true since the content has been initialized by types,
39
  // arguments and attachments.
40
12
  decoded_ = true;
41
12
  updated_ = false;
42
12
}
43

            
44
16
const Buffer::Instance& RequestContent::buffer() {
45
  // Ensure the attachments in the buffer is latest.
46

            
47
16
  if (content_buffer_.length() == 0) {
48
4
    encodeEverything();
49
12
  } else {
50
12
    encodeAttachments();
51
12
  }
52

            
53
16
  return content_buffer_;
54
16
}
55

            
56
4
void RequestContent::bufferMoveTo(Buffer::Instance& buffer) { buffer.move(content_buffer_); }
57

            
58
12
const ArgumentVec& RequestContent::arguments() {
59
12
  lazyDecode();
60

            
61
12
  return argvs_;
62
12
}
63

            
64
24
const Attachments& RequestContent::attachments() {
65
24
  lazyDecode();
66

            
67
24
  return attachs_;
68
24
}
69

            
70
14
void RequestContent::setAttachment(absl::string_view key, absl::string_view val) {
71
14
  lazyDecode();
72

            
73
14
  updated_ = true;
74
14
  attachs_[key] = val;
75
14
}
76

            
77
3
void RequestContent::delAttachment(absl::string_view key) {
78
3
  lazyDecode();
79

            
80
3
  updated_ = true;
81
3
  attachs_.erase(key);
82
3
}
83

            
84
58
void RequestContent::lazyDecode() {
85
58
  if (decoded_) {
86
47
    return;
87
47
  }
88
11
  decoded_ = true;
89

            
90
  // Decode the content buffer into types, arguments and attachments.
91
11
  Hessian2::Decoder decoder(std::make_unique<BufferReader>(content_buffer_));
92

            
93
  // Handle the types and arguments.
94
11
  if (auto element = decoder.decode<Hessian2::Object>(); element != nullptr) {
95
10
    uint32_t number = 0;
96

            
97
10
    if (element->type() == Hessian2::Object::Type::Integer) {
98
4
      ASSERT(element->toInteger().has_value());
99
4
      if (int32_t direct_num = element->toInteger().value(); direct_num == -1) {
100
2
        if (auto types = decoder.decode<std::string>(); types != nullptr) {
101
1
          types_ = *types;
102
1
          number = Hessian2Utils::getParametersNumber(types_);
103
1
        } else {
104
1
          ENVOY_LOG(error, "Cannot parse RpcInvocation parameter types from buffer");
105
1
          handleBrokenValue();
106
1
          return;
107
1
        }
108
2
      } else if (direct_num >= 0) {
109
1
        number = direct_num;
110
1
      } else {
111
1
        ENVOY_LOG(error, "Invalid RpcInvocation parameter number {}", direct_num);
112
1
        handleBrokenValue();
113
1
        return;
114
1
      }
115
7
    } else if (element->type() == Hessian2::Object::Type::String) {
116
6
      ASSERT(element->toString().has_value());
117
6
      types_ = element->toString().value().get();
118
6
      number = Hessian2Utils::getParametersNumber(types_);
119
6
    }
120

            
121
27
    for (uint32_t i = 0; i < number; i++) {
122
20
      if (auto result = decoder.decode<Hessian2::Object>(); result != nullptr) {
123
19
        argvs_.push_back(std::move(result));
124
19
      } else {
125
1
        ENVOY_LOG(error, "Cannot parse RpcInvocation parameter from buffer");
126
1
        handleBrokenValue();
127
1
        return;
128
1
      }
129
20
    }
130
8
  } else {
131
1
    ENVOY_LOG(error, "Cannot parse RpcInvocation from buffer");
132
1
    handleBrokenValue();
133
1
    return;
134
1
  }
135

            
136
  // Record the size of the arguments in the content buffer. This is useful for
137
  // re-encoding the attachments.
138
7
  argvs_size_ = decoder.offset();
139

            
140
  // Handle the attachments.
141
7
  auto map = decoder.decode<Hessian2::Object>();
142
7
  if (map == nullptr || map->type() != Hessian2::Object::Type::UntypedMap) {
143
2
    return;
144
2
  }
145

            
146
8
  for (auto& [key, val] : map->toMutableUntypedMap().value().get()) {
147
8
    if (key->type() != Hessian2::Object::Type::String ||
148
8
        val->type() != Hessian2::Object::Type::String) {
149
1
      continue;
150
1
    }
151
7
    attachs_.emplace(std::move(key->toMutableString().value().get()),
152
7
                     std::move(val->toMutableString().value().get()));
153
7
  }
154
5
}
155

            
156
12
void RequestContent::encodeAttachments() {
157
  // Do nothing if the attachments have not been updated.
158
12
  if (!updated_) {
159
7
    return;
160
7
  }
161

            
162
  // Ensure the content has been decoded before re-encoding it.
163
5
  lazyDecode();
164

            
165
5
  const uint64_t buffer_length = content_buffer_.length();
166
5
  ASSERT(buffer_length > 0, "content buffer is empty");
167

            
168
  // The size of arguments will be set when doing lazyDecode() or encodeEverything().
169
5
  if (buffer_length < argvs_size_) {
170
    ENVOY_LOG(error, "arguments size {} is larger than content buffer {}", argvs_size_,
171
              buffer_length);
172
    handleBrokenValue();
173
    return;
174
  }
175

            
176
  // Create a new buffer to hold the re-encoded content.
177

            
178
5
  Buffer::OwnedImpl new_content_buffer;
179
  // Copy the types and arguments into the new buffer.
180
5
  new_content_buffer.move(content_buffer_, argvs_size_);
181

            
182
  // Encode the attachments into the new buffer.
183
5
  Hessian2::Encoder encoder(std::make_unique<BufferWriter>(new_content_buffer));
184
5
  new_content_buffer.writeByte('H');
185
5
  for (auto& [key, val] : attachs_) {
186
4
    encoder.encode(key);
187
4
    encoder.encode(val);
188
4
  }
189
5
  new_content_buffer.writeByte('Z');
190

            
191
  // Clear the content buffer and move the new buffer into it.
192
5
  content_buffer_.drain(content_buffer_.length());
193
5
  content_buffer_.move(new_content_buffer);
194

            
195
5
  updated_ = false;
196
5
}
197

            
198
20
void RequestContent::encodeEverything() {
199
20
  ASSERT(content_buffer_.length() == 0, "content buffer contains something");
200

            
201
  // Encode the types, arguments and attachments into the content buffer.
202
20
  Hessian2::Encoder encoder(std::make_unique<BufferWriter>(content_buffer_));
203

            
204
  // Encode the types into the content buffer first.
205
20
  if (!types_.empty()) {
206
6
    encoder.encode(types_);
207
15
  } else if (!argvs_.empty()) {
208
1
    encoder.encode(static_cast<int32_t>(argvs_.size()));
209
13
  } else {
210
13
    encoder.encode(types_);
211
13
  }
212

            
213
  // Encode the arguments into the content buffer.
214
22
  for (auto& arg : argvs_) {
215
12
    encoder.encode(*arg);
216
12
  }
217

            
218
  // Record the size of the arguments in the content buffer. This is useful for
219
  // re-encoding the attachments.
220
20
  argvs_size_ = content_buffer_.length();
221

            
222
  // Encode the attachments into the content buffer.
223
20
  content_buffer_.writeByte('H');
224
20
  for (auto& [key, val] : attachs_) {
225
4
    encoder.encode(key);
226
4
    encoder.encode(val);
227
4
  }
228
20
  content_buffer_.writeByte('Z');
229

            
230
20
  updated_ = false;
231
20
}
232

            
233
4
void RequestContent::handleBrokenValue() {
234
  // Because the lazy decoding is used, Envoy cannot reject the message with broken
235
  // content. Instead, it will reset the whole content to an empty state.
236

            
237
  // Clear everything.
238
4
  content_buffer_.drain(content_buffer_.length());
239
4
  types_.clear();
240
4
  argvs_.clear();
241
4
  attachs_.clear();
242

            
243
  // Encode everything.
244
4
  encodeEverything();
245

            
246
4
  decoded_ = true;
247
4
  updated_ = false;
248
4
}
249

            
250
10
void ResponseContent::initialize(Buffer::Instance& buffer, uint64_t length) {
251
10
  ASSERT(content_buffer_.length() == 0, "content buffer has been initialized");
252
10
  content_buffer_.move(buffer, length);
253

            
254
  // Clear the result and attachments.
255
10
  result_ = nullptr;
256
10
  attachs_.clear();
257

            
258
  // Set both decoded and updated to false since the content has been initialized
259
  // by raw buffer.
260
10
  decoded_ = false;
261
10
  updated_ = false;
262
10
}
263

            
264
27
void ResponseContent::initialize(Hessian2::ObjectPtr&& value, Attachments&& attachs) {
265
27
  ASSERT(content_buffer_.length() == 0, "content buffer has been initialized");
266

            
267
  // Set the result and attachments.
268
27
  result_ = std::move(value);
269
27
  attachs_ = std::move(attachs);
270

            
271
  // Encode the result and attachments into the content buffer.
272
27
  encodeEverything();
273

            
274
  // Set decoded to true since the content has been initialized by result and attachments.
275
27
  decoded_ = true;
276
27
  updated_ = false;
277
27
}
278

            
279
7
const Buffer::Instance& ResponseContent::buffer() {
280
  // Ensure the attachments in the buffer is latest.
281
7
  if (content_buffer_.length() == 0) {
282
1
    encodeEverything();
283
6
  } else {
284
6
    encodeAttachments();
285
6
  }
286

            
287
7
  return content_buffer_;
288
7
}
289

            
290
1
void ResponseContent::bufferMoveTo(Buffer::Instance& buffer) { buffer.move(content_buffer_); }
291

            
292
13
const Hessian2::Object* ResponseContent::result() {
293
13
  lazyDecode();
294

            
295
13
  return result_.get();
296
13
}
297

            
298
9
const Attachments& ResponseContent::attachments() {
299
9
  lazyDecode();
300

            
301
9
  return attachs_;
302
9
}
303

            
304
5
void ResponseContent::setAttachment(absl::string_view key, absl::string_view val) {
305
5
  lazyDecode();
306

            
307
5
  updated_ = true;
308
5
  attachs_[key] = val;
309
5
}
310

            
311
1
void ResponseContent::delAttachment(absl::string_view key) {
312
1
  lazyDecode();
313

            
314
1
  updated_ = true;
315
1
  attachs_.erase(key);
316
1
}
317

            
318
31
void ResponseContent::lazyDecode() {
319
31
  if (decoded_) {
320
28
    return;
321
28
  }
322
3
  decoded_ = true;
323

            
324
  // Decode the content buffer into result and attachments.
325
3
  Hessian2::Decoder decoder(std::make_unique<BufferReader>(content_buffer_));
326

            
327
  // Handle the result.
328
3
  result_ = decoder.decode<Hessian2::Object>();
329

            
330
3
  if (result_ == nullptr) {
331
1
    ENVOY_LOG(error, "Cannot parse RpcResult from buffer");
332
1
    handleBrokenValue();
333
1
    return;
334
1
  }
335

            
336
  // Record the size of the result in the content buffer. This is useful for
337
  // re-encoding the attachments.
338
2
  result_size_ = decoder.offset();
339

            
340
  // Handle the attachments.
341
2
  auto map = decoder.decode<Hessian2::Object>();
342
2
  if (map == nullptr || map->type() != Hessian2::Object::Type::UntypedMap) {
343
1
    return;
344
1
  }
345

            
346
2
  for (auto& [key, val] : map->toMutableUntypedMap().value().get()) {
347
2
    if (key->type() != Hessian2::Object::Type::String ||
348
2
        val->type() != Hessian2::Object::Type::String) {
349
1
      continue;
350
1
    }
351
1
    attachs_.emplace(std::move(key->toMutableString().value().get()),
352
1
                     std::move(val->toMutableString().value().get()));
353
1
  }
354
1
}
355

            
356
6
void ResponseContent::encodeAttachments() {
357
6
  if (!updated_) {
358
3
    return;
359
3
  }
360

            
361
  // Ensure the content has been decoded before re-encoding it.
362
3
  lazyDecode();
363

            
364
3
  const uint64_t buffer_length = content_buffer_.length();
365
3
  ASSERT(buffer_length > 0, "content buffer is empty");
366

            
367
3
  if (buffer_length < result_size_) {
368
    ENVOY_LOG(error, "result size {} is larger than content buffer {}", result_size_,
369
              buffer_length);
370
    handleBrokenValue();
371
    return;
372
  }
373

            
374
  // Create a new buffer to hold the re-encoded content.
375
3
  Buffer::OwnedImpl new_content_buffer;
376

            
377
  // Copy the result into the new buffer.
378
3
  new_content_buffer.move(content_buffer_, result_size_);
379

            
380
  // Encode the attachments into the new buffer.
381
3
  Hessian2::Encoder encoder(std::make_unique<BufferWriter>(new_content_buffer));
382
3
  new_content_buffer.writeByte('H');
383
3
  for (auto& [key, val] : attachs_) {
384
2
    encoder.encode(key);
385
2
    encoder.encode(val);
386
2
  }
387
3
  new_content_buffer.writeByte('Z');
388

            
389
  // Clear the content buffer and move the new buffer into it.
390
3
  content_buffer_.drain(content_buffer_.length());
391
3
  content_buffer_.move(new_content_buffer);
392

            
393
3
  updated_ = false;
394
3
}
395

            
396
29
void ResponseContent::encodeEverything() {
397
29
  ASSERT(content_buffer_.length() == 0, "content buffer contains something");
398

            
399
  // Encode the result and attachments into the content buffer.
400
29
  Hessian2::Encoder encoder(std::make_unique<BufferWriter>(content_buffer_));
401
29
  if (result_ == nullptr) {
402
1
    content_buffer_.writeByte('N');
403
28
  } else {
404
28
    encoder.encode(*result_);
405
28
  }
406

            
407
  // Record the size of the result in the content buffer. This is useful for
408
  // re-encoding the attachments.
409
29
  result_size_ = content_buffer_.length();
410

            
411
29
  content_buffer_.writeByte('H');
412
29
  for (auto& [key, val] : attachs_) {
413
1
    encoder.encode(key);
414
1
    encoder.encode(val);
415
1
  }
416
29
  content_buffer_.writeByte('Z');
417

            
418
29
  updated_ = false;
419
29
}
420

            
421
1
void ResponseContent::handleBrokenValue() {
422
  // Because the lazy decoding is used, Envoy cannot reject the message with broken
423
  // content. Instead, it will reset the whole content to an empty state.
424

            
425
  // Clear everything.
426
1
  content_buffer_.drain(content_buffer_.length());
427
1
  result_ = nullptr;
428
1
  attachs_.clear();
429

            
430
  // Encode everything.
431
1
  encodeEverything();
432

            
433
1
  decoded_ = true;
434
1
  updated_ = false;
435
1
}
436

            
437
} // namespace Dubbo
438
} // namespace Common
439
} // namespace Extensions
440
} // namespace Envoy