Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/util/compression.cc
Line
Count
Source
1
// Copyright (c) 2022-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "util/compression.h"
7
8
#ifdef BZIP2
9
#include <bzlib.h>
10
#endif  // BZIP2
11
12
#include <limits>
13
14
#ifdef LZ4
15
#include <lz4.h>
16
#include <lz4hc.h>
17
#if LZ4_VERSION_NUMBER < 10700  // < r129
18
#error "LZ4 support requires version >= 1.7.0 (lz4-devel)"
19
#endif  // LZ4_VERSION_NUMBER < 10700
20
#endif  // LZ4
21
22
#ifdef SNAPPY
23
#include <snappy-sinksource.h>
24
#include <snappy.h>
25
#endif  // SNAPPY
26
27
#ifdef ZLIB
28
#include <zlib.h>
29
#endif  // ZLIB
30
31
#include "options/options_helper.h"
32
#include "port/likely.h"
33
#include "rocksdb/convenience.h"
34
#include "rocksdb/utilities/object_registry.h"
35
#include "test_util/sync_point.h"
36
#include "util/cast_util.h"
37
#include "util/string_util.h"
38
39
namespace ROCKSDB_NAMESPACE {
40
41
// WART: does not match OptionsHelper::compression_type_string_map
42
168k
std::string CompressionTypeToString(CompressionType compression_type) {
43
168k
  switch (compression_type) {
44
168k
    case kNoCompression:
45
168k
      return "NoCompression";
46
0
    case kSnappyCompression:
47
0
      return "Snappy";
48
0
    case kZlibCompression:
49
0
      return "Zlib";
50
0
    case kBZip2Compression:
51
0
      return "BZip2";
52
0
    case kLZ4Compression:
53
0
      return "LZ4";
54
0
    case kLZ4HCCompression:
55
0
      return "LZ4HC";
56
0
    case kXpressCompression:
57
0
      return "Xpress";
58
0
    case kZSTD:
59
0
      return "ZSTD";
60
0
    case kDisableCompressionOption:
61
0
      return "DisableOption";
62
0
    default: {
63
0
      bool is_custom = compression_type >= kFirstCustomCompression &&
64
0
                       compression_type <= kLastCustomCompression;
65
0
      unsigned char c = lossless_cast<unsigned char>(compression_type);
66
0
      return (is_custom ? "Custom" : "Reserved") +
67
0
             ToBaseCharsString<16>(2, c, /*uppercase=*/true);
68
0
    }
69
168k
  }
70
168k
}
71
72
// WART: does not match OptionsHelper::compression_type_string_map
73
0
CompressionType CompressionTypeFromString(std::string compression_type_str) {
74
0
  if (!compression_type_str.empty()) {
75
0
    switch (compression_type_str[0]) {
76
0
      case 'N':
77
0
        if (compression_type_str == "NoCompression") {
78
0
          return kNoCompression;
79
0
        }
80
0
        break;
81
0
      case 'S':
82
0
        if (compression_type_str == "Snappy") {
83
0
          return kSnappyCompression;
84
0
        }
85
0
        break;
86
0
      case 'Z':
87
0
        if (compression_type_str == "ZSTD") {
88
0
          return kZSTD;
89
0
        }
90
0
        if (compression_type_str == "Zlib") {
91
0
          return kZlibCompression;
92
0
        }
93
0
        break;
94
0
      case 'B':
95
0
        if (compression_type_str == "BZip2") {
96
0
          return kBZip2Compression;
97
0
        }
98
0
        break;
99
0
      case 'L':
100
0
        if (compression_type_str == "LZ4") {
101
0
          return kLZ4Compression;
102
0
        }
103
0
        if (compression_type_str == "LZ4HC") {
104
0
          return kLZ4HCCompression;
105
0
        }
106
0
        break;
107
0
      case 'X':
108
0
        if (compression_type_str == "Xpress") {
109
0
          return kXpressCompression;
110
0
        }
111
0
        break;
112
0
      default:;
113
0
    }
114
0
  }
115
  // unrecognized
116
0
  return kDisableCompressionOption;
117
0
}
118
119
std::string CompressionOptionsToString(
120
17.4k
    const CompressionOptions& compression_options) {
121
17.4k
  std::string result;
122
17.4k
  result.reserve(512);
123
17.4k
  result.append("window_bits=")
124
17.4k
      .append(std::to_string(compression_options.window_bits))
125
17.4k
      .append("; ");
126
17.4k
  result.append("level=")
127
17.4k
      .append(std::to_string(compression_options.level))
128
17.4k
      .append("; ");
129
17.4k
  result.append("strategy=")
130
17.4k
      .append(std::to_string(compression_options.strategy))
131
17.4k
      .append("; ");
132
17.4k
  result.append("max_dict_bytes=")
133
17.4k
      .append(std::to_string(compression_options.max_dict_bytes))
134
17.4k
      .append("; ");
135
17.4k
  result.append("zstd_max_train_bytes=")
136
17.4k
      .append(std::to_string(compression_options.zstd_max_train_bytes))
137
17.4k
      .append("; ");
138
  // NOTE: parallel_threads is skipped because it doesn't really affect the file
139
  // contents written, arguably doesn't belong in CompressionOptions
140
17.4k
  result.append("enabled=")
141
17.4k
      .append(std::to_string(compression_options.enabled))
142
17.4k
      .append("; ");
143
17.4k
  result.append("max_dict_buffer_bytes=")
144
17.4k
      .append(std::to_string(compression_options.max_dict_buffer_bytes))
145
17.4k
      .append("; ");
146
17.4k
  result.append("use_zstd_dict_trainer=")
147
17.4k
      .append(std::to_string(compression_options.use_zstd_dict_trainer))
148
17.4k
      .append("; ");
149
17.4k
  result.append("max_compressed_bytes_per_kb=")
150
17.4k
      .append(std::to_string(compression_options.max_compressed_bytes_per_kb))
151
17.4k
      .append("; ");
152
17.4k
  result.append("checksum=")
153
17.4k
      .append(std::to_string(compression_options.checksum))
154
17.4k
      .append("; ");
155
17.4k
  return result;
156
17.4k
}
157
158
StreamingCompress* StreamingCompress::Create(CompressionType compression_type,
159
                                             const CompressionOptions& opts,
160
                                             uint32_t compress_format_version,
161
0
                                             size_t max_output_len) {
162
0
  switch (compression_type) {
163
0
    case kZSTD: {
164
0
      if (!ZSTD_Streaming_Supported()) {
165
0
        return nullptr;
166
0
      }
167
0
      return new ZSTDStreamingCompress(opts, compress_format_version,
168
0
                                       max_output_len);
169
0
    }
170
0
    default:
171
0
      return nullptr;
172
0
  }
173
0
}
174
175
StreamingUncompress* StreamingUncompress::Create(
176
    CompressionType compression_type, uint32_t compress_format_version,
177
0
    size_t max_output_len) {
178
0
  switch (compression_type) {
179
0
    case kZSTD: {
180
0
      if (!ZSTD_Streaming_Supported()) {
181
0
        return nullptr;
182
0
      }
183
0
      return new ZSTDStreamingUncompress(compress_format_version,
184
0
                                         max_output_len);
185
0
    }
186
0
    default:
187
0
      return nullptr;
188
0
  }
189
0
}
190
191
int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
192
0
                                    char* output, size_t* output_pos) {
193
0
  assert(input != nullptr && output != nullptr && output_pos != nullptr);
194
0
  *output_pos = 0;
195
  // Don't need to compress an empty input
196
0
  if (input_size == 0) {
197
0
    return 0;
198
0
  }
199
0
#ifndef ZSTD
200
0
  (void)input;
201
0
  (void)input_size;
202
0
  (void)output;
203
0
  return -1;
204
#else
205
  if (input_buffer_.src == nullptr || input_buffer_.src != input) {
206
    // New input
207
    // Catch errors where the previous input was not fully decompressed.
208
    assert(input_buffer_.pos == input_buffer_.size);
209
    input_buffer_ = {input, input_size, /*pos=*/0};
210
  } else if (input_buffer_.src == input) {
211
    // Same input, not fully compressed.
212
  }
213
  ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
214
  const size_t remaining =
215
      ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end);
216
  if (ZSTD_isError(remaining)) {
217
    // Failure
218
    Reset();
219
    return -1;
220
  }
221
  // Success
222
  *output_pos = output_buffer.pos;
223
  return (int)remaining;
224
#endif
225
0
}
226
227
0
void ZSTDStreamingCompress::Reset() {
228
#ifdef ZSTD
229
  ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
230
  input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
231
#endif
232
0
}
233
234
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
235
0
                                        char* output, size_t* output_pos) {
236
0
  assert(output != nullptr && output_pos != nullptr);
237
0
  *output_pos = 0;
238
  // Don't need to uncompress an empty input
239
0
  if (input_size == 0) {
240
0
    return 0;
241
0
  }
242
#ifdef ZSTD
243
  if (input) {
244
    // New input
245
    input_buffer_ = {input, input_size, /*pos=*/0};
246
  }
247
  ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
248
  size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_);
249
  if (ZSTD_isError(ret)) {
250
    Reset();
251
    return -1;
252
  }
253
  *output_pos = output_buffer.pos;
254
  return (int)(input_buffer_.size - input_buffer_.pos);
255
#else
256
0
  (void)input;
257
0
  (void)input_size;
258
0
  (void)output;
259
0
  return -1;
