Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/util/compression.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
//
10
#pragma once
11
12
#include <algorithm>
13
#include <limits>
14
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
15
#ifdef OS_FREEBSD
16
#include <malloc_np.h>
17
#else  // OS_FREEBSD
18
#include <malloc.h>
19
#endif  // OS_FREEBSD
20
#endif  // ROCKSDB_MALLOC_USABLE_SIZE
21
#include <string>
22
23
#include "memory/memory_allocator_impl.h"
24
#include "rocksdb/options.h"
25
#include "rocksdb/table.h"
26
#include "table/block_based/block_type.h"
27
#include "test_util/sync_point.h"
28
#include "util/coding.h"
29
#include "util/compression_context_cache.h"
30
#include "util/string_util.h"
31
32
#ifdef SNAPPY
33
#include <snappy.h>
34
#endif
35
36
#ifdef ZLIB
37
#include <zlib.h>
38
#endif
39
40
#ifdef BZIP2
41
#include <bzlib.h>
42
#endif
43
44
#if defined(LZ4)
45
#include <lz4.h>
46
#include <lz4hc.h>
47
#endif
48
49
#if defined(ZSTD)
50
#include <zstd.h>
51
// v1.1.3+
52
#if ZSTD_VERSION_NUMBER >= 10103
53
#include <zdict.h>
54
#endif  // ZSTD_VERSION_NUMBER >= 10103
55
// v1.4.0+
56
// ZSTD_Compress2(), ZSTD_compressStream2() and frame parameters all belong to
57
// advanced APIs and require v1.4.0+.
58
// https://github.com/facebook/zstd/blob/eb9f881eb810f2242f1ef36b3f3e7014eecb8fa6/lib/zstd.h#L297C40-L297C45
59
#if ZSTD_VERSION_NUMBER >= 10400
60
#define ZSTD_ADVANCED
61
#endif  // ZSTD_VERSION_NUMBER >= 10400
62
namespace ROCKSDB_NAMESPACE {
63
// Need this for the context allocation override
64
// On windows we need to do this explicitly
65
#if (ZSTD_VERSION_NUMBER >= 500)
66
#if defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
67
    defined(ZSTD_STATIC_LINKING_ONLY)
68
#define ROCKSDB_ZSTD_CUSTOM_MEM
69
namespace port {
70
ZSTD_customMem GetJeZstdAllocationOverrides();
71
}  // namespace port
72
#endif  // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
73
        // defined(ZSTD_STATIC_LINKING_ONLY)
74
75
// We require `ZSTD_sizeof_DDict` and `ZSTD_createDDict_byReference` to use
76
// `ZSTD_DDict`. The former was introduced in v1.0.0 and the latter was
77
// introduced in v1.1.3. But an important bug fix for `ZSTD_sizeof_DDict` came
78
// in v1.1.4, so that is the version we require. As of today's latest version
79
// (v1.3.8), they are both still in the experimental API, which means they are
80
// only exported when the compiler flag `ZSTD_STATIC_LINKING_ONLY` is set.
81
#if defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
82
#define ROCKSDB_ZSTD_DDICT
83
#endif  // defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
84
85
// Cached data represents a portion that can be re-used
86
// If, in the future we have more than one native context to
87
// cache we can arrange this as a tuple
88
class ZSTDUncompressCachedData {
89
 public:
90
  using ZSTDNativeContext = ZSTD_DCtx*;
91
  ZSTDUncompressCachedData() {}
92
  // Init from cache
93
  ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
94
  ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
95
  ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
96
      : ZSTDUncompressCachedData() {
97
    *this = std::move(o);
98
  }
99
  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
100
    assert(zstd_ctx_ == nullptr);
101
    std::swap(zstd_ctx_, o.zstd_ctx_);
102
    std::swap(cache_idx_, o.cache_idx_);
103
    return *this;
104
  }
105
  ZSTDNativeContext Get() const { return zstd_ctx_; }
106
  int64_t GetCacheIndex() const { return cache_idx_; }
107
  void CreateIfNeeded() {
108
    if (zstd_ctx_ == nullptr) {
109
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
110
      zstd_ctx_ =
111
          ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
112
#else   // ROCKSDB_ZSTD_CUSTOM_MEM
113
      zstd_ctx_ = ZSTD_createDCtx();
114
#endif  // ROCKSDB_ZSTD_CUSTOM_MEM
115
      cache_idx_ = -1;
116
    }
117
  }
118
  void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
119
    zstd_ctx_ = o.zstd_ctx_;
120
    cache_idx_ = idx;
121
  }
122
  ~ZSTDUncompressCachedData() {
123
    if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
124
      ZSTD_freeDCtx(zstd_ctx_);
125
    }
126
  }
127
128
 private:
129
  ZSTDNativeContext zstd_ctx_ = nullptr;
130
  int64_t cache_idx_ = -1;  // -1 means this instance owns the context
131
};
132
#endif  // (ZSTD_VERSION_NUMBER >= 500)
133
}  // namespace ROCKSDB_NAMESPACE
134
#endif  // ZSTD
135
136
#if !(defined ZSTD) || !(ZSTD_VERSION_NUMBER >= 500)
137
namespace ROCKSDB_NAMESPACE {
138
class ZSTDUncompressCachedData {
139
  void* padding;  // unused
140
 public:
141
  using ZSTDNativeContext = void*;
142
32
  ZSTDUncompressCachedData() {}
143
0
  ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {}
144
  ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
145
  ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) noexcept = default;
146
  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) noexcept =
147
      default;
148
0
  ZSTDNativeContext Get() const { return nullptr; }
149
0
  int64_t GetCacheIndex() const { return -1; }
150
0
  void CreateIfNeeded() {}
151
0
  void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {}
152
153
 private:
154
0
  void ignore_padding__() { padding = nullptr; }
155
};
156
}  // namespace ROCKSDB_NAMESPACE
157
#endif
158
159
#if defined(XPRESS)
160
#include "port/xpress.h"
161
#endif
162
163
namespace ROCKSDB_NAMESPACE {
164
165
// Holds dictionary and related data, like ZSTD's digested compression
166
// dictionary.
167
struct CompressionDict {
168
#if ZSTD_VERSION_NUMBER >= 700
169
  ZSTD_CDict* zstd_cdict_ = nullptr;
170
#endif  // ZSTD_VERSION_NUMBER >= 700
171
  std::string dict_;
172
173
 public:
174
#if ZSTD_VERSION_NUMBER >= 700
175
  CompressionDict(std::string dict, CompressionType type, int level) {
176
#else   // ZSTD_VERSION_NUMBER >= 700
177
0
  CompressionDict(std::string dict, CompressionType /*type*/, int /*level*/) {
178
0
#endif  // ZSTD_VERSION_NUMBER >= 700
179
0
    dict_ = std::move(dict);
180
#if ZSTD_VERSION_NUMBER >= 700
181
    zstd_cdict_ = nullptr;
182
    if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) {
183
      if (level == CompressionOptions::kDefaultCompressionLevel) {
184
        // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
185
        // https://github.com/facebook/zstd/issues/1148
186
        // TODO(cbi): ZSTD_CLEVEL_DEFAULT is exposed after
187
        //  https://github.com/facebook/zstd/pull/1174. Use ZSTD_CLEVEL_DEFAULT
188
        //  instead of hardcoding 3.
189
        level = 3;
190
      }
191
      // Should be safe (but slower) if below call fails as we'll use the
192
      // raw dictionary to compress.
193
      zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
194
      assert(zstd_cdict_ != nullptr);
195
    }
196
#endif  // ZSTD_VERSION_NUMBER >= 700
197
0
  }
198
199
1
  ~CompressionDict() {
200
#if ZSTD_VERSION_NUMBER >= 700
201
    size_t res = 0;
202
    if (zstd_cdict_ != nullptr) {
203
      res = ZSTD_freeCDict(zstd_cdict_);
204
    }
205
    assert(res == 0);  // Last I checked they can't fail
206
    (void)res;         // prevent unused var warning
207
#endif                 // ZSTD_VERSION_NUMBER >= 700
208
1
  }
209
210
#if ZSTD_VERSION_NUMBER >= 700
211
  const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
212
#endif  // ZSTD_VERSION_NUMBER >= 700
213
214
0
  Slice GetRawDict() const { return dict_; }
215
216
13.3k
  static const CompressionDict& GetEmptyDict() {
217
13.3k
    static CompressionDict empty_dict{};
218
13.3k
    return empty_dict;
219
13.3k
  }
220
221
1
  CompressionDict() = default;
222
  // Disable copy/move
223
  CompressionDict(const CompressionDict&) = delete;
224
  CompressionDict& operator=(const CompressionDict&) = delete;
225
  CompressionDict(CompressionDict&&) = delete;
226
  CompressionDict& operator=(CompressionDict&&) = delete;
227
};
228
229
// Holds dictionary and related data, like ZSTD's digested uncompression
230
// dictionary.
231
struct UncompressionDict {
232
  // Block containing the data for the compression dictionary in case the
233
  // constructor that takes a string parameter is used.
234
  std::string dict_;
235
236
  // Block containing the data for the compression dictionary in case the
237
  // constructor that takes a Slice parameter is used and the passed in
238
  // CacheAllocationPtr is not nullptr.
239
  CacheAllocationPtr allocation_;
240
241
  // Slice pointing to the compression dictionary data. Can point to
242
  // dict_, allocation_, or some other memory location, depending on how
243
  // the object was constructed.
244
  Slice slice_;
245
246
#ifdef ROCKSDB_ZSTD_DDICT
247
  // Processed version of the contents of slice_ for ZSTD compression.
248
  ZSTD_DDict* zstd_ddict_ = nullptr;
249
#endif  // ROCKSDB_ZSTD_DDICT
250
251
#ifdef ROCKSDB_ZSTD_DDICT
252
  UncompressionDict(std::string dict, bool using_zstd)
253
#else   // ROCKSDB_ZSTD_DDICT
254
  UncompressionDict(std::string dict, bool /* using_zstd */)
255
#endif  // ROCKSDB_ZSTD_DDICT
256
0
      : dict_(std::move(dict)), slice_(dict_) {
257
#ifdef ROCKSDB_ZSTD_DDICT
258
    if (!slice_.empty() && using_zstd) {
259
      zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
260
      assert(zstd_ddict_ != nullptr);
261
    }
262
#endif  // ROCKSDB_ZSTD_DDICT
263
0
  }
264
265
#ifdef ROCKSDB_ZSTD_DDICT
266
  UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
267
                    bool using_zstd)
268
#else   // ROCKSDB_ZSTD_DDICT
269
  UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
270
                    bool /* using_zstd */)
