TestBackgroundHiveSplitLoader.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType;
import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.HiveUtil.getRegularColumnHandles;
import static com.facebook.presto.hive.StoragePartitionLoader.BucketSplitInfo.createBucketSplitInfo;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;

public class TestBackgroundHiveSplitLoader
{
    private static final int BUCKET_COUNT = 2;

    private static final String SAMPLE_PATH = "hdfs://VOL1:9000/db_name/table_name/000000_0";
    private static final String SAMPLE_PATH_FILTERED = "hdfs://VOL1:9000/db_name/table_name/000000_1";

    private static final Path RETURNED_PATH = new Path(SAMPLE_PATH);
    private static final Path FILTERED_PATH = new Path(SAMPLE_PATH_FILTERED);

    private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-%s"));

    private static final Domain RETURNED_PATH_DOMAIN = Domain.singleValue(VARCHAR, utf8Slice(RETURNED_PATH.toString()));
    private static final Map<Integer, Domain> RETURNED_PATH_CONSTRAINT = ImmutableMap.of(PATH_COLUMN_INDEX, RETURNED_PATH_DOMAIN);

    private static final List<LocatedFileStatus> TEST_FILES = ImmutableList.of(
            locatedFileStatus(RETURNED_PATH, 0L),
            locatedFileStatus(FILTERED_PATH, 0L));

    private static final String PARTITION_COLUMN_NAME = "partitionColumn";
    private static final List<Column> PARTITION_COLUMNS = ImmutableList.of(
            new Column(PARTITION_COLUMN_NAME, HIVE_INT, Optional.empty(), Optional.empty()));
    private static final List<HiveColumnHandle> BUCKET_COLUMN_HANDLES = ImmutableList.of(
            new HiveColumnHandle("col1", HIVE_INT, INTEGER.getTypeSignature(), 0, ColumnType.REGULAR, Optional.empty(), Optional.empty()));

    private static final Optional<HiveBucketProperty> BUCKET_PROPERTY = Optional.of(
            new HiveBucketProperty(ImmutableList.of("col1"), BUCKET_COUNT, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty()));

    public static final Table SIMPLE_TABLE = table(ImmutableList.of(), Optional.empty());
    private static final Table PARTITIONED_TABLE = table(PARTITION_COLUMNS, BUCKET_PROPERTY);

