TestingPagesSerdeFactory.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.buffer;

import com.facebook.presto.CompressionCodec;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.spi.page.PageCompressor;
import com.facebook.presto.spi.page.PageDecompressor;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.spiller.SpillCipher;
import io.airlift.compress.Compressor;
import io.airlift.compress.Decompressor;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;

import java.nio.ByteBuffer;
import java.util.Optional;

public class TestingPagesSerdeFactory
        extends PagesSerdeFactory
{
    public TestingPagesSerdeFactory(CompressionCodec compressionCodec)
    {
        // compression should be enabled in as many tests as possible
        super(new BlockEncodingManager(), compressionCodec);
    }

    public static PagesSerde testingPagesSerde()
    {
        return testingPagesSerde(false);
    }

    public static PagesSerde testingPagesSerde(boolean checksumEnabled)
    {
        return new SynchronizedPagesSerde(
                new BlockEncodingManager(),
                Optional.of(new PageCompressor()
                {
                    Compressor compressor = new Lz4Compressor();
                    @Override
                    public int maxCompressedLength(int uncompressedSize)
                    {
                        return compressor.maxCompressedLength(uncompressedSize);
                    }

                    @Override
                    public int compress(
                            byte[] input,
                            int inputOffset,
                            int inputLength,
                            byte[] output,
                            int outputOffset,
                            int maxOutputLength)
                    {
                        return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
                    }

                    @Override
                    public void compress(ByteBuffer input, ByteBuffer output)
                    {
                        compressor.compress(input, output);
                    }
                }),
                Optional.of(new PageDecompressor()
                {
                    Decompressor decompressor = new Lz4Decompressor();
                    @Override
                    public int decompress(
                            byte[] input,
                            int inputOffset,
                            int inputLength,
                            byte[] output,
                            int outputOffset,
                            int maxOutputLength)
                    {
                        return decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
                    }

                    @Override
                    public void decompress(ByteBuffer input, ByteBuffer output)
                    {
                        decompressor.decompress(input, output);
                    }
                }),
                Optional.empty(), checksumEnabled);
    }

    private static class SynchronizedPagesSerde
            extends PagesSerde
    {
        public SynchronizedPagesSerde(BlockEncodingSerde blockEncodingSerde, Optional<PageCompressor> compressor, Optional<PageDecompressor> decompressor, Optional<SpillCipher> spillCipher, boolean checksumEnabled)
        {
            super(blockEncodingSerde, compressor, decompressor, spillCipher, checksumEnabled);
        }

        @Override
        public synchronized SerializedPage serialize(Page page)
        {
            return super.serialize(page);
        }

        @Override
        public synchronized Page deserialize(SerializedPage serializedPage)
        {
            return super.deserialize(serializedPage);
        }
    }
}