271
#endif  // ROCKSDB_ZSTD_DDICT
272
0
      : allocation_(std::move(allocation)), slice_(std::move(slice)) {
273
#ifdef ROCKSDB_ZSTD_DDICT
274
    if (!slice_.empty() && using_zstd) {
275
      zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
276
      assert(zstd_ddict_ != nullptr);
277
    }
278
#endif  // ROCKSDB_ZSTD_DDICT
279
0
  }
280
281
  UncompressionDict(UncompressionDict&& rhs)
282
      : dict_(std::move(rhs.dict_)),
283
        allocation_(std::move(rhs.allocation_)),
284
        slice_(std::move(rhs.slice_))
285
#ifdef ROCKSDB_ZSTD_DDICT
286
        ,
287
        zstd_ddict_(rhs.zstd_ddict_)
288
#endif
289
0
  {
290
0
#ifdef ROCKSDB_ZSTD_DDICT
291
0
    rhs.zstd_ddict_ = nullptr;
292
0
#endif
293
0
  }
294
295
1
  ~UncompressionDict() {
296
#ifdef ROCKSDB_ZSTD_DDICT
297
    size_t res = 0;
298
    if (zstd_ddict_ != nullptr) {
299
      res = ZSTD_freeDDict(zstd_ddict_);
300
    }
301
    assert(res == 0);  // Last I checked they can't fail
302
    (void)res;         // prevent unused var warning
303
#endif                 // ROCKSDB_ZSTD_DDICT
304
1
  }
305
306
0
  UncompressionDict& operator=(UncompressionDict&& rhs) {
307
0
    if (this == &rhs) {
308
0
      return *this;
309
0
    }
310
0
311
0
    dict_ = std::move(rhs.dict_);
312
0
    allocation_ = std::move(rhs.allocation_);
313
0
    slice_ = std::move(rhs.slice_);
314
0
315
0
#ifdef ROCKSDB_ZSTD_DDICT
316
0
    zstd_ddict_ = rhs.zstd_ddict_;
317
0
    rhs.zstd_ddict_ = nullptr;
318
0
#endif
319
0
320
0
    return *this;
321
0
  }
322
323
  // The object is self-contained if the string constructor is used, or the
324
  // Slice constructor is invoked with a non-null allocation. Otherwise, it
325
  // is the caller's responsibility to ensure that the underlying storage
326
  // outlives this object.
327
0
  bool own_bytes() const { return !dict_.empty() || allocation_; }
328
329
0
  const Slice& GetRawDict() const { return slice_; }
330
331
  // For TypedCacheInterface
332
0
  const Slice& ContentSlice() const { return slice_; }
333
  static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
334
  static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
335
336
#ifdef ROCKSDB_ZSTD_DDICT
337
  const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; }
338
#endif  // ROCKSDB_ZSTD_DDICT
339
340
25.2k
  static const UncompressionDict& GetEmptyDict() {
341
25.2k
    static UncompressionDict empty_dict{};
342
25.2k
    return empty_dict;
343
25.2k
  }
344
345
0
  size_t ApproximateMemoryUsage() const {
346
0
    size_t usage = sizeof(struct UncompressionDict);
347
0
    usage += dict_.size();
348
0
    if (allocation_) {
349
0
      auto allocator = allocation_.get_deleter().allocator;
350
0
      if (allocator) {
351
0
        usage += allocator->UsableSize(allocation_.get(), slice_.size());
352
0
      } else {
353
0
        usage += slice_.size();
354
0
      }
355
0
    }
356
#ifdef ROCKSDB_ZSTD_DDICT
357
    usage += ZSTD_sizeof_DDict(zstd_ddict_);
358
#endif  // ROCKSDB_ZSTD_DDICT
359
0
    return usage;
360
0
  }
361
362
1
  UncompressionDict() = default;
363
  // Disable copy
364
  UncompressionDict(const CompressionDict&) = delete;
365
  UncompressionDict& operator=(const CompressionDict&) = delete;
366
};
367
368
class CompressionContext {
369
 private:
370
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
371
  ZSTD_CCtx* zstd_ctx_ = nullptr;
372
373
  ZSTD_CCtx* CreateZSTDContext() {
374
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
375
    return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
376
#else   // ROCKSDB_ZSTD_CUSTOM_MEM
377
    return ZSTD_createCCtx();
378
#endif  // ROCKSDB_ZSTD_CUSTOM_MEM
379
  }
380
381
  void CreateNativeContext(CompressionType type, int level, bool checksum) {
382
    if (type == kZSTD || type == kZSTDNotFinalCompression) {
383
      zstd_ctx_ = CreateZSTDContext();
384
#ifdef ZSTD_ADVANCED
385
      if (level == CompressionOptions::kDefaultCompressionLevel) {
386
        // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
387
        // https://github.com/facebook/zstd/issues/1148
388
        level = 3;
389
      }
390
      size_t err =
391
          ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level);
392
      if (ZSTD_isError(err)) {
393
        assert(false);
394
        ZSTD_freeCCtx(zstd_ctx_);
395
        zstd_ctx_ = CreateZSTDContext();
396
      }
397
      if (checksum) {
398
        err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1);
399
        if (ZSTD_isError(err)) {
400
          assert(false);
401
          ZSTD_freeCCtx(zstd_ctx_);
402
          zstd_ctx_ = CreateZSTDContext();
403
        }
404
      }
405
#else
406
      (void)level;
407
      (void)checksum;
408
#endif
409
    }
410
  }
411
  void DestroyNativeContext() {
412
    if (zstd_ctx_ != nullptr) {
413
      ZSTD_freeCCtx(zstd_ctx_);
414
    }
415
  }
416
417
 public:
418
  // callable inside ZSTD_Compress
419
  ZSTD_CCtx* ZSTDPreallocCtx() const {
420
    assert(zstd_ctx_ != nullptr);
421
    return zstd_ctx_;
422
  }
423
424
#else   // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
425
 private:
426
  void CreateNativeContext(CompressionType /* type */, int /* level */,
427
4.48k
                           bool /* checksum */) {}
428
4.48k
  void DestroyNativeContext() {}
429
#endif  // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
430
 public:
431
  explicit CompressionContext(CompressionType type,
432
4.48k
                              const CompressionOptions& options) {
433
4.48k
    CreateNativeContext(type, options.level, options.checksum);
434
4.48k
  }
435
4.48k
  ~CompressionContext() { DestroyNativeContext(); }
436
  CompressionContext(const CompressionContext&) = delete;
437
  CompressionContext& operator=(const CompressionContext&) = delete;
438
};
439
440
class CompressionInfo {
441
  const CompressionOptions& opts_;
442
  const CompressionContext& context_;
443
  const CompressionDict& dict_;
444
  const CompressionType type_;
445
  const uint64_t sample_for_compression_;
446
447
 public:
448
  CompressionInfo(const CompressionOptions& _opts,
449
                  const CompressionContext& _context,
450
                  const CompressionDict& _dict, CompressionType _type,
451
                  uint64_t _sample_for_compression)
452
      : opts_(_opts),
453
        context_(_context),
454
        dict_(_dict),
455
        type_(_type),
456
13.3k
        sample_for_compression_(_sample_for_compression) {}
457
458
13.3k
  const CompressionOptions& options() const { return opts_; }
459
0
  const CompressionContext& context() const { return context_; }
460
0
  const CompressionDict& dict() const { return dict_; }
461
13.3k
  CompressionType type() const { return type_; }
462
8.85k
  uint64_t SampleForCompression() const { return sample_for_compression_; }
463
};
464
465
class UncompressionContext {
466
 private:
467
  CompressionContextCache* ctx_cache_ = nullptr;
468
  ZSTDUncompressCachedData uncomp_cached_data_;
469
470
 public:
471
0
  explicit UncompressionContext(CompressionType type) {
472
0
    if (type == kZSTD || type == kZSTDNotFinalCompression) {
473
0
      ctx_cache_ = CompressionContextCache::Instance();
474
0
      uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
475
0
    }
476
0
  }
477
0
  ~UncompressionContext() {
478
0
    if (uncomp_cached_data_.GetCacheIndex() != -1) {
479
0
      assert(ctx_cache_ != nullptr);
480
0
      ctx_cache_->ReturnCachedZSTDUncompressData(
481
0
          uncomp_cached_data_.GetCacheIndex());
482
0
    }
483
0
  }
484
  UncompressionContext(const UncompressionContext&) = delete;
485
  UncompressionContext& operator=(const UncompressionContext&) = delete;
486
487
0
  ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
488
0
    return uncomp_cached_data_.Get();
489
0
  }
