Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/util/compression.h
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
//
10
#pragma once
11
12
#include <algorithm>
13
14
#include "memory/memory_allocator_impl.h"
15
#include "rocksdb/advanced_compression.h"
16
#include "rocksdb/options.h"
17
#include "table/block_based/block_type.h"
18
#include "util/aligned_buffer.h"
19
#include "util/coding.h"
20
#include "util/compression_context_cache.h"
21
22
#ifdef ZSTD
23
#include <zstd.h>
24
#include <zstd_errors.h>
25
// ZSTD_Compress2(), ZSTD_compressStream2() and frame parameters all belong to
26
// advanced APIs and require v1.4.0+, which is from April 2019.
27
// https://github.com/facebook/zstd/blob/eb9f881eb810f2242f1ef36b3f3e7014eecb8fa6/lib/zstd.h#L297C40-L297C45
28
// To avoid a rat's nest of #ifdefs, we now require v1.4.0+ for ZSTD support.
29
#if ZSTD_VERSION_NUMBER < 10400
30
#error "ZSTD support requires version >= 1.4.0 (libzstd-devel)"
31
#endif  // ZSTD_VERSION_NUMBER
32
// The above release also includes digested dictionary support, but some
33
// required functions (ZSTD_createDDict_byReference) are still only exported
34
// with ZSTD_STATIC_LINKING_ONLY defined.
35
#if defined(ZSTD_STATIC_LINKING_ONLY)
36
#define ROCKSDB_ZSTD_DDICT
37
#endif  // defined(ZSTD_STATIC_LINKING_ONLY)
38
//  For ZDICT_* functions
39
#include <zdict.h>
40
// ZDICT_finalizeDictionary API is exported and stable since v1.4.5
41
#if ZSTD_VERSION_NUMBER >= 10405
42
#define ROCKSDB_ZDICT_FINALIZE
43
#endif  // ZSTD_VERSION_NUMBER >= 10405
44
#endif  // ZSTD
45
46
namespace ROCKSDB_NAMESPACE {
47
// Need this for the context allocation override
48
// On windows we need to do this explicitly
49
#if defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
50
    defined(ZSTD_STATIC_LINKING_ONLY)
51
#define ROCKSDB_ZSTD_CUSTOM_MEM
52
namespace port {
53
ZSTD_customMem GetJeZstdAllocationOverrides();
54
}  // namespace port
55
#endif  // defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
56
        // defined(ZSTD_STATIC_LINKING_ONLY)
57
58
// Cached data represents a portion that can be re-used
59
// If, in the future we have more than one native context to
60
// cache we can arrange this as a tuple
61
class ZSTDUncompressCachedData {
62
 public:
63
#if defined(ZSTD)
64
  using ZSTDNativeContext = ZSTD_DCtx*;
65
#else
66
  using ZSTDNativeContext = void*;
67
#endif  // ZSTD
68
64
  ZSTDUncompressCachedData() {}
69
  // Init from cache
70
  ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
71
  ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
72
  ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
73
0
      : ZSTDUncompressCachedData() {
74
0
    *this = std::move(o);
75
0
  }
76
0
  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
77
0
    assert(zstd_ctx_ == nullptr);
78
0
    std::swap(zstd_ctx_, o.zstd_ctx_);
79
0
    std::swap(cache_idx_, o.cache_idx_);
80
0
    return *this;
81
0
  }
82
0
  ZSTDNativeContext Get() const { return zstd_ctx_; }
83
0
  int64_t GetCacheIndex() const { return cache_idx_; }
84
0
  void CreateIfNeeded() {
85
0
    if (zstd_ctx_ == nullptr) {
86
0
#if !defined(ZSTD)
87
0
      zstd_ctx_ = nullptr;
88
#elif defined(ROCKSDB_ZSTD_CUSTOM_MEM)
89
      zstd_ctx_ =
90
          ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
91
#else  // ZSTD && !ROCKSDB_ZSTD_CUSTOM_MEM
92
      zstd_ctx_ = ZSTD_createDCtx();
93
#endif
94
0
      cache_idx_ = -1;
95
0
    }
96
0
  }
97
0
  void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
98
0
    zstd_ctx_ = o.zstd_ctx_;
99
0
    cache_idx_ = idx;
100
0
  }
