Coverage Report

Created: 2025-02-02 06:38

/proc/self/cwd/src/message_reader.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Google Inc. All Rights Reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
//
15
////////////////////////////////////////////////////////////////////////////////
16
//
17
#include "grpc_transcoding/message_reader.h"
18
19
#include <memory>
20
21
#include "google/protobuf/io/zero_copy_stream_impl.h"
22
23
namespace google {
24
namespace grpc {
25
26
namespace transcoding {
27
28
namespace pb = ::google::protobuf;
29
namespace pbio = ::google::protobuf::io;
30
31
MessageReader::MessageReader(TranscoderInputStream* in)
32
577
    : in_(in),
33
577
      current_message_size_(0),
34
577
      have_current_message_size_(false),
35
577
      finished_(false) {}
36
37
namespace {
38
39
// A helper function that reads the given number of bytes from a
40
// ZeroCopyInputStream and copies it to the given buffer
41
bool ReadStream(pbio::ZeroCopyInputStream* stream, unsigned char* buffer,
42
87.8k
                int size) {
43
87.8k
  int size_in = 0;
44
87.8k
  const void* data_in = nullptr;
45
  // While we have bytes to read
46
452k
  while (size > 0) {
47
364k
    if (!stream->Next(&data_in, &size_in)) {
48
0
      return false;
49
0
    }
50
364k
    int to_copy = std::min(size, size_in);
51
364k
    memcpy(buffer, data_in, to_copy);
52
    // Advance buffer and subtract the size to reflect the number of bytes left
53
364k
    buffer += to_copy;
54
364k
    size -= to_copy;
55
    // Keep track of uncopied bytes
56
364k
    size_in -= to_copy;
57
364k
  }
58
  // Return the uncopied bytes
59
87.8k
  stream->BackUp(size_in);
60
87.8k
  return true;
61
87.8k
}
62
63
// A helper function to extract the size from a gRPC wire format message
64
// delimiter - see http://www.grpc.io/docs/guides/wire.html.
65
87.7k
uint32_t DelimiterToSize(const unsigned char* delimiter) {
66
87.7k
  unsigned size = 0;
67
  // Bytes 1-4 are big-endian 32-bit message size
68
87.7k
  size = size | static_cast<unsigned>(delimiter[1]);
69
87.7k
  size <<= 8;
70
87.7k
  size = size | static_cast<unsigned>(delimiter[2]);
71
87.7k
  size <<= 8;
72
87.7k
  size = size | static_cast<unsigned>(delimiter[3]);
73
87.7k
  size <<= 8;
74
87.7k
  size = size | static_cast<unsigned>(delimiter[4]);
75
87.7k
  return size;
76
87.7k
}
77
78
}  // namespace
79
80
1.64M
std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
81
1.64M
  if (Finished()) {
82
    // The stream has ended
83
0
    return nullptr;
84
0
  }
85
86
  // Check if we have the current message size. If not try to read it.
87
1.64M
  if (!have_current_message_size_) {
88
1.17M
    if (in_->BytesAvailable() <
89
1.17M
        static_cast<pb::int64>(kGrpcDelimiterByteSize)) {
90
      // We don't have 5 bytes available to read the length of the message.
91
      // Find out whether the stream is finished and return false.
92
1.08M
      finished_ = in_->Finished();
93
1.08M
      if (finished_ && in_->BytesAvailable() != 0) {
94
0
        status_ = absl::Status(absl::StatusCode::kInternal,
95
0
                               "Incomplete gRPC frame header received");
96
0
      }
97
1.08M
      return nullptr;
98
1.08M
    }
99
100
    // Try to read the delimiter.
101
87.8k
    memset(delimiter_, 0, kGrpcDelimiterByteSize);
102
87.8k
    if (!ReadStream(in_, delimiter_, kGrpcDelimiterByteSize)) {
103
0
      finished_ = true;
104
0
      return nullptr;
105
0
    }
106
107
87.8k
    if (delimiter_[0] != 0) {
108
127
      status_ = absl::Status(
109
127
          absl::StatusCode::kInternal,
110
127
          "Unsupported gRPC frame flag: " + std::to_string(delimiter_[0]));
111
127
      return nullptr;
112
127
    }
113
114
87.7k
    current_message_size_ = DelimiterToSize(delimiter_);
115
87.7k
    have_current_message_size_ = true;
116
87.7k
  }
117
118
552k
  if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
119
465k
    if (in_->Finished()) {
120
0
      status_ = absl::Status(
121
0
          absl::StatusCode::kInternal,
122
0
          "Incomplete gRPC frame expected size: " +
123
0
              std::to_string(current_message_size_) +
124
0
              " actual size: " + std::to_string(in_->BytesAvailable()));
125
0
    }
126
    // We don't have a full message
127
465k
    return nullptr;
128
465k
  }
129
130
  // Reset the have_current_message_size_ for the next message
131
87.4k
  have_current_message_size_ = false;
132
133
  // We have a message! Use LimitingInputStream to wrap the input stream and
134
  // limit it to current_message_size_ bytes to cover only the current message.
135
87.4k
  return std::unique_ptr<pbio::ZeroCopyInputStream>(
136
87.4k
      new pbio::LimitingInputStream(in_, current_message_size_));
137
552k
}
138
139
1.64M
MessageAndGrpcFrame MessageReader::NextMessageAndGrpcFrame() {
140
1.64M
  MessageAndGrpcFrame out;
141
1.64M
  out.message = NextMessage();
142
1.64M
  memcpy(out.grpc_frame, delimiter_, kGrpcDelimiterByteSize);
143
1.64M
  out.message_size = current_message_size_;
144
1.64M
  return out;
145
1.64M
}
146
147
}  // namespace transcoding
148
149
}  // namespace grpc
150
}  // namespace google