1
#include "source/common/grpc/common.h"
2

            
3
#include <atomic>
4
#include <cstdint>
5
#include <cstring>
6
#include <string>
7

            
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/buffer/zero_copy_input_stream_impl.h"
10
#include "source/common/common/assert.h"
11
#include "source/common/common/base64.h"
12
#include "source/common/common/empty_string.h"
13
#include "source/common/common/enum_to_int.h"
14
#include "source/common/common/fmt.h"
15
#include "source/common/common/macros.h"
16
#include "source/common/common/safe_memcpy.h"
17
#include "source/common/common/utility.h"
18
#include "source/common/grpc/codec.h"
19
#include "source/common/http/header_utility.h"
20
#include "source/common/http/headers.h"
21
#include "source/common/http/message_impl.h"
22
#include "source/common/http/utility.h"
23
#include "source/common/protobuf/protobuf.h"
24

            
25
#include "absl/container/fixed_array.h"
26
#include "absl/strings/match.h"
27
#include "absl/strings/str_cat.h"
28

            
29
namespace Envoy {
30
namespace Grpc {
31

            
32
294434
bool Common::hasGrpcContentType(const Http::RequestOrResponseHeaderMap& headers) {
33
294434
  const absl::string_view content_type = headers.getContentTypeValue();
34
  // Content type is gRPC if it is exactly "application/grpc" or starts with
35
  // "application/grpc+". Specifically, something like application/grpc-web is not gRPC.
36
294434
  return absl::StartsWith(content_type, Http::Headers::get().ContentTypeValues.Grpc) &&
37
294434
         (content_type.size() == Http::Headers::get().ContentTypeValues.Grpc.size() ||
38
11439
          content_type[Http::Headers::get().ContentTypeValues.Grpc.size()] == '+');
39
294434
}
40

            
41
19
bool Common::hasConnectProtocolVersionHeader(const Http::RequestOrResponseHeaderMap& headers) {
42
19
  return !headers.get(Http::CustomHeaders::get().ConnectProtocolVersion).empty();
43
19
}
44

            
45
37
bool Common::hasConnectStreamingContentType(const Http::RequestOrResponseHeaderMap& headers) {
46
  // Consider the request a connect request if the content type starts with "application/connect+".
47
37
  static constexpr absl::string_view connect_prefix{"application/connect+"};
48
37
  const absl::string_view content_type = headers.getContentTypeValue();
49
37
  return absl::StartsWith(content_type, connect_prefix);
50
37
}
51

            
52
13
bool Common::hasProtobufContentType(const Http::RequestOrResponseHeaderMap& headers) {
53
13
  return headers.getContentTypeValue() == Http::Headers::get().ContentTypeValues.Protobuf;
54
13
}
55

            
56
296865
bool Common::isGrpcRequestHeaders(const Http::RequestHeaderMap& headers) {
57
296865
  if (!headers.Path()) {
58
3114
    return false;
59
3114
  }
60
293751
  return hasGrpcContentType(headers);
61
296865
}
62

            
63
19
bool Common::isConnectRequestHeaders(const Http::RequestHeaderMap& headers) {
64
19
  if (!headers.Path()) {
65
    return false;
66
  }
67
19
  return hasConnectProtocolVersionHeader(headers);
68
19
}
69

            
70
19
bool Common::isConnectStreamingRequestHeaders(const Http::RequestHeaderMap& headers) {
71
19
  if (!headers.Path()) {
72
    return false;
73
  }
74
19
  return hasConnectStreamingContentType(headers);
75
19
}
76

            
77
16
bool Common::isProtobufRequestHeaders(const Http::RequestHeaderMap& headers) {
78
16
  if (!headers.Path()) {
79
3
    return false;
80
3
  }
81
13
  return hasProtobufContentType(headers);
82
16
}
83

            
84
806
bool Common::isGrpcResponseHeaders(const Http::ResponseHeaderMap& headers, bool end_stream) {
85
806
  if (end_stream) {
86
    // Trailers-only response, only grpc-status is required.
87
73
    return headers.GrpcStatus() != nullptr;
88
73
  }
89
733
  if (Http::Utility::getResponseStatus(headers) != enumToInt(Http::Code::OK)) {
90
78
    return false;
91
78
  }
92
655
  return hasGrpcContentType(headers);
93
733
}
94

            
95
19
bool Common::isConnectStreamingResponseHeaders(const Http::ResponseHeaderMap& headers) {
96
19
  if (Http::Utility::getResponseStatus(headers) != enumToInt(Http::Code::OK)) {
97
1
    return false;
98
1
  }
99
18
  return hasConnectStreamingContentType(headers);
100
19
}
101

            
102
absl::optional<Status::GrpcStatus>
103
6422
Common::getGrpcStatus(const Http::ResponseHeaderOrTrailerMap& trailers, bool allow_user_defined) {
104
6422
  const absl::string_view grpc_status_header = trailers.getGrpcStatusValue();
105
6422
  uint64_t grpc_status_code;
106

            
107
6422
  if (grpc_status_header.empty()) {
108
4191
    return absl::nullopt;
109
4191
  }
110
2231
  if (!absl::SimpleAtoi(grpc_status_header, &grpc_status_code) ||
111
2231
      (grpc_status_code > Status::WellKnownGrpcStatus::MaximumKnown && !allow_user_defined)) {
112
25
    return {Status::WellKnownGrpcStatus::InvalidCode};
113
25
  }
114
2206
  return {static_cast<Status::GrpcStatus>(grpc_status_code)};
115
2231
}
116

            
117
absl::optional<Status::GrpcStatus> Common::getGrpcStatus(const Http::ResponseTrailerMap& trailers,
118
                                                         const Http::ResponseHeaderMap& headers,
119
                                                         const StreamInfo::StreamInfo& info,
120
206
                                                         bool allow_user_defined) {
121
  // The gRPC specification does not guarantee a gRPC status code will be returned from a gRPC
122
  // request. When it is returned, it will be in the response trailers. With that said, Envoy will
123
  // treat a trailers-only response as a headers-only response, so we have to check the following
124
  // in order:
125
  //   1. trailers gRPC status, if it exists.
126
  //   2. headers gRPC status, if it exists.
127
  //   3. Inferred from info HTTP status, if it exists.
128
206
  absl::optional<Grpc::Status::GrpcStatus> optional_status;
129
206
  optional_status = Grpc::Common::getGrpcStatus(trailers, allow_user_defined);
130
206
  if (optional_status.has_value()) {
131
158
    return optional_status;
132
158
  }
133
48
  optional_status = Grpc::Common::getGrpcStatus(headers, allow_user_defined);
134
48
  if (optional_status.has_value()) {
135
27
    return optional_status;
136
27
  }
137
21
  return info.responseCode() ? absl::optional<Grpc::Status::GrpcStatus>(
138
11
                                   Grpc::Utility::httpToGrpcStatus(info.responseCode().value()))
139
21
                             : absl::nullopt;
140
48
}
141

            
142
852
std::string Common::getGrpcMessage(const Http::ResponseHeaderOrTrailerMap& trailers) {
143
852
  const auto entry = trailers.GrpcMessage();
144
852
  return entry ? std::string(entry->value().getStringView()) : EMPTY_STRING;
145
852
}
146

            
147
absl::optional<google::rpc::Status>
148
16
Common::getGrpcStatusDetailsBin(const Http::HeaderMap& trailers) {
149
16
  const auto details_header = trailers.get(Http::Headers::get().GrpcStatusDetailsBin);
150
16
  if (details_header.empty()) {
151
8
    return absl::nullopt;
152
8
  }
153

            
154
  // Some implementations use non-padded base64 encoding for grpc-status-details-bin.
155
  // This is effectively a trusted header so using the first value is fine.
156
8
  auto decoded_value = Base64::decodeWithoutPadding(details_header[0]->value().getStringView());
157
8
  if (decoded_value.empty()) {
158
1
    return absl::nullopt;
159
1
  }
160

            
161
7
  google::rpc::Status status;
162
7
  if (!status.ParseFromString(decoded_value)) {
163
    return absl::nullopt;
164
  }
165

            
166
7
  return {std::move(status)};
167
7
}
168

            
169
99670
Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& message) {
170
  // http://www.grpc.io/docs/guides/wire.html
171
  // Reserve enough space for the entire message and the 5 byte header.
172
  // NB: we do not use prependGrpcFrameHeader because that would add another BufferFragment and this
173
  // (using a single BufferFragment) is more efficient.
174
99670
  Buffer::InstancePtr body(new Buffer::OwnedImpl());
175
99670
  const uint32_t size = message.ByteSize();
176
99670
  const uint32_t alloc_size = size + 5;
177
99670
  auto reservation = body->reserveSingleSlice(alloc_size);
178
99670
  ASSERT(reservation.slice().len_ >= alloc_size);
179
99670
  uint8_t* current = reinterpret_cast<uint8_t*>(reservation.slice().mem_);
180
99670
  *current++ = 0; // flags
181
99670
  const uint32_t nsize = htonl(size);
182
99670
  safeMemcpyUnsafeDst(current, &nsize);
183
99670
  current += sizeof(uint32_t);
184
99670
  Protobuf::io::ArrayOutputStream stream(current, size, -1);
185
99670
  Protobuf::io::CodedOutputStream codec_stream(&stream);
186
99670
  message.SerializeWithCachedSizes(&codec_stream);
187
99670
  reservation.commit(alloc_size);
188
99670
  return body;
189
99670
}
190

            
191
138771
Buffer::InstancePtr Common::serializeMessage(const Protobuf::Message& message) {
192
138771
  auto body = std::make_unique<Buffer::OwnedImpl>();
193
138771
  const uint32_t size = message.ByteSize();
194
138771
  auto reservation = body->reserveSingleSlice(size);
195
138771
  ASSERT(reservation.slice().len_ >= size);
196
138771
  uint8_t* current = reinterpret_cast<uint8_t*>(reservation.slice().mem_);
197
138771
  Protobuf::io::ArrayOutputStream stream(current, size, -1);
198
138771
  Protobuf::io::CodedOutputStream codec_stream(&stream);
199
138771
  message.SerializeWithCachedSizes(&codec_stream);
200
138771
  reservation.commit(size);
201
138771
  return body;
202
138771
}
203

            
204
absl::optional<std::chrono::milliseconds>
205
88005
Common::getGrpcTimeout(const Http::RequestHeaderMap& request_headers) {
206
88005
  const Http::HeaderEntry* header_grpc_timeout_entry = request_headers.GrpcTimeout();
207
88005
  std::chrono::milliseconds timeout;
208
88005
  if (header_grpc_timeout_entry) {
209
47
    int64_t grpc_timeout;
210
47
    absl::string_view timeout_entry = header_grpc_timeout_entry->value().getStringView();
211
47
    if (timeout_entry.empty()) {
212
      // Must be of the form TimeoutValue TimeoutUnit. See
213
      // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests.
214
1
      return absl::nullopt;
215
1
    }
216
    // TimeoutValue must be a positive integer of at most 8 digits.
217
46
    if (absl::SimpleAtoi(timeout_entry.substr(0, timeout_entry.size() - 1), &grpc_timeout) &&
218
46
        grpc_timeout >= 0 && static_cast<uint64_t>(grpc_timeout) <= MAX_GRPC_TIMEOUT_VALUE) {
219
41
      const char unit = timeout_entry[timeout_entry.size() - 1];
220
41
      switch (unit) {
221
2
      case 'H':
222
2
        return std::chrono::hours(grpc_timeout);
223
9
      case 'M':
224
9
        return std::chrono::minutes(grpc_timeout);
225
8
      case 'S':
226
8
        return std::chrono::seconds(grpc_timeout);
227
18
      case 'm':
228
18
        return std::chrono::milliseconds(grpc_timeout);
229
        break;
230
1
      case 'u':
231
1
        timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
232
1
            std::chrono::microseconds(grpc_timeout));
233
1
        if (timeout < std::chrono::microseconds(grpc_timeout)) {
234
1
          timeout++;
235
1
        }
236
1
        return timeout;
237
1
      case 'n':
238
1
        timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
239
1
            std::chrono::nanoseconds(grpc_timeout));
240
1
        if (timeout < std::chrono::nanoseconds(grpc_timeout)) {
241
1
          timeout++;
242
1
        }
243
1
        return timeout;
244
41
      }
245
41
    }
246
46
  }