260
0
#endif
261
0
}
262
263
0
void ZSTDStreamingUncompress::Reset() {
264
#ifdef ZSTD
265
  ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
266
  input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
267
#endif
268
0
}
269
270
0
void DecompressorDict::Populate(Decompressor& from_decompressor, Slice dict) {
271
0
  if (UNLIKELY(dict.empty())) {
272
0
    dict_str_ = {};
273
0
    dict_allocation_ = {};
274
    // Appropriately reject bad files with empty dictionary block.
275
    // It is longstanding not to write an empty dictionary block:
276
    // https://github.com/facebook/rocksdb/blame/10.2.fb/table/block_based/block_based_table_builder.cc#L1841
277
0
    decompressor_ = std::make_unique<FailureDecompressor>(
278
0
        Status::Corruption("Decompression dictionary is empty"));
279
0
  } else {
280
0
    Status s = from_decompressor.MaybeCloneForDict(dict, &decompressor_);
281
0
    if (decompressor_ == nullptr) {
282
0
      dict_str_ = {};
283
0
      dict_allocation_ = {};
284
0
      assert(!s.ok());
285
0
      decompressor_ = std::make_unique<FailureDecompressor>(std::move(s));
286
0
    } else {
287
0
      assert(s.ok());
288
0
      assert(decompressor_->GetSerializedDict() == dict);
289
0
    }
290
0
  }
291
292
0
  memory_usage_ = sizeof(struct DecompressorDict);
293
0
  memory_usage_ += dict_str_.size();
294
0
  if (dict_allocation_) {
295
0
    auto allocator = dict_allocation_.get_deleter().allocator;
296
0
    if (allocator) {
297
0
      memory_usage_ +=
298
0
          allocator->UsableSize(dict_allocation_.get(), GetRawDict().size());
299
0
    } else {
300
0
      memory_usage_ += GetRawDict().size();
301
0
    }
302
0
  }
303
0
  memory_usage_ += decompressor_->ApproximateOwnedMemoryUsage();
304
0
}
305
306
// ZSTD dictionary training implementations
307
std::string ZSTD_TrainDictionary(const std::string& samples,
308
                                 const std::vector<size_t>& sample_lens,
309
0
                                 size_t max_dict_bytes) {
310
#ifdef ZSTD
311
  assert(samples.empty() == sample_lens.empty());
312
  if (samples.empty()) {
313
    return "";
314
  }
315
  std::string dict_data(max_dict_bytes, '\0');
316
  size_t dict_len = ZDICT_trainFromBuffer(
317
      &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
318
      static_cast<unsigned>(sample_lens.size()));
319
  if (ZDICT_isError(dict_len)) {
320
    return "";
321
  }
322
  assert(dict_len <= max_dict_bytes);
323
  dict_data.resize(dict_len);
324
  return dict_data;
325
#else
326
0
  assert(false);
327
0
  (void)samples;
328
0
  (void)sample_lens;
329
0
  (void)max_dict_bytes;
330
0
  return "";
331
0
#endif  // ZSTD
332
0
}
333
334
std::string ZSTD_TrainDictionary(const std::string& samples,
335
                                 size_t sample_len_shift,
336
0
                                 size_t max_dict_bytes) {
337
#ifdef ZSTD
338
  // skips potential partial sample at the end of "samples"
339
  size_t num_samples = samples.size() >> sample_len_shift;
340
  std::vector<size_t> sample_lens(num_samples, size_t(1) << sample_len_shift);
341
  return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
342
#else
343
0
  assert(false);
344
0
  (void)samples;
345
0
  (void)sample_len_shift;
346
0
  (void)max_dict_bytes;
347
0
  return "";
348
0
#endif  // ZSTD
349
0
}
350
351
std::string ZSTD_FinalizeDictionary(const std::string& samples,
352
                                    const std::vector<size_t>& sample_lens,
353
0
                                    size_t max_dict_bytes, int level) {
354
#ifdef ROCKSDB_ZDICT_FINALIZE
355
  assert(samples.empty() == sample_lens.empty());
356
  if (samples.empty()) {
357
    return "";
358
  }
359
  if (level == CompressionOptions::kDefaultCompressionLevel) {
360
    // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
361
    level = ZSTD_CLEVEL_DEFAULT;
362
  }
363
  std::string dict_data(max_dict_bytes, '\0');
364
  size_t dict_len = ZDICT_finalizeDictionary(
365
      dict_data.data(), max_dict_bytes, samples.data(),
366
      std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
367
      samples.data(), sample_lens.data(),
368
      static_cast<unsigned>(sample_lens.size()),
369
      {level, 0 /* notificationLevel */, 0 /* dictID */});
370
  if (ZDICT_isError(dict_len)) {
371
    return "";
372
  } else {
373
    assert(dict_len <= max_dict_bytes);
374
    dict_data.resize(dict_len);
375
    return dict_data;
376
  }
377
#else
378
0
  assert(false);
379
0
  (void)samples;
380
0
  (void)sample_lens;
381
0
  (void)max_dict_bytes;
382
0
  (void)level;
383
0
  return "";
384
0
#endif  // ROCKSDB_ZDICT_FINALIZE
385
0
}
386
387
// ***********************************************************************
388
// BEGIN built-in implementation of customization interface
389
// ***********************************************************************
390
0
Status Decompressor::ExtractUncompressedSize(Args& args) {
391
  // Default implementation:
392
  //
393
  // Standard format for prepending uncompressed size to the compressed
394
  // payload. (RocksDB compress_format_version=2 except Snappy)
395
  //
396
  // This is historically a varint32, but it is preliminarily generalized
397
  // to varint64, in case that is supported on the write side for some
398
  // algorithms.
399
0
  if (LIKELY(GetVarint64(&args.compressed_data, &args.uncompressed_size))) {
400
0
    if (LIKELY(args.uncompressed_size <= SIZE_MAX)) {
401
0
      return Status::OK();
402
0
    } else {
403
0
      return Status::MemoryLimit("Uncompressed size too large for platform");
404
0
    }
405
0
  } else {
406
0
    return Status::Corruption("Unable to extract uncompressed size");
407
0
  }
408
0
}
409
410
0
const Slice& Decompressor::GetSerializedDict() const {
411
  // Default: empty slice => no dictionary
412
0
  static Slice kEmptySlice;
413
0
  return kEmptySlice;
414
0
}
415
416
namespace {
417
418
class CompressorBase : public Compressor {
419
 public:
420
0
  explicit CompressorBase(const CompressionOptions& opts) : opts_(opts) {}
421
422
 protected:
423
  CompressionOptions opts_;
424
};
425
426
class CompressorWithSimpleDictBase : public CompressorBase {
427
 public:
428
  explicit CompressorWithSimpleDictBase(const CompressionOptions& opts,
429
                                        std::string&& dict_data = {})
430
0
      : CompressorBase(opts), dict_data_(std::move(dict_data)) {}
431
432
0
  DictConfig GetDictGuidance(CacheEntryRole /*block_type*/) const override {
433
0
    if (opts_.max_dict_bytes == 0) {
434
0
      return DictDisabled{};
435
0
    }
436
0
    return DictSampling{opts_.max_dict_bytes};
437
0
  }
438
439
  // NOTE: empty dict is equivalent to no dict
440
0
  Slice GetSerializedDict() const override { return dict_data_; }
441
442
0
  std::unique_ptr<Compressor> Clone() const override {
443
0
    return CloneForDict(std::string{dict_data_});
444
0
  }
445
446
  std::unique_ptr<Compressor> MaybeCloneSpecialized(
447
      CacheEntryRole /*block_type*/,
448
0
      DictConfigArgs&& dict_config) const final override {
449
0
    if (auto* samples = std::get_if<DictSamples>(&dict_config)) {
450
0
      assert(samples->Verify());
451
0
      if (samples->empty()) {
452
0
        return nullptr;
453
0
      }
454
0
      return CloneForDict(std::move(samples->sample_data));
455
0
    } else if (auto* predef = std::get_if<DictPreDefined>(&dict_config)) {
456
0
      if (predef->dict_data.empty()) {
457
0
        return nullptr;
458
0
      }
459
0
      return CloneForDict(std::move(predef->dict_data));
460
0
    } else {
461
0
      assert(std::holds_alternative<DictDisabled>(dict_config));
462
0
      return nullptr;
463
0
    }
464
0
  }
465
466
  virtual std::unique_ptr<Compressor> CloneForDict(
467
      std::string&& dict_data) const = 0;
468
469
 protected:
470
  const std::string dict_data_;
471
};
472
473
// NOTE: the legacy behavior is to pretend to use dictionary compression when
474
// enabled, including storing a dictionary block, but to ignore it. That is
475
// matched here.
476
class BuiltinSnappyCompressorV2 final : public CompressorWithSimpleDictBase {
477
 public:
478
  using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
479
480
0
  const char* Name() const override { return "BuiltinSnappyCompressorV2"; }
481
482
0
  CompressionType GetPreferredCompressionType() const override {
483
0
    return kSnappyCompression;
484
0
  }
485
486
  std::unique_ptr<Compressor> CloneForDict(
487
0
      std::string&& dict_data) const override {
488
0
    return std::make_unique<BuiltinSnappyCompressorV2>(opts_,
489
0
                                                       std::move(dict_data));
490
0
  }
491
492
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
493
                       size_t* compressed_output_size,
494
                       CompressionType* out_compression_type,
495
0
                       ManagedWorkingArea*) override {
496
#ifdef SNAPPY
497
    struct MySink : public snappy::Sink {
498
      MySink(char* output, size_t output_size)
499
          : output_(output), output_size_(output_size) {}
500
501
      char* output_;
502
      size_t output_size_;
503
      size_t pos_ = 0;
504
505
      void Append(const char* data, size_t n) override {
506
        if (pos_ + n <= output_size_) {
507
          std::memcpy(output_ + pos_, data, n);
508
          pos_ += n;
509
        } else {
510
          // Virtual abort
511
          pos_ = output_size_ + 1;
512
        }
513
      }
514
515
      char* GetAppendBuffer(size_t length, char* scratch) override {
516
        if (pos_ + length <= output_size_) {
517
          return output_ + pos_;
518
        }
519
        return scratch;
520
      }
521
    };
522
    MySink sink{compressed_output, *compressed_output_size};
523
    snappy::ByteArraySource source{uncompressed_data.data(),
524
                                   uncompressed_data.size()};
525
526
    size_t outlen = snappy::Compress(&source, &sink);
527
    if (outlen > 0 && sink.pos_ <= sink.output_size_) {
528
      // Compression kept/successful
529
      assert(outlen == sink.pos_);
530
      *compressed_output_size = outlen;
531
      *out_compression_type = kSnappyCompression;
532
      return Status::OK();
533
    }
534
    // Compression rejected
535
    *compressed_output_size = 1;
536
#else
537
0
    (void)uncompressed_data;
538
0
    (void)compressed_output;
539
    // Compression bypassed (not supported)
540
0
    *compressed_output_size = 0;
541
0
#endif
542
0
    *out_compression_type = kNoCompression;
543
0
    return Status::OK();
544
0
  }
545
546
  std::shared_ptr<Decompressor> GetOptimizedDecompressor() const override;
547
};
548
549
[[maybe_unused]]
550
std::pair<char*, size_t> StartCompressBlockV2(Slice uncompressed_data,
551
                                              char* compressed_output,
552
0
                                              size_t compressed_output_size) {
553
0
  if (  // Can't compress more than 4GB
554
0
      uncompressed_data.size() > std::numeric_limits<uint32_t>::max() ||
555
      // Need enough output space for encoding uncompressed size
556
0
      compressed_output_size <= 5) {
557
    // Compression bypassed
558
0
    return {nullptr, 0};
559
0
  }
560
  // Standard format for prepending uncompressed size to the compressed
561
  // data in compress_format_version=2
562
0
  char* alg_output = EncodeVarint32(
563
0
      compressed_output, static_cast<uint32_t>(uncompressed_data.size()));
564
0
  size_t alg_max_output_size =
565
0
      compressed_output_size - (alg_output - compressed_output);
566
0
  return {alg_output, alg_max_output_size};
567
0
}
568
569
class BuiltinZlibCompressorV2 final : public CompressorWithSimpleDictBase {
570
 public:
571
  using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
572
573
0
  const char* Name() const override { return "BuiltinZlibCompressorV2"; }
574
575
0
  CompressionType GetPreferredCompressionType() const override {
576
0
    return kZlibCompression;
577
0
  }
578
579
  std::unique_ptr<Compressor> CloneForDict(
580
0
      std::string&& dict_data) const override {
581
0
    return std::make_unique<BuiltinZlibCompressorV2>(opts_,
582
0
                                                     std::move(dict_data));
583
0
  }
584
585
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
586
                       size_t* compressed_output_size,
587
                       CompressionType* out_compression_type,
588
0
                       ManagedWorkingArea*) override {
589
0
#ifdef ZLIB
590
0
    auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
591
0
        uncompressed_data, compressed_output, *compressed_output_size);