    @Test
    public void testNoPathFilter()
            throws Exception
    {
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                TEST_FILES,
                ImmutableMap.of());

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        assertEquals(drain(hiveSplitSource).size(), 2);
    }

    @Test
    public void testPathFilter()
            throws Exception
    {
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                TEST_FILES,
                RETURNED_PATH_CONSTRAINT);

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);
        List<String> paths = drain(hiveSplitSource);
        assertEquals(paths.size(), 1);
        assertEquals(paths.get(0), RETURNED_PATH.toString());
    }

    @Test
    public void testPathFilterOneBucketMatchPartitionedTable()
            throws Exception
    {
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                TEST_FILES,
                RETURNED_PATH_CONSTRAINT,
                Optional.of(new HiveBucketFilter(ImmutableSet.of(0, 1))),
                PARTITIONED_TABLE,
                Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKET_COUNT, BUCKET_COUNT)));

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);
        List<String> paths = drain(hiveSplitSource);
        assertEquals(paths.size(), 1);
        assertEquals(paths.get(0), RETURNED_PATH.toString());
    }

    @Test
    public void testPathFilterBucketedPartitionedTable()
            throws Exception
    {
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                TEST_FILES,
                RETURNED_PATH_CONSTRAINT,
                Optional.empty(),
                PARTITIONED_TABLE,
                Optional.of(
                        new HiveBucketHandle(
                                getRegularColumnHandles(PARTITIONED_TABLE),
                                BUCKET_COUNT,
                                BUCKET_COUNT)));

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);
        List<String> paths = drain(hiveSplitSource);
        assertEquals(paths.size(), 1);
        assertEquals(paths.get(0), RETURNED_PATH.toString());
    }

    @Test
    public void testEmptyFileWithNoBlocks()
            throws Exception
    {
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                ImmutableList.of(locatedFileStatusWithNoBlocks(RETURNED_PATH)),
                ImmutableMap.of());

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        List<HiveSplit> splits = drainSplits(hiveSplitSource);
        assertEquals(splits.size(), 1);
        assertEquals(splits.get(0).getFileSplit().getPath(), RETURNED_PATH.toString());
        assertEquals(splits.get(0).getFileSplit().getLength(), 0);
    }

    @Test
    public void testNoHangIfPartitionIsOffline()
    {
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoaderOfflinePartitions();
        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        assertThrows(RuntimeException.class, () -> drain(hiveSplitSource));
        assertThrows(RuntimeException.class, hiveSplitSource::isFinished);
    }

    @Test
    public void testUnsupportedTableFormat()
            throws Exception
    {
        StorageFormat storageFormat = StorageFormat.create("NonExistentSerde", "NonExistentInputFormat", "NonExistentOutputFormat");
        Table unsupportedTable = table(PARTITION_COLUMNS, Optional.empty(), storageFormat);

        String partitionId = PARTITION_COLUMN_NAME + "=Partition_Value";
        List<HivePartitionMetadata> hivePartitionMetadatas =
                ImmutableList.of(
                        new HivePartitionMetadata(
                                new HivePartition(unsupportedTable.getSchemaTableName(), new PartitionNameWithVersion(partitionId, Optional.empty()), ImmutableMap.of()),
                                Optional.of(orcPartition()),
                                TableToPartitionMapping.empty(),
                                Optional.empty(),
                                ImmutableSet.of(),
                                Optional.empty()));

        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                SESSION,
                ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), getMaxInitialSplitSize(SESSION).toBytes())),
                ImmutableMap.of(),
                Optional.empty(),
                unsupportedTable,
                Optional.empty(),
                hivePartitionMetadatas);

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        assertEquals(drainSplits(hiveSplitSource).size(), 1);
    }

    private static Partition orcPartition()
    {
        return new Partition(
                "test_db",
                "test_table",
                PARTITION_COLUMNS.stream().map(x -> "Partition_Value").collect(Collectors.toList()),
                new Storage(
                        fromHiveStorageFormat(ORC),
                        "location",
                        Optional.empty(),
                        true,
                        ImmutableMap.of(),
                        ImmutableMap.of()),
                PARTITION_COLUMNS,
                ImmutableMap.of(),
                Optional.empty(),
                false,
                true,
                0,
                0,
                Optional.empty());
    }

    @Test
    public void testCachingDirectoryLister()
            throws Exception
    {
        testCachingDirectoryLister(
                new CachingDirectoryLister(
                        new HadoopDirectoryLister(),
                        new Duration(5, TimeUnit.MINUTES),
                        new DataSize(100, KILOBYTE),
                        ImmutableList.of("test_dbname.test_table")),
                "test_dbname.test_table");
        testCachingDirectoryLister(
                new CachingDirectoryLister(
                        new HadoopDirectoryLister(),
                        new Duration(5, TimeUnit.MINUTES),
                        new DataSize(100, KILOBYTE),
                        ImmutableList.of("*")),
                "*");
        testCachingDirectoryLister(
                new CachingDirectoryLister(
                        new HadoopDirectoryLister(),
                        new Duration(5, TimeUnit.MINUTES),
                        new DataSize(100, KILOBYTE),
                        ImmutableList.of("*")),
                "");
        assertThrows(
                IllegalArgumentException.class,
                () -> testCachingDirectoryLister(
                        new CachingDirectoryLister(
                                new HadoopDirectoryLister(),
                                new Duration(5, TimeUnit.MINUTES),
                                new DataSize(100, KILOBYTE),
                                ImmutableList.of("*", "test_dbname.test_table")),
                        "*,test_dbname.test_table"));
    }

    @Test
    public void testSplittableNotCheckedOnSmallFiles()
            throws Exception
    {
        DataSize initialSplitSize = getMaxInitialSplitSize(SESSION);

        StorageFormat splittableStorageFormat = StorageFormat.create(
                LazySimpleSerDe.class.getName(),
                TestSplittableFailureInputFormat.class.getName(),
                TestSplittableFailureInputFormat.class.getName());
        Table table = table(ImmutableList.of(), Optional.empty(), splittableStorageFormat);

        //  Exactly minimum split size, no isSplittable check
        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                SESSION,
                ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes())),
                ImmutableMap.of(),
                Optional.empty(),
                table,
                Optional.empty(),
                samplePartitionMetadatas());

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        assertEquals(drainSplits(hiveSplitSource).size(), 1);

        //  Large enough for isSplittable to be called
        backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                SESSION,
                ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes() + 1)),
                ImmutableMap.of(),
                Optional.empty(),
                table,
                Optional.empty(),
                samplePartitionMetadatas());

        hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        try {
            drainSplits(hiveSplitSource);
            fail("Expected split generation to call isSplittable and fail");
        }
        catch (PrestoException e) {
            Throwable cause = Throwables.getRootCause(e);
            assertTrue(cause instanceof IllegalStateException);
            assertEquals(cause.getMessage(), "isSplittable called");
        }
    }

    public static final class TestSplittableFailureInputFormat
            extends FileInputFormat<Void, Void>
    {
        @Override
        protected boolean isSplitable(FileSystem fs, Path filename)
        {
            throw new IllegalStateException("isSplittable called");
        }

        @Override
        public RecordReader<Void, Void> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter)
        {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testTextInputHeaderFooterSplitCounts()
            throws Exception
    {
        DataSize fileSize = new DataSize(2, GIGABYTE);
        assertTextFileSplitCount(fileSize, ImmutableMap.of(), 33);
        assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.header.line.count", "1"), 33);
        assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.header.line.count", "2"), 1);
        assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.footer.line.count", "1"), 1);
        assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.header.line.count", "1", "skip.footer.line.count", "1"), 1);
    }

    private void assertTextFileSplitCount(DataSize fileSize, Map<String, String> tableProperties, int expectedSplitCount)
            throws Exception
    {
        Table.Builder tableBuilder = Table.builder(SIMPLE_TABLE)
                .setParameters(ImmutableMap.copyOf(tableProperties));
        tableBuilder.getStorageBuilder()
                .setStorageFormat(StorageFormat.fromHiveStorageFormat(HiveStorageFormat.TEXTFILE));
        Table table = tableBuilder.build();

        BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), fileSize.toBytes())),
                ImmutableMap.of(),
                Optional.empty(),
                table,
                Optional.empty());

        HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
        backgroundHiveSplitLoader.start(hiveSplitSource);

        assertEquals(drainSplits(hiveSplitSource).size(), expectedSplitCount);
    }

    private void testCachingDirectoryLister(CachingDirectoryLister cachingDirectoryLister, String fileStatusCacheTables)
            throws Exception
    {
        assertEquals(cachingDirectoryLister.getRequestCount(), 0);

        int totalCount = 50;
        CountDownLatch firstVisit = new CountDownLatch(1);
        List<Future<List<HiveSplit>>> futures = new ArrayList<>();

        futures.add(EXECUTOR.submit(() -> {
            BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                    TEST_FILES,
                    cachingDirectoryLister,
                    fileStatusCacheTables);
            HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
            backgroundHiveSplitLoader.start(hiveSplitSource);
            try {
                return drainSplits(hiveSplitSource);
            }
            finally {
                firstVisit.countDown();
            }
        }));

        for (int i = 0; i < totalCount - 1; i++) {
            futures.add(EXECUTOR.submit(() -> {
                firstVisit.await();
                BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
                        TEST_FILES,
                        cachingDirectoryLister,
                        fileStatusCacheTables);
                HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
                backgroundHiveSplitLoader.start(hiveSplitSource);
                return drainSplits(hiveSplitSource);
            }));
        }

        for (Future<List<HiveSplit>> future : futures) {
            assertEquals(future.get().size(), TEST_FILES.size());
        }

        if (fileStatusCacheTables.isEmpty()) {
            assertEquals(cachingDirectoryLister.getRequestCount(), 0);
            assertEquals(cachingDirectoryLister.getHitCount(), 0);
            assertEquals(cachingDirectoryLister.getMissCount(), 0);
        }
        else {
            assertEquals(cachingDirectoryLister.getRequestCount(), totalCount);
            assertEquals(cachingDirectoryLister.getHitCount(), totalCount - 1);
            assertEquals(cachingDirectoryLister.getMissCount(), 1);
        }
    }

    private static List<String> drain(HiveSplitSource source)
            throws Exception
    {
        return drainSplits(source).stream()
                .map(HiveSplit::getFileSplit)
                .map(HiveFileSplit::getPath)
                .collect(toImmutableList());
    }

    private static List<HiveSplit> drainSplits(HiveSplitSource source)
            throws Exception
    {
        ImmutableList.Builder<HiveSplit> splits = ImmutableList.builder();
        while (!source.isFinished()) {
            source.getNextBatch(NOT_PARTITIONED, 100).get()
                    .getSplits().stream()
                    .map(HiveSplit.class::cast)
                    .forEach(splits::add);
        }
        return splits.build();
    }

    private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
            List<LocatedFileStatus> files,
            Map<Integer, Domain> pathDomain)
    {
        return backgroundHiveSplitLoader(
                files,
                pathDomain,
                Optional.empty(),
                SIMPLE_TABLE,
                Optional.empty());
    }

    private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
            List<LocatedFileStatus> files,
            Map<Integer, Domain> constraints,
            Optional<HiveBucketFilter> hiveBucketFilter,
            Table table,
            Optional<HiveBucketHandle> bucketHandle)
    {
        ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
                        new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE)),
                        new HiveCommonClientConfig()));
        return backgroundHiveSplitLoader(connectorSession, files, constraints, hiveBucketFilter, table, bucketHandle, samplePartitionMetadatas());
    }

    private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
            ConnectorSession connectorSession,
            List<LocatedFileStatus> files,
            Map<Integer, Domain> constraints,
            Optional<HiveBucketFilter> hiveBucketFilter,
            Table table,
            Optional<HiveBucketHandle> bucketHandle,
            List<HivePartitionMetadata> hivePartitionMetadatas)
    {
        return new BackgroundHiveSplitLoader(
                table,
                hivePartitionMetadatas,
                constraints,
                createBucketSplitInfo(bucketHandle, hiveBucketFilter),
                connectorSession,
                new TestingHdfsEnvironment(files),
                new NamenodeStats(),
                new CachingDirectoryLister(new HadoopDirectoryLister(), new HiveClientConfig()),
                EXECUTOR,
                2,
                false,
                false,
                false);
    }

    public static List<HivePartitionMetadata> samplePartitionMetadatas()
    {
        return ImmutableList.of(
                        new HivePartitionMetadata(
                                new HivePartition(new SchemaTableName("testSchema", "table_name")),
                                Optional.empty(),
                                TableToPartitionMapping.empty(),
                                Optional.empty(),
                                ImmutableSet.of(),
                                Optional.empty()));
    }

    private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(List<LocatedFileStatus> files, DirectoryLister directoryLister, String fileStatusCacheTables)
    {
        ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
                        new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE))
                                .setFileStatusCacheTables(fileStatusCacheTables),
                        new HiveCommonClientConfig()));

        return new BackgroundHiveSplitLoader(
                SIMPLE_TABLE,
                samplePartitionMetadatas(),
                ImmutableMap.of(),
                createBucketSplitInfo(Optional.empty(), Optional.empty()),
                connectorSession,
                new TestingHdfsEnvironment(files),
                new NamenodeStats(),
                directoryLister,
                EXECUTOR,
                2,
                false,
                false,
                false);
    }

    private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions()
    {
        ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
                        new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE)),
                        new HiveCommonClientConfig()));

        return new BackgroundHiveSplitLoader(
                SIMPLE_TABLE,
                createPartitionMetadataWithOfflinePartitions(),
                ImmutableMap.of(),
                createBucketSplitInfo(Optional.empty(), Optional.empty()),
                connectorSession,
                new TestingHdfsEnvironment(TEST_FILES),
                new NamenodeStats(),
                new CachingDirectoryLister(new HadoopDirectoryLister(), new HiveClientConfig()),
                directExecutor(),
                2,
                false,
                false,
                false);
    }

    private static Iterable<HivePartitionMetadata> createPartitionMetadataWithOfflinePartitions()
            throws RuntimeException
    {
        return () -> new AbstractIterator<HivePartitionMetadata>()
        {
            // This iterator is crafted to return a valid partition for the first calls to
            // hasNext() and next(), and then it should throw for the second call to hasNext()
            private int position = -1;

            @Override
            protected HivePartitionMetadata computeNext()
            {
                position++;
                switch (position) {
                    case 0:
                        return new HivePartitionMetadata(
                                new HivePartition(new SchemaTableName("testSchema", "table_name")),
                                Optional.empty(),
                                TableToPartitionMapping.empty(),
                                Optional.empty(),
                                ImmutableSet.of(),
                                Optional.empty());
                    case 1:
                        throw new RuntimeException("OFFLINE");
                    default:
                        return endOfData();
                }
            }
        };
    }

    private static HiveSplitSource hiveSplitSource(BackgroundHiveSplitLoader backgroundHiveSplitLoader)
    {
        return HiveSplitSource.allAtOnce(
                SESSION,
                SIMPLE_TABLE.getDatabaseName(),
                SIMPLE_TABLE.getTableName(),
                new CacheQuotaRequirement(GLOBAL, Optional.empty()),
                1,
                1,
                new DataSize(32, MEGABYTE),
                backgroundHiveSplitLoader,
                EXECUTOR,
                new CounterStat(),
                1);
    }

    private static Table table(
            List<Column> partitionColumns,
            Optional<HiveBucketProperty> bucketProperty)
    {
        return table(
                partitionColumns,
                bucketProperty,
                StorageFormat.create(
                        "com.facebook.hive.orc.OrcSerde",
                        "org.apache.hadoop.hive.ql.io.RCFileInputFormat",
                        "org.apache.hadoop.hive.ql.io.RCFileInputFormat"));
    }

    private static Table table(
            List<Column> partitionColumns,
            Optional<HiveBucketProperty> bucketProperty,
            StorageFormat storageFormat)
    {
        Table.Builder tableBuilder = Table.builder();
        tableBuilder.getStorageBuilder()
                .setStorageFormat(storageFormat)
                .setLocation("hdfs://VOL1:9000/db_name/table_name")
                .setSkewed(false)
                .setBucketProperty(bucketProperty);

        return tableBuilder
                .setDatabaseName("test_dbname")
                .setOwner("testOwner")
                .setTableName("test_table")
                .setTableType(MANAGED_TABLE)
                .setDataColumns(ImmutableList.of(new Column("col1", HIVE_STRING, Optional.empty(), Optional.empty())))
                .setParameters(ImmutableMap.of())
                .setPartitionColumns(partitionColumns)
                .build();
    }

    private static LocatedFileStatus locatedFileStatus(Path path, long fileSize)
    {
        return new LocatedFileStatus(
                fileSize,
                false,
                0,
                0L,
                0L,
                0L,
                null,
                null,
                null,
                null,
                path,
                new BlockLocation[]{new BlockLocation(new String[1], new String[]{"localhost"}, 0, fileSize)});
    }

    private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path)
    {
        return new LocatedFileStatus(
                0L,
                false,
                0,
                0L,
                0L,
                0L,
                null,
                null,
                null,
                null,
                path,
                new BlockLocation[]{});
    }

    public static class TestingHdfsEnvironment
            extends HdfsEnvironment
    {
        private final List<LocatedFileStatus> files;

        public TestingHdfsEnvironment(List<LocatedFileStatus> files)
        {
            super(
                    new HiveHdfsConfiguration(new HdfsConfigurationInitializer(new HiveClientConfig(), new MetastoreClientConfig()), ImmutableSet.of(), new HiveClientConfig()),
                    new MetastoreClientConfig(),
                    new NoHdfsAuthentication());
            this.files = ImmutableList.copyOf(files);
        }

        @Override
        public ExtendedFileSystem getFileSystem(String user, Path path, Configuration configuration)
        {
            return new TestingHdfsFileSystem(files);
        }
    }

    private static class TestingHdfsFileSystem
            extends ExtendedFileSystem
    {
        private final List<LocatedFileStatus> files;

        public TestingHdfsFileSystem(List<LocatedFileStatus> files)
        {
            this.files = ImmutableList.copyOf(files);
        }

        @Override
        public boolean delete(Path f, boolean recursive)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean rename(Path src, Path dst)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public void setWorkingDirectory(Path dir)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public FileStatus[] listStatus(Path f)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
        {
            return new RemoteIterator<LocatedFileStatus>()
            {
                private final Iterator<LocatedFileStatus> iterator = files.iterator();

                @Override
                public boolean hasNext()
                {
                    return iterator.hasNext();
                }

                @Override
                public LocatedFileStatus next()
                {
                    return iterator.next();
                }
            };
        }

        @Override
        public FSDataOutputStream create(
                Path f,
                FsPermission permission,
                boolean overwrite,
                int bufferSize,
                short replication,
                long blockSize,
                Progressable progress)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean mkdirs(Path f, FsPermission permission)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public FSDataInputStream open(Path f, int bufferSize)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public FileStatus getFileStatus(Path f)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getWorkingDirectory()
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public URI getUri()
        {
            throw new UnsupportedOperationException();
        }
    }
}