TestBackgroundHiveSplitLoader.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType;
import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.testng.annotations.Test;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.HiveUtil.getRegularColumnHandles;
import static com.facebook.presto.hive.StoragePartitionLoader.BucketSplitInfo.createBucketSplitInfo;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;
public class TestBackgroundHiveSplitLoader
{
private static final int BUCKET_COUNT = 2;
private static final String SAMPLE_PATH = "hdfs://VOL1:9000/db_name/table_name/000000_0";
private static final String SAMPLE_PATH_FILTERED = "hdfs://VOL1:9000/db_name/table_name/000000_1";
private static final Path RETURNED_PATH = new Path(SAMPLE_PATH);
private static final Path FILTERED_PATH = new Path(SAMPLE_PATH_FILTERED);
private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-%s"));
private static final Domain RETURNED_PATH_DOMAIN = Domain.singleValue(VARCHAR, utf8Slice(RETURNED_PATH.toString()));
private static final Map<Integer, Domain> RETURNED_PATH_CONSTRAINT = ImmutableMap.of(PATH_COLUMN_INDEX, RETURNED_PATH_DOMAIN);
private static final List<LocatedFileStatus> TEST_FILES = ImmutableList.of(
locatedFileStatus(RETURNED_PATH, 0L),
locatedFileStatus(FILTERED_PATH, 0L));
private static final String PARTITION_COLUMN_NAME = "partitionColumn";
private static final List<Column> PARTITION_COLUMNS = ImmutableList.of(
new Column(PARTITION_COLUMN_NAME, HIVE_INT, Optional.empty(), Optional.empty()));
private static final List<HiveColumnHandle> BUCKET_COLUMN_HANDLES = ImmutableList.of(
new HiveColumnHandle("col1", HIVE_INT, INTEGER.getTypeSignature(), 0, ColumnType.REGULAR, Optional.empty(), Optional.empty()));
private static final Optional<HiveBucketProperty> BUCKET_PROPERTY = Optional.of(
new HiveBucketProperty(ImmutableList.of("col1"), BUCKET_COUNT, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty()));
public static final Table SIMPLE_TABLE = table(ImmutableList.of(), Optional.empty());
private static final Table PARTITIONED_TABLE = table(PARTITION_COLUMNS, BUCKET_PROPERTY);
@Test
public void testNoPathFilter()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
TEST_FILES,
ImmutableMap.of());
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
assertEquals(drain(hiveSplitSource).size(), 2);
}
@Test
public void testPathFilter()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
TEST_FILES,
RETURNED_PATH_CONSTRAINT);
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
List<String> paths = drain(hiveSplitSource);
assertEquals(paths.size(), 1);
assertEquals(paths.get(0), RETURNED_PATH.toString());
}
@Test
public void testPathFilterOneBucketMatchPartitionedTable()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
TEST_FILES,
RETURNED_PATH_CONSTRAINT,
Optional.of(new HiveBucketFilter(ImmutableSet.of(0, 1))),
PARTITIONED_TABLE,
Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKET_COUNT, BUCKET_COUNT)));
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
List<String> paths = drain(hiveSplitSource);
assertEquals(paths.size(), 1);
assertEquals(paths.get(0), RETURNED_PATH.toString());
}
@Test
public void testPathFilterBucketedPartitionedTable()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
TEST_FILES,
RETURNED_PATH_CONSTRAINT,
Optional.empty(),
PARTITIONED_TABLE,
Optional.of(
new HiveBucketHandle(
getRegularColumnHandles(PARTITIONED_TABLE),
BUCKET_COUNT,
BUCKET_COUNT)));
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
List<String> paths = drain(hiveSplitSource);
assertEquals(paths.size(), 1);
assertEquals(paths.get(0), RETURNED_PATH.toString());
}
@Test
public void testEmptyFileWithNoBlocks()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
ImmutableList.of(locatedFileStatusWithNoBlocks(RETURNED_PATH)),
ImmutableMap.of());
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
List<HiveSplit> splits = drainSplits(hiveSplitSource);
assertEquals(splits.size(), 1);
assertEquals(splits.get(0).getFileSplit().getPath(), RETURNED_PATH.toString());
assertEquals(splits.get(0).getFileSplit().getLength(), 0);
}
@Test
public void testNoHangIfPartitionIsOffline()
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoaderOfflinePartitions();
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
assertThrows(RuntimeException.class, () -> drain(hiveSplitSource));
assertThrows(RuntimeException.class, hiveSplitSource::isFinished);
}
@Test
public void testUnsupportedTableFormat()
throws Exception
{
StorageFormat storageFormat = StorageFormat.create("NonExistentSerde", "NonExistentInputFormat", "NonExistentOutputFormat");
Table unsupportedTable = table(PARTITION_COLUMNS, Optional.empty(), storageFormat);
String partitionId = PARTITION_COLUMN_NAME + "=Partition_Value";
List<HivePartitionMetadata> hivePartitionMetadatas =
ImmutableList.of(
new HivePartitionMetadata(
new HivePartition(unsupportedTable.getSchemaTableName(), new PartitionNameWithVersion(partitionId, Optional.empty()), ImmutableMap.of()),
Optional.of(orcPartition()),
TableToPartitionMapping.empty(),
Optional.empty(),
ImmutableSet.of(),
Optional.empty()));
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
SESSION,
ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), getMaxInitialSplitSize(SESSION).toBytes())),
ImmutableMap.of(),
Optional.empty(),
unsupportedTable,
Optional.empty(),
hivePartitionMetadatas);
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
assertEquals(drainSplits(hiveSplitSource).size(), 1);
}
private static Partition orcPartition()
{
return new Partition(
"test_db",
"test_table",
PARTITION_COLUMNS.stream().map(x -> "Partition_Value").collect(Collectors.toList()),
new Storage(
fromHiveStorageFormat(ORC),
"location",
Optional.empty(),
true,
ImmutableMap.of(),
ImmutableMap.of()),
PARTITION_COLUMNS,
ImmutableMap.of(),
Optional.empty(),
false,
true,
0,
0,
Optional.empty());
}
@Test
public void testCachingDirectoryLister()
throws Exception
{
testCachingDirectoryLister(
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
new DataSize(100, KILOBYTE),
ImmutableList.of("test_dbname.test_table")),
"test_dbname.test_table");
testCachingDirectoryLister(
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
new DataSize(100, KILOBYTE),
ImmutableList.of("*")),
"*");
testCachingDirectoryLister(
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
new DataSize(100, KILOBYTE),
ImmutableList.of("*")),
"");
assertThrows(
IllegalArgumentException.class,
() -> testCachingDirectoryLister(
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
new DataSize(100, KILOBYTE),
ImmutableList.of("*", "test_dbname.test_table")),
"*,test_dbname.test_table"));
}
@Test
public void testSplittableNotCheckedOnSmallFiles()
throws Exception
{
DataSize initialSplitSize = getMaxInitialSplitSize(SESSION);
StorageFormat splittableStorageFormat = StorageFormat.create(
LazySimpleSerDe.class.getName(),
TestSplittableFailureInputFormat.class.getName(),
TestSplittableFailureInputFormat.class.getName());
Table table = table(ImmutableList.of(), Optional.empty(), splittableStorageFormat);
// Exactly minimum split size, no isSplittable check
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
SESSION,
ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes())),
ImmutableMap.of(),
Optional.empty(),
table,
Optional.empty(),
samplePartitionMetadatas());
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
assertEquals(drainSplits(hiveSplitSource).size(), 1);
// Large enough for isSplittable to be called
backgroundHiveSplitLoader = backgroundHiveSplitLoader(
SESSION,
ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes() + 1)),
ImmutableMap.of(),
Optional.empty(),
table,
Optional.empty(),
samplePartitionMetadatas());
hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
try {
drainSplits(hiveSplitSource);
fail("Expected split generation to call isSplittable and fail");
}
catch (PrestoException e) {
Throwable cause = Throwables.getRootCause(e);
assertTrue(cause instanceof IllegalStateException);
assertEquals(cause.getMessage(), "isSplittable called");
}
}
public static final class TestSplittableFailureInputFormat
extends FileInputFormat<Void, Void>
{
@Override
protected boolean isSplitable(FileSystem fs, Path filename)
{
throw new IllegalStateException("isSplittable called");
}
@Override
public RecordReader<Void, Void> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter)
{
throw new UnsupportedOperationException();
}
}
@Test
public void testTextInputHeaderFooterSplitCounts()
throws Exception
{
DataSize fileSize = new DataSize(2, GIGABYTE);
assertTextFileSplitCount(fileSize, ImmutableMap.of(), 33);
assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.header.line.count", "1"), 33);
assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.header.line.count", "2"), 1);
assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.footer.line.count", "1"), 1);
assertTextFileSplitCount(fileSize, ImmutableMap.of("skip.header.line.count", "1", "skip.footer.line.count", "1"), 1);
}
private void assertTextFileSplitCount(DataSize fileSize, Map<String, String> tableProperties, int expectedSplitCount)
throws Exception
{
Table.Builder tableBuilder = Table.builder(SIMPLE_TABLE)
.setParameters(ImmutableMap.copyOf(tableProperties));
tableBuilder.getStorageBuilder()
.setStorageFormat(StorageFormat.fromHiveStorageFormat(HiveStorageFormat.TEXTFILE));
Table table = tableBuilder.build();
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), fileSize.toBytes())),
ImmutableMap.of(),
Optional.empty(),
table,
Optional.empty());
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
assertEquals(drainSplits(hiveSplitSource).size(), expectedSplitCount);
}
private void testCachingDirectoryLister(CachingDirectoryLister cachingDirectoryLister, String fileStatusCacheTables)
throws Exception
{
assertEquals(cachingDirectoryLister.getRequestCount(), 0);
int totalCount = 50;
CountDownLatch firstVisit = new CountDownLatch(1);
List<Future<List<HiveSplit>>> futures = new ArrayList<>();
futures.add(EXECUTOR.submit(() -> {
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
TEST_FILES,
cachingDirectoryLister,
fileStatusCacheTables);
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
try {
return drainSplits(hiveSplitSource);
}
finally {
firstVisit.countDown();
}
}));
for (int i = 0; i < totalCount - 1; i++) {
futures.add(EXECUTOR.submit(() -> {
firstVisit.await();
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
TEST_FILES,
cachingDirectoryLister,
fileStatusCacheTables);
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);
return drainSplits(hiveSplitSource);
}));
}
for (Future<List<HiveSplit>> future : futures) {
assertEquals(future.get().size(), TEST_FILES.size());
}
if (fileStatusCacheTables.isEmpty()) {
assertEquals(cachingDirectoryLister.getRequestCount(), 0);
assertEquals(cachingDirectoryLister.getHitCount(), 0);
assertEquals(cachingDirectoryLister.getMissCount(), 0);
}
else {
assertEquals(cachingDirectoryLister.getRequestCount(), totalCount);
assertEquals(cachingDirectoryLister.getHitCount(), totalCount - 1);
assertEquals(cachingDirectoryLister.getMissCount(), 1);
}
}
private static List<String> drain(HiveSplitSource source)
throws Exception
{
return drainSplits(source).stream()
.map(HiveSplit::getFileSplit)
.map(HiveFileSplit::getPath)
.collect(toImmutableList());
}
private static List<HiveSplit> drainSplits(HiveSplitSource source)
throws Exception
{
ImmutableList.Builder<HiveSplit> splits = ImmutableList.builder();
while (!source.isFinished()) {
source.getNextBatch(NOT_PARTITIONED, 100).get()
.getSplits().stream()
.map(HiveSplit.class::cast)
.forEach(splits::add);
}
return splits.build();
}
private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
List<LocatedFileStatus> files,
Map<Integer, Domain> pathDomain)
{
return backgroundHiveSplitLoader(
files,
pathDomain,
Optional.empty(),
SIMPLE_TABLE,
Optional.empty());
}
private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
List<LocatedFileStatus> files,
Map<Integer, Domain> constraints,
Optional<HiveBucketFilter> hiveBucketFilter,
Table table,
Optional<HiveBucketHandle> bucketHandle)
{
ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE)),
new HiveCommonClientConfig()));
return backgroundHiveSplitLoader(connectorSession, files, constraints, hiveBucketFilter, table, bucketHandle, samplePartitionMetadatas());
}
private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
ConnectorSession connectorSession,
List<LocatedFileStatus> files,
Map<Integer, Domain> constraints,
Optional<HiveBucketFilter> hiveBucketFilter,
Table table,
Optional<HiveBucketHandle> bucketHandle,
List<HivePartitionMetadata> hivePartitionMetadatas)
{
return new BackgroundHiveSplitLoader(
table,
hivePartitionMetadatas,
constraints,
createBucketSplitInfo(bucketHandle, hiveBucketFilter),
connectorSession,
new TestingHdfsEnvironment(files),
new NamenodeStats(),
new CachingDirectoryLister(new HadoopDirectoryLister(), new HiveClientConfig()),
EXECUTOR,
2,
false,
false,
false);
}
public static List<HivePartitionMetadata> samplePartitionMetadatas()
{
return ImmutableList.of(
new HivePartitionMetadata(
new HivePartition(new SchemaTableName("testSchema", "table_name")),
Optional.empty(),
TableToPartitionMapping.empty(),
Optional.empty(),
ImmutableSet.of(),
Optional.empty()));
}
private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(List<LocatedFileStatus> files, DirectoryLister directoryLister, String fileStatusCacheTables)
{
ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE))
.setFileStatusCacheTables(fileStatusCacheTables),
new HiveCommonClientConfig()));
return new BackgroundHiveSplitLoader(
SIMPLE_TABLE,
samplePartitionMetadatas(),
ImmutableMap.of(),
createBucketSplitInfo(Optional.empty(), Optional.empty()),
connectorSession,
new TestingHdfsEnvironment(files),
new NamenodeStats(),
directoryLister,
EXECUTOR,
2,
false,
false,
false);
}
private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions()
{
ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE)),
new HiveCommonClientConfig()));
return new BackgroundHiveSplitLoader(
SIMPLE_TABLE,
createPartitionMetadataWithOfflinePartitions(),
ImmutableMap.of(),
createBucketSplitInfo(Optional.empty(), Optional.empty()),
connectorSession,
new TestingHdfsEnvironment(TEST_FILES),
new NamenodeStats(),
new CachingDirectoryLister(new HadoopDirectoryLister(), new HiveClientConfig()),
directExecutor(),
2,
false,
false,
false);
}
private static Iterable<HivePartitionMetadata> createPartitionMetadataWithOfflinePartitions()
throws RuntimeException
{
return () -> new AbstractIterator<HivePartitionMetadata>()
{
// This iterator is crafted to return a valid partition for the first calls to
// hasNext() and next(), and then it should throw for the second call to hasNext()
private int position = -1;
@Override
protected HivePartitionMetadata computeNext()
{
position++;
switch (position) {
case 0:
return new HivePartitionMetadata(
new HivePartition(new SchemaTableName("testSchema", "table_name")),
Optional.empty(),
TableToPartitionMapping.empty(),
Optional.empty(),
ImmutableSet.of(),
Optional.empty());
case 1:
throw new RuntimeException("OFFLINE");
default:
return endOfData();
}
}
};
}
private static HiveSplitSource hiveSplitSource(BackgroundHiveSplitLoader backgroundHiveSplitLoader)
{
return HiveSplitSource.allAtOnce(
SESSION,
SIMPLE_TABLE.getDatabaseName(),
SIMPLE_TABLE.getTableName(),
new CacheQuotaRequirement(GLOBAL, Optional.empty()),
1,
1,
new DataSize(32, MEGABYTE),
backgroundHiveSplitLoader,
EXECUTOR,
new CounterStat(),
1);
}
private static Table table(
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty)
{
return table(
partitionColumns,
bucketProperty,
StorageFormat.create(
"com.facebook.hive.orc.OrcSerde",
"org.apache.hadoop.hive.ql.io.RCFileInputFormat",
"org.apache.hadoop.hive.ql.io.RCFileInputFormat"));
}
private static Table table(
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty,
StorageFormat storageFormat)
{
Table.Builder tableBuilder = Table.builder();
tableBuilder.getStorageBuilder()
.setStorageFormat(storageFormat)
.setLocation("hdfs://VOL1:9000/db_name/table_name")
.setSkewed(false)
.setBucketProperty(bucketProperty);
return tableBuilder
.setDatabaseName("test_dbname")
.setOwner("testOwner")
.setTableName("test_table")
.setTableType(MANAGED_TABLE)
.setDataColumns(ImmutableList.of(new Column("col1", HIVE_STRING, Optional.empty(), Optional.empty())))
.setParameters(ImmutableMap.of())
.setPartitionColumns(partitionColumns)
.build();
}
private static LocatedFileStatus locatedFileStatus(Path path, long fileSize)
{
return new LocatedFileStatus(
fileSize,
false,
0,
0L,
0L,
0L,
null,
null,
null,
null,
path,
new BlockLocation[]{new BlockLocation(new String[1], new String[]{"localhost"}, 0, fileSize)});
}
private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path)
{
return new LocatedFileStatus(
0L,
false,
0,
0L,
0L,
0L,
null,
null,
null,
null,
path,
new BlockLocation[]{});
}
public static class TestingHdfsEnvironment
extends HdfsEnvironment
{
private final List<LocatedFileStatus> files;
public TestingHdfsEnvironment(List<LocatedFileStatus> files)
{
super(
new HiveHdfsConfiguration(new HdfsConfigurationInitializer(new HiveClientConfig(), new MetastoreClientConfig()), ImmutableSet.of(), new HiveClientConfig()),
new MetastoreClientConfig(),
new NoHdfsAuthentication());
this.files = ImmutableList.copyOf(files);
}
@Override
public ExtendedFileSystem getFileSystem(String user, Path path, Configuration configuration)
{
return new TestingHdfsFileSystem(files);
}
}
private static class TestingHdfsFileSystem
extends ExtendedFileSystem
{
private final List<LocatedFileStatus> files;
public TestingHdfsFileSystem(List<LocatedFileStatus> files)
{
this.files = ImmutableList.copyOf(files);
}
@Override
public boolean delete(Path f, boolean recursive)
{
throw new UnsupportedOperationException();
}
@Override
public boolean rename(Path src, Path dst)
{
throw new UnsupportedOperationException();
}
@Override
public void setWorkingDirectory(Path dir)
{
throw new UnsupportedOperationException();
}
@Override
public FileStatus[] listStatus(Path f)
{
throw new UnsupportedOperationException();
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
{
return new RemoteIterator<LocatedFileStatus>()
{
private final Iterator<LocatedFileStatus> iterator = files.iterator();
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public LocatedFileStatus next()
{
return iterator.next();
}
};
}
@Override
public FSDataOutputStream create(
Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress)
{
throw new UnsupportedOperationException();
}
@Override
public boolean mkdirs(Path f, FsPermission permission)
{
throw new UnsupportedOperationException();
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
{
throw new UnsupportedOperationException();
}
@Override
public FSDataInputStream open(Path f, int bufferSize)
{
throw new UnsupportedOperationException();
}
@Override
public FileStatus getFileStatus(Path f)
{
throw new UnsupportedOperationException();
}
@Override
public Path getWorkingDirectory()
{
throw new UnsupportedOperationException();
}
@Override
public URI getUri()
{
throw new UnsupportedOperationException();
}
}
}