101
64
  ~ZSTDUncompressCachedData() {
102
#if defined(ZSTD)
103
    if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
104
      ZSTD_freeDCtx(zstd_ctx_);
105
    }
106
#endif  // ZSTD
107
64
  }
108
109
 private:
110
  ZSTDNativeContext zstd_ctx_ = nullptr;
111
  int64_t cache_idx_ = -1;  // -1 means this instance owns the context
112
};
113
}  // namespace ROCKSDB_NAMESPACE
114
115
#if defined(XPRESS)
116
#include "port/xpress.h"
117
#endif
118
119
namespace ROCKSDB_NAMESPACE {
120
121
class FailureDecompressor : public Decompressor {
122
 public:
123
0
  explicit FailureDecompressor(Status&& status) : status_(std::move(status)) {
124
0
    assert(!status_.ok());
125
0
  }
126
0
  ~FailureDecompressor() override { status_.PermitUncheckedError(); }
127
128
0
  const char* Name() const override { return "FailureDecompressor"; }
129
130
0
  Status ExtractUncompressedSize(Args& /*args*/) override { return status_; }
131
132
  Status DecompressBlock(const Args& /*args*/,
133
0
                         char* /*uncompressed_output*/) override {
134
0
    return status_;
135
0
  }
136
137
 protected:
138
  Status status_;
139
};
140
141
// Owns a decompression dictionary, and associated Decompressor, for storing
142
// in the block cache.
143
//
144
// Justification: for a "processed" dictionary to be saved in block cache, we
145
// also need a reference to the decompressor that processed it, to ensure it
146
// is recognized properly. At that point, we might as well have the dictionary
147
// part of the decompressor identity and track an associated decompressor along
148
// with a decompression dictionary in the block cache, and the decompressor
149
// hides potential details of processing the dictionary.
150
struct DecompressorDict {
151
  // Block containing the data for the compression dictionary in case the
152
  // constructor that takes a string parameter is used.
153
  std::string dict_str_;
154
155
  // Block containing the data for the compression dictionary in case the
156
  // constructor that takes a Slice parameter is used and the passed in
157
  // CacheAllocationPtr is not nullptr.
158
  CacheAllocationPtr dict_allocation_;
159
160
  // A Decompressor referencing and using the dictionary owned by this.
161
  std::unique_ptr<Decompressor> decompressor_;
162
163
  // Approximate owned memory usage
164
  size_t memory_usage_;
165
166
  DecompressorDict(std::string&& dict, Decompressor& from_decompressor)
167
0
      : dict_str_(std::move(dict)) {
168
0
    Populate(from_decompressor, dict_str_);
169
0
  }
170
171
  DecompressorDict(Slice slice, CacheAllocationPtr&& allocation,
172
                   Decompressor& from_decompressor)
173
0
      : dict_allocation_(std::move(allocation)) {
174
0
    Populate(from_decompressor, slice);
175
0
  }
176
177
  DecompressorDict(DecompressorDict&& rhs) noexcept
178
      : dict_str_(std::move(rhs.dict_str_)),
179
        dict_allocation_(std::move(rhs.dict_allocation_)),
180
        decompressor_(std::move(rhs.decompressor_)),
181
0
        memory_usage_(std::move(rhs.memory_usage_)) {}
182
183
0
  DecompressorDict& operator=(DecompressorDict&& rhs) noexcept {
184
0
    if (this == &rhs) {
185
0
      return *this;
186
0
    }
187
0
    dict_str_ = std::move(rhs.dict_str_);
188
0
    dict_allocation_ = std::move(rhs.dict_allocation_);
189
0
    decompressor_ = std::move(rhs.decompressor_);
190
0
    return *this;
191
0
  }
192
  // Disable copy
193
  DecompressorDict(const DecompressorDict&) = delete;
194
  DecompressorDict& operator=(const DecompressorDict&) = delete;
195
196
  // The object is self-contained if the string constructor is used, or the
197
  // Slice constructor is invoked with a non-null allocation. Otherwise, it
198
  // is the caller's responsibility to ensure that the underlying storage
199
  // outlives this object.
200
0
  bool own_bytes() const { return !dict_str_.empty() || dict_allocation_; }
201
202
0
  const Slice& GetRawDict() const { return decompressor_->GetSerializedDict(); }
203
204
  // For TypedCacheInterface
205
0
  const Slice& ContentSlice() const { return GetRawDict(); }
206
  static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
207
  static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
208
209
0
  size_t ApproximateMemoryUsage() const { return memory_usage_; }
210
211
 private:
212
  void Populate(Decompressor& from_decompressor, Slice dict);
213
};
214
215
// Holds dictionary and related data, like ZSTD's digested compression
216
// dictionary.
217
struct CompressionDict {
218
#ifdef ZSTD
219
  ZSTD_CDict* zstd_cdict_ = nullptr;
220
#endif  // ZSTD
221
  std::string dict_;
222
223
 public:
224
0
  CompressionDict() = default;
225
0
  CompressionDict(std::string&& dict, CompressionType type, int level) {
226
0
    dict_ = std::move(dict);
227
#ifdef ZSTD
228
    zstd_cdict_ = nullptr;
229
    if (!dict_.empty() && type == kZSTD) {
230
      if (level == CompressionOptions::kDefaultCompressionLevel) {
231
        // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
232
        level = ZSTD_CLEVEL_DEFAULT;
233
      }
234
      // Should be safe (but slower) if below call fails as we'll use the
235
      // raw dictionary to compress.
236
      zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
237
      assert(zstd_cdict_ != nullptr);
238
    }
239
#else
240
0
    (void)type;
241
0
    (void)level;
242
0
#endif  // ZSTD
243
0
  }
244
245
0
  CompressionDict(CompressionDict&& other) {
246
#ifdef ZSTD
247
    zstd_cdict_ = other.zstd_cdict_;
248
    other.zstd_cdict_ = nullptr;
249
#endif  // ZSTD
250
0
    dict_ = std::move(other.dict_);
251
0
  }
252
0
  CompressionDict& operator=(CompressionDict&& other) {
253
0
    if (this == &other) {
254
0
      return *this;
255
0
    }
256
0
#ifdef ZSTD
257
0
    zstd_cdict_ = other.zstd_cdict_;
258
0
    other.zstd_cdict_ = nullptr;
259
0
#endif  // ZSTD
260
0
    dict_ = std::move(other.dict_);
261
0
    return *this;
262
0
  }
263
264
0
  ~CompressionDict() {
265
#ifdef ZSTD
266
    size_t res = 0;
267
    if (zstd_cdict_ != nullptr) {
268
      res = ZSTD_freeCDict(zstd_cdict_);
269
    }
270
    assert(res == 0);  // Last I checked they can't fail
271
    (void)res;         // prevent unused var warning
272
#endif                 // ZSTD
273
0
  }
274
275
#ifdef ZSTD
276
  const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
277
#endif  // ZSTD
278
279
0
  Slice GetRawDict() const { return dict_; }
280
0
  bool empty() const { return dict_.empty(); }
281
282
0
  static const CompressionDict& GetEmptyDict() {
283
0
    static CompressionDict empty_dict{};
284
0
    return empty_dict;
285
0
  }
286
287
  // Disable copy
288
  CompressionDict(const CompressionDict&) = delete;
289
  CompressionDict& operator=(const CompressionDict&) = delete;
290
};
291
292
class CompressionContext : public Compressor::WorkingArea {
293
 private:
294
#ifdef ZSTD
295
  ZSTD_CCtx* zstd_ctx_ = nullptr;
296
297
  ZSTD_CCtx* CreateZSTDContext() {
298
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
299
    return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
300
#else   // ROCKSDB_ZSTD_CUSTOM_MEM
301
    return ZSTD_createCCtx();
302
#endif  // ROCKSDB_ZSTD_CUSTOM_MEM
303
  }
304
305
 public:
306
  // callable inside ZSTD_Compress
307
  ZSTD_CCtx* ZSTDPreallocCtx() const {
308
    assert(zstd_ctx_ != nullptr);
309
    return zstd_ctx_;
310
  }
311
312
 private:
313
#endif  // ZSTD
314
315
0
  void CreateNativeContext(CompressionType type, int level, bool checksum) {
316
0
#ifdef ZSTD
317
0
    if (type == kZSTD) {
318
0
      zstd_ctx_ = CreateZSTDContext();
319
0
      if (level == CompressionOptions::kDefaultCompressionLevel) {
320
0
        // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
321
0
        level = ZSTD_CLEVEL_DEFAULT;
322
0
      }
323
0
      size_t err =
324
0
          ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level);
325
0
      if (ZSTD_isError(err)) {
326
0
        assert(false);
327
0
        ZSTD_freeCCtx(zstd_ctx_);
328
0
        zstd_ctx_ = CreateZSTDContext();
329
0
      }
330
0
      if (checksum) {
331
0
        err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1);
332
0
        if (ZSTD_isError(err)) {
333
0
          assert(false);
334
0
          ZSTD_freeCCtx(zstd_ctx_);
335
0
          zstd_ctx_ = CreateZSTDContext();
336
0
        }
337
0
      }
338
0
    }
339
0
#else
340
0
    (void)type;
341
0
    (void)level;
342
0
    (void)checksum;
343
0
#endif  // ZSTD
344
0
  }
