Line data Source code
1 : #include "source/extensions/compression/zstd/compressor/zstd_compressor_impl.h" 2 : 3 : #include "source/common/buffer/buffer_impl.h" 4 : 5 : namespace Envoy { 6 : namespace Extensions { 7 : namespace Compression { 8 : namespace Zstd { 9 : namespace Compressor { 10 : 11 : ZstdCompressorImpl::ZstdCompressorImpl(uint32_t compression_level, bool enable_checksum, 12 : uint32_t strategy, const ZstdCDictManagerPtr& cdict_manager, 13 : uint32_t chunk_size) 14 : : Common::Base(chunk_size), cctx_(ZSTD_createCCtx(), &ZSTD_freeCCtx), 15 0 : cdict_manager_(cdict_manager), compression_level_(compression_level) { 16 0 : size_t result; 17 0 : result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_checksumFlag, enable_checksum); 18 0 : RELEASE_ASSERT(!ZSTD_isError(result), ""); 19 : 20 0 : result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_strategy, strategy); 21 0 : RELEASE_ASSERT(!ZSTD_isError(result), ""); 22 : 23 0 : if (cdict_manager_) { 24 0 : ZSTD_CDict* cdict = cdict_manager_->getFirstDictionary(); 25 0 : result = ZSTD_CCtx_refCDict(cctx_.get(), cdict); 26 0 : } else { 27 0 : result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_compressionLevel, compression_level_); 28 0 : } 29 0 : RELEASE_ASSERT(!ZSTD_isError(result), ""); 30 0 : } 31 : 32 : void ZstdCompressorImpl::compress(Buffer::Instance& buffer, 33 0 : Envoy::Compression::Compressor::State state) { 34 0 : Buffer::OwnedImpl accumulation_buffer; 35 0 : for (const Buffer::RawSlice& input_slice : buffer.getRawSlices()) { 36 0 : if (input_slice.len_ > 0) { 37 0 : setInput(input_slice); 38 0 : process(accumulation_buffer, ZSTD_e_continue); 39 0 : buffer.drain(input_slice.len_); 40 0 : } 41 0 : } 42 : 43 0 : ASSERT(buffer.length() == 0); 44 0 : buffer.move(accumulation_buffer); 45 : 46 0 : if (state == Envoy::Compression::Compressor::State::Finish) { 47 0 : process(buffer, ZSTD_e_end); 48 0 : } 49 0 : } 50 : 51 0 : void ZstdCompressorImpl::process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode) { 52 0 : bool finished; 53 0 : do { 54 0 : const size_t remaining = ZSTD_compressStream2(cctx_.get(), &output_, &input_, mode); 55 0 : getOutput(output_buffer); 56 : // If we're on the last chunk we're finished when zstd returns 0, 57 : // which means its consumed all the input AND finished the frame. 58 : // Otherwise, we're finished when we've consumed all the input. 59 0 : finished = (ZSTD_e_end == mode) ? (remaining == 0) : (input_.pos == input_.size); 60 0 : } while (!finished); 61 0 : } 62 : 63 : } // namespace Compressor 64 : } // namespace Zstd 65 : } // namespace Compression 66 : } // namespace Extensions 67 : } // namespace Envoy