490
};
491
492
class UncompressionInfo {
493
  const UncompressionContext& context_;
494
  const UncompressionDict& dict_;
495
  const CompressionType type_;
496
497
 public:
498
  UncompressionInfo(const UncompressionContext& _context,
499
                    const UncompressionDict& _dict, CompressionType _type)
500
0
      : context_(_context), dict_(_dict), type_(_type) {}
501
502
0
  const UncompressionContext& context() const { return context_; }
503
0
  const UncompressionDict& dict() const { return dict_; }
504
0
  CompressionType type() const { return type_; }
505
};
506
507
148k
inline bool Snappy_Supported() {
508
#ifdef SNAPPY
509
  return true;
510
#else
511
148k
  return false;
512
148k
#endif
513
148k
}
514
515
9.02k
inline bool Zlib_Supported() {
516
9.02k
#ifdef ZLIB
517
9.02k
  return true;
518
#else
519
  return false;
520
#endif
521
9.02k
}
522
523
9.02k
inline bool BZip2_Supported() {
524
9.02k
#ifdef BZIP2
525
9.02k
  return true;
526
#else
527
  return false;
528
#endif
529
9.02k
}
530
531
18.0k
inline bool LZ4_Supported() {
532
#ifdef LZ4
533
  return true;
534
#else
535
18.0k
  return false;
536
18.0k
#endif
537
18.0k
}
538
539
9.02k
inline bool XPRESS_Supported() {
540
#ifdef XPRESS
541
  return true;
542
#else
543
9.02k
  return false;
544
9.02k
#endif
545
9.02k
}
546
547
9.02k
inline bool ZSTD_Supported() {
548
#ifdef ZSTD
549
  // ZSTD format is finalized since version 0.8.0.
550
  return (ZSTD_versionNumber() >= 800);
551
#else
552
9.02k
  return false;
553
9.02k
#endif
554
9.02k
}
555
556
9.02k
inline bool ZSTDNotFinal_Supported() {
557
#ifdef ZSTD
558
  return true;
559
#else
560
9.02k
  return false;
561
9.02k
#endif
562
9.02k
}
563
564
0
inline bool ZSTD_Streaming_Supported() {
565
#if defined(ZSTD_ADVANCED)
566
  return true;
567
#else
568
0
  return false;
569
0
#endif
570
0
}
571
572
inline bool StreamingCompressionTypeSupported(
573
13.5k
    CompressionType compression_type) {
574
13.5k
  switch (compression_type) {
575
13.5k
    case kNoCompression:
576
13.5k
      return true;
577
0
    case kZSTD:
578
0
      return ZSTD_Streaming_Supported();
579
0
    default:
580
0
      return false;
581
13.5k
  }
582
13.5k
}
583
584
90.2k
inline bool CompressionTypeSupported(CompressionType compression_type) {
585
90.2k
  switch (compression_type) {
586
18.0k
    case kNoCompression:
587
18.0k
      return true;
588
9.02k
    case kSnappyCompression:
589
9.02k
      return Snappy_Supported();
590
9.02k
    case kZlibCompression:
591
9.02k
      return Zlib_Supported();
592
9.02k
    case kBZip2Compression:
593
9.02k
      return BZip2_Supported();
594
9.02k
    case kLZ4Compression:
595
9.02k
      return LZ4_Supported();
596
9.02k
    case kLZ4HCCompression:
597
9.02k
      return LZ4_Supported();
598
9.02k
    case kXpressCompression:
599
9.02k
      return XPRESS_Supported();
600
9.02k
    case kZSTDNotFinalCompression:
601
9.02k
      return ZSTDNotFinal_Supported();
602
9.02k
    case kZSTD:
603
9.02k
      return ZSTD_Supported();
604
0
    default:
605
0
      assert(false);
606
0
      return false;
607
90.2k
  }
608
90.2k
}
609
610
0
inline bool DictCompressionTypeSupported(CompressionType compression_type) {
611
0
  switch (compression_type) {
612
0
    case kNoCompression:
613
0
      return false;
614
0
    case kSnappyCompression:
615
0
      return false;
616
0
    case kZlibCompression:
617
0
      return Zlib_Supported();
618
0
    case kBZip2Compression:
619
0
      return false;
620
0
    case kLZ4Compression:
621
0
    case kLZ4HCCompression:
622
#if LZ4_VERSION_NUMBER >= 10400  // r124+
623
      return LZ4_Supported();
624
#else
625
0
      return false;
626
0
#endif
627
0
    case kXpressCompression:
628
0
      return false;
629
0
    case kZSTDNotFinalCompression:
630
#if ZSTD_VERSION_NUMBER >= 500  // v0.5.0+
631
      return ZSTDNotFinal_Supported();
632
#else
633
0
      return false;
634
0
#endif
635
0
    case kZSTD:
636
#if ZSTD_VERSION_NUMBER >= 500  // v0.5.0+
637
      return ZSTD_Supported();
638
#else
639
0
      return false;
640
0
#endif
641
0
    default:
642
0
      assert(false);
643
0
      return false;
644
0
  }
645
0
}
646
647
35.9k
inline std::string CompressionTypeToString(CompressionType compression_type) {
648
35.9k
  switch (compression_type) {
649
27.0k
    case kNoCompression:
650
27.0k
      return "NoCompression";
651
0
    case kSnappyCompression:
652
0
      return "Snappy";
653
0
    case kZlibCompression:
654
0
      return "Zlib";
655
0
    case kBZip2Compression:
656
0
      return "BZip2";
657
0
    case kLZ4Compression:
658
0
      return "LZ4";
659
0
    case kLZ4HCCompression:
660
0
      return "LZ4HC";
661
0
    case kXpressCompression:
662
0
      return "Xpress";
663
4.48k
    case kZSTD:
664
4.48k
      return "ZSTD";
665
4.48k
    case kZSTDNotFinalCompression:
666
4.48k
      return "ZSTDNotFinal";
667
0
    case kDisableCompressionOption:
668
0
      return "DisableOption";
669
0
    default:
670
0
      assert(false);
671
0
      return "";
672
35.9k
  }
673
35.9k
}
674
675
inline std::string CompressionOptionsToString(
676
4.48k
    CompressionOptions& compression_options) {
677
4.48k
  std::string result;
678
4.48k
  result.reserve(512);
679
4.48k
  result.append("window_bits=")
680
4.48k
      .append(std::to_string(compression_options.window_bits))
681
4.48k
      .append("; ");
682
4.48k
  result.append("level=")
683
4.48k
      .append(std::to_string(compression_options.level))
684
4.48k
      .append("; ");
685
4.48k
  result.append("strategy=")
686
4.48k
      .append(std::to_string(compression_options.strategy))
687
4.48k
      .append("; ");
688
4.48k
  result.append("max_dict_bytes=")
689
4.48k
      .append(std::to_string(compression_options.max_dict_bytes))
690
4.48k
      .append("; ");
691
4.48k
  result.append("zstd_max_train_bytes=")
692
4.48k
      .append(std::to_string(compression_options.zstd_max_train_bytes))
693
4.48k
      .append("; ");
694
4.48k
  result.append("enabled=")
695
4.48k
      .append(std::to_string(compression_options.enabled))
696
4.48k
      .append("; ");
697
4.48k
  result.append("max_dict_buffer_bytes=")
698
4.48k
      .append(std::to_string(compression_options.max_dict_buffer_bytes))
699
4.48k
      .append("; ");
700
4.48k
  result.append("use_zstd_dict_trainer=")
701
4.48k
      .append(std::to_string(compression_options.use_zstd_dict_trainer))
702
4.48k
      .append("; ");
703
4.48k
  return result;
704
4.48k
}
705
706
// compress_format_version can have two values:
707
// 1 -- decompressed sizes for BZip2 and Zlib are not included in the compressed
708
// block. Also, decompressed sizes for LZ4 are encoded in platform-dependent
709
// way.
710
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
711
// start of compressed block. Snappy format is the same as version 1.
712
713
inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
714
0
                            size_t length, ::std::string* output) {
715
#ifdef SNAPPY
716
  output->resize(snappy::MaxCompressedLength(length));
717
  size_t outlen;
718
  snappy::RawCompress(input, length, &(*output)[0], &outlen);
719
  output->resize(outlen);
720
  return true;
721
#else
722
0
  (void)input;
723
0
  (void)length;
724
0
  (void)output;
725
0
  return false;
726
0
#endif
727
0
}
728
729
inline CacheAllocationPtr Snappy_Uncompress(
730
    const char* input, size_t length, size_t* uncompressed_size,
731
0
    MemoryAllocator* allocator = nullptr) {
732
#ifdef SNAPPY
733
  size_t uncompressed_length = 0;
734
  if (!snappy::GetUncompressedLength(input, length, &uncompressed_length)) {
735
    return nullptr;
736
  }
737
738
  CacheAllocationPtr output = AllocateBlock(uncompressed_length, allocator);
739
740
  if (!snappy::RawUncompress(input, length, output.get())) {
741
    return nullptr;
742
  }
743
744
  *uncompressed_size = uncompressed_length;
745
746
  return output;
747
#else
748
0
  (void)input;
749
0
  (void)length;
750
0
  (void)uncompressed_size;
751
0
  (void)allocator;
752
0
  return nullptr;
753
0
#endif
754
0
}
755
756
namespace compression {
757
// returns size
758
0
inline size_t PutDecompressedSizeInfo(std::string* output, uint32_t length) {
759
0
  PutVarint32(output, length);
760
0
  return output->size();
761
0
}
762
763
inline bool GetDecompressedSizeInfo(const char** input_data,
764
                                    size_t* input_length,
765
0
                                    uint32_t* output_len) {
766
0
  auto new_input_data =
767
0
      GetVarint32Ptr(*input_data, *input_data + *input_length, output_len);
768
0
  if (new_input_data == nullptr) {
769
0
    return false;
770
0
  }
771
0
  *input_length -= (new_input_data - *input_data);
772
0
  *input_data = new_input_data;
773
0
  return true;
774
0
}
775
}  // namespace compression
776
777
// compress_format_version == 1 -- decompressed size is not included in the
778
// block header
779
// compress_format_version == 2 -- decompressed size is included in the block
780
// header in varint32 format
781
// @param compression_dict Data for presetting the compression library's
782
//    dictionary.
783
inline bool Zlib_Compress(const CompressionInfo& info,
784
                          uint32_t compress_format_version, const char* input,
785
0
                          size_t length, ::std::string* output) {
786
0
#ifdef ZLIB
787
0
  if (length > std::numeric_limits<uint32_t>::max()) {
788
    // Can't compress more than 4GB
789
0
    return false;
790
0
  }
791
792
0
  size_t output_header_len = 0;
793
0
  if (compress_format_version == 2) {
794
0
    output_header_len = compression::PutDecompressedSizeInfo(
795
0
        output, static_cast<uint32_t>(length));
796
0
  }
797
798
  // The memLevel parameter specifies how much memory should be allocated for
799
  // the internal compression state.
800
  // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
801
  // memLevel=9 uses maximum memory for optimal speed.
802
  // The default value is 8. See zconf.h for more details.
803
0
  static const int memLevel = 8;
804
0
  int level;
805
0
  if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
806
0
    level = Z_DEFAULT_COMPRESSION;
807
0
  } else {
808
0
    level = info.options().level;
809
0
  }
