Coverage Report

Created: 2024-02-25 06:31

/proc/self/cwd/src/message_reader.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Google Inc. All Rights Reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
//
15
////////////////////////////////////////////////////////////////////////////////
16
//
17
#include "grpc_transcoding/message_reader.h"
18
19
#include <memory>
20
21
#include "google/protobuf/io/zero_copy_stream_impl.h"
22
23
namespace google {
24
namespace grpc {
25
26
namespace transcoding {
27
28
namespace pb = ::google::protobuf;
29
namespace pbio = ::google::protobuf::io;
30
31
MessageReader::MessageReader(TranscoderInputStream* in)
32
    : in_(in),
33
      current_message_size_(0),
34
      have_current_message_size_(false),
35
615
      finished_(false) {}
36
37
namespace {
38
39
// A helper function that reads the given number of bytes from a
40
// ZeroCopyInputStream and copies it to the given buffer
41
bool ReadStream(pbio::ZeroCopyInputStream* stream, unsigned char* buffer,
42
57.0k
                int size) {
43
57.0k
  int size_in = 0;
44
57.0k
  const void* data_in = nullptr;
45
  // While we have bytes to read
46
286k
  while (size > 0) {
47
229k
    if (!stream->Next(&data_in, &size_in)) {
48
0
      return false;
49
0
    }
50
229k
    int to_copy = std::min(size, size_in);
51
229k
    memcpy(buffer, data_in, to_copy);
52
    // Advance buffer and subtract the size to reflect the number of bytes left
53
229k
    buffer += to_copy;
54
229k
    size -= to_copy;
55
    // Keep track of uncopied bytes
56
229k
    size_in -= to_copy;
57
229k
  }
58
  // Return the uncopied bytes
59
57.0k
  stream->BackUp(size_in);
60
57.0k
  return true;
61
57.0k
}
62
63
// A helper function to extract the size from a gRPC wire format message
64
// delimiter - see http://www.grpc.io/docs/guides/wire.html.
65
56.9k
uint32_t DelimiterToSize(const unsigned char* delimiter) {
66
56.9k
  unsigned size = 0;
67
  // Bytes 1-4 are big-endian 32-bit message size
68
56.9k
  size = size | static_cast<unsigned>(delimiter[1]);
69
56.9k
  size <<= 8;
70
56.9k
  size = size | static_cast<unsigned>(delimiter[2]);
71
56.9k
  size <<= 8;
72
56.9k
  size = size | static_cast<unsigned>(delimiter[3]);
73
56.9k
  size <<= 8;
74
56.9k
  size = size | static_cast<unsigned>(delimiter[4]);
75
56.9k
  return size;
76
56.9k
}
77
78
}  // namespace
79
80
2.62M
std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
81
2.62M
  if (Finished()) {
82
    // The stream has ended
83
0
    return nullptr;
84
0
  }
85
86
  // Check if we have the current message size. If not try to read it.
87
2.62M
  if (!have_current_message_size_) {
88
2.15M
    if (in_->BytesAvailable() <
89
2.15M
        static_cast<pb::int64>(kGrpcDelimiterByteSize)) {
90
      // We don't have 5 bytes available to read the length of the message.
91
      // Find out whether the stream is finished and return false.
92
2.09M
      finished_ = in_->Finished();
93
2.09M
      if (finished_ && in_->BytesAvailable() != 0) {
94
0
        status_ = absl::Status(
95
0
            absl::StatusCode::kInternal,
96
0
            "Incomplete gRPC frame header received");
97
0
      }
98
2.09M
      return nullptr;
99
2.09M
    }
100
101
    // Try to read the delimiter.
102
57.0k
    memset(delimiter_, 0, kGrpcDelimiterByteSize);
103
57.0k
    if (!ReadStream(in_, delimiter_, kGrpcDelimiterByteSize)) {
104
0
      finished_ = true;
105
0
      return nullptr;
106
0
    }
107
108
57.0k
    if (delimiter_[0] != 0) {
109
132
      status_ = absl::Status(
110
132
          absl::StatusCode::kInternal,
111
132
          "Unsupported gRPC frame flag: " + std::to_string(delimiter_[0]));
112
132
      return nullptr;
113
132
    }
114
115
56.9k
    current_message_size_ = DelimiterToSize(delimiter_);
116
56.9k
    have_current_message_size_ = true;
117
56.9k
  }
118
119
525k
  if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
120
469k
    if (in_->Finished()) {
121
0
      status_ = absl::Status(
122
0
          absl::StatusCode::kInternal,
123
0
          "Incomplete gRPC frame expected size: " +
124
0
              std::to_string(current_message_size_) + " actual size: " +
125
0
              std::to_string(in_->BytesAvailable()));
126
0
    }
127
    // We don't have a full message
128
469k
    return nullptr;
129
469k
  }
130
131
  // Reset the have_current_message_size_ for the next message
132
56.6k
  have_current_message_size_ = false;
133
134
  // We have a message! Use LimitingInputStream to wrap the input stream and
135
  // limit it to current_message_size_ bytes to cover only the current message.
136
56.6k
  return std::unique_ptr<pbio::ZeroCopyInputStream>(
137
56.6k
      new pbio::LimitingInputStream(in_, current_message_size_));
138
525k
}
139
140
2.62M
MessageAndGrpcFrame MessageReader::NextMessageAndGrpcFrame() {
141
2.62M
  MessageAndGrpcFrame out;
142
2.62M
  out.message = NextMessage();
143
2.62M
  memcpy(out.grpc_frame, delimiter_, kGrpcDelimiterByteSize);
144
2.62M
  out.message_size = current_message_size_;
145
2.62M
  return out;
146
2.62M
}
147
148
}  // namespace transcoding
149
150
}  // namespace grpc
151
}  // namespace google