592
0
    if (alg_max_output_size == 0) {
593
      // Compression bypassed
594
0
      *compressed_output_size = 0;
595
0
      *out_compression_type = kNoCompression;
596
0
      return Status::OK();
597
0
    }
598
599
    // The memLevel parameter specifies how much memory should be allocated for
600
    // the internal compression state.
601
    // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
602
    // memLevel=9 uses maximum memory for optimal speed.
603
    // The default value is 8. See zconf.h for more details.
604
0
    static const int memLevel = 8;
605
0
    int level = opts_.level;
606
0
    if (level == CompressionOptions::kDefaultCompressionLevel) {
607
0
      level = Z_DEFAULT_COMPRESSION;
608
0
    }
609
610
0
    z_stream stream;
611
0
    memset(&stream, 0, sizeof(z_stream));
612
613
    // Initialize the zlib stream
614
0
    int st = deflateInit2(&stream, level, Z_DEFLATED, opts_.window_bits,
615
0
                          memLevel, opts_.strategy);
616
0
    if (st != Z_OK) {
617
0
      *compressed_output_size = 0;
618
0
      *out_compression_type = kNoCompression;
619
0
      return Status::OK();
620
0
    }
621
622
    // Set dictionary if available
623
0
    if (!dict_data_.empty()) {
624
0
      st = deflateSetDictionary(
625
0
          &stream, reinterpret_cast<const Bytef*>(dict_data_.data()),
626
0
          static_cast<unsigned int>(dict_data_.size()));
627
0
      if (st != Z_OK) {
628
0
        deflateEnd(&stream);
629
0
        *compressed_output_size = 0;
630
0
        *out_compression_type = kNoCompression;
631
0
        return Status::OK();
632
0
      }
633
0
    }
634
635
    // Set up input
636
0
    stream.next_in = (Bytef*)uncompressed_data.data();
637
0
    stream.avail_in = static_cast<unsigned int>(uncompressed_data.size());
638
639
    // Set up output
640
0
    stream.next_out = reinterpret_cast<Bytef*>(alg_output);
641
0
    stream.avail_out = static_cast<unsigned int>(alg_max_output_size);
642
643
    // Compress
644
0
    st = deflate(&stream, Z_FINISH);
645
0
    size_t outlen = alg_max_output_size - stream.avail_out;
646
0
    deflateEnd(&stream);
647
648
0
    if (st == Z_STREAM_END) {
649
      // Compression kept/successful
650
0
      *compressed_output_size =
651
0
          outlen + /*header size*/ (alg_output - compressed_output);
652
0
      *out_compression_type = kZlibCompression;
653
0
      return Status::OK();
654
0
    }
655
    // Compression failed or rejected
656
0
    *compressed_output_size = 1;
657
#else
658
    (void)uncompressed_data;
659
    (void)compressed_output;
660
    // Compression bypassed (not supported)
661
    *compressed_output_size = 0;
662
#endif
663
0
    *out_compression_type = kNoCompression;
664
0
    return Status::OK();
665
0
  }
666
};
667
668
class BuiltinBZip2CompressorV2 final : public CompressorWithSimpleDictBase {
669
 public:
670
  using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
671
672
0
  const char* Name() const override { return "BuiltinBZip2CompressorV2"; }
673
674
0
  CompressionType GetPreferredCompressionType() const override {
675
0
    return kBZip2Compression;
676
0
  }
677
678
  std::unique_ptr<Compressor> CloneForDict(
679
0
      std::string&& dict_data) const override {
680
0
    return std::make_unique<BuiltinBZip2CompressorV2>(opts_,
681
0
                                                      std::move(dict_data));
682
0
  }
683
684
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
685
                       size_t* compressed_output_size,
686
                       CompressionType* out_compression_type,
687
0
                       ManagedWorkingArea*) override {
688
0
#ifdef BZIP2
689
0
    auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
690
0
        uncompressed_data, compressed_output, *compressed_output_size);
691
0
    if (alg_max_output_size == 0) {
692
      // Compression bypassed
693
0
      *compressed_output_size = 0;
694
0
      *out_compression_type = kNoCompression;
695
0
      return Status::OK();
696
0
    }
697
698
    // BZip2 doesn't actually use the dictionary, but we store it for
699
    // compatibility similar to BuiltinSnappyCompressorV2
700
701
    // Initialize the bzip2 stream
702
0
    bz_stream stream;
703
0
    memset(&stream, 0, sizeof(bz_stream));
704
705
    // Block size 1 is 100K.
706
    // 0 is for silent.
707
    // 30 is the default workFactor
708
0
    int st = BZ2_bzCompressInit(&stream, 1, 0, 30);
709
0
    if (st != BZ_OK) {
710
0
      *compressed_output_size = 0;
711
0
      *out_compression_type = kNoCompression;
712
0
      return Status::OK();
713
0
    }
714
715
    // Set up input
716
0
    stream.next_in = const_cast<char*>(uncompressed_data.data());
717
0
    stream.avail_in = static_cast<unsigned int>(uncompressed_data.size());
718
719
    // Set up output
720
0
    stream.next_out = alg_output;
721
0
    stream.avail_out = static_cast<unsigned int>(alg_max_output_size);
722
723
    // Compress
724
0
    st = BZ2_bzCompress(&stream, BZ_FINISH);
725
0
    size_t outlen = alg_max_output_size - stream.avail_out;
726
0
    BZ2_bzCompressEnd(&stream);
727
728
    // Check for success
729
0
    if (st == BZ_STREAM_END) {
730
      // Compression kept/successful
731
0
      *compressed_output_size = outlen + (alg_output - compressed_output);
732
0
      *out_compression_type = kBZip2Compression;
733
0
      return Status::OK();
734
0
    }
735
    // Compression failed or rejected
736
0
    *compressed_output_size = 1;
737
#else
738
    (void)uncompressed_data;
739
    (void)compressed_output;
740
    // Compression bypassed (not supported)
741
    *compressed_output_size = 0;
742
#endif
743
0
    *out_compression_type = kNoCompression;
744
0
    return Status::OK();
745
0
  }
746
};
747
748
class BuiltinLZ4CompressorV2WithDict : public CompressorWithSimpleDictBase {
749
 public:
750
  using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
751
752
0
  const char* Name() const override { return "BuiltinLZ4CompressorV2"; }
753
754
0
  CompressionType GetPreferredCompressionType() const override {
755
0
    return kLZ4Compression;
756
0
  }
757
758
  std::unique_ptr<Compressor> CloneForDict(
759
0
      std::string&& dict_data) const override {
760
0
    return std::make_unique<BuiltinLZ4CompressorV2WithDict>(
761
0
        opts_, std::move(dict_data));
762
0
  }
763
764
0
  ManagedWorkingArea ObtainWorkingArea() override {
765
#ifdef LZ4
766
    return {reinterpret_cast<WorkingArea*>(LZ4_createStream()), this};
767
#else
768
0
    return {};
769
0
#endif
770
0
  }
771
0
  void ReleaseWorkingArea(WorkingArea* wa) override {
772
0
    if (wa) {
773
#ifdef LZ4
774
      LZ4_freeStream(reinterpret_cast<LZ4_stream_t*>(wa));
775
#endif
776
0
    }
777
0
  }
778
779
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
780
                       size_t* compressed_output_size,
781
                       CompressionType* out_compression_type,
782
0
                       ManagedWorkingArea* wa) override {
783
#ifdef LZ4
784
    auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
785
        uncompressed_data, compressed_output, *compressed_output_size);
786
    if (alg_max_output_size == 0) {
787
      // Compression bypassed
788
      *compressed_output_size = 0;
789
      *out_compression_type = kNoCompression;
790
      return Status::OK();
791
    }
792
793
    ManagedWorkingArea tmp_wa;
794
    LZ4_stream_t* stream;
795
    if (wa != nullptr && wa->owner() == this) {
796
      stream = reinterpret_cast<LZ4_stream_t*>(wa->get());
797
#if LZ4_VERSION_NUMBER >= 10900  // >= version 1.9.0
798
      LZ4_resetStream_fast(stream);
799
#else
800
      LZ4_resetStream(stream);
801
#endif
802
    } else {
803
      tmp_wa = ObtainWorkingArea();
804
      stream = reinterpret_cast<LZ4_stream_t*>(tmp_wa.get());
805
    }
806
    if (!dict_data_.empty()) {
807
      // TODO: more optimization possible here?
808
      LZ4_loadDict(stream, dict_data_.data(),
809
                   static_cast<int>(dict_data_.size()));
810
    }