247
87965
  return absl::nullopt;
248
88005
}
249

            
250
void Common::toGrpcTimeout(const std::chrono::milliseconds& timeout,
251
275
                           Http::RequestHeaderMap& headers) {
252
275
  uint64_t time = timeout.count();
253
275
  static const char units[] = "mSMH";
254
275
  const char* unit = units; // start with milliseconds
255
275
  if (time > MAX_GRPC_TIMEOUT_VALUE) {
256
7
    time /= 1000; // Convert from milliseconds to seconds
257
7
    unit++;
258
7
  }
259
288
  while (time > MAX_GRPC_TIMEOUT_VALUE) {
260
13
    if (*unit == 'H') {
261
3
      time = MAX_GRPC_TIMEOUT_VALUE; // No bigger unit available, clip to max 8 digit hours.
262
10
    } else {
263
10
      time /= 60; // Convert from seconds to minutes to hours
264
10
      unit++;
265
10
    }
266
13
  }
267
275
  headers.setGrpcTimeout(absl::StrCat(time, absl::string_view(unit, 1)));
268
275
}
269

            
270
Http::RequestMessagePtr
271
Common::prepareHeaders(absl::string_view host_name, absl::string_view service_full_name,
272
                       absl::string_view method_name,
273
2458
                       const absl::optional<std::chrono::milliseconds>& timeout) {
274
2458
  Http::RequestMessagePtr message(new Http::RequestMessageImpl());
275
2458
  message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
276
2458
  message->headers().setPath(absl::StrCat("/", service_full_name, "/", method_name));
277
2458
  message->headers().setHost(host_name);
278
  // According to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md TE should appear
279
  // before Timeout and ContentType.
280
2458
  message->headers().setReferenceTE(Http::Headers::get().TEValues.Trailers);
281
2458
  if (timeout) {
282
157
    toGrpcTimeout(timeout.value(), message->headers());
283
157
  }
284
2458
  message->headers().setReferenceContentType(Http::Headers::get().ContentTypeValues.Grpc);
285

            
286
2458
  return message;
287
2458
}
288

            
289
26014
const std::string& Common::typeUrlPrefix() {
290
26014
  CONSTRUCT_ON_FIRST_USE(std::string, "type.googleapis.com");
291
26014
}
292

            
293
22586
std::string Common::typeUrl(absl::string_view qualified_name) {
294
22586
  return absl::StrCat(typeUrlPrefix(), "/", qualified_name);
295
22586
}
296

            
297
69599
void Common::prependGrpcFrameHeader(Buffer::Instance& buffer) {
298
69599
  std::array<char, 5> header;
299
69599
  header[0] = GRPC_FH_DEFAULT; // flags
300
69599
  const uint32_t nsize = htonl(buffer.length());
301
69599
  safeMemcpyUnsafeDst(&header[1], &nsize);
302
69599
  buffer.prepend(absl::string_view(&header[0], 5));
303
69599
}
304

            
305
28
bool Common::parseBufferInstance(Buffer::InstancePtr&& buffer, Protobuf::Message& proto) {
306
28
  Buffer::ZeroCopyInputStreamImpl stream(std::move(buffer));
307
28
  return proto.ParseFromZeroCopyStream(&stream);
308
28
}
309

            
310
absl::optional<Common::RequestNames>
311
179
Common::resolveServiceAndMethod(const Http::HeaderEntry* path) {
312
179
  absl::optional<RequestNames> request_names;
313
179
  if (path == nullptr) {
314
    return request_names;
315
  }
316
179
  absl::string_view str = path->value().getStringView();
317
179
  str = str.substr(0, str.find('?'));
318
179
  const auto parts = StringUtil::splitToken(str, "/");
319
179
  if (parts.size() != 2) {
320
108
    return request_names;
321
108
  }
322
71
  request_names = RequestNames{parts[0], parts[1]};
323
71
  return request_names;
324
179
}
325

            
326
} // namespace Grpc
327
} // namespace Envoy