810
0
  z_stream _stream;
811
0
  memset(&_stream, 0, sizeof(z_stream));
812
0
  int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits,
813
0
                        memLevel, info.options().strategy);
814
0
  if (st != Z_OK) {
815
0
    return false;
816
0
  }
817
818
0
  Slice compression_dict = info.dict().GetRawDict();
819
0
  if (compression_dict.size()) {
820
    // Initialize the compression library's dictionary
821
0
    st = deflateSetDictionary(
822
0
        &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
823
0
        static_cast<unsigned int>(compression_dict.size()));
824
0
    if (st != Z_OK) {
825
0
      deflateEnd(&_stream);
826
0
      return false;
827
0
    }
828
0
  }
829
830
  // Get an upper bound on the compressed size.
831
0
  size_t upper_bound =
832
0
      deflateBound(&_stream, static_cast<unsigned long>(length));
833
0
  output->resize(output_header_len + upper_bound);
834
835
  // Compress the input, and put compressed data in output.
836
0
  _stream.next_in = (Bytef*)input;
837
0
  _stream.avail_in = static_cast<unsigned int>(length);
838
839
  // Initialize the output size.
840
0
  _stream.avail_out = static_cast<unsigned int>(upper_bound);
841
0
  _stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
842
843
0
  bool compressed = false;
844
0
  st = deflate(&_stream, Z_FINISH);
845
0
  if (st == Z_STREAM_END) {
846
0
    compressed = true;
847
0
    output->resize(output->size() - _stream.avail_out);
848
0
  }
849
  // The only return value we really care about is Z_STREAM_END.
850
  // Z_OK means insufficient output space. This means the compression is
851
  // bigger than decompressed size. Just fail the compression in that case.
852
853
0
  deflateEnd(&_stream);
854
0
  return compressed;
855
#else
856
  (void)info;
857
  (void)compress_format_version;
858
  (void)input;
859
  (void)length;
860
  (void)output;
861
  return false;
862
#endif
863
0
}
864
865
// compress_format_version == 1 -- decompressed size is not included in the
866
// block header
867
// compress_format_version == 2 -- decompressed size is included in the block
868
// header in varint32 format
869
// @param compression_dict Data for presetting the compression library's
870
//    dictionary.
871
inline CacheAllocationPtr Zlib_Uncompress(
872
    const UncompressionInfo& info, const char* input_data, size_t input_length,
873
    size_t* uncompressed_size, uint32_t compress_format_version,
874
0
    MemoryAllocator* allocator = nullptr, int windowBits = -14) {
875
0
#ifdef ZLIB
876
0
  uint32_t output_len = 0;
877
0
  if (compress_format_version == 2) {
878
0
    if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
879
0
                                              &output_len)) {
880
0
      return nullptr;
881
0
    }