811
    int acceleration;
812
    if (opts_.level < 0) {
813
      acceleration = -opts_.level;
814
    } else {
815
      acceleration = 1;
816
    }
817
    auto outlen = LZ4_compress_fast_continue(
818
        stream, uncompressed_data.data(), alg_output,
819
        static_cast<int>(uncompressed_data.size()),
820
        static_cast<int>(alg_max_output_size), acceleration);
821
    if (outlen > 0) {
822
      // Compression kept/successful
823
      size_t output_size = static_cast<size_t>(
824
          outlen + /*header size*/ (alg_output - compressed_output));
825
      assert(output_size <= *compressed_output_size);
826
      *compressed_output_size = output_size;
827
      *out_compression_type = kLZ4Compression;
828
      return Status::OK();
829
    }
830
    // Compression rejected
831
    *compressed_output_size = 1;
832
#else
833
0
    (void)uncompressed_data;
834
0
    (void)compressed_output;
835
0
    (void)wa;
836
    // Compression bypassed (not supported)
837
0
    *compressed_output_size = 0;
838
0
#endif
839
0
    *out_compression_type = kNoCompression;
840
0
    return Status::OK();
841
0
  }
842
};
843
844
class BuiltinLZ4CompressorV2NoDict final
845
    : public BuiltinLZ4CompressorV2WithDict {
846
 public:
847
  BuiltinLZ4CompressorV2NoDict(const CompressionOptions& opts)
848
0
      : BuiltinLZ4CompressorV2WithDict(opts, /*dict_data=*/{}) {}
849
850
0
  std::unique_ptr<Compressor> Clone() const override {
851
0
    return std::make_unique<BuiltinLZ4CompressorV2NoDict>(opts_);
852
0
  }
853
854
0
  ManagedWorkingArea ObtainWorkingArea() override {
855
    // Using an LZ4_stream_t between compressions and resetting with
856
    // LZ4_resetStream_fast is actually slower than using a fresh LZ4_stream_t
857
    // each time, or not involving a stream at all. Similarly, using an extState
858
    // does not seem to offer a performance boost, perhaps a small regression.
859
0
    return {};
860
0
  }
861
862
0
  void ReleaseWorkingArea(WorkingArea* wa) override {
863
    // Should not be called
864
0
    (void)wa;
865
0
    assert(wa == nullptr);
866
0
  }
867
868
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
869
                       size_t* compressed_output_size,
870
                       CompressionType* out_compression_type,
871
0
                       ManagedWorkingArea* wa) override {
872
#ifdef LZ4
873
    (void)wa;
874
    auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
875
        uncompressed_data, compressed_output, *compressed_output_size);
876
    if (alg_max_output_size == 0) {
877
      // Compression bypassed
878
      *compressed_output_size = 0;
879
      *out_compression_type = kNoCompression;
880
      return Status::OK();
881
    }
882
    int acceleration;
883
    if (opts_.level < 0) {
884
      acceleration = -opts_.level;
885
    } else {
886
      acceleration = 1;
887
    }
888
    auto outlen =
889
        LZ4_compress_fast(uncompressed_data.data(), alg_output,
890
                          static_cast<int>(uncompressed_data.size()),
891
                          static_cast<int>(alg_max_output_size), acceleration);
892
    if (outlen > 0) {
893
      // Compression kept/successful
894
      size_t output_size = static_cast<size_t>(
895
          outlen + /*header size*/ (alg_output - compressed_output));
896
      assert(output_size <= *compressed_output_size);
897
      *compressed_output_size = output_size;
898
      *out_compression_type = kLZ4Compression;
899
      return Status::OK();
900
    }
901
    // Compression rejected
902
    *compressed_output_size = 1;
903
#else
904
0
    (void)uncompressed_data;
905
0
    (void)compressed_output;
906
0
    (void)wa;
907
    // Compression bypassed (not supported)
908
0
    *compressed_output_size = 0;
909
0
#endif
910
0
    *out_compression_type = kNoCompression;
911
0
    return Status::OK();
912
0
  }
913
};
914
915
class BuiltinLZ4HCCompressorV2 final : public CompressorWithSimpleDictBase {
916
 public:
917
  using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
918
919
0
  const char* Name() const override { return "BuiltinLZ4HCCompressorV2"; }
920
921
0
  CompressionType GetPreferredCompressionType() const override {
922
0
    return kLZ4HCCompression;
923
0
  }
924
925
  std::unique_ptr<Compressor> CloneForDict(
926
0
      std::string&& dict_data) const override {
927
0
    return std::make_unique<BuiltinLZ4HCCompressorV2>(opts_,
928
0
                                                      std::move(dict_data));
929
0
  }
930
931
0
  ManagedWorkingArea ObtainWorkingArea() override {
932
#ifdef LZ4
933
    return {reinterpret_cast<WorkingArea*>(LZ4_createStreamHC()), this};
934
#else
935
0
    return {};
936
0
#endif
937
0
  }
938
0
  void ReleaseWorkingArea(WorkingArea* wa) override {
939
0
    if (wa) {
940
#ifdef LZ4
941
      LZ4_freeStreamHC(reinterpret_cast<LZ4_streamHC_t*>(wa));
942
#endif
943
0
    }
944
0
  }
945
946
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
947
                       size_t* compressed_output_size,
948
                       CompressionType* out_compression_type,
949
0
                       ManagedWorkingArea* wa) override {
950
#ifdef LZ4
951
    auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
952
        uncompressed_data, compressed_output, *compressed_output_size);
953
    if (alg_max_output_size == 0) {
954
      // Compression bypassed
955
      *compressed_output_size = 0;
956
      *out_compression_type = kNoCompression;
957
      return Status::OK();
958
    }
959
960
    int level = opts_.level;
961
    if (level == CompressionOptions::kDefaultCompressionLevel) {
962
      level = 0;  // lz4hc.h says any value < 1 will be sanitized to default
963
    }
964
965
    ManagedWorkingArea tmp_wa;
966
    LZ4_streamHC_t* stream;
967
    if (wa != nullptr && wa->owner() == this) {
968
      stream = reinterpret_cast<LZ4_streamHC_t*>(wa->get());
969
    } else {
970
      tmp_wa = ObtainWorkingArea();
971
      stream = reinterpret_cast<LZ4_streamHC_t*>(tmp_wa.get());
972
    }
973
#if LZ4_VERSION_NUMBER >= 10900  // >= version 1.9.0
974
    LZ4_resetStreamHC_fast(stream, level);
975
#else
976
    LZ4_resetStreamHC(stream, level);
977
#endif
978
    if (dict_data_.size() > 0) {
979
      // TODO: more optimization possible here?
980
      LZ4_loadDictHC(stream, dict_data_.data(),
981
                     static_cast<int>(dict_data_.size()));
982
    }
983
984
    auto outlen =
985
        LZ4_compress_HC_continue(stream, uncompressed_data.data(), alg_output,
986
                                 static_cast<int>(uncompressed_data.size()),
987
                                 static_cast<int>(alg_max_output_size));
988
    if (outlen > 0) {
989
      // Compression kept/successful
990
      size_t output_size = static_cast<size_t>(
991
          outlen + /*header size*/ (alg_output - compressed_output));
992
      assert(output_size <= *compressed_output_size);
993
      *compressed_output_size = output_size;
994
      *out_compression_type = kLZ4HCCompression;
995
      return Status::OK();
996
    }
997
    // Compression rejected
998
    *compressed_output_size = 1;
999
#else
1000
0
    (void)uncompressed_data;
1001
0
    (void)compressed_output;
1002
0
    (void)wa;
1003
    // Compression bypassed (not supported)
1004
0
    *compressed_output_size = 0;
1005
0
#endif
1006
0
    *out_compression_type = kNoCompression;
1007
0
    return Status::OK();
1008
0
  }
1009
};
1010
1011
class BuiltinXpressCompressorV2 final : public CompressorWithSimpleDictBase {
1012
 public:
1013
  using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
1014
1015
0
  const char* Name() const override { return "BuiltinXpressCompressorV2"; }
1016
1017
0
  CompressionType GetPreferredCompressionType() const override {
1018
0
    return kXpressCompression;
1019
0
  }
1020
1021
  std::unique_ptr<Compressor> CloneForDict(
1022
0
      std::string&& dict_data) const override {
1023
0
    return std::make_unique<BuiltinXpressCompressorV2>(opts_,
1024
0
                                                       std::move(dict_data));
1025
0
  }
1026
1027
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
1028
                       size_t* compressed_output_size,
1029
                       CompressionType* out_compression_type,
1030
0
                       ManagedWorkingArea*) override {
1031
#ifdef XPRESS
1032
    // XPRESS doesn't actually use the dictionary, but we store it for
1033
    // compatibility similar to BuiltinSnappyCompressorV2
1034
1035
    // Use the new CompressWithMaxSize function that writes directly to the
1036
    // output buffer
1037
    size_t compressed_size = port::xpress::CompressWithMaxSize(
1038
        uncompressed_data.data(), uncompressed_data.size(), compressed_output,
1039
        *compressed_output_size);
1040
1041
    if (compressed_size > 0) {
1042
      // Compression kept/successful
1043
      *compressed_output_size = compressed_size;
1044
      *out_compression_type = kXpressCompression;
1045
      return Status::OK();
1046
    }
1047
1048
    // Compression rejected or failed
1049
    *compressed_output_size = 1;
1050
#else
1051
0
    (void)uncompressed_data;
1052
0
    (void)compressed_output;
1053
    // Compression bypassed (not supported)
1054
0
    *compressed_output_size = 0;
1055
0
#endif
1056
0
    *out_compression_type = kNoCompression;
1057
0
    return Status::OK();
1058
0
  }