345
0
  void DestroyNativeContext() {
346
0
#ifdef ZSTD
347
0
    if (zstd_ctx_ != nullptr) {
348
0
      ZSTD_freeCCtx(zstd_ctx_);
349
0
    }
350
0
#endif  // ZSTD
351
0
  }
352
353
 public:
354
  explicit CompressionContext(CompressionType type,
355
0
                              const CompressionOptions& options) {
356
0
    CreateNativeContext(type, options.level, options.checksum);
357
0
  }
358
0
  ~CompressionContext() { DestroyNativeContext(); }
359
  CompressionContext(const CompressionContext&) = delete;
360
  CompressionContext& operator=(const CompressionContext&) = delete;
361
};
362
363
// This is like a working area, reusable for different dicts, etc.
364
// TODO: refactor / consolidate
365
class UncompressionContext : public Decompressor::WorkingArea {
366
 private:
367
  CompressionContextCache* ctx_cache_ = nullptr;
368
  ZSTDUncompressCachedData uncomp_cached_data_;
369
370
 public:
371
0
  explicit UncompressionContext(CompressionType type) {
372
0
    if (type == kZSTD) {
373
0
      ctx_cache_ = CompressionContextCache::Instance();
374
0
      uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
375
0
    }
376
0
  }
377
0
  ~UncompressionContext() {
378
0
    if (uncomp_cached_data_.GetCacheIndex() != -1) {
379
0
      assert(ctx_cache_ != nullptr);
380
0
      ctx_cache_->ReturnCachedZSTDUncompressData(
381
0
          uncomp_cached_data_.GetCacheIndex());
382
0
    }
383
0
  }
384
  UncompressionContext(const UncompressionContext&) = delete;
385
  UncompressionContext& operator=(const UncompressionContext&) = delete;
386
387
0
  ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
388
0
    return uncomp_cached_data_.Get();
389
0
  }