882
0
  } else {
883
    // Assume the decompressed data size will 5x of compressed size, but round
884
    // to the page size
885
0
    size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
886
0
    output_len = static_cast<uint32_t>(
887
0
        std::min(proposed_output_len,
888
0
                 static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
889
0
  }
890
891
0
  z_stream _stream;
892
0
  memset(&_stream, 0, sizeof(z_stream));
893
894
  // For raw inflate, the windowBits should be -8..-15.
895
  // If windowBits is bigger than zero, it will use either zlib
896
  // header or gzip header. Adding 32 to it will do automatic detection.
897
0
  int st =
898
0
      inflateInit2(&_stream, windowBits > 0 ? windowBits + 32 : windowBits);
899
0
  if (st != Z_OK) {
900
0
    return nullptr;
901
0
  }
902
903
0
  const Slice& compression_dict = info.dict().GetRawDict();
904
0
  if (compression_dict.size()) {
905
    // Initialize the compression library's dictionary
906
0
    st = inflateSetDictionary(
907
0
        &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
908
0
        static_cast<unsigned int>(compression_dict.size()));
909
0
    if (st != Z_OK) {
910
0
      return nullptr;
911
0
    }
912
0
  }
913
914
0
  _stream.next_in = (Bytef*)input_data;
915
0
  _stream.avail_in = static_cast<unsigned int>(input_length);
916
917
0
  auto output = AllocateBlock(output_len, allocator);
918
919
0
  _stream.next_out = (Bytef*)output.get();
920
0
  _stream.avail_out = static_cast<unsigned int>(output_len);
921
922
0
  bool done = false;
923
0
  while (!done) {
924
0
    st = inflate(&_stream, Z_SYNC_FLUSH);
925
0
    switch (st) {
926
0
      case Z_STREAM_END:
927
0
        done = true;
928
0
        break;
929
0
      case Z_OK: {
930
        // No output space. Increase the output space by 20%.
931
        // We should never run out of output space if
932
        // compress_format_version == 2
933
0
        assert(compress_format_version != 2);
934
0
        size_t old_sz = output_len;
935
0
        uint32_t output_len_delta = output_len / 5;
936
0
        output_len += output_len_delta < 10 ? 10 : output_len_delta;
937
0
        auto tmp = AllocateBlock(output_len, allocator);
938
0
        memcpy(tmp.get(), output.get(), old_sz);
939
0
        output = std::move(tmp);
940
941
        // Set more output.
942
0
        _stream.next_out = (Bytef*)(output.get() + old_sz);
943
0
        _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
944
0
        break;
945
0
      }
946
0
      case Z_BUF_ERROR:
947
0
      default:
948
0
        inflateEnd(&_stream);
949
0
        return nullptr;
950
0
    }
951
0
  }
952
953
  // If we encoded decompressed block size, we should have no bytes left
954
0
  assert(compress_format_version != 2 || _stream.avail_out == 0);
955
0
  assert(output_len >= _stream.avail_out);
956
0
  *uncompressed_size = output_len - _stream.avail_out;
957
0
  inflateEnd(&_stream);
958
0
  return output;
959
#else
960
  (void)info;
961
  (void)input_data;
962
  (void)input_length;
963
  (void)uncompressed_size;
964
  (void)compress_format_version;
965
  (void)allocator;
966
  (void)windowBits;
967
  return nullptr;
968
#endif
969
0
}
970
971
// compress_format_version == 1 -- decompressed size is not included in the
972
// block header
973
// compress_format_version == 2 -- decompressed size is included in the block
974
// header in varint32 format
975
inline bool BZip2_Compress(const CompressionInfo& /*info*/,
976
                           uint32_t compress_format_version, const char* input,
977
0
                           size_t length, ::std::string* output) {
978
0
#ifdef BZIP2
979
0
  if (length > std::numeric_limits<uint32_t>::max()) {
980
    // Can't compress more than 4GB
981
0
    return false;
982
0
  }
983
0
  size_t output_header_len = 0;
984
0
  if (compress_format_version == 2) {
985
0
    output_header_len = compression::PutDecompressedSizeInfo(
986
0
        output, static_cast<uint32_t>(length));
987
0
  }
988
  // Resize output to be the plain data length.
989
  // This may not be big enough if the compression actually expands data.
990
0
  output->resize(output_header_len + length);
991
992
0
  bz_stream _stream;
993
0
  memset(&_stream, 0, sizeof(bz_stream));
994
995
  // Block size 1 is 100K.
996
  // 0 is for silent.
997
  // 30 is the default workFactor
998
0
  int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
999
0
  if (st != BZ_OK) {
1000
0
    return false;
1001
0
  }
1002
1003
  // Compress the input, and put compressed data in output.
1004
0
  _stream.next_in = (char*)input;
1005
0
  _stream.avail_in = static_cast<unsigned int>(length);
1006
1007
  // Initialize the output size.
1008
0
  _stream.avail_out = static_cast<unsigned int>(length);
1009
0
  _stream.next_out = reinterpret_cast<char*>(&(*output)[output_header_len]);
1010
1011
0
  bool compressed = false;
1012
0
  st = BZ2_bzCompress(&_stream, BZ_FINISH);
1013
0
  if (st == BZ_STREAM_END) {
1014
0
    compressed = true;
1015
0
    output->resize(output->size() - _stream.avail_out);
1016
0
  }
1017
  // The only return value we really care about is BZ_STREAM_END.
1018
  // BZ_FINISH_OK means insufficient output space. This means the compression
1019
  // is bigger than decompressed size. Just fail the compression in that case.
1020
1021
0
  BZ2_bzCompressEnd(&_stream);
1022
0
  return compressed;
1023
#else
1024
  (void)compress_format_version;
1025
  (void)input;
1026
  (void)length;
1027
  (void)output;
1028
  return false;
1029
#endif
1030
0
}
1031
1032
// compress_format_version == 1 -- decompressed size is not included in the
1033
// block header
1034
// compress_format_version == 2 -- decompressed size is included in the block
1035
// header in varint32 format
1036
inline CacheAllocationPtr BZip2_Uncompress(
1037
    const char* input_data, size_t input_length, size_t* uncompressed_size,
1038
0
    uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) {
1039
0
#ifdef BZIP2
1040
0
  uint32_t output_len = 0;
1041
0
  if (compress_format_version == 2) {
1042
0
    if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
1043
0
                                              &output_len)) {
1044
0
      return nullptr;
1045
0
    }
1046
0
  } else {
1047
    // Assume the decompressed data size will 5x of compressed size, but round
1048
    // to the next page size
1049
0
    size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
1050
0
    output_len = static_cast<uint32_t>(
1051
0
        std::min(proposed_output_len,
1052
0
                 static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
1053
0
  }
1054
1055
0
  bz_stream _stream;
1056
0
  memset(&_stream, 0, sizeof(bz_stream));
1057
1058
0
  int st = BZ2_bzDecompressInit(&_stream, 0, 0);
1059
0
  if (st != BZ_OK) {
1060
0
    return nullptr;
1061
0
  }
1062
1063
0
  _stream.next_in = (char*)input_data;
1064
0
  _stream.avail_in = static_cast<unsigned int>(input_length);
1065
1066
0
  auto output = AllocateBlock(output_len, allocator);
1067
1068
0
  _stream.next_out = (char*)output.get();
1069
0
  _stream.avail_out = static_cast<unsigned int>(output_len);
1070
1071
0
  bool done = false;
1072
0
  while (!done) {
1073
0
    st = BZ2_bzDecompress(&_stream);
1074
0
    switch (st) {
1075
0
      case BZ_STREAM_END:
1076
0
        done = true;
1077
0
        break;
1078
0
      case BZ_OK: {
1079
        // No output space. Increase the output space by 20%.
1080
        // We should never run out of output space if
1081
        // compress_format_version == 2
1082
0
        assert(compress_format_version != 2);
1083
0
        uint32_t old_sz = output_len;
1084
0
        output_len = output_len * 1.2;
1085
0
        auto tmp = AllocateBlock(output_len, allocator);
1086
0
        memcpy(tmp.get(), output.get(), old_sz);
1087
0
        output = std::move(tmp);
1088
1089
        // Set more output.
1090
0
        _stream.next_out = (char*)(output.get() + old_sz);
1091
0
        _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
1092
0
        break;
1093
0
      }
1094
0
      default:
1095
0
        BZ2_bzDecompressEnd(&_stream);
1096
0
        return nullptr;
1097
0
    }
1098
0
  }
1099
1100
  // If we encoded decompressed block size, we should have no bytes left
1101
0
  assert(compress_format_version != 2 || _stream.avail_out == 0);
1102
0
  assert(output_len >= _stream.avail_out);
1103
0
  *uncompressed_size = output_len - _stream.avail_out;
1104
0
  BZ2_bzDecompressEnd(&_stream);
1105
0
  return output;
1106
#else
1107
  (void)input_data;
1108
  (void)input_length;
1109
  (void)uncompressed_size;
1110
  (void)compress_format_version;
1111
  (void)allocator;
1112
  return nullptr;
1113
#endif
1114
0
}
1115
1116
// compress_format_version == 1 -- decompressed size is included in the
1117
// block header using memcpy, which makes database non-portable)
1118
// compress_format_version == 2 -- decompressed size is included in the block
1119
// header in varint32 format
1120
// @param compression_dict Data for presetting the compression library's
1121
//    dictionary.
1122
inline bool LZ4_Compress(const CompressionInfo& info,
1123
                         uint32_t compress_format_version, const char* input,
1124
0
                         size_t length, ::std::string* output) {
1125
#ifdef LZ4
1126
  if (length > std::numeric_limits<uint32_t>::max()) {
1127
    // Can't compress more than 4GB
1128
    return false;
1129
  }
1130
1131
  size_t output_header_len = 0;
1132
  if (compress_format_version == 2) {
1133
    // new encoding, using varint32 to store size information
1134
    output_header_len = compression::PutDecompressedSizeInfo(
1135
        output, static_cast<uint32_t>(length));
1136
  } else {
1137
    // legacy encoding, which is not really portable (depends on big/little
1138
    // endianness)
1139
    output_header_len = 8;
1140
    output->resize(output_header_len);
1141
    char* p = const_cast<char*>(output->c_str());
1142
    memcpy(p, &length, sizeof(length));
1143
  }
1144
  int compress_bound = LZ4_compressBound(static_cast<int>(length));
1145
  output->resize(static_cast<size_t>(output_header_len + compress_bound));
1146
1147
  int outlen;
1148
#if LZ4_VERSION_NUMBER >= 10400  // r124+
1149
  LZ4_stream_t* stream = LZ4_createStream();
1150
  Slice compression_dict = info.dict().GetRawDict();
1151
  if (compression_dict.size()) {
1152
    LZ4_loadDict(stream, compression_dict.data(),
1153
                 static_cast<int>(compression_dict.size()));
1154
  }
1155
#if LZ4_VERSION_NUMBER >= 10700  // r129+
1156
  int acceleration;
1157
  if (info.options().level < 0) {
1158
    acceleration = -info.options().level;
1159
  } else {
1160
    acceleration = 1;
1161
  }
1162
  outlen = LZ4_compress_fast_continue(
1163
      stream, input, &(*output)[output_header_len], static_cast<int>(length),
1164
      compress_bound, acceleration);
1165
#else  // up to r128
1166
  outlen = LZ4_compress_limitedOutput_continue(
1167
      stream, input, &(*output)[output_header_len], static_cast<int>(length),
1168
      compress_bound);
1169
#endif
1170
  LZ4_freeStream(stream);
1171
#else   // up to r123
1172
  outlen = LZ4_compress_limitedOutput(input, &(*output)[output_header_len],
1173
                                      static_cast<int>(length), compress_bound);
1174
#endif  // LZ4_VERSION_NUMBER >= 10400
1175
1176
  if (outlen == 0) {
1177
    return false;
1178
  }
1179
  output->resize(static_cast<size_t>(output_header_len + outlen));
1180
  return true;
1181
#else  // LZ4
1182
0
  (void)info;
1183
0
  (void)compress_format_version;
1184
0
  (void)input;
1185
0
  (void)length;
1186
0
  (void)output;
1187
0
  return false;
1188
0
#endif
1189
0
}
1190
1191
// compress_format_version == 1 -- decompressed size is included in the
1192
// block header using memcpy, which makes database non-portable)
1193
// compress_format_version == 2 -- decompressed size is included in the block
1194
// header in varint32 format
1195
// @param compression_dict Data for presetting the compression library's
1196
//    dictionary.
1197
inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
1198
                                         const char* input_data,
1199
                                         size_t input_length,
1200
                                         size_t* uncompressed_size,
1201
                                         uint32_t compress_format_version,
1202
0
                                         MemoryAllocator* allocator = nullptr) {
1203
#ifdef LZ4
1204
  uint32_t output_len = 0;
1205
  if (compress_format_version == 2) {
1206
    // new encoding, using varint32 to store size information
1207
    if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
1208
                                              &output_len)) {
1209
      return nullptr;
1210
    }
1211
  } else {
1212
    // legacy encoding, which is not really portable (depends on big/little
1213
    // endianness)
1214
    if (input_length < 8) {
1215
      return nullptr;
1216
    }
1217
    if (port::kLittleEndian) {
1218
      memcpy(&output_len, input_data, sizeof(output_len));
1219
    } else {
1220
      memcpy(&output_len, input_data + 4, sizeof(output_len));
1221
    }
1222
    input_length -= 8;
1223
    input_data += 8;
1224
  }