1059
};
1060
1061
class BuiltinZSTDCompressorV2 final : public CompressorBase {
1062
 public:
1063
  explicit BuiltinZSTDCompressorV2(const CompressionOptions& opts,
1064
                                   CompressionDict&& dict = {})
1065
0
      : CompressorBase(opts), dict_(std::move(dict)) {}
1066
1067
0
  const char* Name() const override { return "BuiltinZSTDCompressorV2"; }
1068
1069
0
  CompressionType GetPreferredCompressionType() const override { return kZSTD; }
1070
1071
0
  std::unique_ptr<Compressor> Clone() const override {
1072
0
    CompressionDict dict_copy{dict_.GetRawDict().ToString(), kZSTD,
1073
0
                              opts_.level};
1074
0
    return std::make_unique<BuiltinZSTDCompressorV2>(opts_,
1075
0
                                                     std::move(dict_copy));
1076
0
  }
1077
1078
0
  DictConfig GetDictGuidance(CacheEntryRole /*block_type*/) const override {
1079
0
    if (opts_.max_dict_bytes == 0) {
1080
      // Dictionary compression disabled
1081
0
      return DictDisabled{};
1082
0
    } else {
1083
0
      size_t max_sample_bytes = opts_.zstd_max_train_bytes > 0
1084
0
                                    ? opts_.zstd_max_train_bytes
1085
0
                                    : opts_.max_dict_bytes;
1086
0
      return DictSampling{max_sample_bytes};
1087
0
    }
1088
0
  }
1089
1090
  // NOTE: empty dict is equivalent to no dict
1091
0
  Slice GetSerializedDict() const override { return dict_.GetRawDict(); }
1092
1093
0
  ManagedWorkingArea ObtainWorkingArea() override {
1094
#ifdef ZSTD
1095
    ZSTD_CCtx* ctx =
1096
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
1097
        ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
1098
#else   // ROCKSDB_ZSTD_CUSTOM_MEM
1099
        ZSTD_createCCtx();
1100
#endif  // ROCKSDB_ZSTD_CUSTOM_MEM
1101
    auto level = opts_.level;
1102
    if (level == CompressionOptions::kDefaultCompressionLevel) {
1103
      // NB: ZSTD_CLEVEL_DEFAULT is historically == 3
1104
      level = ZSTD_CLEVEL_DEFAULT;
1105
    }
1106
    size_t err = ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, level);
1107
    if (ZSTD_isError(err)) {
1108
      assert(false);
1109
      ZSTD_freeCCtx(ctx);
1110
      ctx = ZSTD_createCCtx();
1111
    }
1112
    if (opts_.checksum) {
1113
      err = ZSTD_CCtx_setParameter(ctx, ZSTD_c_checksumFlag, 1);
1114
      if (ZSTD_isError(err)) {
1115
        assert(false);
1116
        ZSTD_freeCCtx(ctx);
1117
        ctx = ZSTD_createCCtx();
1118
      }
1119
    }
1120
    return ManagedWorkingArea(reinterpret_cast<WorkingArea*>(ctx), this);
1121
#else
1122
0
    return {};
1123
0
#endif  // ZSTD
1124
0
  }
1125
1126
0
  void ReleaseWorkingArea(WorkingArea* wa) override {
1127
0
    if (wa) {
1128
#ifdef ZSTD
1129
      ZSTD_freeCCtx(reinterpret_cast<ZSTD_CCtx*>(wa));
1130
#endif  // ZSTD
1131
0
    }
1132
0
  }
1133
1134
  Status CompressBlock(Slice uncompressed_data, char* compressed_output,
1135
                       size_t* compressed_output_size,
1136
                       CompressionType* out_compression_type,
1137
0
                       ManagedWorkingArea* wa) override {
1138
#ifdef ZSTD
1139
    auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
1140
        uncompressed_data, compressed_output, *compressed_output_size);
1141
    if (alg_max_output_size == 0) {
1142
      // Compression bypassed
1143
      *compressed_output_size = 0;
1144
      *out_compression_type = kNoCompression;
1145
      return Status::OK();
1146
    }
1147
1148
    ManagedWorkingArea tmp_wa;
1149
    if (wa == nullptr || wa->owner() != this) {
1150
      tmp_wa = ObtainWorkingArea();
1151
      wa = &tmp_wa;
1152
    }
1153
    assert(wa->get() != nullptr);
1154
    ZSTD_CCtx* ctx = reinterpret_cast<ZSTD_CCtx*>(wa->get());
1155
1156
    if (dict_.GetDigestedZstdCDict() != nullptr) {
1157
      ZSTD_CCtx_refCDict(ctx, dict_.GetDigestedZstdCDict());
1158
    } else {
1159
      ZSTD_CCtx_loadDictionary(ctx, dict_.GetRawDict().data(),
1160
                               dict_.GetRawDict().size());
1161
    }
1162
1163
    // Compression level is set in `contex` during ObtainWorkingArea()
1164
    size_t outlen =
1165
        ZSTD_compress2(ctx, alg_output, alg_max_output_size,
1166
                       uncompressed_data.data(), uncompressed_data.size());
1167
    if (!ZSTD_isError(outlen)) {
1168
      // Compression kept/successful
1169
      size_t output_size = static_cast<size_t>(
1170
          outlen + /*header size*/ (alg_output - compressed_output));
1171
      assert(output_size <= *compressed_output_size);
1172
      *compressed_output_size = output_size;
1173
      *out_compression_type = kZSTD;
1174
      return Status::OK();
1175
    }
1176
    if (ZSTD_getErrorCode(outlen) != ZSTD_error_dstSize_tooSmall) {
1177
      return Status::Corruption(std::string("ZSTD_compress2 failed: ") +
1178
                                ZSTD_getErrorName(outlen));
1179
    }
1180
    // Compression rejected
1181
    *compressed_output_size = 1;
1182
#else
1183
0
    (void)uncompressed_data;
1184
0
    (void)compressed_output;
1185
0
    (void)wa;
1186
    // Compression bypassed (not supported)
1187
0
    *compressed_output_size = 0;
1188
0
#endif
1189
0
    *out_compression_type = kNoCompression;
1190
0
    return Status::OK();
1191
0
  }
1192
1193
  std::unique_ptr<Compressor> MaybeCloneSpecialized(
1194
      CacheEntryRole /*block_type*/,
1195
0
      DictConfigArgs&& dict_config) const override {
1196
    // Handle DictDisabled
1197
    // TODO: use holds_alternative
1198
0
    if (auto* disabled = std::get_if<DictDisabled>(&dict_config)) {
1199
0
      (void)disabled;
1200
0
      return nullptr;
1201
0
    }
1202
1203
0
    std::string dict_data;
1204
1205
    // Handle DictPreDefined - use the pre-defined dictionary directly
1206
0
    if (auto* predef = std::get_if<DictPreDefined>(&dict_config)) {
1207
0
      if (predef->dict_data.empty()) {
1208
0
        return nullptr;
1209
0
      }
1210
0
      dict_data = std::move(predef->dict_data);
1211
0
    }
1212
1213
    // Handle DictSamples - train dictionary from samples
1214
0
    if (auto* samples = std::get_if<DictSamples>(&dict_config)) {
1215
0
      assert(samples->Verify());
1216
0
      if (samples->empty()) {
1217
0
        return nullptr;
1218
0
      }
1219
      // Migrated from BlockBasedTableBuilder::EnterUnbuffered()
1220
0
      if (opts_.zstd_max_train_bytes > 0) {
1221
0
        assert(samples->sample_data.size() <= opts_.zstd_max_train_bytes);
1222
0
        if (opts_.use_zstd_dict_trainer) {
1223
0
          dict_data = ZSTD_TrainDictionary(
1224
0
              samples->sample_data, samples->sample_lens, opts_.max_dict_bytes);
1225
0
        } else {
1226
0
          dict_data = ZSTD_FinalizeDictionary(
1227
0
              samples->sample_data, samples->sample_lens, opts_.max_dict_bytes,
1228
0
              opts_.level);
1229
0
        }
1230
0
      } else {
1231
0
        assert(samples->sample_data.size() <= opts_.max_dict_bytes);
1232
        // ZSTD "raw content dictionary" - "Any buffer is a valid raw content
1233
        // dictionary." Or similar for other compressions.
1234
0
        dict_data = std::move(samples->sample_data);
1235
0
      }
1236
0
    }
1237
1238
0
    CompressionDict dict{std::move(dict_data), kZSTD, opts_.level};
1239
0
    return std::make_unique<BuiltinZSTDCompressorV2>(opts_, std::move(dict));
1240
0
  }
1241
1242
  std::shared_ptr<Decompressor> GetOptimizedDecompressor() const override;
1243
1244
 protected:
1245
  const CompressionDict dict_;