390
};
391
392
1.17M
inline bool Snappy_Supported() {
393
#ifdef SNAPPY
394
  return true;
395
#else
396
1.17M
  return false;
397
1.17M
#endif
398
1.17M
}
399
400
49.7k
inline bool Zlib_Supported() {
401
49.7k
#ifdef ZLIB
402
49.7k
  return true;
403
#else
404
  return false;
405
#endif
406
49.7k
}
407
408
49.7k
inline bool BZip2_Supported() {
409
49.7k
#ifdef BZIP2
410
49.7k
  return true;
411
#else
412
  return false;
413
#endif
414
49.7k
}
415
416
99.4k
inline bool LZ4_Supported() {
417
#ifdef LZ4
418
  return true;
419
#else
420
99.4k
  return false;
421
99.4k
#endif
422
99.4k
}
423
424
49.7k
inline bool XPRESS_Supported() {
425
#ifdef XPRESS
426
  return true;
427
#else
428
49.7k
  return false;
429
49.7k
#endif
430
49.7k
}
431
432
49.7k
inline bool ZSTD_Supported() {
433
#ifdef ZSTD
434
  // NB: ZSTD format is finalized since version 0.8.0. See ZSTD_VERSION_NUMBER
435
  // check above.
436
  return true;
437
#else
438
49.7k
  return false;
439
49.7k
#endif
440
49.7k
}
441
442
0
inline bool ZSTD_Streaming_Supported() {
443
#if defined(ZSTD)
444
  return true;
445
#else
446
0
  return false;
447
0
#endif
448
0
}
449
450
inline bool StreamingCompressionTypeSupported(
451
57.3k
    CompressionType compression_type) {
452
57.3k
  switch (compression_type) {
453
57.3k
    case kNoCompression:
454
57.3k
      return true;
455
0
    case kZSTD:
456
0
      return ZSTD_Streaming_Supported();
457
0
    default:
458
0
      return false;
459
57.3k
  }
460
57.3k
}
461
462
6.86M
inline bool CompressionTypeSupported(CompressionType compression_type) {
463
6.86M
  switch (compression_type) {
464
195k
    case kNoCompression:
465
195k
      return true;
466
49.7k
    case kSnappyCompression:
467
49.7k
      return Snappy_Supported();
468
49.7k
    case kZlibCompression:
469
49.7k
      return Zlib_Supported();
470
49.7k
    case kBZip2Compression:
471
49.7k
      return BZip2_Supported();
472
49.7k
    case kLZ4Compression:
473
49.7k
      return LZ4_Supported();
474
49.7k
    case kLZ4HCCompression:
475
49.7k
      return LZ4_Supported();
476
49.7k
    case kXpressCompression:
477
49.7k
      return XPRESS_Supported();
478
49.7k
    case kZSTD:
479
49.7k
      return ZSTD_Supported();
480
6.31M
    default:  // Including custom compression types
481
6.31M
      return false;
482
6.86M
  }
483
6.86M
}
484
485
0
inline bool DictCompressionTypeSupported(CompressionType compression_type) {
486
0
  switch (compression_type) {
487
0
    case kNoCompression:
488
0
      return false;
489
0
    case kSnappyCompression:
490
0
      return false;
491
0
    case kZlibCompression:
492
0
      return Zlib_Supported();
493
0
    case kBZip2Compression:
494
0
      return false;
495
0
    case kLZ4Compression:
496
0
    case kLZ4HCCompression:
497
#if LZ4_VERSION_NUMBER >= 10400  // r124+
498
      return LZ4_Supported();
499
#else
500
0
      return false;
501
0
#endif
502
0
    case kXpressCompression:
503
0
      return false;
504
0
    case kZSTD:
505
      // NB: dictionary supported since 0.5.0. See ZSTD_VERSION_NUMBER check
506
      // above.
507
0
      return ZSTD_Supported();
508
0
    default:  // Including custom compression types
509
0
      return false;
510
0
  }
511
0
}
512
513
// WART: does not match OptionsHelper::compression_type_string_map
514
std::string CompressionTypeToString(CompressionType compression_type);
515
516
// WART: does not match OptionsHelper::compression_type_string_map
517
CompressionType CompressionTypeFromString(std::string compression_type_str);
518
519
std::string CompressionOptionsToString(
520
    const CompressionOptions& compression_options);
