1
#include "source/extensions/common/aws/eventstream/eventstream_parser.h"
2

            
3
#include <zlib.h>
4

            
5
#include "source/common/common/safe_memcpy.h"
6

            
7
#include "absl/base/internal/endian.h"
8
#include "absl/status/status.h"
9

            
10
namespace Envoy {
11
namespace Extensions {
12
namespace Common {
13
namespace Aws {
14
namespace Eventstream {
15

            
16
32
absl::StatusOr<ParseResult> EventstreamParser::parseMessage(absl::string_view buffer) {
17
  // Need at least the prelude to read total_length and validate prelude CRC
18
32
  if (buffer.size() < PRELUDE_SIZE) {
19
1
    return ParseResult{absl::nullopt, 0};
20
1
  }
21

            
22
31
  const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
23

            
24
  // Read prelude fields
25
31
  const uint32_t total_length = absl::big_endian::Load32(data + TOTAL_LENGTH_OFFSET);
26
31
  const uint32_t headers_length = absl::big_endian::Load32(data + HEADERS_LENGTH_OFFSET);
27
31
  const uint32_t prelude_crc = absl::big_endian::Load32(data + PRELUDE_CRC_OFFSET);
28

            
29
  // Validate total_length bounds to prevent unbounded buffering
30
31
  if (total_length < MIN_MESSAGE_SIZE) {
31
1
    return absl::InvalidArgumentError("Invalid message length");
32
1
  }
33
30
  if (total_length > MAX_TOTAL_LENGTH) {
34
1
    return absl::ResourceExhaustedError("Message length exceeds maximum");
35
1
  }
36

            
37
  // Validate headers_length doesn't exceed message size
38
29
  if (headers_length > total_length - PRELUDE_SIZE - TRAILER_SIZE) {
39
1
    return absl::InvalidArgumentError("Headers length exceeds message size");
40
1
  }
41

            
42
28
  if (headers_length > MAX_HEADERS_SIZE) {
43
1
    return absl::ResourceExhaustedError("Headers length exceeds maximum");
44
1
  }
45

            
46
  // Validate payload doesn't exceed maximum (24 MB)
47
27
  const uint32_t payload_length = total_length - PRELUDE_SIZE - headers_length - TRAILER_SIZE;
48
27
  if (payload_length > MAX_PAYLOAD_SIZE) {
49
1
    return absl::ResourceExhaustedError("Payload exceeds maximum size");
50
1
  }
51

            
52
  // Verify prelude CRC (covers first 8 bytes: total_length + headers_length)
53
26
  const uint32_t computed_prelude_crc = computeCrc32(buffer.substr(0, PRELUDE_CRC_OFFSET));
54
26
  if (computed_prelude_crc != prelude_crc) {
55
2
    return absl::DataLossError("Prelude CRC mismatch");
56
2
  }
57

            
58
  // Check if we have the complete message
59
24
  if (buffer.size() < total_length) {
60
1
    return ParseResult{absl::nullopt, 0};
61
1
  }
62

            
63
  // Verify message CRC (covers everything except the last 4 bytes)
64
23
  const uint32_t message_crc = absl::big_endian::Load32(data + total_length - TRAILER_SIZE);
65
23
  const uint32_t computed_message_crc = computeCrc32(buffer.substr(0, total_length - TRAILER_SIZE));
66
23
  if (computed_message_crc != message_crc) {
67
2
    return absl::DataLossError("Message CRC mismatch");
68
2
  }
69

            
70
  // Parse headers
71
21
  absl::string_view headers_bytes = buffer.substr(PRELUDE_SIZE, headers_length);
72
21
  auto headers_result = parseHeaders(headers_bytes);
73
21
  if (!headers_result.ok()) {
74
13
    return headers_result.status();
75
13
  }
76

            
77
  // Extract payload
78
8
  absl::string_view payload_data = buffer.substr(PRELUDE_SIZE + headers_length, payload_length);
79

            
80
8
  ParsedMessage parsed;
81
8
  parsed.headers = std::move(headers_result.value());
82
8
  parsed.payload_bytes = std::string(payload_data);
83

            
84
8
  return ParseResult{std::move(parsed), total_length};
85
21
}
86

            
87
absl::StatusOr<std::vector<Header>>
88
21
EventstreamParser::parseHeaders(absl::string_view headers_bytes) {
89
21
  std::vector<Header> headers;
90

            
91
21
  if (headers_bytes.empty()) {
92
5
    return headers;
93
5
  }
94

            
95
16
  const uint8_t* data = reinterpret_cast<const uint8_t*>(headers_bytes.data());
96
16
  size_t remaining = headers_bytes.size();
97

            
98
26
  while (remaining > 0) {
99
23
    const uint8_t name_length = data[0];
100
23
    if (name_length == 0) {
101
1
      return absl::InvalidArgumentError("Invalid header name length");
102
1
    }
103

            
104
    // Need name_length bytes + 1 byte for type
105
22
    if (remaining < NAME_LENGTH_SIZE + name_length + TYPE_SIZE) {
106
1
      return absl::InvalidArgumentError("Header truncated: missing name or type");
107
1
    }
108

            
109
21
    Header header;
110
21
    header.name = std::string(reinterpret_cast<const char*>(data + NAME_LENGTH_SIZE), name_length);
111

            
112
21
    const uint8_t type_byte = data[NAME_LENGTH_SIZE + name_length];
113

            
114
21
    header.value.type = static_cast<HeaderValueType>(type_byte);
115
21
    const size_t value_offset = NAME_LENGTH_SIZE + name_length + TYPE_SIZE;
116
21
    size_t bytes_consumed = 0;
117

            
118
21
    switch (header.value.type) {
119
1
    case HeaderValueType::BoolTrue:
120
2
    case HeaderValueType::BoolFalse:
121
2
      header.value.value = (header.value.type == HeaderValueType::BoolTrue);
122
2
      bytes_consumed = value_offset;
123
2
      break;
124

            
125
2
    case HeaderValueType::Byte:
126
2
      if (remaining < value_offset + BYTE_VALUE_SIZE) {
127
1
        return absl::InvalidArgumentError("Header truncated: missing byte value");
128
1
      }
129
1
      header.value.value = static_cast<int8_t>(data[value_offset]);
130
1
      bytes_consumed = value_offset + BYTE_VALUE_SIZE;
131
1
      break;
132

            
133
2
    case HeaderValueType::Short:
134
2
      if (remaining < value_offset + SHORT_VALUE_SIZE) {
135
1
        return absl::InvalidArgumentError("Header truncated: missing short value");
136
1
      }
137
1
      header.value.value = static_cast<int16_t>(absl::big_endian::Load16(data + value_offset));
138
1
      bytes_consumed = value_offset + SHORT_VALUE_SIZE;
139
1
      break;
140

            
141
2
    case HeaderValueType::Int32:
142
2
      if (remaining < value_offset + INT32_VALUE_SIZE) {
143
1
        return absl::InvalidArgumentError("Header truncated: missing int32 value");
144
1
      }
145
1
      header.value.value = static_cast<int32_t>(absl::big_endian::Load32(data + value_offset));
146
1
      bytes_consumed = value_offset + INT32_VALUE_SIZE;
147
1
      break;
148

            
149
2
    case HeaderValueType::Int64:
150
3
    case HeaderValueType::Timestamp:
151
3
      if (remaining < value_offset + INT64_VALUE_SIZE) {
152
1
        return absl::InvalidArgumentError("Header truncated: missing int64/timestamp value");
153
1
      }
154
2
      header.value.value = static_cast<int64_t>(absl::big_endian::Load64(data + value_offset));
155
2
      bytes_consumed = value_offset + INT64_VALUE_SIZE;
156
2
      break;
157

            
158
2
    case HeaderValueType::ByteArray:
159
7
    case HeaderValueType::String: {
160
7
      if (remaining < value_offset + STRING_LENGTH_SIZE) {
161
1
        return absl::InvalidArgumentError("Header truncated: missing string/bytes length");
162
1
      }
163
6
      const uint16_t value_length = absl::big_endian::Load16(data + value_offset);
164
6
      if (value_length == 0) {
165
2
        return absl::InvalidArgumentError("Header string/bytes value must not be empty");
166
2
      }
167
4
      if (value_length > MAX_HEADER_STRING_LENGTH) {
168
1
        return absl::InvalidArgumentError("Header value too long");
169
1
      }
170
3
      if (remaining < value_offset + STRING_LENGTH_SIZE + value_length) {
171
1
        return absl::InvalidArgumentError("Header truncated: missing string/bytes data");
172
1
      }
173
2
      header.value.value = std::string(
174
2
          reinterpret_cast<const char*>(data + value_offset + STRING_LENGTH_SIZE), value_length);
175
2
      bytes_consumed = value_offset + STRING_LENGTH_SIZE + value_length;
176
2
      break;
177
3
    }
178

            
179
2
    case HeaderValueType::Uuid: {
180
2
      if (remaining < value_offset + UUID_VALUE_SIZE) {
181
1
        return absl::InvalidArgumentError("Header truncated: missing uuid value");
182
1
      }
183
1
      std::array<uint8_t, 16> uuid;
184
      // Copies sizeof(uuid) == 16 bytes.
185
1
      safeMemcpyUnsafeSrc(&uuid, data + value_offset);
186
1
      header.value.value = uuid;
187
1
      bytes_consumed = value_offset + UUID_VALUE_SIZE;
188
1
      break;
189
2
    }
190

            
191
1
    default:
192
1
      return absl::InvalidArgumentError("Unknown header value type");
193
21
    }
194

            
195
10
    headers.push_back(std::move(header));
196
10
    data += bytes_consumed;
197
10
    remaining -= bytes_consumed;
198
10
  }
199

            
200
3
  return headers;
201
16
}
202

            
203
49
uint32_t EventstreamParser::computeCrc32(absl::string_view data, uint32_t initial_crc) {
204
49
  return crc32(initial_crc, reinterpret_cast<const Bytef*>(data.data()),
205
49
               static_cast<uInt>(data.size()));
206
49
}
207

            
208
} // namespace Eventstream
209
} // namespace Aws
210
} // namespace Common
211
} // namespace Extensions
212
} // namespace Envoy