/proc/self/cwd/external/com_google_protobuf/src/google/protobuf/io/zero_copy_stream_impl_lite.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Protocol Buffers - Google's data interchange format |
2 | | // Copyright 2008 Google Inc. All rights reserved. |
3 | | // https://developers.google.com/protocol-buffers/ |
4 | | // |
5 | | // Redistribution and use in source and binary forms, with or without |
6 | | // modification, are permitted provided that the following conditions are |
7 | | // met: |
8 | | // |
9 | | // * Redistributions of source code must retain the above copyright |
10 | | // notice, this list of conditions and the following disclaimer. |
11 | | // * Redistributions in binary form must reproduce the above |
12 | | // copyright notice, this list of conditions and the following disclaimer |
13 | | // in the documentation and/or other materials provided with the |
14 | | // distribution. |
15 | | // * Neither the name of Google Inc. nor the names of its |
16 | | // contributors may be used to endorse or promote products derived from |
17 | | // this software without specific prior written permission. |
18 | | // |
19 | | // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
20 | | // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
21 | | // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
22 | | // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
23 | | // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
24 | | // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
25 | | // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
26 | | // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
27 | | // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
28 | | // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
29 | | // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
30 | | |
31 | | // Author: kenton@google.com (Kenton Varda) |
32 | | // Based on original Protocol Buffers design by |
33 | | // Sanjay Ghemawat, Jeff Dean, and others. |
34 | | |
35 | | #include "google/protobuf/io/zero_copy_stream_impl_lite.h" |
36 | | |
37 | | #include <algorithm> |
38 | | #include <limits> |
39 | | #include <utility> |
40 | | |
41 | | #include "google/protobuf/stubs/common.h" |
42 | | #include "absl/base/casts.h" |
43 | | #include "absl/log/absl_check.h" |
44 | | #include "absl/strings/cord.h" |
45 | | #include "absl/strings/internal/resize_uninitialized.h" |
46 | | |
47 | | // Must be included last |
48 | | #include "google/protobuf/port_def.inc" |
49 | | |
50 | | namespace google { |
51 | | namespace protobuf { |
52 | | namespace io { |
53 | | |
54 | | namespace { |
55 | | |
56 | | // Default block size for Copying{In,Out}putStreamAdaptor. |
57 | | static const int kDefaultBlockSize = 8192; |
58 | | |
59 | | } // namespace |
60 | | |
61 | | // =================================================================== |
62 | | |
63 | | ArrayInputStream::ArrayInputStream(const void* data, int size, int block_size) |
64 | 0 | : data_(reinterpret_cast<const uint8_t*>(data)), |
65 | 0 | size_(size), |
66 | 0 | block_size_(block_size > 0 ? block_size : size), |
67 | 0 | position_(0), |
68 | 0 | last_returned_size_(0) {} |
69 | | |
70 | 0 | bool ArrayInputStream::Next(const void** data, int* size) { |
71 | 0 | if (position_ < size_) { |
72 | 0 | last_returned_size_ = std::min(block_size_, size_ - position_); |
73 | 0 | *data = data_ + position_; |
74 | 0 | *size = last_returned_size_; |
75 | 0 | position_ += last_returned_size_; |
76 | 0 | return true; |
77 | 0 | } else { |
78 | | // We're at the end of the array. |
79 | 0 | last_returned_size_ = 0; // Don't let caller back up. |
80 | 0 | return false; |
81 | 0 | } |
82 | 0 | } |
83 | | |
84 | 0 | void ArrayInputStream::BackUp(int count) { |
85 | 0 | ABSL_CHECK_GT(last_returned_size_, 0) |
86 | 0 | << "BackUp() can only be called after a successful Next()."; |
87 | 0 | ABSL_CHECK_LE(count, last_returned_size_); |
88 | 0 | ABSL_CHECK_GE(count, 0); |
89 | 0 | position_ -= count; |
90 | 0 | last_returned_size_ = 0; // Don't let caller back up further. |
91 | 0 | } |
92 | | |
93 | 0 | bool ArrayInputStream::Skip(int count) { |
94 | 0 | ABSL_CHECK_GE(count, 0); |
95 | 0 | last_returned_size_ = 0; // Don't let caller back up. |
96 | 0 | if (count > size_ - position_) { |
97 | 0 | position_ = size_; |
98 | 0 | return false; |
99 | 0 | } else { |
100 | 0 | position_ += count; |
101 | 0 | return true; |
102 | 0 | } |
103 | 0 | } |
104 | | |
105 | 0 | int64_t ArrayInputStream::ByteCount() const { return position_; } |
106 | | |
107 | | |
108 | | // =================================================================== |
109 | | |
110 | | ArrayOutputStream::ArrayOutputStream(void* data, int size, int block_size) |
111 | 0 | : data_(reinterpret_cast<uint8_t*>(data)), |
112 | 0 | size_(size), |
113 | 0 | block_size_(block_size > 0 ? block_size : size), |
114 | 0 | position_(0), |
115 | 0 | last_returned_size_(0) {} |
116 | | |
117 | 0 | bool ArrayOutputStream::Next(void** data, int* size) { |
118 | 0 | if (position_ < size_) { |
119 | 0 | last_returned_size_ = std::min(block_size_, size_ - position_); |
120 | 0 | *data = data_ + position_; |
121 | 0 | *size = last_returned_size_; |
122 | 0 | position_ += last_returned_size_; |
123 | 0 | return true; |
124 | 0 | } else { |
125 | | // We're at the end of the array. |
126 | 0 | last_returned_size_ = 0; // Don't let caller back up. |
127 | 0 | return false; |
128 | 0 | } |
129 | 0 | } |
130 | | |
131 | 0 | void ArrayOutputStream::BackUp(int count) { |
132 | 0 | ABSL_CHECK_LE(count, last_returned_size_) |
133 | 0 | << "BackUp() can not exceed the size of the last Next() call."; |
134 | 0 | ABSL_CHECK_GE(count, 0); |
135 | 0 | position_ -= count; |
136 | 0 | last_returned_size_ -= count; |
137 | 0 | } |
138 | | |
139 | 0 | int64_t ArrayOutputStream::ByteCount() const { return position_; } |
140 | | |
141 | | // =================================================================== |
142 | | |
143 | 0 | StringOutputStream::StringOutputStream(std::string* target) : target_(target) {} |
144 | | |
145 | 0 | bool StringOutputStream::Next(void** data, int* size) { |
146 | 0 | ABSL_CHECK(target_ != NULL); |
147 | 0 | size_t old_size = target_->size(); |
148 | | |
149 | | // Grow the string. |
150 | 0 | size_t new_size; |
151 | 0 | if (old_size < target_->capacity()) { |
152 | | // Resize the string to match its capacity, since we can get away |
153 | | // without a memory allocation this way. |
154 | 0 | new_size = target_->capacity(); |
155 | 0 | } else { |
156 | | // Size has reached capacity, try to double it. |
157 | 0 | new_size = old_size * 2; |
158 | 0 | } |
159 | | // Avoid integer overflow in returned '*size'. |
160 | 0 | new_size = std::min(new_size, old_size + std::numeric_limits<int>::max()); |
161 | | // Increase the size, also make sure that it is at least kMinimumSize. |
162 | 0 | absl::strings_internal::STLStringResizeUninitialized( |
163 | 0 | target_, |
164 | 0 | std::max(new_size, |
165 | 0 | kMinimumSize + 0)); // "+ 0" works around GCC4 weirdness. |
166 | |
|
167 | 0 | *data = mutable_string_data(target_) + old_size; |
168 | 0 | *size = target_->size() - old_size; |
169 | 0 | return true; |
170 | 0 | } |
171 | | |
172 | 0 | void StringOutputStream::BackUp(int count) { |
173 | 0 | ABSL_CHECK_GE(count, 0); |
174 | 0 | ABSL_CHECK(target_ != NULL); |
175 | 0 | ABSL_CHECK_LE(static_cast<size_t>(count), target_->size()); |
176 | 0 | target_->resize(target_->size() - count); |
177 | 0 | } |
178 | | |
179 | 0 | int64_t StringOutputStream::ByteCount() const { |
180 | 0 | ABSL_CHECK(target_ != NULL); |
181 | 0 | return target_->size(); |
182 | 0 | } |
183 | | |
184 | | // =================================================================== |
185 | | |
186 | 0 | int CopyingInputStream::Skip(int count) { |
187 | 0 | char junk[4096]; |
188 | 0 | int skipped = 0; |
189 | 0 | while (skipped < count) { |
190 | 0 | int bytes = Read(junk, std::min(count - skipped, |
191 | 0 | absl::implicit_cast<int>(sizeof(junk)))); |
192 | 0 | if (bytes <= 0) { |
193 | | // EOF or read error. |
194 | 0 | return skipped; |
195 | 0 | } |
196 | 0 | skipped += bytes; |
197 | 0 | } |
198 | 0 | return skipped; |
199 | 0 | } |
200 | | |
201 | | CopyingInputStreamAdaptor::CopyingInputStreamAdaptor( |
202 | | CopyingInputStream* copying_stream, int block_size) |
203 | 0 | : copying_stream_(copying_stream), |
204 | 0 | owns_copying_stream_(false), |
205 | 0 | failed_(false), |
206 | 0 | position_(0), |
207 | 0 | buffer_size_(block_size > 0 ? block_size : kDefaultBlockSize), |
208 | 0 | buffer_used_(0), |
209 | 0 | backup_bytes_(0) {} |
210 | | |
211 | 0 | CopyingInputStreamAdaptor::~CopyingInputStreamAdaptor() { |
212 | 0 | if (owns_copying_stream_) { |
213 | 0 | delete copying_stream_; |
214 | 0 | } |
215 | 0 | } |
216 | | |
217 | 0 | bool CopyingInputStreamAdaptor::Next(const void** data, int* size) { |
218 | 0 | if (failed_) { |
219 | | // Already failed on a previous read. |
220 | 0 | return false; |
221 | 0 | } |
222 | | |
223 | 0 | AllocateBufferIfNeeded(); |
224 | |
|
225 | 0 | if (backup_bytes_ > 0) { |
226 | | // We have data left over from a previous BackUp(), so just return that. |
227 | 0 | *data = buffer_.get() + buffer_used_ - backup_bytes_; |
228 | 0 | *size = backup_bytes_; |
229 | 0 | backup_bytes_ = 0; |
230 | 0 | return true; |
231 | 0 | } |
232 | | |
233 | | // Read new data into the buffer. |
234 | 0 | buffer_used_ = copying_stream_->Read(buffer_.get(), buffer_size_); |
235 | 0 | if (buffer_used_ <= 0) { |
236 | | // EOF or read error. We don't need the buffer anymore. |
237 | 0 | if (buffer_used_ < 0) { |
238 | | // Read error (not EOF). |
239 | 0 | failed_ = true; |
240 | 0 | } |
241 | 0 | FreeBuffer(); |
242 | 0 | return false; |
243 | 0 | } |
244 | 0 | position_ += buffer_used_; |
245 | |
|
246 | 0 | *size = buffer_used_; |
247 | 0 | *data = buffer_.get(); |
248 | 0 | return true; |
249 | 0 | } |
250 | | |
251 | 0 | void CopyingInputStreamAdaptor::BackUp(int count) { |
252 | 0 | ABSL_CHECK(backup_bytes_ == 0 && buffer_.get() != NULL) |
253 | 0 | << " BackUp() can only be called after Next()."; |
254 | 0 | ABSL_CHECK_LE(count, buffer_used_) |
255 | 0 | << " Can't back up over more bytes than were returned by the last call" |
256 | 0 | " to Next()."; |
257 | 0 | ABSL_CHECK_GE(count, 0) << " Parameter to BackUp() can't be negative."; |
258 | |
|
259 | 0 | backup_bytes_ = count; |
260 | 0 | } |
261 | | |
262 | 0 | bool CopyingInputStreamAdaptor::Skip(int count) { |
263 | 0 | ABSL_CHECK_GE(count, 0); |
264 | |
|
265 | 0 | if (failed_) { |
266 | | // Already failed on a previous read. |
267 | 0 | return false; |
268 | 0 | } |
269 | | |
270 | | // First skip any bytes left over from a previous BackUp(). |
271 | 0 | if (backup_bytes_ >= count) { |
272 | | // We have more data left over than we're trying to skip. Just chop it. |
273 | 0 | backup_bytes_ -= count; |
274 | 0 | return true; |
275 | 0 | } |
276 | | |
277 | 0 | count -= backup_bytes_; |
278 | 0 | backup_bytes_ = 0; |
279 | |
|
280 | 0 | int skipped = copying_stream_->Skip(count); |
281 | 0 | position_ += skipped; |
282 | 0 | return skipped == count; |
283 | 0 | } |
284 | | |
285 | 0 | int64_t CopyingInputStreamAdaptor::ByteCount() const { |
286 | 0 | return position_ - backup_bytes_; |
287 | 0 | } |
288 | | |
289 | 0 | void CopyingInputStreamAdaptor::AllocateBufferIfNeeded() { |
290 | 0 | if (buffer_.get() == NULL) { |
291 | 0 | buffer_.reset(new uint8_t[buffer_size_]); |
292 | 0 | } |
293 | 0 | } |
294 | | |
295 | 0 | void CopyingInputStreamAdaptor::FreeBuffer() { |
296 | 0 | ABSL_CHECK_EQ(backup_bytes_, 0); |
297 | 0 | buffer_used_ = 0; |
298 | 0 | buffer_.reset(); |
299 | 0 | } |
300 | | |
301 | | // =================================================================== |
302 | | |
303 | | CopyingOutputStreamAdaptor::CopyingOutputStreamAdaptor( |
304 | | CopyingOutputStream* copying_stream, int block_size) |
305 | 0 | : copying_stream_(copying_stream), |
306 | 0 | owns_copying_stream_(false), |
307 | 0 | failed_(false), |
308 | 0 | position_(0), |
309 | 0 | buffer_size_(block_size > 0 ? block_size : kDefaultBlockSize), |
310 | 0 | buffer_used_(0) {} |
311 | | |
312 | 0 | CopyingOutputStreamAdaptor::~CopyingOutputStreamAdaptor() { |
313 | 0 | WriteBuffer(); |
314 | 0 | if (owns_copying_stream_) { |
315 | 0 | delete copying_stream_; |
316 | 0 | } |
317 | 0 | } |
318 | | |
319 | 0 | bool CopyingOutputStreamAdaptor::Flush() { return WriteBuffer(); } |
320 | | |
321 | 0 | bool CopyingOutputStreamAdaptor::Next(void** data, int* size) { |
322 | 0 | if (buffer_used_ == buffer_size_) { |
323 | 0 | if (!WriteBuffer()) return false; |
324 | 0 | } |
325 | | |
326 | 0 | AllocateBufferIfNeeded(); |
327 | |
|
328 | 0 | *data = buffer_.get() + buffer_used_; |
329 | 0 | *size = buffer_size_ - buffer_used_; |
330 | 0 | buffer_used_ = buffer_size_; |
331 | 0 | return true; |
332 | 0 | } |
333 | | |
334 | 0 | void CopyingOutputStreamAdaptor::BackUp(int count) { |
335 | 0 | if (count == 0) { |
336 | 0 | Flush(); |
337 | 0 | return; |
338 | 0 | } |
339 | 0 | ABSL_CHECK_GE(count, 0); |
340 | 0 | ABSL_CHECK_EQ(buffer_used_, buffer_size_) |
341 | 0 | << " BackUp() can only be called after Next()."; |
342 | 0 | ABSL_CHECK_LE(count, buffer_used_) |
343 | 0 | << " Can't back up over more bytes than were returned by the last call" |
344 | 0 | " to Next()."; |
345 | |
|
346 | 0 | buffer_used_ -= count; |
347 | 0 | } |
348 | | |
349 | 0 | int64_t CopyingOutputStreamAdaptor::ByteCount() const { |
350 | 0 | return position_ + buffer_used_; |
351 | 0 | } |
352 | | |
353 | 0 | bool CopyingOutputStreamAdaptor::WriteAliasedRaw(const void* data, int size) { |
354 | 0 | if (size >= buffer_size_) { |
355 | 0 | if (!Flush() || !copying_stream_->Write(data, size)) { |
356 | 0 | return false; |
357 | 0 | } |
358 | 0 | ABSL_DCHECK_EQ(buffer_used_, 0); |
359 | 0 | position_ += size; |
360 | 0 | return true; |
361 | 0 | } |
362 | | |
363 | 0 | void* out; |
364 | 0 | int out_size; |
365 | 0 | while (true) { |
366 | 0 | if (!Next(&out, &out_size)) { |
367 | 0 | return false; |
368 | 0 | } |
369 | | |
370 | 0 | if (size <= out_size) { |
371 | 0 | std::memcpy(out, data, size); |
372 | 0 | BackUp(out_size - size); |
373 | 0 | return true; |
374 | 0 | } |
375 | | |
376 | 0 | std::memcpy(out, data, out_size); |
377 | 0 | data = static_cast<const char*>(data) + out_size; |
378 | 0 | size -= out_size; |
379 | 0 | } |
380 | 0 | return true; |
381 | 0 | } |
382 | | |
383 | 0 | bool CopyingOutputStreamAdaptor::WriteCord(const absl::Cord& cord) { |
384 | 0 | for (absl::string_view chunk : cord.Chunks()) { |
385 | 0 | if (!WriteAliasedRaw(chunk.data(), chunk.size())) { |
386 | 0 | return false; |
387 | 0 | } |
388 | 0 | } |
389 | 0 | return true; |
390 | 0 | } |
391 | | |
392 | 0 | bool CopyingOutputStreamAdaptor::WriteBuffer() { |
393 | 0 | if (failed_) { |
394 | | // Already failed on a previous write. |
395 | 0 | return false; |
396 | 0 | } |
397 | | |
398 | 0 | if (buffer_used_ == 0) return true; |
399 | | |
400 | 0 | if (copying_stream_->Write(buffer_.get(), buffer_used_)) { |
401 | 0 | position_ += buffer_used_; |
402 | 0 | buffer_used_ = 0; |
403 | 0 | return true; |
404 | 0 | } else { |
405 | 0 | failed_ = true; |
406 | 0 | FreeBuffer(); |
407 | 0 | return false; |
408 | 0 | } |
409 | 0 | } |
410 | | |
411 | 0 | void CopyingOutputStreamAdaptor::AllocateBufferIfNeeded() { |
412 | 0 | if (buffer_ == NULL) { |
413 | 0 | buffer_.reset(new uint8_t[buffer_size_]); |
414 | 0 | } |
415 | 0 | } |
416 | | |
417 | 0 | void CopyingOutputStreamAdaptor::FreeBuffer() { |
418 | 0 | buffer_used_ = 0; |
419 | 0 | buffer_.reset(); |
420 | 0 | } |
421 | | |
422 | | // =================================================================== |
423 | | |
424 | | LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input, |
425 | | int64_t limit) |
426 | 80.4k | : input_(input), limit_(limit) { |
427 | 80.4k | prior_bytes_read_ = input_->ByteCount(); |
428 | 80.4k | } |
429 | | |
430 | 80.4k | LimitingInputStream::~LimitingInputStream() { |
431 | | // If we overshot the limit, back up. |
432 | 80.4k | if (limit_ < 0) input_->BackUp(-limit_); |
433 | 80.4k | } |
434 | | |
435 | 0 | bool LimitingInputStream::Next(const void** data, int* size) { |
436 | 0 | if (limit_ <= 0) return false; |
437 | 0 | if (!input_->Next(data, size)) return false; |
438 | | |
439 | 0 | limit_ -= *size; |
440 | 0 | if (limit_ < 0) { |
441 | | // We overshot the limit. Reduce *size to hide the rest of the buffer. |
442 | 0 | *size += limit_; |
443 | 0 | } |
444 | 0 | return true; |
445 | 0 | } |
446 | | |
447 | 0 | void LimitingInputStream::BackUp(int count) { |
448 | 0 | if (limit_ < 0) { |
449 | 0 | input_->BackUp(count - limit_); |
450 | 0 | limit_ = count; |
451 | 0 | } else { |
452 | 0 | input_->BackUp(count); |
453 | 0 | limit_ += count; |
454 | 0 | } |
455 | 0 | } |
456 | | |
457 | 0 | bool LimitingInputStream::Skip(int count) { |
458 | 0 | if (count > limit_) { |
459 | 0 | if (limit_ < 0) return false; |
460 | 0 | input_->Skip(limit_); |
461 | 0 | limit_ = 0; |
462 | 0 | return false; |
463 | 0 | } else { |
464 | 0 | if (!input_->Skip(count)) return false; |
465 | 0 | limit_ -= count; |
466 | 0 | return true; |
467 | 0 | } |
468 | 0 | } |
469 | | |
470 | 0 | int64_t LimitingInputStream::ByteCount() const { |
471 | 0 | if (limit_ < 0) { |
472 | 0 | return input_->ByteCount() + limit_ - prior_bytes_read_; |
473 | 0 | } else { |
474 | 0 | return input_->ByteCount() - prior_bytes_read_; |
475 | 0 | } |
476 | 0 | } |
477 | | |
478 | 0 | bool LimitingInputStream::ReadCord(absl::Cord* cord, int count) { |
479 | 0 | if (count <= 0) return true; |
480 | 0 | if (count <= limit_) { |
481 | 0 | if (!input_->ReadCord(cord, count)) return false; |
482 | 0 | limit_ -= count; |
483 | 0 | return true; |
484 | 0 | } |
485 | 0 | input_->ReadCord(cord, limit_); |
486 | 0 | limit_ = 0; |
487 | 0 | return false; |
488 | 0 | } |
489 | | |
490 | | |
491 | | // =================================================================== |
492 | | CordInputStream::CordInputStream(const absl::Cord* cord) |
493 | 0 | : it_(cord->char_begin()), |
494 | 0 | length_(cord->size()), |
495 | 0 | bytes_remaining_(length_) { |
496 | 0 | LoadChunkData(); |
497 | 0 | } |
498 | | |
499 | 0 | bool CordInputStream::LoadChunkData() { |
500 | 0 | if (bytes_remaining_ != 0) { |
501 | 0 | absl::string_view sv = absl::Cord::ChunkRemaining(it_); |
502 | 0 | data_ = sv.data(); |
503 | 0 | size_ = available_ = sv.size(); |
504 | 0 | return true; |
505 | 0 | } |
506 | 0 | size_ = available_ = 0; |
507 | 0 | return false; |
508 | 0 | } |
509 | | |
510 | 0 | bool CordInputStream::NextChunk(size_t skip) { |
511 | | // `size_ == 0` indicates we're at EOF. |
512 | 0 | if (size_ == 0) return false; |
513 | | |
514 | | // The caller consumed 'size_ - available_' bytes that are not yet accounted |
515 | | // for in the iterator position to get to the start of the next chunk. |
516 | 0 | const size_t distance = size_ - available_ + skip; |
517 | 0 | absl::Cord::Advance(&it_, distance); |
518 | 0 | bytes_remaining_ -= skip; |
519 | |
|
520 | 0 | return LoadChunkData(); |
521 | 0 | } |
522 | | |
523 | 0 | bool CordInputStream::Next(const void** data, int* size) { |
524 | 0 | if (available_ > 0 || NextChunk(0)) { |
525 | 0 | *data = data_ + size_ - available_; |
526 | 0 | *size = available_; |
527 | 0 | bytes_remaining_ -= available_; |
528 | 0 | available_ = 0; |
529 | 0 | return true; |
530 | 0 | } |
531 | 0 | return false; |
532 | 0 | } |
533 | | |
534 | 0 | void CordInputStream::BackUp(int count) { |
535 | | // Backup is only allowed on last returned chunk from `Next()`. |
536 | 0 | ABSL_CHECK_LE(static_cast<size_t>(count), size_ - available_); |
537 | |
|
538 | 0 | available_ += count; |
539 | 0 | bytes_remaining_ += count; |
540 | 0 | } |
541 | | |
542 | 0 | bool CordInputStream::Skip(int count) { |
543 | | // Short circuit if we stay inside the current chunk. |
544 | 0 | if (static_cast<size_t>(count) <= available_) { |
545 | 0 | available_ -= count; |
546 | 0 | bytes_remaining_ -= count; |
547 | 0 | return true; |
548 | 0 | } |
549 | | |
550 | | // Sanity check the skip count. |
551 | 0 | if (static_cast<size_t>(count) <= bytes_remaining_) { |
552 | | // Skip to end: do not return EOF condition: skipping into EOF is ok. |
553 | 0 | NextChunk(count); |
554 | 0 | return true; |
555 | 0 | } |
556 | 0 | NextChunk(bytes_remaining_); |
557 | 0 | return false; |
558 | 0 | } |
559 | | |
560 | 0 | int64_t CordInputStream::ByteCount() const { |
561 | 0 | return length_ - bytes_remaining_; |
562 | 0 | } |
563 | | |
564 | 0 | bool CordInputStream::ReadCord(absl::Cord* cord, int count) { |
565 | | // Advance the iterator to the current position |
566 | 0 | const size_t used = size_ - available_; |
567 | 0 | absl::Cord::Advance(&it_, used); |
568 | | |
569 | | // Read the cord, adjusting the iterator position. |
570 | | // Make sure to cap at available bytes to avoid hard crashes. |
571 | 0 | const size_t n = std::min(static_cast<size_t>(count), bytes_remaining_); |
572 | 0 | cord->Append(absl::Cord::AdvanceAndRead(&it_, n)); |
573 | | |
574 | | // Update current chunk data. |
575 | 0 | bytes_remaining_ -= n; |
576 | 0 | LoadChunkData(); |
577 | |
|
578 | 0 | return n == static_cast<size_t>(count); |
579 | 0 | } |
580 | | |
581 | | |
582 | 0 | CordOutputStream::CordOutputStream(size_t size_hint) : size_hint_(size_hint) {} |
583 | | |
584 | | CordOutputStream::CordOutputStream(absl::Cord cord, size_t size_hint) |
585 | 0 | : cord_(std::move(cord)), |
586 | 0 | size_hint_(size_hint), |
587 | 0 | state_(cord_.empty() ? State::kEmpty : State::kSteal) {} |
588 | | |
589 | | CordOutputStream::CordOutputStream(absl::CordBuffer buffer, size_t size_hint) |
590 | 0 | : size_hint_(size_hint), |
591 | 0 | state_(buffer.length() < buffer.capacity() ? State::kPartial |
592 | 0 | : State::kFull), |
593 | 0 | buffer_(std::move(buffer)) {} |
594 | | |
595 | | CordOutputStream::CordOutputStream(absl::Cord cord, absl::CordBuffer buffer, |
596 | | size_t size_hint) |
597 | 0 | : cord_(std::move(cord)), |
598 | 0 | size_hint_(size_hint), |
599 | 0 | state_(buffer.length() < buffer.capacity() ? State::kPartial |
600 | 0 | : State::kFull), |
601 | 0 | buffer_(std::move(buffer)) {} |
602 | | |
603 | 0 | bool CordOutputStream::Next(void** data, int* size) { |
604 | | // Use 128 bytes as a minimum buffer size if we don't have any application |
605 | | // provided size hints. This number is picked somewhat arbitrary as 'small |
606 | | // enough to avoid excessive waste on small data, and large enough to not |
607 | | // waste CPU and memory on tiny buffer overhead'. |
608 | | // It is worth noting that absent size hints, we pick 'current size' as |
609 | | // the default buffer size (capped at max flat size), which means we quickly |
610 | | // double the buffer size. This is in contrast to `Cord::Append()` functions |
611 | | // accepting strings which use a conservative 10% growth. |
612 | 0 | static const size_t kMinBlockSize = 128; |
613 | |
|
614 | 0 | size_t desired_size, max_size; |
615 | 0 | const size_t cord_size = cord_.size() + buffer_.length(); |
616 | 0 | if (size_hint_ > cord_size) { |
617 | | // Try to hit size_hint_ exactly so the caller doesn't receive a larger |
618 | | // buffer than indicated, requiring a non-zero call to BackUp() to undo |
619 | | // the buffer capacity we returned beyond the indicated size hint. |
620 | 0 | desired_size = size_hint_ - cord_size; |
621 | 0 | max_size = desired_size; |
622 | 0 | } else { |
623 | | // We're past the size hint or don't have a size hint. Try to allocate a |
624 | | // block as large as what we have so far, or at least kMinBlockSize bytes. |
625 | | // CordBuffer will truncate this to an appropriate size if it is too large. |
626 | 0 | desired_size = std::max(cord_size, kMinBlockSize); |
627 | 0 | max_size = std::numeric_limits<size_t>::max(); |
628 | 0 | } |
629 | |
|
630 | 0 | switch (state_) { |
631 | 0 | case State::kSteal: |
632 | | // Steal last buffer from Cord if available. |
633 | 0 | assert(buffer_.length() == 0); |
634 | 0 | buffer_ = cord_.GetAppendBuffer(desired_size); |
635 | 0 | break; |
636 | 0 | case State::kPartial: |
637 | | // Use existing capacity in 'buffer_` |
638 | 0 | assert(buffer_.length() < buffer_.capacity()); |
639 | 0 | break; |
640 | 0 | case State::kFull: |
641 | 0 | assert(buffer_.length() > 0); |
642 | 0 | cord_.Append(std::move(buffer_)); |
643 | 0 | PROTOBUF_FALLTHROUGH_INTENDED; |
644 | 0 | case State::kEmpty: |
645 | 0 | assert(buffer_.length() == 0); |
646 | 0 | buffer_ = absl::CordBuffer::CreateWithDefaultLimit(desired_size); |
647 | 0 | break; |
648 | 0 | } |
649 | | |
650 | | // Get all available capacity from the buffer. |
651 | 0 | absl::Span<char> span = buffer_.available(); |
652 | 0 | assert(!span.empty()); |
653 | 0 | *data = span.data(); |
654 | | |
655 | | // Only hand out up to 'max_size', which is limited if there is a size hint |
656 | | // specified, and we have more available than the size hint. |
657 | 0 | if (span.size() > max_size) { |
658 | 0 | *size = static_cast<int>(max_size); |
659 | 0 | buffer_.IncreaseLengthBy(max_size); |
660 | 0 | state_ = State::kPartial; |
661 | 0 | } else { |
662 | 0 | *size = static_cast<int>(span.size()); |
663 | 0 | buffer_.IncreaseLengthBy(span.size()); |
664 | 0 | state_ = State::kFull; |
665 | 0 | } |
666 | |
|
667 | 0 | return true; |
668 | 0 | } |
669 | | |
670 | 0 | void CordOutputStream::BackUp(int count) { |
671 | | // Check if something to do, else state remains unchanged. |
672 | 0 | assert(0 <= count && count <= ByteCount()); |
673 | 0 | if (count == 0) return; |
674 | | |
675 | | // Backup() is not supposed to backup beyond last Next() call |
676 | 0 | const int buffer_length = static_cast<int>(buffer_.length()); |
677 | 0 | assert(count <= buffer_length); |
678 | 0 | if (count <= buffer_length) { |
679 | 0 | buffer_.SetLength(static_cast<size_t>(buffer_length - count)); |
680 | 0 | state_ = State::kPartial; |
681 | 0 | } else { |
682 | 0 | buffer_ = {}; |
683 | 0 | cord_.RemoveSuffix(static_cast<size_t>(count)); |
684 | 0 | state_ = State::kSteal; |
685 | 0 | } |
686 | 0 | } |
687 | | |
688 | 0 | int64_t CordOutputStream::ByteCount() const { |
689 | 0 | return static_cast<int64_t>(cord_.size() + buffer_.length()); |
690 | 0 | } |
691 | | |
692 | 0 | bool CordOutputStream::WriteCord(const absl::Cord& cord) { |
693 | 0 | cord_.Append(std::move(buffer_)); |
694 | 0 | cord_.Append(cord); |
695 | 0 | state_ = State::kSteal; // Attempt to utilize existing capacity in `cord' |
696 | 0 | return true; |
697 | 0 | } |
698 | | |
699 | 0 | absl::Cord CordOutputStream::Consume() { |
700 | 0 | cord_.Append(std::move(buffer_)); |
701 | 0 | state_ = State::kEmpty; |
702 | 0 | return std::move(cord_); |
703 | 0 | } |
704 | | |
705 | | |
706 | | } // namespace io |
707 | | } // namespace protobuf |
708 | | } // namespace google |