1246
};
1247
1248
// Subroutines for BuiltinDecompressorV2
1249
1250
Status Snappy_DecompressBlock(const Decompressor::Args& args,
1251
0
                              char* uncompressed_output) {
1252
#ifdef SNAPPY
1253
  if (!snappy::RawUncompress(args.compressed_data.data(),
1254
                             args.compressed_data.size(),
1255
                             uncompressed_output)) {
1256
    return Status::Corruption("Error decompressing snappy data");
1257
  }
1258
  return Status::OK();
1259
#else
1260
0
  (void)args;
1261
0
  (void)uncompressed_output;
1262
0
  return Status::NotSupported("Snappy not supported in this build");
1263
0
#endif
1264
0
}
1265
1266
Status Zlib_DecompressBlock(const Decompressor::Args& args, Slice dict,
1267
0
                            char* uncompressed_output) {
1268
0
#ifdef ZLIB
1269
  // NOTE: uses "raw" format
1270
0
  constexpr int kWindowBits = -14;
1271
1272
0
  z_stream _stream;
1273
0
  memset(&_stream, 0, sizeof(z_stream));
1274
1275
  // For raw inflate, the windowBits should be -8..-15.
1276
  // If windowBits is bigger than zero, it will use either zlib
1277
  // header or gzip header. Adding 32 to it will do automatic detection.
1278
0
  int st = inflateInit2(&_stream, kWindowBits);
1279
0
  if (UNLIKELY(st != Z_OK)) {
1280
0
    return Status::Corruption("Failed to initialize zlib inflate: " +
1281
0
                              std::to_string(st));
1282
0
  }
1283
1284
0
  if (!dict.empty()) {
1285
    // Initialize the compression library's dictionary
1286
0
    st = inflateSetDictionary(&_stream,
1287
0
                              reinterpret_cast<const Bytef*>(dict.data()),
1288
0
                              static_cast<unsigned int>(dict.size()));
1289
0
    if (UNLIKELY(st != Z_OK)) {
1290
0
      return Status::Corruption("Failed to initialize zlib dictionary: " +
1291
0
                                std::to_string(st));
1292
0
    }
1293
0
  }
1294
1295
0
  _stream.next_in = const_cast<Bytef*>(
1296
0
      reinterpret_cast<const Bytef*>(args.compressed_data.data()));
1297
0
  _stream.avail_in = static_cast<unsigned int>(args.compressed_data.size());
1298
1299
0
  _stream.next_out = reinterpret_cast<Bytef*>(uncompressed_output);
1300
0
  _stream.avail_out = static_cast<unsigned int>(args.uncompressed_size);
1301
1302
0
  st = inflate(&_stream, Z_SYNC_FLUSH);
1303
0
  if (UNLIKELY(st != Z_STREAM_END)) {
1304
0
    inflateEnd(&_stream);
1305
    // NOTE: Z_OK is still corruption because it means we got the size wrong
1306
0
    return Status::Corruption("Failed zlib inflate: " + std::to_string(st));
1307
0
  }
1308
1309
  // We should have no bytes left
1310
0
  if (_stream.avail_out != 0) {
1311
0
    inflateEnd(&_stream);
1312
0
    return Status::Corruption("Size mismatch decompressing zlib data");
1313
0
  }
1314
1315
0
  inflateEnd(&_stream);
1316
0
  return Status::OK();
1317
#else
1318
  (void)args;
1319
  (void)dict;
1320
  (void)uncompressed_output;
1321
  return Status::NotSupported("Zlib not supported in this build");
1322
#endif
1323
0
}
1324
1325
Status BZip2_DecompressBlock(const Decompressor::Args& args,
1326
0
                             char* uncompressed_output) {
1327
0
#ifdef BZIP2
1328
0
  auto uncompressed_size = static_cast<unsigned int>(args.uncompressed_size);
1329
0
  if (BZ_OK != BZ2_bzBuffToBuffDecompress(
1330
0
                   uncompressed_output, &uncompressed_size,
1331
0
                   const_cast<char*>(args.compressed_data.data()),
1332
0
                   static_cast<unsigned int>(args.compressed_data.size()),
1333
0
                   0 /*small mem*/, 0 /*verbosity*/)) {
1334
0
    return Status::Corruption("Error decompressing bzip2 data");
1335
0
  }
1336
0
  if (uncompressed_size != args.uncompressed_size) {
1337
0
    return Status::Corruption("Size mismatch decompressing bzip2 data");
1338
0
  }
1339
0
  return Status::OK();
1340
#else
1341
  (void)args;
1342
  (void)uncompressed_output;
1343
  return Status::NotSupported("BZip2 not supported in this build");
1344
#endif
1345
0
}
1346
1347
Status LZ4_DecompressBlock(const Decompressor::Args& args, Slice dict,
1348
0
                           char* uncompressed_output) {
1349
#ifdef LZ4
1350
  int expected_uncompressed_size = static_cast<int>(args.uncompressed_size);
1351
  LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
1352
  if (!dict.empty()) {
1353
    LZ4_setStreamDecode(stream, dict.data(), static_cast<int>(dict.size()));
1354
  }
1355
  int uncompressed_size = LZ4_decompress_safe_continue(
1356
      stream, args.compressed_data.data(), uncompressed_output,
1357
      static_cast<int>(args.compressed_data.size()),
1358
      expected_uncompressed_size);
1359
  LZ4_freeStreamDecode(stream);
1360
1361
  if (uncompressed_size != expected_uncompressed_size) {
1362
    if (uncompressed_size < 0) {
1363
      return Status::Corruption("Error decompressing LZ4 data");
1364
    } else {
1365
      return Status::Corruption("Size mismatch decompressing LZ4 data");
1366
    }
1367
  }
1368
  return Status::OK();
1369
#else
1370
0
  (void)args;
1371
0
  (void)dict;
1372
0
  (void)uncompressed_output;
1373
0
  return Status::NotSupported("LZ4 not supported in this build");
1374
0
#endif
1375
0
}
1376
1377
Status XPRESS_DecompressBlock(const Decompressor::Args& args,
1378
0
                              char* uncompressed_output) {
1379
#ifdef XPRESS
1380
  int64_t actual_uncompressed_size = port::xpress::DecompressToBuffer(
1381
      args.compressed_data.data(), args.compressed_data.size(),
1382
      uncompressed_output, args.uncompressed_size);
1383
  if (actual_uncompressed_size !=
1384
      static_cast<int64_t>(args.uncompressed_size)) {
1385
    if (actual_uncompressed_size < 0) {
1386
      return Status::Corruption("Error decompressing XPRESS data");
1387
    } else {
1388
      return Status::Corruption("Size mismatch decompressing XPRESS data");
1389
    }
1390
  }
1391
  return Status::OK();
1392
#else
1393
0
  (void)args;
1394
0
  (void)uncompressed_output;
1395
0
  return Status::NotSupported("XPRESS not supported in this build");
1396
0
#endif
1397
0
}
1398
1399
template <bool kIsDigestedDict = false>
1400
Status ZSTD_DecompressBlockWithContext(
1401
    const Decompressor::Args& args,
1402
    std::conditional_t<kIsDigestedDict, void*, Slice> dict,
1403
    ZSTDUncompressCachedData::ZSTDNativeContext zstd_context,
1404
0
    char* uncompressed_output) {
1405
#ifdef ZSTD
1406
  size_t uncompressed_size;
1407
  assert(zstd_context != nullptr);
1408
  if constexpr (kIsDigestedDict) {
1409
#ifdef ROCKSDB_ZSTD_DDICT
1410
    uncompressed_size = ZSTD_decompress_usingDDict(
1411
        zstd_context, uncompressed_output, args.uncompressed_size,
1412
        args.compressed_data.data(), args.compressed_data.size(),
1413
        static_cast<ZSTD_DDict*>(dict));
1414
#else
1415
    static_assert(!kIsDigestedDict,
1416
                  "Inconsistent expectation of ZSTD digested dict support");
1417
#endif  // ROCKSDB_ZSTD_DDICT
1418
  } else if (dict.empty()) {
1419
    uncompressed_size = ZSTD_decompressDCtx(
1420
        zstd_context, uncompressed_output, args.uncompressed_size,
1421
        args.compressed_data.data(), args.compressed_data.size());
1422
  } else {
1423
    uncompressed_size = ZSTD_decompress_usingDict(
1424
        zstd_context, uncompressed_output, args.uncompressed_size,
1425
        args.compressed_data.data(), args.compressed_data.size(), dict.data(),
1426
        dict.size());
1427
  }
1428
  if (ZSTD_isError(uncompressed_size)) {
1429
    return Status::Corruption(std::string("ZSTD ") +
1430
                              ZSTD_getErrorName(uncompressed_size));
1431
  } else if (uncompressed_size != args.uncompressed_size) {
1432
    return Status::Corruption("ZSTD decompression size mismatch");
1433
  } else {
1434
    return Status::OK();
1435
  }
1436
#else
1437
0
  (void)args;
1438
0
  (void)dict;
1439
0
  (void)zstd_context;
1440
0
  (void)uncompressed_output;
1441
0
  return Status::NotSupported("ZSTD not supported in this build");
1442
0
#endif
1443
0
}
1444
1445
template <bool kIsDigestedDict = false>
1446
Status ZSTD_DecompressBlock(
1447
    const Decompressor::Args& args,
1448
    std::conditional_t<kIsDigestedDict, void*, Slice> dict,
1449
0
    const Decompressor* decompressor, char* uncompressed_output) {
1450
0
  if (args.working_area && args.working_area->owner() == decompressor) {
1451
0
    auto ctx = static_cast<UncompressionContext*>(args.working_area->get());
1452
0
    assert(ctx != nullptr);
1453
0
    if (ctx->GetZSTDContext() != nullptr) {
1454
0
      return ZSTD_DecompressBlockWithContext<kIsDigestedDict>(
1455
0
          args, dict, ctx->GetZSTDContext(), uncompressed_output);
1456
0
    }
1457
0
  }
1458
0
  UncompressionContext tmp_ctx{kZSTD};
1459
0
  return ZSTD_DecompressBlockWithContext<kIsDigestedDict>(
1460
0
      args, dict, tmp_ctx.GetZSTDContext(), uncompressed_output);
1461
0
}
1462
1463
class BuiltinDecompressorV2 : public Decompressor {
1464
 public:
1465
0
  const char* Name() const override { return "BuiltinDecompressorV2"; }
1466
1467
0
  Status ExtractUncompressedSize(Args& args) override {
1468
0
    assert(args.compression_type != kNoCompression);
1469
0
    if (args.compression_type == kSnappyCompression) {
1470
      // 1st exception to encoding of uncompressed size
1471
#ifdef SNAPPY
1472
      size_t uncompressed_length = 0;
1473
      if (!snappy::GetUncompressedLength(args.compressed_data.data(),
1474
                                         args.compressed_data.size(),
1475
                                         &uncompressed_length)) {
1476
        return Status::Corruption("Error reading snappy compressed length");
1477
      }
1478
      args.uncompressed_size = uncompressed_length;
1479
      return Status::OK();
1480
#else
1481
0
      return Status::NotSupported("Snappy not supported in this build");
1482
0
#endif
1483
0
    } else if (args.compression_type == kXpressCompression) {
1484
      // 2nd exception to encoding of uncompressed size
1485
#ifdef XPRESS
1486
      int64_t result = port::xpress::GetDecompressedSize(
1487
          args.compressed_data.data(), args.compressed_data.size());
1488
      if (result < 0) {
1489
        return Status::Corruption("Error reading XPRESS compressed length");
1490
      }
1491
      args.uncompressed_size = static_cast<size_t>(result);
1492
      return Status::OK();
1493
#else
1494
0
      return Status::NotSupported("XPRESS not supported in this build");
1495
0
#endif
1496
1497
0
    } else {
1498
      // Extract encoded uncompressed size
1499
0
      return Decompressor::ExtractUncompressedSize(args);
1500
0
    }
1501
0
  }
1502
1503
0
  Status DecompressBlock(const Args& args, char* uncompressed_output) override {
1504
0
    switch (args.compression_type) {
1505
0
      case kSnappyCompression:
1506
0
        return Snappy_DecompressBlock(args, uncompressed_output);
1507
0
      case kZlibCompression:
1508
0
        return Zlib_DecompressBlock(args, /*dict=*/Slice{},
1509
0
                                    uncompressed_output);
1510
0
      case kBZip2Compression:
1511
0
        return BZip2_DecompressBlock(args, uncompressed_output);
1512
0
      case kLZ4Compression:
1513
0
      case kLZ4HCCompression:
1514
0
        return LZ4_DecompressBlock(args, /*dict=*/Slice{}, uncompressed_output);
1515
0
      case kXpressCompression:
1516
0
        return XPRESS_DecompressBlock(args, uncompressed_output);
1517
0
      case kZSTD:
1518
0
        return ZSTD_DecompressBlock(args, /*dict=*/Slice{}, this,
1519
0
                                    uncompressed_output);
1520
0
      default:
1521
0
        return Status::NotSupported(
1522
0
            "Compression type not supported or not built-in: " +
1523
0
            CompressionTypeToString(args.compression_type));
1524
0
    }
1525
0
  }
1526
1527
  Status MaybeCloneForDict(const Slice&,
1528
                           std::unique_ptr<Decompressor>*) override;
1529
1530
0
  size_t ApproximateOwnedMemoryUsage() const override {
1531
0
    return sizeof(BuiltinDecompressorV2);
1532
0
  }
1533
};
1534
1535
class BuiltinDecompressorV2SnappyOnly final : public BuiltinDecompressorV2 {
1536
 public:
1537
0
  const char* Name() const override {
1538
0
    return "BuiltinDecompressorV2SnappyOnly";
1539
0
  }
1540
1541
0
  Status ExtractUncompressedSize(Args& args) override {
1542
0
    assert(args.compression_type == kSnappyCompression);
1543
#ifdef SNAPPY
1544
    size_t uncompressed_length = 0;
1545
    if (!snappy::GetUncompressedLength(args.compressed_data.data(),
1546
                                       args.compressed_data.size(),
1547
                                       &uncompressed_length)) {
1548
      return Status::Corruption("Error reading snappy compressed length");
1549
    }
1550
    args.uncompressed_size = uncompressed_length;
1551
    return Status::OK();
1552
#else
1553
0
    return Status::NotSupported("Snappy not supported in this build");
1554
0
#endif
1555
0
  }
1556
1557
0
  Status DecompressBlock(const Args& args, char* uncompressed_output) override {
1558
0
    assert(args.compression_type == kSnappyCompression);
1559
0
    return Snappy_DecompressBlock(args, uncompressed_output);
1560
0
  }
1561
};
1562
1563
class BuiltinDecompressorV2WithDict final : public BuiltinDecompressorV2 {
1564
 public:
1565
0
  explicit BuiltinDecompressorV2WithDict(const Slice& dict) : dict_(dict) {}
1566
1567
0
  const char* Name() const override { return "BuiltinDecompressorV2WithDict"; }
1568
1569
0
  Status DecompressBlock(const Args& args, char* uncompressed_output) override {
1570
0
    switch (args.compression_type) {
1571
0
      case kSnappyCompression:
1572
        // NOTE: quietly ignores the dictionary (for compatibility)
1573
0
        return Snappy_DecompressBlock(args, uncompressed_output);
1574
0
      case kZlibCompression:
1575
0
        return Zlib_DecompressBlock(args, dict_, uncompressed_output);
1576
0
      case kBZip2Compression:
1577
        // NOTE: quietly ignores the dictionary (for compatibility)
1578
0
        return BZip2_DecompressBlock(args, uncompressed_output);
1579
0
      case kLZ4Compression:
1580
0
      case kLZ4HCCompression:
1581
0
        return LZ4_DecompressBlock(args, dict_, uncompressed_output);
1582
0
      case kXpressCompression:
1583
        // NOTE: quietly ignores the dictionary (for compatibility)
1584
0
        return XPRESS_DecompressBlock(args, uncompressed_output);
1585
0
      case kZSTD:
1586
0
        return ZSTD_DecompressBlock(args, dict_, this, uncompressed_output);
1587
0
      default:
1588
0
        return Status::NotSupported(
1589
0
            "Compression type not supported or not built-in: " +
1590
0
            CompressionTypeToString(args.compression_type));
1591
0
    }
1592
0
  }
1593
1594
0
  const Slice& GetSerializedDict() const override { return dict_; }
1595
1596
0
  size_t ApproximateOwnedMemoryUsage() const override {
1597
0
    return sizeof(BuiltinDecompressorV2WithDict);
1598
0
  }
1599
1600
 protected:
1601
  const Slice dict_;
1602
};
1603
1604
Status BuiltinDecompressorV2::MaybeCloneForDict(
1605
0
    const Slice& dict, std::unique_ptr<Decompressor>* out) {
1606
  // Check RocksDB-promised precondition
1607
0
  assert(dict.size() > 0);
1608
  // Because of unfortunate decisions in handling built-in compression types,
1609
  // all the compression types before ZSTD that do not actually support
1610
  // dictionary compression pretend to support it. Specifically, we have to be
1611
  // able to read files with a compression dictionary block using those
1612
  // compression types even though the compression dictionary is ignored by
1613
  // the compression algorithm. And the Decompressor has to return the
1614
  // configured dictionary from GetSerializedDict() even if it is ignored. This
1615
  // unfortunately means that a new schema version (BuiltinV3?) would be needed
1616
  // toactually support dictionary compression in the future for these
1617
  // algorithms (if the libraries add support).
1618
  // TODO: can we make this a better/cleaner experience?
1619
0
  *out = std::make_unique<BuiltinDecompressorV2WithDict>(dict);
1620
0
  return Status::OK();
1621
0
}
1622
1623
class BuiltinDecompressorV2OptimizeZstd : public BuiltinDecompressorV2 {
1624
 public:
1625
0
  const char* Name() const override {
1626
0
    return "BuiltinDecompressorV2OptimizeZstd";
1627
0
  }
1628
1629
0
  ManagedWorkingArea ObtainWorkingArea(CompressionType preferred) override {
1630
0
    if (preferred == kZSTD) {
1631
      // TODO: evaluate whether it makes sense to use core local cache here.
1632
      // (Perhaps not, because explicit WorkingArea could be long-running.)
1633
0
      return ManagedWorkingArea(new UncompressionContext(kZSTD), this);
1634
0
    } else {
1635
0
      return {};
1636
0
    }
1637
0
  }
1638
1639
0
  void ReleaseWorkingArea(WorkingArea* wa) override {
1640
0
    delete static_cast<UncompressionContext*>(wa);
1641
0
  }
1642
1643
0
  Status DecompressBlock(const Args& args, char* uncompressed_output) override {
1644
0
    if (LIKELY(args.compression_type == kZSTD)) {
1645
0
      return ZSTD_DecompressBlock(args, /*dict=*/Slice{}, this,
1646
0
                                  uncompressed_output);
1647
0
    } else {
1648
0
      return BuiltinDecompressorV2::DecompressBlock(args, uncompressed_output);
1649
0
    }
1650
0
  }
1651
1652
  Status MaybeCloneForDict(const Slice& /*serialized_dict*/,
1653
                           std::unique_ptr<Decompressor>* /*out*/) override;
1654
};
1655
1656
class BuiltinDecompressorV2OptimizeZstdWithDict final
1657
    : public BuiltinDecompressorV2OptimizeZstd {
1658
 public:
1659
  explicit BuiltinDecompressorV2OptimizeZstdWithDict(const Slice& dict)
1660
      :
1661
#ifdef ROCKSDB_ZSTD_DDICT
1662
        dict_(dict),
1663
        ddict_(ZSTD_createDDict_byReference(dict.data(), dict.size())) {
1664
    assert(ddict_ != nullptr);
1665
  }
1666
#else
1667
0
        dict_(dict) {
1668
0
  }
1669
#endif  // ROCKSDB_ZSTD_DDICT
1670
1671
0
  const char* Name() const override {
1672
0
    return "BuiltinDecompressorV2OptimizeZstdWithDict";
1673
0
  }
1674
1675
0
  ~BuiltinDecompressorV2OptimizeZstdWithDict() override {
1676
0
#ifdef ROCKSDB_ZSTD_DDICT
1677
0
    size_t res = ZSTD_freeDDict(ddict_);
1678
0
    assert(res == 0);  // Last I checked they can't fail
1679
0
    (void)res;         // prevent unused var warning
1680
0
#endif                 // ROCKSDB_ZSTD_DDICT
1681
0
  }
1682
1683
0
  const Slice& GetSerializedDict() const override { return dict_; }
1684
1685
0
  size_t ApproximateOwnedMemoryUsage() const override {
1686
0
    size_t sz = sizeof(BuiltinDecompressorV2WithDict);
1687
#ifdef ROCKSDB_ZSTD_DDICT
1688
    sz += ZSTD_sizeof_DDict(ddict_);
1689
#endif  // ROCKSDB_ZSTD_DDICT
1690
0
    return sz;
1691
0
  }
1692
1693
0
  Status DecompressBlock(const Args& args, char* uncompressed_output) override {
1694
0
    if (LIKELY(args.compression_type == kZSTD)) {
1695
#ifdef ROCKSDB_ZSTD_DDICT
1696
      return ZSTD_DecompressBlock</*kIsDigestedDict=*/true>(
1697
          args, ddict_, this, uncompressed_output);
1698
#else
1699
0
      return ZSTD_DecompressBlock(args, dict_, this, uncompressed_output);
1700
0
#endif  // ROCKSDB_ZSTD_DDICT
1701
0
    } else {
1702
0
      return BuiltinDecompressorV2WithDict(dict_).DecompressBlock(
1703
0
          args, uncompressed_output);
1704
0
    }
1705
0
  }
1706
1707
 protected:
1708
  const Slice dict_;
1709
#ifdef ROCKSDB_ZSTD_DDICT
1710
  ZSTD_DDict* const ddict_;
1711
#endif  // ROCKSDB_ZSTD_DDICT
1712
};
1713
1714
Status BuiltinDecompressorV2OptimizeZstd::MaybeCloneForDict(
1715
0
    const Slice& serialized_dict, std::unique_ptr<Decompressor>* out) {
1716
0
  *out = std::make_unique<BuiltinDecompressorV2OptimizeZstdWithDict>(
1717
0
      serialized_dict);
1718
0
  return Status::OK();
1719
0
}
1720
class BuiltinCompressionManagerV2 final : public CompressionManager {
1721
 public:
1722
4
  BuiltinCompressionManagerV2() = default;
1723
0
  ~BuiltinCompressionManagerV2() override = default;
1724
1725
0
  const char* Name() const override { return "BuiltinCompressionManagerV2"; }
1726
1727
0
  const char* CompatibilityName() const override { return "BuiltinV2"; }
1728
1729
  std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
1730
17.4k
                                            CompressionType type) override {
1731
17.4k
    if (opts.max_compressed_bytes_per_kb <= 0) {
1732
      // No acceptable compression ratio => no compression
1733
0
      return nullptr;
1734
0
    }
1735
17.4k
    if (!SupportsCompressionType(type)) {
1736
      // Unrecognized or support not compiled in. Fall back on default
1737
0
      type = ColumnFamilyOptions{}.compression;
1738
0
    }
1739
17.4k
    switch (type) {
1740
17.4k
      case kNoCompression:
1741
17.4k
      default:
1742
17.4k
        assert(type == kNoCompression);  // Others should be excluded above
1743
17.4k
        return nullptr;
1744
0
      case kSnappyCompression:
1745
0
        return std::make_unique<BuiltinSnappyCompressorV2>(opts);
1746
0
      case kZlibCompression:
1747
0
        return std::make_unique<BuiltinZlibCompressorV2>(opts);
1748
0
      case kBZip2Compression:
1749
0
        return std::make_unique<BuiltinBZip2CompressorV2>(opts);
1750
0
      case kLZ4Compression:
1751
0
        return std::make_unique<BuiltinLZ4CompressorV2NoDict>(opts);
1752
0
      case kLZ4HCCompression:
1753
0
        return std::make_unique<BuiltinLZ4HCCompressorV2>(opts);
1754
0
      case kXpressCompression:
1755
0
        return std::make_unique<BuiltinXpressCompressorV2>(opts);
1756
0
      case kZSTD:
1757
0
        return std::make_unique<BuiltinZSTDCompressorV2>(opts);
1758
17.4k
    }
1759
17.4k
  }
