1
#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h"
2

            
3
#include "source/common/buffer/buffer_impl.h"
4

            
5
namespace Envoy {
6
namespace Compression {
7
namespace Zstd {
8
namespace Compressor {
9

            
10
ZstdCompressorImplBase::ZstdCompressorImplBase(uint32_t compression_level, bool enable_checksum,
11
                                               uint32_t strategy, uint32_t chunk_size)
12
342
    : Common::Base(chunk_size), cctx_(ZSTD_createCCtx(), &ZSTD_freeCCtx),
13
342
      compression_level_(compression_level) {
14
342
  size_t result;
15
342
  result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_checksumFlag, enable_checksum);
16
342
  RELEASE_ASSERT(!ZSTD_isError(result), "");
17

            
18
342
  result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_strategy, strategy);
19
342
  RELEASE_ASSERT(!ZSTD_isError(result), "");
20
342
}
21

            
22
void ZstdCompressorImplBase::compress(Buffer::Instance& buffer,
23
3733
                                      Envoy::Compression::Compressor::State state) {
24
3733
  compressPreprocess(buffer, state);
25

            
26
3733
  Buffer::OwnedImpl accumulation_buffer;
27
3751
  for (const Buffer::RawSlice& input_slice : buffer.getRawSlices()) {
28
3077
    if (input_slice.len_ > 0) {
29
3077
      compressProcess(buffer, input_slice, accumulation_buffer);
30
3077
      buffer.drain(input_slice.len_);
31
3077
    }
32
3077
  }
33

            
34
3733
  compressPostprocess(accumulation_buffer);
35

            
36
3733
  ASSERT(buffer.length() == 0);
37
3733
  buffer.move(accumulation_buffer);
38

            
39
3733
  if (state == Envoy::Compression::Compressor::State::Finish) {
40
341
    process(buffer, ZSTD_e_end);
41
341
  }
42
3733
}
43

            
44
3418
void ZstdCompressorImplBase::process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode) {
45
3418
  bool finished;
46
9928
  do {
47
9928
    const size_t remaining = ZSTD_compressStream2(cctx_.get(), &output_, &input_, mode);
48
9928
    getOutput(output_buffer);
49
    // If we're on the last chunk we're finished when zstd returns 0,
50
    // which means its consumed all the input AND finished the frame.
51
    // Otherwise, we're finished when we've consumed all the input.
52
9928
    finished = (ZSTD_e_end == mode) ? (remaining == 0) : (input_.pos == input_.size);
53
9928
  } while (!finished);
54
3418
}
55

            
56
} // namespace Compressor
57
} // namespace Zstd
58
} // namespace Compression
59
} // namespace Envoy