1225
1226
  auto output = AllocateBlock(output_len, allocator);
1227
1228
  int decompress_bytes = 0;
1229
1230
#if LZ4_VERSION_NUMBER >= 10400  // r124+
1231
  LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
1232
  const Slice& compression_dict = info.dict().GetRawDict();
1233
  if (compression_dict.size()) {
1234
    LZ4_setStreamDecode(stream, compression_dict.data(),
1235
                        static_cast<int>(compression_dict.size()));
1236
  }
1237
  decompress_bytes = LZ4_decompress_safe_continue(
1238
      stream, input_data, output.get(), static_cast<int>(input_length),
1239
      static_cast<int>(output_len));
1240
  LZ4_freeStreamDecode(stream);
1241
#else   // up to r123
1242
  decompress_bytes = LZ4_decompress_safe(input_data, output.get(),
1243
                                         static_cast<int>(input_length),
1244
                                         static_cast<int>(output_len));
1245
#endif  // LZ4_VERSION_NUMBER >= 10400
1246
1247
  if (decompress_bytes < 0) {
1248
    return nullptr;
1249
  }
1250
  assert(decompress_bytes == static_cast<int>(output_len));
1251
  *uncompressed_size = decompress_bytes;
1252
  return output;
1253
#else  // LZ4
1254
0
  (void)info;
1255
0
  (void)input_data;
1256
0
  (void)input_length;
1257
0
  (void)uncompressed_size;
1258
0
  (void)compress_format_version;
1259
0
  (void)allocator;
1260
0
  return nullptr;
1261
0
#endif
1262
0
}
1263
1264
// compress_format_version == 1 -- decompressed size is included in the
1265
// block header using memcpy, which makes database non-portable)
1266
// compress_format_version == 2 -- decompressed size is included in the block
1267
// header in varint32 format
1268
// @param compression_dict Data for presetting the compression library's
1269
//    dictionary.
1270
inline bool LZ4HC_Compress(const CompressionInfo& info,
1271
                           uint32_t compress_format_version, const char* input,
1272
0
                           size_t length, ::std::string* output) {
1273
#ifdef LZ4
1274
  if (length > std::numeric_limits<uint32_t>::max()) {
1275
    // Can't compress more than 4GB
1276
    return false;
1277
  }
1278
1279
  size_t output_header_len = 0;
1280
  if (compress_format_version == 2) {
1281
    // new encoding, using varint32 to store size information
1282
    output_header_len = compression::PutDecompressedSizeInfo(
1283
        output, static_cast<uint32_t>(length));
1284
  } else {
1285
    // legacy encoding, which is not really portable (depends on big/little
1286
    // endianness)
1287
    output_header_len = 8;
1288
    output->resize(output_header_len);
1289
    char* p = const_cast<char*>(output->c_str());
1290
    memcpy(p, &length, sizeof(length));
1291
  }
1292
  int compress_bound = LZ4_compressBound(static_cast<int>(length));
1293
  output->resize(static_cast<size_t>(output_header_len + compress_bound));
1294
1295
  int outlen;
1296
  int level;
1297
  if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
1298
    level = 0;  // lz4hc.h says any value < 1 will be sanitized to default
1299
  } else {
1300
    level = info.options().level;
1301
  }
1302
#if LZ4_VERSION_NUMBER >= 10400  // r124+
1303
  LZ4_streamHC_t* stream = LZ4_createStreamHC();
1304
  LZ4_resetStreamHC(stream, level);
1305
  Slice compression_dict = info.dict().GetRawDict();
1306
  const char* compression_dict_data =
1307
      compression_dict.size() > 0 ? compression_dict.data() : nullptr;
1308
  size_t compression_dict_size = compression_dict.size();
1309
  if (compression_dict_data != nullptr) {
1310
    LZ4_loadDictHC(stream, compression_dict_data,
1311
                   static_cast<int>(compression_dict_size));
1312
  }
1313
1314
#if LZ4_VERSION_NUMBER >= 10700  // r129+
1315
  outlen =
1316
      LZ4_compress_HC_continue(stream, input, &(*output)[output_header_len],
1317
                               static_cast<int>(length), compress_bound);
1318
#else   // r124-r128
1319
  outlen = LZ4_compressHC_limitedOutput_continue(
1320
      stream, input, &(*output)[output_header_len], static_cast<int>(length),
1321
      compress_bound);
1322
#endif  // LZ4_VERSION_NUMBER >= 10700
1323
  LZ4_freeStreamHC(stream);
1324
1325
#elif LZ4_VERSION_MAJOR  // r113-r123
1326
  outlen = LZ4_compressHC2_limitedOutput(input, &(*output)[output_header_len],
1327
                                         static_cast<int>(length),
1328
                                         compress_bound, level);
1329
#else                    // up to r112
1330
  outlen =
1331
      LZ4_compressHC_limitedOutput(input, &(*output)[output_header_len],
1332
                                   static_cast<int>(length), compress_bound);
1333
#endif                   // LZ4_VERSION_NUMBER >= 10400
1334
1335
  if (outlen == 0) {
1336
    return false;
1337
  }
1338
  output->resize(static_cast<size_t>(output_header_len + outlen));
1339
  return true;
1340
#else  // LZ4
1341
0
  (void)info;
1342
0
  (void)compress_format_version;
1343
0
  (void)input;
1344
0
  (void)length;
1345
0
  (void)output;
1346
0
  return false;
1347
0
#endif
1348
0
}
1349
1350
#ifdef XPRESS
1351
inline bool XPRESS_Compress(const char* input, size_t length,
1352
                            std::string* output) {
1353
  return port::xpress::Compress(input, length, output);
1354
}
1355
#else
1356
inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/,
1357
0
                            std::string* /*output*/) {
1358
0
  return false;
1359
0
}
1360
#endif
1361
1362
#ifdef XPRESS
1363
inline char* XPRESS_Uncompress(const char* input_data, size_t input_length,
1364
                               size_t* uncompressed_size) {
1365
  return port::xpress::Decompress(input_data, input_length, uncompressed_size);
1366
}
1367
#else
1368
inline char* XPRESS_Uncompress(const char* /*input_data*/,
1369
                               size_t /*input_length*/,
1370
0
                               size_t* /*uncompressed_size*/) {
1371
0
  return nullptr;
1372
0
}
1373
#endif
1374
1375
inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
1376
0
                          size_t length, ::std::string* output) {
1377
#ifdef ZSTD
1378
  if (length > std::numeric_limits<uint32_t>::max()) {
1379
    // Can't compress more than 4GB
1380
    return false;
1381
  }
1382
1383
  size_t output_header_len = compression::PutDecompressedSizeInfo(
1384
      output, static_cast<uint32_t>(length));
1385
1386
  size_t compressBound = ZSTD_compressBound(length);
1387
  output->resize(static_cast<size_t>(output_header_len + compressBound));
1388
  size_t outlen = 0;
1389
#if ZSTD_VERSION_NUMBER >= 500  // v0.5.0+
1390
  ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
1391
  assert(context != nullptr);
1392
#ifdef ZSTD_ADVANCED
1393
  if (info.dict().GetDigestedZstdCDict() != nullptr) {
1394
    ZSTD_CCtx_refCDict(context, info.dict().GetDigestedZstdCDict());
1395
  } else {
1396
    ZSTD_CCtx_loadDictionary(context, info.dict().GetRawDict().data(),
1397
                             info.dict().GetRawDict().size());
1398
  }
1399
1400
  // Compression level is set in `contex` during CreateNativeContext()
1401
  outlen = ZSTD_compress2(context, &(*output)[output_header_len], compressBound,
1402
                          input, length);
1403
#else                           // ZSTD_ADVANCED
1404
#if ZSTD_VERSION_NUMBER >= 700  // v0.7.0+
1405
  if (info.dict().GetDigestedZstdCDict() != nullptr) {
1406
    outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len],
1407
                                      compressBound, input, length,
1408
                                      info.dict().GetDigestedZstdCDict());
1409
  }
1410
#endif                          // ZSTD_VERSION_NUMBER >= 700
1411
  // TODO (cbi): error handling for compression.
1412
  if (outlen == 0) {
1413
    int level;
1414
    if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
1415
      // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
1416
      // https://github.com/facebook/zstd/issues/1148
1417
      level = 3;
1418
    } else {
1419
      level = info.options().level;
1420
    }
1421
    outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
1422
                                     compressBound, input, length,
1423
                                     info.dict().GetRawDict().data(),
1424
                                     info.dict().GetRawDict().size(), level);
1425
  }
1426
#endif                          // ZSTD_ADVANCED
1427
#else   // up to v0.4.x
1428
  outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
1429
                         length, level);
1430
#endif  // ZSTD_VERSION_NUMBER >= 500
1431
  if (outlen == 0) {
1432
    return false;
1433
  }
1434
  output->resize(output_header_len + outlen);
1435
  return true;
1436
#else  // ZSTD
1437
0
  (void)info;
1438
0
  (void)input;
1439
0
  (void)length;
1440
0
  (void)output;
1441
0
  return false;
