/src/rocksdb/util/compression.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | // |
10 | | #pragma once |
11 | | |
12 | | #include <algorithm> |
13 | | |
14 | | #include "memory/memory_allocator_impl.h" |
15 | | #include "rocksdb/advanced_compression.h" |
16 | | #include "rocksdb/options.h" |
17 | | #include "table/block_based/block_type.h" |
18 | | #include "util/aligned_buffer.h" |
19 | | #include "util/coding.h" |
20 | | #include "util/compression_context_cache.h" |
21 | | |
22 | | #ifdef ZSTD |
23 | | #include <zstd.h> |
24 | | #include <zstd_errors.h> |
25 | | // ZSTD_Compress2(), ZSTD_compressStream2() and frame parameters all belong to |
26 | | // advanced APIs and require v1.4.0+, which is from April 2019. |
27 | | // https://github.com/facebook/zstd/blob/eb9f881eb810f2242f1ef36b3f3e7014eecb8fa6/lib/zstd.h#L297C40-L297C45 |
28 | | // To avoid a rat's nest of #ifdefs, we now require v1.4.0+ for ZSTD support. |
29 | | #if ZSTD_VERSION_NUMBER < 10400 |
30 | | #error "ZSTD support requires version >= 1.4.0 (libzstd-devel)" |
31 | | #endif // ZSTD_VERSION_NUMBER |
32 | | // The above release also includes digested dictionary support, but some |
33 | | // required functions (ZSTD_createDDict_byReference) are still only exported |
34 | | // with ZSTD_STATIC_LINKING_ONLY defined. |
35 | | #if defined(ZSTD_STATIC_LINKING_ONLY) |
36 | | #define ROCKSDB_ZSTD_DDICT |
37 | | #endif // defined(ZSTD_STATIC_LINKING_ONLY) |
38 | | // For ZDICT_* functions |
39 | | #include <zdict.h> |
40 | | // ZDICT_finalizeDictionary API is exported and stable since v1.4.5 |
41 | | #if ZSTD_VERSION_NUMBER >= 10405 |
42 | | #define ROCKSDB_ZDICT_FINALIZE |
43 | | #endif // ZSTD_VERSION_NUMBER >= 10405 |
44 | | #endif // ZSTD |
45 | | |
46 | | namespace ROCKSDB_NAMESPACE { |
47 | | // Need this for the context allocation override |
48 | | // On windows we need to do this explicitly |
49 | | #if defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \ |
50 | | defined(ZSTD_STATIC_LINKING_ONLY) |
51 | | #define ROCKSDB_ZSTD_CUSTOM_MEM |
52 | | namespace port { |
53 | | ZSTD_customMem GetJeZstdAllocationOverrides(); |
54 | | } // namespace port |
55 | | #endif // defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && |
56 | | // defined(ZSTD_STATIC_LINKING_ONLY) |
57 | | |
58 | | // Cached data represents a portion that can be re-used |
59 | | // If, in the future we have more than one native context to |
60 | | // cache we can arrange this as a tuple |
61 | | class ZSTDUncompressCachedData { |
62 | | public: |
63 | | #if defined(ZSTD) |
64 | | using ZSTDNativeContext = ZSTD_DCtx*; |
65 | | #else |
66 | | using ZSTDNativeContext = void*; |
67 | | #endif // ZSTD |
68 | 64 | ZSTDUncompressCachedData() {} |
69 | | // Init from cache |
70 | | ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete; |
71 | | ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; |
72 | | ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept |
73 | 0 | : ZSTDUncompressCachedData() { |
74 | 0 | *this = std::move(o); |
75 | 0 | } |
76 | 0 | ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept { |
77 | 0 | assert(zstd_ctx_ == nullptr); |
78 | 0 | std::swap(zstd_ctx_, o.zstd_ctx_); |
79 | 0 | std::swap(cache_idx_, o.cache_idx_); |
80 | 0 | return *this; |
81 | 0 | } |
82 | 0 | ZSTDNativeContext Get() const { return zstd_ctx_; } |
83 | 0 | int64_t GetCacheIndex() const { return cache_idx_; } |
84 | 0 | void CreateIfNeeded() { |
85 | 0 | if (zstd_ctx_ == nullptr) { |
86 | 0 | #if !defined(ZSTD) |
87 | 0 | zstd_ctx_ = nullptr; |
88 | | #elif defined(ROCKSDB_ZSTD_CUSTOM_MEM) |
89 | | zstd_ctx_ = |
90 | | ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides()); |
91 | | #else // ZSTD && !ROCKSDB_ZSTD_CUSTOM_MEM |
92 | | zstd_ctx_ = ZSTD_createDCtx(); |
93 | | #endif |
94 | 0 | cache_idx_ = -1; |
95 | 0 | } |
96 | 0 | } |
97 | 0 | void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) { |
98 | 0 | zstd_ctx_ = o.zstd_ctx_; |
99 | 0 | cache_idx_ = idx; |
100 | 0 | } |
101 | 64 | ~ZSTDUncompressCachedData() { |
102 | | #if defined(ZSTD) |
103 | | if (zstd_ctx_ != nullptr && cache_idx_ == -1) { |
104 | | ZSTD_freeDCtx(zstd_ctx_); |
105 | | } |
106 | | #endif // ZSTD |
107 | 64 | } |
108 | | |
109 | | private: |
110 | | ZSTDNativeContext zstd_ctx_ = nullptr; |
111 | | int64_t cache_idx_ = -1; // -1 means this instance owns the context |
112 | | }; |
113 | | } // namespace ROCKSDB_NAMESPACE |
114 | | |
115 | | #if defined(XPRESS) |
116 | | #include "port/xpress.h" |
117 | | #endif |
118 | | |
119 | | namespace ROCKSDB_NAMESPACE { |
120 | | |
121 | | class FailureDecompressor : public Decompressor { |
122 | | public: |
123 | 0 | explicit FailureDecompressor(Status&& status) : status_(std::move(status)) { |
124 | 0 | assert(!status_.ok()); |
125 | 0 | } |
126 | 0 | ~FailureDecompressor() override { status_.PermitUncheckedError(); } |
127 | | |
128 | 0 | const char* Name() const override { return "FailureDecompressor"; } |
129 | | |
130 | 0 | Status ExtractUncompressedSize(Args& /*args*/) override { return status_; } |
131 | | |
132 | | Status DecompressBlock(const Args& /*args*/, |
133 | 0 | char* /*uncompressed_output*/) override { |
134 | 0 | return status_; |
135 | 0 | } |
136 | | |
137 | | protected: |
138 | | Status status_; |
139 | | }; |
140 | | |
141 | | // Owns a decompression dictionary, and associated Decompressor, for storing |
142 | | // in the block cache. |
143 | | // |
144 | | // Justification: for a "processed" dictionary to be saved in block cache, we |
145 | | // also need a reference to the decompressor that processed it, to ensure it |
146 | | // is recognized properly. At that point, we might as well have the dictionary |
147 | | // part of the decompressor identity and track an associated decompressor along |
148 | | // with a decompression dictionary in the block cache, and the decompressor |
149 | | // hides potential details of processing the dictionary. |
150 | | struct DecompressorDict { |
151 | | // Block containing the data for the compression dictionary in case the |
152 | | // constructor that takes a string parameter is used. |
153 | | std::string dict_str_; |
154 | | |
155 | | // Block containing the data for the compression dictionary in case the |
156 | | // constructor that takes a Slice parameter is used and the passed in |
157 | | // CacheAllocationPtr is not nullptr. |
158 | | CacheAllocationPtr dict_allocation_; |
159 | | |
160 | | // A Decompressor referencing and using the dictionary owned by this. |
161 | | std::unique_ptr<Decompressor> decompressor_; |
162 | | |
163 | | // Approximate owned memory usage |
164 | | size_t memory_usage_; |
165 | | |
166 | | DecompressorDict(std::string&& dict, Decompressor& from_decompressor) |
167 | 0 | : dict_str_(std::move(dict)) { |
168 | 0 | Populate(from_decompressor, dict_str_); |
169 | 0 | } |
170 | | |
171 | | DecompressorDict(Slice slice, CacheAllocationPtr&& allocation, |
172 | | Decompressor& from_decompressor) |
173 | 0 | : dict_allocation_(std::move(allocation)) { |
174 | 0 | Populate(from_decompressor, slice); |
175 | 0 | } |
176 | | |
177 | | DecompressorDict(DecompressorDict&& rhs) noexcept |
178 | | : dict_str_(std::move(rhs.dict_str_)), |
179 | | dict_allocation_(std::move(rhs.dict_allocation_)), |
180 | | decompressor_(std::move(rhs.decompressor_)), |
181 | 0 | memory_usage_(std::move(rhs.memory_usage_)) {} |
182 | | |
183 | 0 | DecompressorDict& operator=(DecompressorDict&& rhs) noexcept { |
184 | 0 | if (this == &rhs) { |
185 | 0 | return *this; |
186 | 0 | } |
187 | 0 | dict_str_ = std::move(rhs.dict_str_); |
188 | 0 | dict_allocation_ = std::move(rhs.dict_allocation_); |
189 | 0 | decompressor_ = std::move(rhs.decompressor_); |
190 | 0 | return *this; |
191 | 0 | } |
192 | | // Disable copy |
193 | | DecompressorDict(const DecompressorDict&) = delete; |
194 | | DecompressorDict& operator=(const DecompressorDict&) = delete; |
195 | | |
196 | | // The object is self-contained if the string constructor is used, or the |
197 | | // Slice constructor is invoked with a non-null allocation. Otherwise, it |
198 | | // is the caller's responsibility to ensure that the underlying storage |
199 | | // outlives this object. |
200 | 0 | bool own_bytes() const { return !dict_str_.empty() || dict_allocation_; } |
201 | | |
202 | 0 | const Slice& GetRawDict() const { return decompressor_->GetSerializedDict(); } |
203 | | |
204 | | // For TypedCacheInterface |
205 | 0 | const Slice& ContentSlice() const { return GetRawDict(); } |
206 | | static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock; |
207 | | static constexpr BlockType kBlockType = BlockType::kCompressionDictionary; |
208 | | |
209 | 0 | size_t ApproximateMemoryUsage() const { return memory_usage_; } |
210 | | |
211 | | private: |
212 | | void Populate(Decompressor& from_decompressor, Slice dict); |
213 | | }; |
214 | | |
215 | | // Holds dictionary and related data, like ZSTD's digested compression |
216 | | // dictionary. |
217 | | struct CompressionDict { |
218 | | #ifdef ZSTD |
219 | | ZSTD_CDict* zstd_cdict_ = nullptr; |
220 | | #endif // ZSTD |
221 | | std::string dict_; |
222 | | |
223 | | public: |
224 | 0 | CompressionDict() = default; |
225 | 0 | CompressionDict(std::string&& dict, CompressionType type, int level) { |
226 | 0 | dict_ = std::move(dict); |
227 | | #ifdef ZSTD |
228 | | zstd_cdict_ = nullptr; |
229 | | if (!dict_.empty() && type == kZSTD) { |
230 | | if (level == CompressionOptions::kDefaultCompressionLevel) { |
231 | | // NB: ZSTD_CLEVEL_DEFAULT is historically == 3 |
232 | | level = ZSTD_CLEVEL_DEFAULT; |
233 | | } |
234 | | // Should be safe (but slower) if below call fails as we'll use the |
235 | | // raw dictionary to compress. |
236 | | zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level); |
237 | | assert(zstd_cdict_ != nullptr); |
238 | | } |
239 | | #else |
240 | 0 | (void)type; |
241 | 0 | (void)level; |
242 | 0 | #endif // ZSTD |
243 | 0 | } |
244 | | |
245 | 0 | CompressionDict(CompressionDict&& other) { |
246 | | #ifdef ZSTD |
247 | | zstd_cdict_ = other.zstd_cdict_; |
248 | | other.zstd_cdict_ = nullptr; |
249 | | #endif // ZSTD |
250 | 0 | dict_ = std::move(other.dict_); |
251 | 0 | } |
252 | 0 | CompressionDict& operator=(CompressionDict&& other) { |
253 | 0 | if (this == &other) { |
254 | 0 | return *this; |
255 | 0 | } |
256 | 0 | #ifdef ZSTD |
257 | 0 | zstd_cdict_ = other.zstd_cdict_; |
258 | 0 | other.zstd_cdict_ = nullptr; |
259 | 0 | #endif // ZSTD |
260 | 0 | dict_ = std::move(other.dict_); |
261 | 0 | return *this; |
262 | 0 | } |
263 | | |
264 | 0 | ~CompressionDict() { |
265 | | #ifdef ZSTD |
266 | | size_t res = 0; |
267 | | if (zstd_cdict_ != nullptr) { |
268 | | res = ZSTD_freeCDict(zstd_cdict_); |
269 | | } |
270 | | assert(res == 0); // Last I checked they can't fail |
271 | | (void)res; // prevent unused var warning |
272 | | #endif // ZSTD |
273 | 0 | } |
274 | | |
275 | | #ifdef ZSTD |
276 | | const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; } |
277 | | #endif // ZSTD |
278 | | |
279 | 0 | Slice GetRawDict() const { return dict_; } |
280 | 0 | bool empty() const { return dict_.empty(); } |
281 | | |
282 | 0 | static const CompressionDict& GetEmptyDict() { |
283 | 0 | static CompressionDict empty_dict{}; |
284 | 0 | return empty_dict; |
285 | 0 | } |
286 | | |
287 | | // Disable copy |
288 | | CompressionDict(const CompressionDict&) = delete; |
289 | | CompressionDict& operator=(const CompressionDict&) = delete; |
290 | | }; |
291 | | |
292 | | class CompressionContext : public Compressor::WorkingArea { |
293 | | private: |
294 | | #ifdef ZSTD |
295 | | ZSTD_CCtx* zstd_ctx_ = nullptr; |
296 | | |
297 | | ZSTD_CCtx* CreateZSTDContext() { |
298 | | #ifdef ROCKSDB_ZSTD_CUSTOM_MEM |
299 | | return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides()); |
300 | | #else // ROCKSDB_ZSTD_CUSTOM_MEM |
301 | | return ZSTD_createCCtx(); |
302 | | #endif // ROCKSDB_ZSTD_CUSTOM_MEM |
303 | | } |
304 | | |
305 | | public: |
306 | | // callable inside ZSTD_Compress |
307 | | ZSTD_CCtx* ZSTDPreallocCtx() const { |
308 | | assert(zstd_ctx_ != nullptr); |
309 | | return zstd_ctx_; |
310 | | } |
311 | | |
312 | | private: |
313 | | #endif // ZSTD |
314 | | |
315 | 0 | void CreateNativeContext(CompressionType type, int level, bool checksum) { |
316 | 0 | #ifdef ZSTD |
317 | 0 | if (type == kZSTD) { |
318 | 0 | zstd_ctx_ = CreateZSTDContext(); |
319 | 0 | if (level == CompressionOptions::kDefaultCompressionLevel) { |
320 | 0 | // NB: ZSTD_CLEVEL_DEFAULT is historically == 3 |
321 | 0 | level = ZSTD_CLEVEL_DEFAULT; |
322 | 0 | } |
323 | 0 | size_t err = |
324 | 0 | ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level); |
325 | 0 | if (ZSTD_isError(err)) { |
326 | 0 | assert(false); |
327 | 0 | ZSTD_freeCCtx(zstd_ctx_); |
328 | 0 | zstd_ctx_ = CreateZSTDContext(); |
329 | 0 | } |
330 | 0 | if (checksum) { |
331 | 0 | err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1); |
332 | 0 | if (ZSTD_isError(err)) { |
333 | 0 | assert(false); |
334 | 0 | ZSTD_freeCCtx(zstd_ctx_); |
335 | 0 | zstd_ctx_ = CreateZSTDContext(); |
336 | 0 | } |
337 | 0 | } |
338 | 0 | } |
339 | 0 | #else |
340 | 0 | (void)type; |
341 | 0 | (void)level; |
342 | 0 | (void)checksum; |
343 | 0 | #endif // ZSTD |
344 | 0 | } |
345 | 0 | void DestroyNativeContext() { |
346 | 0 | #ifdef ZSTD |
347 | 0 | if (zstd_ctx_ != nullptr) { |
348 | 0 | ZSTD_freeCCtx(zstd_ctx_); |
349 | 0 | } |
350 | 0 | #endif // ZSTD |
351 | 0 | } |
352 | | |
353 | | public: |
354 | | explicit CompressionContext(CompressionType type, |
355 | 0 | const CompressionOptions& options) { |
356 | 0 | CreateNativeContext(type, options.level, options.checksum); |
357 | 0 | } |
358 | 0 | ~CompressionContext() { DestroyNativeContext(); } |
359 | | CompressionContext(const CompressionContext&) = delete; |
360 | | CompressionContext& operator=(const CompressionContext&) = delete; |
361 | | }; |
362 | | |
363 | | // This is like a working area, reusable for different dicts, etc. |
364 | | // TODO: refactor / consolidate |
365 | | class UncompressionContext : public Decompressor::WorkingArea { |
366 | | private: |
367 | | CompressionContextCache* ctx_cache_ = nullptr; |
368 | | ZSTDUncompressCachedData uncomp_cached_data_; |
369 | | |
370 | | public: |
371 | 0 | explicit UncompressionContext(CompressionType type) { |
372 | 0 | if (type == kZSTD) { |
373 | 0 | ctx_cache_ = CompressionContextCache::Instance(); |
374 | 0 | uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); |
375 | 0 | } |
376 | 0 | } |
377 | 0 | ~UncompressionContext() { |
378 | 0 | if (uncomp_cached_data_.GetCacheIndex() != -1) { |
379 | 0 | assert(ctx_cache_ != nullptr); |
380 | 0 | ctx_cache_->ReturnCachedZSTDUncompressData( |
381 | 0 | uncomp_cached_data_.GetCacheIndex()); |
382 | 0 | } |
383 | 0 | } |
384 | | UncompressionContext(const UncompressionContext&) = delete; |
385 | | UncompressionContext& operator=(const UncompressionContext&) = delete; |
386 | | |
387 | 0 | ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { |
388 | 0 | return uncomp_cached_data_.Get(); |
389 | 0 | } |
390 | | }; |
391 | | |
392 | 1.17M | inline bool Snappy_Supported() { |
393 | | #ifdef SNAPPY |
394 | | return true; |
395 | | #else |
396 | 1.17M | return false; |
397 | 1.17M | #endif |
398 | 1.17M | } |
399 | | |
400 | 49.7k | inline bool Zlib_Supported() { |
401 | 49.7k | #ifdef ZLIB |
402 | 49.7k | return true; |
403 | | #else |
404 | | return false; |
405 | | #endif |
406 | 49.7k | } |
407 | | |
408 | 49.7k | inline bool BZip2_Supported() { |
409 | 49.7k | #ifdef BZIP2 |
410 | 49.7k | return true; |
411 | | #else |
412 | | return false; |
413 | | #endif |
414 | 49.7k | } |
415 | | |
416 | 99.4k | inline bool LZ4_Supported() { |
417 | | #ifdef LZ4 |
418 | | return true; |
419 | | #else |
420 | 99.4k | return false; |
421 | 99.4k | #endif |
422 | 99.4k | } |
423 | | |
424 | 49.7k | inline bool XPRESS_Supported() { |
425 | | #ifdef XPRESS |
426 | | return true; |
427 | | #else |
428 | 49.7k | return false; |
429 | 49.7k | #endif |
430 | 49.7k | } |
431 | | |
432 | 49.7k | inline bool ZSTD_Supported() { |
433 | | #ifdef ZSTD |
434 | | // NB: ZSTD format is finalized since version 0.8.0. See ZSTD_VERSION_NUMBER |
435 | | // check above. |
436 | | return true; |
437 | | #else |
438 | 49.7k | return false; |
439 | 49.7k | #endif |
440 | 49.7k | } |
441 | | |
442 | 0 | inline bool ZSTD_Streaming_Supported() { |
443 | | #if defined(ZSTD) |
444 | | return true; |
445 | | #else |
446 | 0 | return false; |
447 | 0 | #endif |
448 | 0 | } |
449 | | |
450 | | inline bool StreamingCompressionTypeSupported( |
451 | 57.3k | CompressionType compression_type) { |
452 | 57.3k | switch (compression_type) { |
453 | 57.3k | case kNoCompression: |
454 | 57.3k | return true; |
455 | 0 | case kZSTD: |
456 | 0 | return ZSTD_Streaming_Supported(); |
457 | 0 | default: |
458 | 0 | return false; |
459 | 57.3k | } |
460 | 57.3k | } |
461 | | |
462 | 6.86M | inline bool CompressionTypeSupported(CompressionType compression_type) { |
463 | 6.86M | switch (compression_type) { |
464 | 195k | case kNoCompression: |
465 | 195k | return true; |
466 | 49.7k | case kSnappyCompression: |
467 | 49.7k | return Snappy_Supported(); |
468 | 49.7k | case kZlibCompression: |
469 | 49.7k | return Zlib_Supported(); |
470 | 49.7k | case kBZip2Compression: |
471 | 49.7k | return BZip2_Supported(); |
472 | 49.7k | case kLZ4Compression: |
473 | 49.7k | return LZ4_Supported(); |
474 | 49.7k | case kLZ4HCCompression: |
475 | 49.7k | return LZ4_Supported(); |
476 | 49.7k | case kXpressCompression: |
477 | 49.7k | return XPRESS_Supported(); |
478 | 49.7k | case kZSTD: |
479 | 49.7k | return ZSTD_Supported(); |
480 | 6.31M | default: // Including custom compression types |
481 | 6.31M | return false; |
482 | 6.86M | } |
483 | 6.86M | } |
484 | | |
485 | 0 | inline bool DictCompressionTypeSupported(CompressionType compression_type) { |
486 | 0 | switch (compression_type) { |
487 | 0 | case kNoCompression: |
488 | 0 | return false; |
489 | 0 | case kSnappyCompression: |
490 | 0 | return false; |
491 | 0 | case kZlibCompression: |
492 | 0 | return Zlib_Supported(); |
493 | 0 | case kBZip2Compression: |
494 | 0 | return false; |
495 | 0 | case kLZ4Compression: |
496 | 0 | case kLZ4HCCompression: |
497 | | #if LZ4_VERSION_NUMBER >= 10400 // r124+ |
498 | | return LZ4_Supported(); |
499 | | #else |
500 | 0 | return false; |
501 | 0 | #endif |
502 | 0 | case kXpressCompression: |
503 | 0 | return false; |
504 | 0 | case kZSTD: |
505 | | // NB: dictionary supported since 0.5.0. See ZSTD_VERSION_NUMBER check |
506 | | // above. |
507 | 0 | return ZSTD_Supported(); |
508 | 0 | default: // Including custom compression types |
509 | 0 | return false; |
510 | 0 | } |
511 | 0 | } |
512 | | |
513 | | // WART: does not match OptionsHelper::compression_type_string_map |
514 | | std::string CompressionTypeToString(CompressionType compression_type); |
515 | | |
516 | | // WART: does not match OptionsHelper::compression_type_string_map |
517 | | CompressionType CompressionTypeFromString(std::string compression_type_str); |
518 | | |
519 | | std::string CompressionOptionsToString( |
520 | | const CompressionOptions& compression_options); |
521 | | |
522 | 0 | inline bool ZSTD_TrainDictionarySupported() { |
523 | | #ifdef ZSTD |
524 | | // NB: Dictionary trainer is available since v0.6.1 for static linking, but |
525 | | // not available for dynamic linking until v1.1.3. See ZSTD_VERSION_NUMBER |
526 | | // check above. |
527 | | return true; |
528 | | #else |
529 | 0 | return false; |
530 | 0 | #endif |
531 | 0 | } |
532 | | |
533 | 0 | inline bool ZSTD_FinalizeDictionarySupported() { |
534 | | #ifdef ROCKSDB_ZDICT_FINALIZE |
535 | | return true; |
536 | | #else |
537 | 0 | return false; |
538 | 0 | #endif |
539 | 0 | } |
540 | | |
541 | | // The new compression APIs intentionally make it difficult to generate |
542 | | // compressed data larger than the original. (It is better to store the |
543 | | // uncompressed version in that case.) For legacy cases that must store |
544 | | // compressed data even when larger than the uncompressed, this is a convenient |
545 | | // wrapper to support that, with a compressor from BuiltinCompressionManager and |
546 | | // a GrowableBuffer. |
547 | | Status LegacyForceBuiltinCompression( |
548 | | Compressor& builtin_compressor, |
549 | | Compressor::ManagedWorkingArea* working_area, Slice from, |
550 | | GrowableBuffer* to); |
551 | | |
552 | | // Records the compression type for subsequent WAL records. |
553 | | class CompressionTypeRecord { |
554 | | public: |
555 | | explicit CompressionTypeRecord(CompressionType compression_type) |
556 | 0 | : compression_type_(compression_type) {} |
557 | | |
558 | 0 | CompressionType GetCompressionType() const { return compression_type_; } |
559 | | |
560 | 0 | inline void EncodeTo(std::string* dst) const { |
561 | 0 | assert(dst != nullptr); |
562 | 0 | PutFixed32(dst, compression_type_); |
563 | 0 | } |
564 | | |
565 | 0 | inline Status DecodeFrom(Slice* src) { |
566 | 0 | constexpr char class_name[] = "CompressionTypeRecord"; |
567 | |
|
568 | 0 | uint32_t val; |
569 | 0 | if (!GetFixed32(src, &val)) { |
570 | 0 | return Status::Corruption(class_name, |
571 | 0 | "Error decoding WAL compression type"); |
572 | 0 | } |
573 | 0 | CompressionType compression_type = static_cast<CompressionType>(val); |
574 | 0 | if (!StreamingCompressionTypeSupported(compression_type)) { |
575 | 0 | return Status::Corruption(class_name, |
576 | 0 | "WAL compression type not supported"); |
577 | 0 | } |
578 | 0 | compression_type_ = compression_type; |
579 | 0 | return Status::OK(); |
580 | 0 | } |
581 | | |
582 | 0 | inline std::string DebugString() const { |
583 | 0 | return "compression_type: " + CompressionTypeToString(compression_type_); |
584 | 0 | } |
585 | | |
586 | | private: |
587 | | CompressionType compression_type_; |
588 | | }; |
589 | | |
590 | | // Base class to implement compression for a stream of buffers. |
591 | | // Instantiate an implementation of the class using Create() with the |
592 | | // compression type and use Compress() repeatedly. |
593 | | // The output buffer needs to be at least max_output_len. |
594 | | // Call Reset() in between frame boundaries or in case of an error. |
595 | | // NOTE: This class is not thread safe. |
596 | | class StreamingCompress { |
597 | | public: |
598 | | StreamingCompress(CompressionType compression_type, |
599 | | const CompressionOptions& opts, |
600 | | uint32_t compress_format_version, size_t max_output_len) |
601 | 0 | : compression_type_(compression_type), |
602 | 0 | opts_(opts), |
603 | 0 | compress_format_version_(compress_format_version), |
604 | 0 | max_output_len_(max_output_len) {} |
605 | 0 | virtual ~StreamingCompress() = default; |
606 | | // compress should be called repeatedly with the same input till the method |
607 | | // returns 0 |
608 | | // Parameters: |
609 | | // input - buffer to compress |
610 | | // input_size - size of input buffer |
611 | | // output - compressed buffer allocated by caller, should be at least |
612 | | // max_output_len |
613 | | // output_size - size of the output buffer |
614 | | // Returns -1 for errors, the remaining size of the input buffer that needs |
615 | | // to be compressed |
616 | | virtual int Compress(const char* input, size_t input_size, char* output, |
617 | | size_t* output_pos) = 0; |
618 | | // static method to create object of a class inherited from |
619 | | // StreamingCompress based on the actual compression type. |
620 | | static std::unique_ptr<StreamingCompress> Create( |
621 | | CompressionType compression_type, const CompressionOptions& opts, |
622 | | uint32_t compress_format_version, size_t max_output_len); |
623 | | virtual void Reset() = 0; |
624 | | |
625 | | protected: |
626 | | const CompressionType compression_type_; |
627 | | const CompressionOptions opts_; |
628 | | const uint32_t compress_format_version_; |
629 | | const size_t max_output_len_; |
630 | | }; |
631 | | |
632 | | // Base class to uncompress a stream of compressed buffers. |
633 | | // Instantiate an implementation of the class using Create() with the |
634 | | // compression type and use Uncompress() repeatedly. |
635 | | // The output buffer needs to be at least max_output_len. |
636 | | // Call Reset() in between frame boundaries or in case of an error. |
637 | | // NOTE: This class is not thread safe. |
638 | | class StreamingUncompress { |
639 | | public: |
640 | | StreamingUncompress(CompressionType compression_type, |
641 | | uint32_t compress_format_version, size_t max_output_len) |
642 | 0 | : compression_type_(compression_type), |
643 | 0 | compress_format_version_(compress_format_version), |
644 | 0 | max_output_len_(max_output_len) {} |
645 | 0 | virtual ~StreamingUncompress() = default; |
646 | | // Uncompress can be called repeatedly to progressively process the same |
647 | | // input buffer, or can be called with a new input buffer. When the input |
648 | | // buffer is not fully consumed, the return value is > 0 or output_size |
649 | | // == max_output_len. When calling uncompress to continue processing the |
650 | | // same input buffer, the input argument should be nullptr. |
651 | | // Parameters: |
652 | | // input - buffer to uncompress |
653 | | // input_size - size of input buffer |
654 | | // output - uncompressed buffer allocated by caller, should be at least |
655 | | // max_output_len |
656 | | // output_size - size of the output buffer |
657 | | // Returns -1 for errors, remaining input to be processed otherwise. |
658 | | virtual int Uncompress(const char* input, size_t input_size, char* output, |
659 | | size_t* output_pos) = 0; |
660 | | static std::unique_ptr<StreamingUncompress> Create( |
661 | | CompressionType compression_type, uint32_t compress_format_version, |
662 | | size_t max_output_len); |
663 | | virtual void Reset() = 0; |
664 | | |
665 | | protected: |
666 | | CompressionType compression_type_; |
667 | | uint32_t compress_format_version_; |
668 | | size_t max_output_len_; |
669 | | }; |
670 | | |
671 | | class ZSTDStreamingCompress final : public StreamingCompress { |
672 | | public: |
673 | | explicit ZSTDStreamingCompress(const CompressionOptions& opts, |
674 | | uint32_t compress_format_version, |
675 | | size_t max_output_len) |
676 | 0 | : StreamingCompress(kZSTD, opts, compress_format_version, |
677 | 0 | max_output_len) { |
678 | | #ifdef ZSTD |
679 | | cctx_ = ZSTD_createCCtx(); |
680 | | // Each compressed frame will have a checksum |
681 | | ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1); |
682 | | assert(cctx_ != nullptr); |
683 | | input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; |
684 | | #endif |
685 | 0 | } |
686 | 0 | ~ZSTDStreamingCompress() override { |
687 | 0 | #ifdef ZSTD |
688 | 0 | ZSTD_freeCCtx(cctx_); |
689 | 0 | #endif |
690 | 0 | } |
691 | | int Compress(const char* input, size_t input_size, char* output, |
692 | | size_t* output_pos) override; |
693 | | void Reset() override; |
694 | | #ifdef ZSTD |
695 | | ZSTD_CCtx* cctx_; |
696 | | ZSTD_inBuffer input_buffer_; |
697 | | #endif |
698 | | }; |
699 | | |
700 | | class ZSTDStreamingUncompress final : public StreamingUncompress { |
701 | | public: |
702 | | explicit ZSTDStreamingUncompress(uint32_t compress_format_version, |
703 | | size_t max_output_len) |
704 | 0 | : StreamingUncompress(kZSTD, compress_format_version, max_output_len) { |
705 | | #ifdef ZSTD |
706 | | dctx_ = ZSTD_createDCtx(); |
707 | | assert(dctx_ != nullptr); |
708 | | input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; |
709 | | #endif |
710 | 0 | } |
711 | 0 | ~ZSTDStreamingUncompress() override { |
712 | 0 | #ifdef ZSTD |
713 | 0 | ZSTD_freeDCtx(dctx_); |
714 | 0 | #endif |
715 | 0 | } |
716 | | int Uncompress(const char* input, size_t input_size, char* output, |
717 | | size_t* output_size) override; |
718 | | void Reset() override; |
719 | | |
720 | | private: |
721 | | #ifdef ZSTD |
722 | | ZSTD_DCtx* dctx_; |
723 | | ZSTD_inBuffer input_buffer_; |
724 | | #endif |
725 | | }; |
726 | | |
727 | | } // namespace ROCKSDB_NAMESPACE |