TestStoragePartitionLoader.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.testng.annotations.Test;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
import static com.facebook.presto.hive.StoragePartitionLoader.BucketSplitInfo.createBucketSplitInfo;
import static com.facebook.presto.hive.TestBackgroundHiveSplitLoader.SIMPLE_TABLE;
import static com.facebook.presto.hive.TestBackgroundHiveSplitLoader.samplePartitionMetadatas;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static org.testng.Assert.assertEquals;
public class TestStoragePartitionLoader
{
@Test
public void testGetSymlinkIterator()
throws Exception
{
CachingDirectoryLister directoryLister = new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
new DataSize(100, KILOBYTE),
ImmutableList.of());
Configuration configuration = new Configuration(false);
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(
configuration,
SymlinkTextInputFormat.class.getName(),
PARQUET.getSerDe(),
true);
Path firstFilePath = new Path("hdfs://hadoop:9000/db_name/table_name/file1");
Path secondFilePath = new Path("hdfs://hadoop:9000/db_name/table_name/file2");
List<Path> paths = ImmutableList.of(firstFilePath, secondFilePath);
List<LocatedFileStatus> files = paths.stream()
.map(path -> locatedFileStatus(path, 0L))
.collect(toImmutableList());
ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE))
.setFileStatusCacheTables(""),
new HiveCommonClientConfig()));
StoragePartitionLoader storagePartitionLoader = storagePartitionLoader(files, directoryLister, connectorSession);
HdfsContext hdfsContext = new HdfsContext(
connectorSession,
SIMPLE_TABLE.getDatabaseName(),
SIMPLE_TABLE.getTableName(),
SIMPLE_TABLE.getStorage().getLocation(),
false);
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
IGNORED,
isUseListDirectoryCache(connectorSession),
isSkipEmptyFilesEnabled(connectorSession),
hdfsContext.getIdentity(),
buildDirectoryContextProperties(connectorSession),
connectorSession.getRuntimeStats());
Iterator<InternalHiveSplit> symlinkIterator = storagePartitionLoader.getSymlinkIterator(
new Path("hdfs://hadoop:9000/db_name/table_name/symlink_manifest"),
false,
SIMPLE_TABLE.getStorage(),
ImmutableList.of(),
"UNPARTITIONED",
SIMPLE_TABLE.getDataColumns().size(),
getOnlyElement(samplePartitionMetadatas()),
true,
new Path("hdfs://hadoop:9000/db_name/table_name/"),
paths,
inputFormat,
hiveDirectoryContext);
List<InternalHiveSplit> splits = ImmutableList.copyOf(symlinkIterator);
assertEquals(splits.size(), 2);
assertEquals(splits.get(0).getPath(), firstFilePath.toString());
assertEquals(splits.get(1).getPath(), secondFilePath.toString());
}
private static LocatedFileStatus locatedFileStatus(Path path, long fileSize)
{
return new LocatedFileStatus(
fileSize,
false,
0,
0L,
0L,
0L,
null,
null,
null,
null,
path,
new org.apache.hadoop.fs.BlockLocation[]{new BlockLocation(new String[1], new String[]{"localhost"}, 0, fileSize)});
}
private static StoragePartitionLoader storagePartitionLoader(
List<LocatedFileStatus> files,
DirectoryLister directoryLister,
ConnectorSession connectorSession)
{
return new StoragePartitionLoader(
SIMPLE_TABLE,
ImmutableMap.of(),
createBucketSplitInfo(Optional.empty(), Optional.empty()),
connectorSession,
new TestingHdfsEnvironment(files),
new NamenodeStats(),
directoryLister,
new ConcurrentLinkedDeque<>(),
false,
false,
false);
}
}