Coverage Report

Created: 2025-09-08 06:52

/proc/self/cwd/src/message_reader.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Google Inc. All Rights Reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
//
15
////////////////////////////////////////////////////////////////////////////////
16
//
17
#include "grpc_transcoding/message_reader.h"
18
19
#include <memory>
20
21
#include "google/protobuf/io/zero_copy_stream_impl.h"
22
23
namespace google {
24
namespace grpc {
25
26
namespace transcoding {
27
28
namespace pb = ::google::protobuf;
29
namespace pbio = ::google::protobuf::io;
30
31
MessageReader::MessageReader(TranscoderInputStream* in)
32
709
    : in_(in),
33
709
      current_message_size_(0),
34
709
      have_current_message_size_(false),
35
709
      finished_(false) {}
36
37
namespace {
38
39
// A helper function that reads the given number of bytes from a
40
// ZeroCopyInputStream and copies it to the given buffer
41
bool ReadStream(pbio::ZeroCopyInputStream* stream, unsigned char* buffer,
42
95.3k
                int size) {
43
95.3k
  int size_in = 0;
44
95.3k
  const void* data_in = nullptr;
45
  // While we have bytes to read
46
511k
  while (size > 0) {
47
416k
    if (!stream->Next(&data_in, &size_in)) {
48
0
      return false;
49
0
    }
50
416k
    int to_copy = std::min(size, size_in);
51
416k
    memcpy(buffer, data_in, to_copy);
52
    // Advance buffer and subtract the size to reflect the number of bytes left
53
416k
    buffer += to_copy;
54
416k
    size -= to_copy;
55
    // Keep track of uncopied bytes
56
416k
    size_in -= to_copy;
57
416k
  }
58
  // Return the uncopied bytes
59
95.3k
  stream->BackUp(size_in);
60
95.3k
  return true;
61
95.3k
}
62
63
// A helper function to extract the size from a gRPC wire format message
64
// delimiter - see http://www.grpc.io/docs/guides/wire.html.
65
95.2k
uint32_t DelimiterToSize(const unsigned char* delimiter) {
66
95.2k
  unsigned size = 0;
67
  // Bytes 1-4 are big-endian 32-bit message size
68
95.2k
  size = size | static_cast<unsigned>(delimiter[1]);
69
95.2k
  size <<= 8;
70
95.2k
  size = size | static_cast<unsigned>(delimiter[2]);
71
95.2k
  size <<= 8;
72
95.2k
  size = size | static_cast<unsigned>(delimiter[3]);
73
95.2k
  size <<= 8;
74
95.2k
  size = size | static_cast<unsigned>(delimiter[4]);
75
95.2k
  return size;
76
95.2k
}
77
78
}  // namespace
79
80
1.82M
std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
81
1.82M
  if (Finished()) {
82
    // The stream has ended
83
0
    return nullptr;
84
0
  }
85
86
  // Check if we have the current message size. If not try to read it.
87
1.82M
  if (!have_current_message_size_) {
88
1.22M
    if (in_->BytesAvailable() <
89
1.22M
        static_cast<pb::int64>(kGrpcDelimiterByteSize)) {
90
      // We don't have 5 bytes available to read the length of the message.
91
      // Find out whether the stream is finished and return false.
92
1.12M
      finished_ = in_->Finished();
93
1.12M
      if (finished_ && in_->BytesAvailable() != 0) {
94
0
        status_ = absl::Status(absl::StatusCode::kInternal,
95
0
                               "Incomplete gRPC frame header received");
96
0
      }
97
1.12M
      return nullptr;
98
1.12M
    }
99
100
    // Try to read the delimiter.
101
95.3k
    memset(delimiter_, 0, kGrpcDelimiterByteSize);
102
95.3k
    if (!ReadStream(in_, delimiter_, kGrpcDelimiterByteSize)) {
103
0
      finished_ = true;
104
0
      return nullptr;
105
0
    }
106
107
95.3k
    if (delimiter_[0] != 0) {
108
144
      status_ = absl::Status(
109
144
          absl::StatusCode::kInternal,
110
144
          "Unsupported gRPC frame flag: " + std::to_string(delimiter_[0]));
111
144
      return nullptr;
112
144
    }
113
114
95.2k
    current_message_size_ = DelimiterToSize(delimiter_);
115
95.2k
    have_current_message_size_ = true;
116
95.2k
  }
117
118
696k
  if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
119
601k
    if (in_->Finished()) {
120
0
      status_ = absl::Status(
121
0
          absl::StatusCode::kInternal,
122
0
          "Incomplete gRPC frame expected size: " +
123
0
              std::to_string(current_message_size_) +
124
0
              " actual size: " + std::to_string(in_->BytesAvailable()));
125
0
    }
126
    // We don't have a full message
127
601k
    return nullptr;
128
601k
  }
129
130
  // Reset the have_current_message_size_ for the next message
131
94.9k
  have_current_message_size_ = false;
132
133
  // We have a message! Use LimitingInputStream to wrap the input stream and
134
  // limit it to current_message_size_ bytes to cover only the current message.
135
94.9k
  return std::unique_ptr<pbio::ZeroCopyInputStream>(
136
94.9k
      new pbio::LimitingInputStream(in_, current_message_size_));
137
696k
}
138
139
1.82M
MessageAndGrpcFrame MessageReader::NextMessageAndGrpcFrame() {
140
1.82M
  MessageAndGrpcFrame out;
141
1.82M
  out.message = NextMessage();
142
1.82M
  memcpy(out.grpc_frame, delimiter_, kGrpcDelimiterByteSize);
143
1.82M
  out.message_size = current_message_size_;
144
1.82M
  return out;
145
1.82M
}
146
147
}  // namespace transcoding
148
149
}  // namespace grpc
150
}  // namespace google