1442
0
#endif
1443
0
}
1444
1445
// @param compression_dict Data for presetting the compression library's
1446
//    dictionary.
1447
// @param error_message If not null, will be set if decompression fails.
1448
//
1449
// Returns nullptr if decompression fails.
1450
inline CacheAllocationPtr ZSTD_Uncompress(
1451
    const UncompressionInfo& info, const char* input_data, size_t input_length,
1452
    size_t* uncompressed_size, MemoryAllocator* allocator = nullptr,
1453
0
    const char** error_message = nullptr) {
1454
#ifdef ZSTD
1455
  static const char* const kErrorDecodeOutputSize =
1456
      "Cannot decode output size.";
1457
  static const char* const kErrorOutputLenMismatch =
1458
      "Decompressed size does not match header.";
1459
  uint32_t output_len = 0;
1460
  if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
1461
                                            &output_len)) {
1462
    if (error_message) {
1463
      *error_message = kErrorDecodeOutputSize;
1464
    }
1465
    return nullptr;
1466
  }
1467
1468
  CacheAllocationPtr output = AllocateBlock(output_len, allocator);
1469
  size_t actual_output_length = 0;
1470
#if ZSTD_VERSION_NUMBER >= 500  // v0.5.0+
1471
  ZSTD_DCtx* context = info.context().GetZSTDContext();
1472
  assert(context != nullptr);
1473
#ifdef ROCKSDB_ZSTD_DDICT
1474
  if (info.dict().GetDigestedZstdDDict() != nullptr) {
1475
    actual_output_length = ZSTD_decompress_usingDDict(
1476
        context, output.get(), output_len, input_data, input_length,
1477
        info.dict().GetDigestedZstdDDict());
1478
  } else {
1479
#endif  // ROCKSDB_ZSTD_DDICT
1480
    actual_output_length = ZSTD_decompress_usingDict(
1481
        context, output.get(), output_len, input_data, input_length,
1482
        info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
1483
#ifdef ROCKSDB_ZSTD_DDICT
1484
  }
1485
#endif  // ROCKSDB_ZSTD_DDICT
1486
#else   // up to v0.4.x
1487
  (void)info;
1488
  actual_output_length =
1489
      ZSTD_decompress(output.get(), output_len, input_data, input_length);
1490
#endif  // ZSTD_VERSION_NUMBER >= 500
1491
  if (ZSTD_isError(actual_output_length)) {
1492
    if (error_message) {
1493
      *error_message = ZSTD_getErrorName(actual_output_length);
1494
    }
1495
    return nullptr;
1496
  } else if (actual_output_length != output_len) {
1497
    if (error_message) {
1498
      *error_message = kErrorOutputLenMismatch;
1499
    }
1500
    return nullptr;
1501
  }
1502
1503
  *uncompressed_size = actual_output_length;
1504
  return output;
1505
#else  // ZSTD
1506
0
  (void)info;
1507
0
  (void)input_data;
1508
0
  (void)input_length;
1509
0
  (void)uncompressed_size;
1510
0
  (void)allocator;
1511
0
  (void)error_message;
1512
0
  return nullptr;
1513
0
#endif
1514
0
}
1515
1516
0
inline bool ZSTD_TrainDictionarySupported() {
1517
#ifdef ZSTD
1518
  // Dictionary trainer is available since v0.6.1 for static linking, but not
1519
  // available for dynamic linking until v1.1.3. For now we enable the feature
1520
  // in v1.1.3+ only.
1521
  return (ZSTD_versionNumber() >= 10103);
1522
#else
1523
0
  return false;
1524
0
#endif
1525
0
}
1526
1527
inline std::string ZSTD_TrainDictionary(const std::string& samples,
1528
                                        const std::vector<size_t>& sample_lens,
1529
0
                                        size_t max_dict_bytes) {
1530
  // Dictionary trainer is available since v0.6.1 for static linking, but not
1531
  // available for dynamic linking until v1.1.3. For now we enable the feature
1532
  // in v1.1.3+ only.
1533
#if ZSTD_VERSION_NUMBER >= 10103  // v1.1.3+
1534
  assert(samples.empty() == sample_lens.empty());
1535
  if (samples.empty()) {
1536
    return "";
1537
  }
1538
  std::string dict_data(max_dict_bytes, '\0');
1539
  size_t dict_len = ZDICT_trainFromBuffer(
1540
      &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
1541
      static_cast<unsigned>(sample_lens.size()));
1542
  if (ZDICT_isError(dict_len)) {
1543
    return "";
1544
  }
1545
  assert(dict_len <= max_dict_bytes);
1546
  dict_data.resize(dict_len);
1547
  return dict_data;
1548
#else   // up to v1.1.2
1549
0
  assert(false);
1550
0
  (void)samples;
1551
0
  (void)sample_lens;
1552
0
  (void)max_dict_bytes;
1553
0
  return "";
1554
0
#endif  // ZSTD_VERSION_NUMBER >= 10103
1555
0
}
1556
1557
inline std::string ZSTD_TrainDictionary(const std::string& samples,
1558
                                        size_t sample_len_shift,
1559
0
                                        size_t max_dict_bytes) {
1560
0
  // Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
1561
0
  // only since v0.8.0. For now we enable the feature in stable versions only.
1562
0
#if ZSTD_VERSION_NUMBER >= 10103  // v1.1.3+
1563
0
  // skips potential partial sample at the end of "samples"
1564
0
  size_t num_samples = samples.size() >> sample_len_shift;
1565
0
  std::vector<size_t> sample_lens(num_samples, size_t(1) << sample_len_shift);
1566
0
  return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
1567
0
#else   // up to v1.1.2
1568
0
  assert(false);
1569
0
  (void)samples;
1570
0
  (void)sample_len_shift;
1571
0
  (void)max_dict_bytes;
1572
0
  return "";
1573
0
#endif  // ZSTD_VERSION_NUMBER >= 10103
1574
0
}
1575
1576
0
inline bool ZSTD_FinalizeDictionarySupported() {
1577
#ifdef ZSTD
1578
  // ZDICT_finalizeDictionary API is stable since v1.4.5
1579
  return (ZSTD_versionNumber() >= 10405);
1580
#else
1581
0
  return false;
1582
0
#endif
1583
0
}
1584
1585
inline std::string ZSTD_FinalizeDictionary(
1586
    const std::string& samples, const std::vector<size_t>& sample_lens,
1587
0
    size_t max_dict_bytes, int level) {
1588
  // ZDICT_finalizeDictionary is stable since version v1.4.5
1589
#if ZSTD_VERSION_NUMBER >= 10405  // v1.4.5+
1590
  assert(samples.empty() == sample_lens.empty());
1591
  if (samples.empty()) {
1592
    return "";
1593
  }
1594
  if (level == CompressionOptions::kDefaultCompressionLevel) {
1595
    // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
1596
    // https://github.com/facebook/zstd/issues/1148
1597
    level = 3;
1598
  }
1599
  std::string dict_data(max_dict_bytes, '\0');
1600
  size_t dict_len = ZDICT_finalizeDictionary(
1601
      dict_data.data(), max_dict_bytes, samples.data(),
1602
      std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
1603
      samples.data(), sample_lens.data(),
1604
      static_cast<unsigned>(sample_lens.size()),
1605
      {level, 0 /* notificationLevel */, 0 /* dictID */});
1606
  if (ZDICT_isError(dict_len)) {
1607
    return "";
1608
  } else {
1609
    assert(dict_len <= max_dict_bytes);
1610
    dict_data.resize(dict_len);
1611
    return dict_data;
1612
  }
1613
#else   // up to v1.4.4
1614
0
  assert(false);
1615
0
  (void)samples;
1616
0
  (void)sample_lens;
1617
0
  (void)max_dict_bytes;
1618
0
  (void)level;
1619
0
  return "";
1620
0
#endif  // ZSTD_VERSION_NUMBER >= 10405
1621
0
}
1622
1623
inline bool CompressData(const Slice& raw,
1624
                         const CompressionInfo& compression_info,
1625
                         uint32_t compress_format_version,
1626
0
                         std::string* compressed_output) {
1627
0
  bool ret = false;
1628
1629
  // Will return compressed block contents if (1) the compression method is
1630
  // supported in this platform and (2) the compression rate is "good enough".
1631
0
  switch (compression_info.type()) {
1632
0
    case kSnappyCompression:
1633
0
      ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
1634
0
                            compressed_output);
1635
0
      break;
1636
0
    case kZlibCompression:
1637
0
      ret = Zlib_Compress(compression_info, compress_format_version, raw.data(),
1638
0
                          raw.size(), compressed_output);
1639
0
      break;
1640
0
    case kBZip2Compression:
1641
0
      ret = BZip2_Compress(compression_info, compress_format_version,
1642
0
                           raw.data(), raw.size(), compressed_output);
1643
0
      break;
1644
0
    case kLZ4Compression:
1645
0
      ret = LZ4_Compress(compression_info, compress_format_version, raw.data(),
1646
0
                         raw.size(), compressed_output);
1647
0
      break;
1648
0
    case kLZ4HCCompression:
1649
0
      ret = LZ4HC_Compress(compression_info, compress_format_version,
1650
0
                           raw.data(), raw.size(), compressed_output);
1651
0
      break;
1652
0
    case kXpressCompression:
1653
0
      ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
1654
0
      break;
1655
0
    case kZSTD:
1656
0
    case kZSTDNotFinalCompression:
1657
0
      ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
1658
0
                          compressed_output);
