InternalHiveSplit.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.hive.HiveSplit.BucketConversion;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.airlift.slice.SizeOf.sizeOfCharArray;
import static io.airlift.slice.SizeOf.sizeOfObjectArray;
import static java.util.Objects.requireNonNull;

@NotThreadSafe
public class InternalHiveSplit
{
    // Overhead of ImmutableList and ImmutableMap is not accounted for because of its complexity.
    private static final int INSTANCE_SIZE = ClassLayout.parseClass(InternalHiveSplit.class).instanceSize();

    private static final int HOST_ADDRESS_INSTANCE_SIZE = ClassLayout.parseClass(HostAddress.class).instanceSize() +
            ClassLayout.parseClass(String.class).instanceSize();

    private final String path;
    private final boolean isRelative;
    private final long end;
    private final long fileSize;
    private final long fileModifiedTime;

    // encode the hive blocks as an array of longs and list of list of addresses to save memory
    //if all blockAddress lists are empty, store only the empty list
    private final long[] blockEndOffsets;
    private final List<List<HostAddress>> blockAddresses;

    // stored as ints rather than Optionals to save memory
    // -1 indicates an absent value
    private final int readBucketNumber;
    private final int tableBucketNumber;

    private final boolean splittable;
    private final NodeSelectionStrategy nodeSelectionStrategy;
    private final boolean s3SelectPushdownEnabled;
    private final HiveSplitPartitionInfo partitionInfo;
    private final Optional<byte[]> extraFileInfo;
    private final Optional<EncryptionInformation> encryptionInformation;
    private final Map<String, String> customSplitInfo;

    private long start;
    private int currentBlockIndex;

