Coverage Report

Created: 2021-09-27 09:58

/proc/self/cwd/src/message_reader.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Google Inc. All Rights Reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
//
15
////////////////////////////////////////////////////////////////////////////////
16
//
17
#include "grpc_transcoding/message_reader.h"
18
19
#include <memory>
20
21
#include "google/protobuf/io/zero_copy_stream_impl.h"
22
23
namespace google {
24
namespace grpc {
25
26
namespace transcoding {
27
28
namespace pb = ::google::protobuf;
29
namespace pbio = ::google::protobuf::io;
30
31
MessageReader::MessageReader(TranscoderInputStream* in)
32
    : in_(in),
33
      current_message_size_(0),
34
      have_current_message_size_(false),
35
571
      finished_(false) {}
36
37
namespace {
38
39
// A helper function that reads the given number of bytes from a
40
// ZeroCopyInputStream and copies it to the given buffer
41
bool ReadStream(pbio::ZeroCopyInputStream* stream, unsigned char* buffer,
42
201k
                int size) {
43
201k
  int size_in = 0;
44
201k
  const void* data_in = nullptr;
45
  // While we have bytes to read
46
593k
  while (size > 0) {
47
392k
    if (!stream->Next(&data_in, &size_in)) {
48
0
      return false;
49
0
    }
50
392k
    int to_copy = std::min(size, size_in);
51
392k
    memcpy(buffer, data_in, to_copy);
52
    // Advance buffer and subtract the size to reflect the number of bytes left
53
392k
    buffer += to_copy;
54
392k
    size -= to_copy;
55
    // Keep track of uncopied bytes
56
392k
    size_in -= to_copy;
57
392k
  }
58
  // Return the uncopied bytes
59
201k
  stream->BackUp(size_in);
60
201k
  return true;
61
201k
}
62
63
// A helper function to extract the size from a gRPC wire format message
64
// delimiter - see http://www.grpc.io/docs/guides/wire.html.
65
201k
uint32_t DelimiterToSize(const unsigned char* delimiter) {
66
201k
  unsigned size = 0;
67
  // Bytes 1-4 are big-endian 32-bit message size
68
201k
  size = size | static_cast<unsigned>(delimiter[1]);
69
201k
  size <<= 8;
70
201k
  size = size | static_cast<unsigned>(delimiter[2]);
71
201k
  size <<= 8;
72
201k
  size = size | static_cast<unsigned>(delimiter[3]);
73
201k
  size <<= 8;
74
201k
  size = size | static_cast<unsigned>(delimiter[4]);
75
201k
  return size;
76
201k
}
77
78
}  // namespace
79
80
3.68M
std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
81
3.68M
  if (Finished()) {
82
    // The stream has ended
83
0
    return nullptr;
84
0
  }
85
86
  // Check if we have the current message size. If not try to read it.
87
3.68M
  if (!have_current_message_size_) {
88
2.96M
    if (in_->BytesAvailable()
89
2.96M
        < static_cast<pb::int64>(kGrpcDelimiterByteSize)) {
90
      // We don't have 5 bytes available to read the length of the message.
91
      // Find out whether the stream is finished and return false.
92
2.76M
      finished_ = in_->Finished();
93
2.76M
      if (finished_ && in_->BytesAvailable() != 0) {
94
0
        status_ = google::protobuf::util::Status(
95
0
            google::protobuf::util::StatusCode::kInternal,
96
0
            "Incomplete gRPC frame header received");
97
0
      }
98
2.76M
      return nullptr;
99
2.76M
    }
100
101
    // Try to read the delimiter.
102
201k
    memset(delimiter_, 0, kGrpcDelimiterByteSize);
103
201k
    if (!ReadStream(in_, delimiter_, kGrpcDelimiterByteSize)) {
104
0
      finished_ = true;
105
0
      return nullptr;
106
0
    }
107
108
201k
    if (delimiter_[0] != 0) {
109
85
      status_ = google::protobuf::util::Status(
110
85
          google::protobuf::util::StatusCode::kInternal,
111
85
          "Unsupported gRPC frame flag: " + std::to_string(delimiter_[0]));
112
85
      return nullptr;
113
85
    }
114
115
201k
    current_message_size_ = DelimiterToSize(delimiter_);
116
201k
    have_current_message_size_ = true;
117
201k
  }
118
119
923k
  if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
120
722k
    if (in_->Finished()) {
121
0
      status_ = google::protobuf::util::Status(
122
0
          google::protobuf::util::StatusCode::kInternal,
123
0
          "Incomplete gRPC frame expected size: " +
124
0
              std::to_string(current_message_size_) + " actual size: " +
125
0
              std::to_string(in_->BytesAvailable()));
126
0
    }
127
    // We don't have a full message
128
722k
    return nullptr;
129
722k
  }
130
131
  // Reset the have_current_message_size_ for the next message
132
200k
  have_current_message_size_ = false;
133
134
  // We have a message! Use LimitingInputStream to wrap the input stream and
135
  // limit it to current_message_size_ bytes to cover only the current message.
136
200k
  return std::unique_ptr<pbio::ZeroCopyInputStream>(
137
200k
      new pbio::LimitingInputStream(in_, current_message_size_));
138
923k
}
139
140
3.68M
MessageAndGrpcFrame MessageReader::NextMessageAndGrpcFrame() {
141
3.68M
  MessageAndGrpcFrame out;
142
3.68M
  out.message = NextMessage();
143
3.68M
  memcpy(out.grpc_frame, delimiter_, kGrpcDelimiterByteSize);
144
3.68M
  return out;
145
3.68M
}
146
147
}  // namespace transcoding
148
149
}  // namespace grpc
150
}  // namespace google