1659
0
      break;
1660
0
    default:
1661
      // Do not recognize this compression type
1662
0
      break;
1663
0
  }
1664
1665
0
  TEST_SYNC_POINT_CALLBACK("CompressData:TamperWithReturnValue",
1666
0
                           static_cast<void*>(&ret));
1667
1668
0
  return ret;
1669
0
}
1670
1671
inline CacheAllocationPtr UncompressData(
1672
    const UncompressionInfo& uncompression_info, const char* data, size_t n,
1673
    size_t* uncompressed_size, uint32_t compress_format_version,
1674
    MemoryAllocator* allocator = nullptr,
1675
0
    const char** error_message = nullptr) {
1676
0
  switch (uncompression_info.type()) {
1677
0
    case kSnappyCompression:
1678
0
      return Snappy_Uncompress(data, n, uncompressed_size, allocator);
1679
0
    case kZlibCompression:
1680
0
      return Zlib_Uncompress(uncompression_info, data, n, uncompressed_size,
1681
0
                             compress_format_version, allocator);
1682
0
    case kBZip2Compression:
1683
0
      return BZip2_Uncompress(data, n, uncompressed_size,
1684
0
                              compress_format_version, allocator);
1685
0
    case kLZ4Compression:
1686
0
    case kLZ4HCCompression:
1687
0
      return LZ4_Uncompress(uncompression_info, data, n, uncompressed_size,
1688
0
                            compress_format_version, allocator);
1689
0
    case kXpressCompression:
1690
      // XPRESS allocates memory internally, thus no support for custom
1691
      // allocator.
1692
0
      return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size));
1693
0
    case kZSTD:
1694
0
    case kZSTDNotFinalCompression:
1695
      // TODO(cbi): error message handling for other compression algorithms.
1696
0
      return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size,
1697
0
                             allocator, error_message);
1698
0
    default:
1699
0
      return CacheAllocationPtr();
1700
0
  }
1701
0
}
1702
1703
// Records the compression type for subsequent WAL records.
1704
class CompressionTypeRecord {
1705
 public:
1706
  explicit CompressionTypeRecord(CompressionType compression_type)
1707
0
      : compression_type_(compression_type) {}
1708
1709
0
  CompressionType GetCompressionType() const { return compression_type_; }
1710
1711
0
  inline void EncodeTo(std::string* dst) const {
1712
0
    assert(dst != nullptr);
1713
0
    PutFixed32(dst, compression_type_);
1714
0
  }
1715
1716
0
  inline Status DecodeFrom(Slice* src) {
1717
0
    constexpr char class_name[] = "CompressionTypeRecord";
1718
1719
0
    uint32_t val;
1720
0
    if (!GetFixed32(src, &val)) {
1721
0
      return Status::Corruption(class_name,
1722
0
                                "Error decoding WAL compression type");
1723
0
    }
1724
0
    CompressionType compression_type = static_cast<CompressionType>(val);
1725
0
    if (!StreamingCompressionTypeSupported(compression_type)) {
1726
0
      return Status::Corruption(class_name,
1727
0
                                "WAL compression type not supported");
1728
0
    }
1729
0
    compression_type_ = compression_type;
1730
0
    return Status::OK();
1731
0
  }
1732
1733
0
  inline std::string DebugString() const {
1734
0
    return "compression_type: " + CompressionTypeToString(compression_type_);
1735
0
  }
1736
1737
 private:
1738
  CompressionType compression_type_;
1739
};
1740
1741
// Base class to implement compression for a stream of buffers.
1742
// Instantiate an implementation of the class using Create() with the
1743
// compression type and use Compress() repeatedly.
1744
// The output buffer needs to be at least max_output_len.
1745
// Call Reset() in between frame boundaries or in case of an error.
1746
// NOTE: This class is not thread safe.
1747
class StreamingCompress {
1748
 public:
1749
  StreamingCompress(CompressionType compression_type,
1750
                    const CompressionOptions& opts,
1751
                    uint32_t compress_format_version, size_t max_output_len)
1752
      : compression_type_(compression_type),
1753
        opts_(opts),
1754
        compress_format_version_(compress_format_version),
1755
0
        max_output_len_(max_output_len) {}
1756
0
  virtual ~StreamingCompress() = default;
1757
  // compress should be called repeatedly with the same input till the method
1758
  // returns 0
1759
  // Parameters:
1760
  // input - buffer to compress
1761
  // input_size - size of input buffer
1762
  // output - compressed buffer allocated by caller, should be at least
1763
  // max_output_len
1764
  // output_size - size of the output buffer
1765
  // Returns -1 for errors, the remaining size of the input buffer that needs to
1766
  // be compressed
1767
  virtual int Compress(const char* input, size_t input_size, char* output,
1768
                       size_t* output_pos) = 0;
1769
  // static method to create object of a class inherited from StreamingCompress
1770
  // based on the actual compression type.
1771
  static StreamingCompress* Create(CompressionType compression_type,
1772
                                   const CompressionOptions& opts,
1773
                                   uint32_t compress_format_version,
1774
                                   size_t max_output_len);
1775
  virtual void Reset() = 0;
1776
1777
 protected:
1778
  const CompressionType compression_type_;
1779
  const CompressionOptions opts_;
1780
  const uint32_t compress_format_version_;
1781
  const size_t max_output_len_;
1782
};
1783
1784
// Base class to uncompress a stream of compressed buffers.
1785
// Instantiate an implementation of the class using Create() with the
1786
// compression type and use Uncompress() repeatedly.
1787
// The output buffer needs to be at least max_output_len.
1788
// Call Reset() in between frame boundaries or in case of an error.
1789
// NOTE: This class is not thread safe.
1790
class StreamingUncompress {
1791
 public:
1792
  StreamingUncompress(CompressionType compression_type,
1793
                      uint32_t compress_format_version, size_t max_output_len)
1794
      : compression_type_(compression_type),
1795
        compress_format_version_(compress_format_version),
1796
0
        max_output_len_(max_output_len) {}
1797
0
  virtual ~StreamingUncompress() = default;
1798
  // Uncompress can be called repeatedly to progressively process the same
1799
  // input buffer, or can be called with a new input buffer. When the input
1800
  // buffer is not fully consumed, the return value is > 0 or output_size
1801
  // == max_output_len. When calling uncompress to continue processing the
1802
  // same input buffer, the input argument should be nullptr.
1803
  // Parameters:
1804
  // input - buffer to uncompress
1805
  // input_size - size of input buffer
1806
  // output - uncompressed buffer allocated by caller, should be at least
1807
  // max_output_len
1808
  // output_size - size of the output buffer
1809
  // Returns -1 for errors, remaining input to be processed otherwise.
1810
  virtual int Uncompress(const char* input, size_t input_size, char* output,
1811
                         size_t* output_pos) = 0;
1812
  static StreamingUncompress* Create(CompressionType compression_type,
1813
                                     uint32_t compress_format_version,
1814
                                     size_t max_output_len);
1815
  virtual void Reset() = 0;
1816
1817
 protected:
1818
  CompressionType compression_type_;
1819
  uint32_t compress_format_version_;
1820
  size_t max_output_len_;
1821
};
1822
1823
class ZSTDStreamingCompress final : public StreamingCompress {
1824
 public:
1825
  explicit ZSTDStreamingCompress(const CompressionOptions& opts,
1826
                                 uint32_t compress_format_version,
1827
                                 size_t max_output_len)
1828
      : StreamingCompress(kZSTD, opts, compress_format_version,
1829
0
                          max_output_len) {
1830
#ifdef ZSTD_ADVANCED
1831
    cctx_ = ZSTD_createCCtx();
1832
    // Each compressed frame will have a checksum
1833
    ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
1834
    assert(cctx_ != nullptr);
1835
    input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
1836
#endif
1837
0
  }
1838
0
  ~ZSTDStreamingCompress() override {
1839
0
#ifdef ZSTD_ADVANCED
1840
0
    ZSTD_freeCCtx(cctx_);
1841
0
#endif
1842
0
  }
1843
  int Compress(const char* input, size_t input_size, char* output,
1844
               size_t* output_pos) override;
1845
  void Reset() override;
1846
#ifdef ZSTD_ADVANCED
1847
  ZSTD_CCtx* cctx_;
1848
  ZSTD_inBuffer input_buffer_;
1849
#endif
1850
};
1851
1852
class ZSTDStreamingUncompress final : public StreamingUncompress {
1853
 public:
1854
  explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
1855
                                   size_t max_output_len)
1856
0
      : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
1857
#ifdef ZSTD_ADVANCED
1858
    dctx_ = ZSTD_createDCtx();
1859
    assert(dctx_ != nullptr);
1860
    input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
1861
#endif
1862
0
  }
1863
0
  ~ZSTDStreamingUncompress() override {
1864
0
#ifdef ZSTD_ADVANCED
1865
0
    ZSTD_freeDCtx(dctx_);
1866
0
#endif
1867
0
  }
1868
  int Uncompress(const char* input, size_t input_size, char* output,
1869
                 size_t* output_size) override;
1870
  void Reset() override;
1871
1872
 private:
1873
#ifdef ZSTD_ADVANCED
1874
  ZSTD_DCtx* dctx_;
1875
  ZSTD_inBuffer input_buffer_;
1876
#endif
1877
};
1878
1879
}  // namespace ROCKSDB_NAMESPACE