    public InternalHiveSplit(
            String path,
            boolean isRelative,
            long start,
            long end,
            long fileSize,
            long fileModifiedTime,
            List<InternalHiveBlock> blocks,
            OptionalInt readBucketNumber,
            OptionalInt tableBucketNumber,
            boolean splittable,
            NodeSelectionStrategy nodeSelectionStrategy,
            boolean s3SelectPushdownEnabled,
            HiveSplitPartitionInfo partitionInfo,
            Optional<byte[]> extraFileInfo,
            Optional<EncryptionInformation> encryptionInformation,
            Map<String, String> customSplitInfo)
    {
        checkArgument(start >= 0, "start must be non-negative");
        checkArgument(end >= 0, "end must be non-negative");
        checkArgument(fileSize >= 0, "fileSize must be non-negative");
        checkArgument(fileModifiedTime >= 0, "fileModifiedTime must be non-negative");
        requireNonNull(path, "path is null");
        requireNonNull(readBucketNumber, "readBucketNumber is null");
        requireNonNull(tableBucketNumber, "tableBucketNumber is null");
        requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
        requireNonNull(partitionInfo, "partitionInfo is null");
        requireNonNull(extraFileInfo, "extraFileInfo is null");
        requireNonNull(encryptionInformation, "encryptionInformation is null");

        this.path = path;
        this.isRelative = isRelative;
        this.start = start;
        this.end = end;
        this.fileSize = fileSize;
        this.fileModifiedTime = fileModifiedTime;
        this.readBucketNumber = readBucketNumber.orElse(-1);
        this.tableBucketNumber = tableBucketNumber.orElse(-1);
        this.splittable = splittable;
        this.nodeSelectionStrategy = nodeSelectionStrategy;
        this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
        this.partitionInfo = partitionInfo;
        this.extraFileInfo = extraFileInfo;
        this.customSplitInfo = ImmutableMap
                .copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));

        ImmutableList.Builder<List<HostAddress>> addressesBuilder = ImmutableList.builder();
        blockEndOffsets = new long[blocks.size()];
        boolean allAddressesEmpty = true;
        for (int i = 0; i < blocks.size(); i++) {
            InternalHiveBlock block = blocks.get(i);
            List<HostAddress> addresses = block.getAddresses();
            allAddressesEmpty = allAddressesEmpty && addresses.isEmpty();
            addressesBuilder.add(addresses);
            blockEndOffsets[i] = block.getEnd();
        }
        blockAddresses = allAddressesEmpty ? ImmutableList.of() : addressesBuilder.build();
        this.encryptionInformation = encryptionInformation;
    }

    public String getPath()
    {
        return isRelative ? partitionInfo.getPath() + path : path;
    }

    public long getStart()
    {
        return start;
    }

    public long getEnd()
    {
        return end;
    }

    public long getFileSize()
    {
        return fileSize;
    }

    public long getFileModifiedTime()
    {
        return fileModifiedTime;
    }

    public boolean isS3SelectPushdownEnabled()
    {
        return s3SelectPushdownEnabled;
    }

    public List<HivePartitionKey> getPartitionKeys()
    {
        return partitionInfo.getPartitionKeys();
    }

    public String getPartitionName()
    {
        return partitionInfo.getPartitionName();
    }

    public OptionalInt getReadBucketNumber()
    {
        return readBucketNumber >= 0 ? OptionalInt.of(readBucketNumber) : OptionalInt.empty();
    }

    public OptionalInt getTableBucketNumber()
    {
        return tableBucketNumber >= 0 ? OptionalInt.of(tableBucketNumber) : OptionalInt.empty();
    }

    public boolean isSplittable()
    {
        return splittable;
    }

    public NodeSelectionStrategy getNodeSelectionStrategy()
    {
        return nodeSelectionStrategy;
    }

    public TableToPartitionMapping getTableToPartitionMapping()
    {
        return partitionInfo.getTableToPartitionMapping();
    }

    public Optional<BucketConversion> getBucketConversion()
    {
        return partitionInfo.getBucketConversion();
    }

    public InternalHiveBlock currentBlock()
    {
        checkState(!isDone(), "All blocks have been consumed");
        List<HostAddress> addresses = blockAddresses.isEmpty() ? ImmutableList.of() : blockAddresses.get(currentBlockIndex);
        return new InternalHiveBlock(blockEndOffsets[currentBlockIndex], addresses);
    }

    public boolean isDone()
    {
        return currentBlockIndex == blockEndOffsets.length;
    }

    public void increaseStart(long value)
    {
        start += value;
        if (start == currentBlock().getEnd()) {
            currentBlockIndex++;
        }
    }

    public HiveSplitPartitionInfo getPartitionInfo()
    {
        return partitionInfo;
    }

    public Optional<byte[]> getExtraFileInfo()
    {
        return extraFileInfo;
    }

    public Optional<EncryptionInformation> getEncryptionInformation()
    {
        return this.encryptionInformation;
    }

    public Map<String, String> getCustomSplitInfo()
    {
        return customSplitInfo;
    }

    public void reset()
    {
        currentBlockIndex = 0;
        start = 0;
    }

    /**
     * Estimate the size of this InternalHiveSplit. Note that
     * PartitionInfo is a shared object, so its memory usage is
     * tracked separately in HiveSplitSource.
     */
    public int getEstimatedSizeInBytes()
    {
        int result = INSTANCE_SIZE;
        result += sizeOfCharArray(path.length());
        result += sizeOf(blockEndOffsets);
        if (!blockAddresses.isEmpty()) {
            result += sizeOfObjectArray(blockAddresses.size());
            for (List<HostAddress> addresses : blockAddresses) {
                result += sizeOfObjectArray(addresses.size());
                for (HostAddress address : addresses) {
                    result += HOST_ADDRESS_INSTANCE_SIZE + address.getHostText().length() * Character.BYTES;
                }
            }
        }
        if (extraFileInfo.isPresent()) {
            result += sizeOf(extraFileInfo.get());
        }
        return result;
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("path", path)
                .add("start", start)
                .add("end", end)
                .add("fileSize", fileSize)
                .toString();
    }

    public static class InternalHiveBlock
    {
        private final long end;
        private final List<HostAddress> addresses;

        public InternalHiveBlock(long end, List<HostAddress> addresses)
        {
            checkArgument(end >= 0, "block end must be >= 0");
            this.end = end;
            this.addresses = ImmutableList.copyOf(addresses);
        }

        public long getEnd()
        {
            return end;
        }

        public List<HostAddress> getAddresses()
        {
            return addresses;
        }
    }
}