1760
1761
0
  std::shared_ptr<Decompressor> GetDecompressor() override {
1762
0
    return GetGeneralDecompressor();
1763
0
  }
1764
1765
  std::shared_ptr<Decompressor> GetDecompressorOptimizeFor(
1766
0
      CompressionType optimize_for_type) override {
1767
0
    if (optimize_for_type == kZSTD) {
1768
0
      return GetZstdDecompressor();
1769
0
    } else {
1770
0
      return GetGeneralDecompressor();
1771
0
    }
1772
0
  }
1773
1774
  std::shared_ptr<Decompressor> GetDecompressorForTypes(
1775
      const CompressionType* types_begin,
1776
0
      const CompressionType* types_end) override {
1777
0
    if (types_begin == types_end) {
1778
0
      return nullptr;
1779
0
    } else if (types_begin + 1 == types_end &&
1780
0
               *types_begin == kSnappyCompression) {
1781
      // Exclusively Snappy
1782
0
      return GetSnappyDecompressor();
1783
0
    } else if (std::find(types_begin, types_end, kZSTD) != types_end) {
1784
      // Includes ZSTD
1785
0
      return GetZstdDecompressor();
1786
0
    } else {
1787
      // Everything else
1788
0
      return GetGeneralDecompressor();
1789
0
    }
1790
0
  }