521
522
0
inline bool ZSTD_TrainDictionarySupported() {
523
#ifdef ZSTD
524
  // NB: Dictionary trainer is available since v0.6.1 for static linking, but
525
  // not available for dynamic linking until v1.1.3. See ZSTD_VERSION_NUMBER
526
  // check above.
527
  return true;
528
#else
529
0
  return false;
530
0
#endif
531
0
}
532
533
0
inline bool ZSTD_FinalizeDictionarySupported() {
534
#ifdef ROCKSDB_ZDICT_FINALIZE
535
  return true;
536
#else
537
0
  return false;
538
0
#endif
539
0
}
540
541
// The new compression APIs intentionally make it difficult to generate
542
// compressed data larger than the original. (It is better to store the
543
// uncompressed version in that case.) For legacy cases that must store
544
// compressed data even when larger than the uncompressed, this is a convenient
545
// wrapper to support that, with a compressor from BuiltinCompressionManager and
546
// a GrowableBuffer.
547
Status LegacyForceBuiltinCompression(
548
    Compressor& builtin_compressor,
549
    Compressor::ManagedWorkingArea* working_area, Slice from,
550
    GrowableBuffer* to);
551
552
// Records the compression type for subsequent WAL records.
553
class CompressionTypeRecord {
554
 public:
555
  explicit CompressionTypeRecord(CompressionType compression_type)
556
0
      : compression_type_(compression_type) {}
557
558
0
  CompressionType GetCompressionType() const { return compression_type_; }
559
560
0
  inline void EncodeTo(std::string* dst) const {
561
0
    assert(dst != nullptr);
562
0
    PutFixed32(dst, compression_type_);
563
0
  }
564
565
0
  inline Status DecodeFrom(Slice* src) {
566
0
    constexpr char class_name[] = "CompressionTypeRecord";
567
568
0
    uint32_t val;
569
0
    if (!GetFixed32(src, &val)) {
570
0
      return Status::Corruption(class_name,
571
0
                                "Error decoding WAL compression type");
572
0
    }
573
0
    CompressionType compression_type = static_cast<CompressionType>(val);
574
0
    if (!StreamingCompressionTypeSupported(compression_type)) {
575
0
      return Status::Corruption(class_name,
576
0
                                "WAL compression type not supported");
577
0
    }
578
0
    compression_type_ = compression_type;
579
0
    return Status::OK();
580
0
  }
581
582
0
  inline std::string DebugString() const {
583
0
    return "compression_type: " + CompressionTypeToString(compression_type_);
584
0
  }
585
586
 private:
587
  CompressionType compression_type_;
588
};
589
590
// Base class to implement compression for a stream of buffers.
591
// Instantiate an implementation of the class using Create() with the
592
// compression type and use Compress() repeatedly.
593
// The output buffer needs to be at least max_output_len.
594
// Call Reset() in between frame boundaries or in case of an error.
595
// NOTE: This class is not thread safe.
596
class StreamingCompress {
597
 public:
598
  StreamingCompress(CompressionType compression_type,
599
                    const CompressionOptions& opts,
600
                    uint32_t compress_format_version, size_t max_output_len)
601
0
      : compression_type_(compression_type),
602
0
        opts_(opts),
603
0
        compress_format_version_(compress_format_version),
604
0
        max_output_len_(max_output_len) {}
605
0
  virtual ~StreamingCompress() = default;
606
  // compress should be called repeatedly with the same input till the method
607
  // returns 0
608
  // Parameters:
609
  // input - buffer to compress
610
  // input_size - size of input buffer
611
  // output - compressed buffer allocated by caller, should be at least
612
  // max_output_len
613
  // output_size - size of the output buffer
614
  // Returns -1 for errors, the remaining size of the input buffer that needs
615
  // to be compressed
616
  virtual int Compress(const char* input, size_t input_size, char* output,
617
                       size_t* output_pos) = 0;
618
  // static method to create object of a class inherited from
619
  // StreamingCompress based on the actual compression type.
620
  static std::unique_ptr<StreamingCompress> Create(
621
      CompressionType compression_type, const CompressionOptions& opts,
622
      uint32_t compress_format_version, size_t max_output_len);
623
  virtual void Reset() = 0;
624
625
 protected:
626
  const CompressionType compression_type_;
627
  const CompressionOptions opts_;
628
  const uint32_t compress_format_version_;
629
  const size_t max_output_len_;
630
};
631
632
// Base class to uncompress a stream of compressed buffers.
633
// Instantiate an implementation of the class using Create() with the
634
// compression type and use Uncompress() repeatedly.
635
// The output buffer needs to be at least max_output_len.
636
// Call Reset() in between frame boundaries or in case of an error.
637
// NOTE: This class is not thread safe.
638
class StreamingUncompress {
639
 public:
640
  StreamingUncompress(CompressionType compression_type,
641
                      uint32_t compress_format_version, size_t max_output_len)
642
0
      : compression_type_(compression_type),
643
0
        compress_format_version_(compress_format_version),
644
0
        max_output_len_(max_output_len) {}
645
0
  virtual ~StreamingUncompress() = default;
646
  // Uncompress can be called repeatedly to progressively process the same
647
  // input buffer, or can be called with a new input buffer. When the input
648
  // buffer is not fully consumed, the return value is > 0 or output_size
649
  // == max_output_len. When calling uncompress to continue processing the
650
  // same input buffer, the input argument should be nullptr.
651
  // Parameters:
652
  // input - buffer to uncompress
653
  // input_size - size of input buffer
654
  // output - uncompressed buffer allocated by caller, should be at least
655
  // max_output_len
656
  // output_size - size of the output buffer
657
  // Returns -1 for errors, remaining input to be processed otherwise.
658
  virtual int Uncompress(const char* input, size_t input_size, char* output,
659
                         size_t* output_pos) = 0;
660
  static std::unique_ptr<StreamingUncompress> Create(
661
      CompressionType compression_type, uint32_t compress_format_version,
662
      size_t max_output_len);
663
  virtual void Reset() = 0;
664
665
 protected:
666
  CompressionType compression_type_;
667
  uint32_t compress_format_version_;
668
  size_t max_output_len_;
669
};
670
671
class ZSTDStreamingCompress final : public StreamingCompress {
672
 public:
673
  explicit ZSTDStreamingCompress(const CompressionOptions& opts,
674
                                 uint32_t compress_format_version,
675
                                 size_t max_output_len)
676
0
      : StreamingCompress(kZSTD, opts, compress_format_version,
677
0
                          max_output_len) {
678
#ifdef ZSTD
679
    cctx_ = ZSTD_createCCtx();
680
    // Each compressed frame will have a checksum
681
    ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
682
    assert(cctx_ != nullptr);
683
    input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
684
#endif
685
0
  }
686
0
  ~ZSTDStreamingCompress() override {
687
0
#ifdef ZSTD
688
0
    ZSTD_freeCCtx(cctx_);
689
0
#endif
690
0
  }
691
  int Compress(const char* input, size_t input_size, char* output,
692
               size_t* output_pos) override;
693
  void Reset() override;
694
#ifdef ZSTD
695
  ZSTD_CCtx* cctx_;
696
  ZSTD_inBuffer input_buffer_;
697
#endif
698
};
699
700
class ZSTDStreamingUncompress final : public StreamingUncompress {
701
 public:
702
  explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
703
                                   size_t max_output_len)
704
0
      : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
705
#ifdef ZSTD
706
    dctx_ = ZSTD_createDCtx();
707
    assert(dctx_ != nullptr);
708
    input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
709
#endif
710
0
  }
711
0
  ~ZSTDStreamingUncompress() override {
712
0
#ifdef ZSTD
713
0
    ZSTD_freeDCtx(dctx_);
714
0
#endif
715
0
  }
716
  int Uncompress(const char* input, size_t input_size, char* output,
717
                 size_t* output_size) override;
718
  void Reset() override;
719
720
 private:
721
#ifdef ZSTD
722
  ZSTD_DCtx* dctx_;
723
  ZSTD_inBuffer input_buffer_;
724
#endif
725
};
726
727
}  // namespace ROCKSDB_NAMESPACE