1791
1792
17.4k
  bool SupportsCompressionType(CompressionType type) const override {
1793
17.4k
    return CompressionTypeSupported(type);
1794
17.4k
  }
1795
1796
 protected:
1797
  BuiltinDecompressorV2 decompressor_;
1798
  BuiltinDecompressorV2OptimizeZstd zstd_decompressor_;
1799
  BuiltinDecompressorV2SnappyOnly snappy_decompressor_;
1800
1801
 public:
1802
0
  inline std::shared_ptr<Decompressor> GetGeneralDecompressor() {
1803
0
    return std::shared_ptr<Decompressor>(shared_from_this(), &decompressor_);
1804
0
  }
1805
1806
0
  inline std::shared_ptr<Decompressor> GetZstdDecompressor() {
1807
0
    return std::shared_ptr<Decompressor>(shared_from_this(),
1808
0
                                         &zstd_decompressor_);
1809
0
  }
1810
1811
0
  inline std::shared_ptr<Decompressor> GetSnappyDecompressor() {
1812
0
    return std::shared_ptr<Decompressor>(shared_from_this(),
1813
0
                                         &snappy_decompressor_);
1814
0
  }
1815
};
1816
1817
const std::shared_ptr<BuiltinCompressionManagerV2>
1818
    kBuiltinCompressionManagerV2 =
1819
        std::make_shared<BuiltinCompressionManagerV2>();
1820
1821
std::shared_ptr<Decompressor>
1822
0
BuiltinZSTDCompressorV2::GetOptimizedDecompressor() const {
1823
0
  return kBuiltinCompressionManagerV2->GetZstdDecompressor();
1824
0
}
1825
1826
std::shared_ptr<Decompressor>
1827
0
BuiltinSnappyCompressorV2::GetOptimizedDecompressor() const {
1828
0
  return kBuiltinCompressionManagerV2->GetSnappyDecompressor();
1829
0
}
1830
1831
}  // namespace
1832
1833
Status CompressionManager::CreateFromString(
1834
    const ConfigOptions& config_options, const std::string& value,
1835
176k
    std::shared_ptr<CompressionManager>* result) {
1836
176k
  if (value == kNullptrString || value.empty()) {
1837
176k
    result->reset();
1838
176k
    return Status::OK();
1839
176k
  }
1840
1841
18.4E
  static std::once_flag loaded;
1842
18.4E
  std::call_once(loaded, [&]() {
1843
0
    auto& library = *ObjectLibrary::Default();
1844
    // TODO: try to enhance ObjectLibrary to support singletons
1845
0
    library.AddFactory<CompressionManager>(
1846
0
        kBuiltinCompressionManagerV2->CompatibilityName(),
1847
0
        [](const std::string& /*uri*/,
1848
0
           std::unique_ptr<CompressionManager>* guard,
1849
0
           std::string* /*errmsg*/) {
1850
0
          *guard = std::make_unique<BuiltinCompressionManagerV2>();
1851
0
          return guard->get();
1852
0
        });
1853
0
  });
1854
1855
18.4E
  std::string id;
1856
18.4E
  std::unordered_map<std::string, std::string> opt_map;
1857
18.4E
  Status status = Customizable::GetOptionsMap(config_options, result->get(),
1858
18.4E
                                              value, &id, &opt_map);
1859
18.4E
  if (!status.ok()) {  // GetOptionsMap failed
1860
0
    return status;
1861
18.4E
  } else if (id.empty()) {  // We have no Id but have options.  Not good
1862
0
    return Status::NotSupported("Cannot reset object ", id);
1863
18.4E
  } else {
1864
18.4E
    status = config_options.registry->NewSharedObject(id, result);
1865
18.4E
  }
1866
18.4E
  if (config_options.ignore_unsupported_options && status.IsNotSupported()) {
1867
0
    return Status::OK();
1868
18.4E
  } else if (status.ok()) {
1869
0
    status = Customizable::ConfigureNewObject(config_options, result->get(),
1870
0
                                              opt_map);
1871
0
  }
1872
18.4E
  return status;
1873
18.4E
}
1874
1875
std::shared_ptr<CompressionManager>
1876
0
CompressionManager::FindCompatibleCompressionManager(Slice compatibility_name) {
1877
0
  if (compatibility_name.compare(CompatibilityName()) == 0) {
1878
0
    return shared_from_this();
1879
0
  } else {
1880
0
    std::shared_ptr<CompressionManager> out;
1881
0
    Status s =
1882
0
        CreateFromString(ConfigOptions(), compatibility_name.ToString(), &out);
1883
0
    if (s.ok()) {
1884
0
      return out;
1885
0
    } else {
1886
0
      return nullptr;
1887
0
    }
1888
0
  }
1889
0
}
1890
1891
17.4k
const std::shared_ptr<CompressionManager>& GetBuiltinV2CompressionManager() {
1892
17.4k
  static const std::shared_ptr<CompressionManager> v2_as_base =
1893
17.4k
      kBuiltinCompressionManagerV2;
1894
17.4k
  return v2_as_base;
1895
17.4k
}
1896
1897
// ***********************************************************************
1898
// END built-in implementation of customization interface
1899
// ***********************************************************************
1900
1901
Status LegacyForceBuiltinCompression(
1902
    Compressor& builtin_compressor,
1903
    Compressor::ManagedWorkingArea* working_area, Slice from,
1904
0
    GrowableBuffer* to) {
1905
  // For legacy cases that store compressed data even when it's larger than the
1906
  // uncompressed data (!!!), we need a reliable upper bound on the compressed
1907
  // size. This is based on consulting various algorithms documentation etc.
1908
  // and adding ~4 bytes for encoded uncompressed size. (Snappy is the worst
1909
  // case for multiplicative overhead at n + n/6, bounded by 19*n/16 to avoid
1910
  // costly division. Bzip2 is the worst case for additive overhead at 600
1911
  // bytes.)
1912
0
  size_t n = from.size();
1913
0
  size_t upper_bound = ((19 * n) >> 4) + 604;
1914
  // The upper bound has only been established considering built-in compression
1915
  // types through kZSTD. (Might need updating if this fails.)
1916
0
  assert(builtin_compressor.GetPreferredCompressionType() <= kZSTD);
1917
1918
0
  to->ResetForSize(upper_bound);
1919
0
  CompressionType actual_type = kNoCompression;
1920
0
  Status s = builtin_compressor.CompressBlock(
1921
0
      from, to->data(), &to->MutableSize(), &actual_type, working_area);
1922
0
  TEST_SYNC_POINT_CALLBACK("LegacyForceBuiltinCompression:TamperWithStatus",
1923
0
                           &s);
1924
1925
0
  if (!s.ok()) {
1926
0
    return s;
1927
0
  }
1928
0
  if (actual_type == kNoCompression) {
1929
    // abort in debug builds
1930
0
    assert(actual_type != kNoCompression);
1931
0
    return Status::Corruption("Compression unexpectedly declined or aborted");
1932
0
  }
1933
0
  assert(actual_type == builtin_compressor.GetPreferredCompressionType());
1934
0
  return Status::OK();
1935
0
}
1936
1937
}  // namespace ROCKSDB_NAMESPACE