TestHiveCommitHandleOutput.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.statistics.QuickStatsProvider;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTimeZone;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.hive.AbstractTestHiveClient.TEST_SERVER_VERSION;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveTableProperties.BUCKETED_BY_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.BUCKET_COUNT_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.SORTED_BY_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
import static com.facebook.presto.hive.HiveTestUtils.DO_NOTHING_DIRECTORY_LISTER;
import static com.facebook.presto.hive.HiveTestUtils.FILTER_STATS_CALCULATOR_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION;
import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_QUERY_ID_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static java.nio.file.Files.createTempDirectory;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class TestHiveCommitHandleOutput
{
private static final String TEST_SCHEMA = "test_schema";
private static final String TEST_TABLE = "test_table";
private static final Map<String, Object> testTableProperties;
private static ConnectorTableMetadata testTableMetadata;
static {
try {
URI tempUri = createTempDirectory("test").toUri();
testTableProperties = ImmutableMap.<String, Object>builder()
.put(BUCKET_COUNT_PROPERTY, 0)
.put(BUCKETED_BY_PROPERTY, ImmutableList.of())
.put(SORTED_BY_PROPERTY, ImmutableList.of())
.put(STORAGE_FORMAT_PROPERTY, ORC)
.put(EXTERNAL_LOCATION_PROPERTY, tempUri.toASCIIString())
.put(PARTITIONED_BY_PROPERTY, ImmutableList.of("a"))
.build();
}
catch (IOException e) {
throw new RuntimeException(e);
}
testTableMetadata = new ConnectorTableMetadata(
new SchemaTableName(TEST_SCHEMA, TEST_TABLE),
ImmutableList.of(
ColumnMetadata.builder().setName("b").setType(BIGINT).build(),
ColumnMetadata.builder().setName("a").setType(BIGINT).build()),
testTableProperties);
}
@Test
public void testCommitOutputForTable()
{
TestingExtendedHiveMetastore metastore = new TestingExtendedHiveMetastore();
HiveClientConfig hiveClientConfig = new HiveClientConfig().setPartitionStatisticsBasedOptimizationEnabled(true);
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(newFixedThreadPool(10, daemonThreadsNamed("test-hive-commit-handle-%s")));
ConnectorSession connectorSession = new TestingConnectorSession(
new HiveSessionProperties(
new HiveClientConfig().setPartitionStatisticsBasedOptimizationEnabled(true),
new OrcFileWriterConfig(),
new ParquetFileWriterConfig(),
new CacheConfig()).getSessionProperties());
HiveMetadata hiveMeta = getHiveMetadata(metastore, hiveClientConfig, listeningExecutor);
// Create a table; write commit output should not be empty.
hiveMeta.createTable(connectorSession, testTableMetadata, false);
ConnectorCommitHandle handle = hiveMeta.commit();
assertEquals(handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), "");
assertFalse(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)).isEmpty());
// Get the table; both commit write and read should be empty.
hiveMeta = getHiveMetadata(metastore, hiveClientConfig, listeningExecutor);
HiveTableHandle hiveTableHandle = new HiveTableHandle(TEST_SCHEMA, TEST_TABLE, Optional.empty());
hiveMeta.getTableMetadata(connectorSession, hiveTableHandle);
handle = hiveMeta.commit();
assertEquals(handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), "");
assertEquals(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), "");
}
@Test
public void testCommitOutputForPartitions()
{
TestingExtendedHiveMetastore metastore = new TestingExtendedHiveMetastore();
HiveClientConfig hiveClientConfig = new HiveClientConfig().setPartitionStatisticsBasedOptimizationEnabled(true);
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(newFixedThreadPool(10, daemonThreadsNamed("test-hive-commit-handle-%s")));
HiveMetadata hiveMeta = getHiveMetadata(metastore, hiveClientConfig, listeningExecutor);
ConnectorSession connectorSession = new TestingConnectorSession(
new HiveSessionProperties(
new HiveClientConfig().setPartitionStatisticsBasedOptimizationEnabled(true),
new OrcFileWriterConfig(),
new ParquetFileWriterConfig(),
new CacheConfig()).getSessionProperties());
// Create a table.
hiveMeta.createTable(connectorSession, testTableMetadata, false);
ConnectorCommitHandle handle = hiveMeta.commit();
assertEquals(handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), "");
assertFalse(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)).isEmpty());
// Add a partition: a=1;
hiveMeta = getHiveMetadata(metastore, hiveClientConfig, listeningExecutor);
String partitionName = "a=1";
hiveMeta.getMetastore().addPartition(
connectorSession,
TEST_SCHEMA,
TEST_TABLE,
"random_table_path",
false,
createPartition(partitionName, "location1"),
new Path("/" + TEST_TABLE),
PartitionStatistics.empty());
handle = hiveMeta.commit();
assertEquals(handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), "");
assertFalse(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)).isEmpty());
String serializedCommitOutput = handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE));
// Get the partition; the last commit output should equal to the one returned when adding the partition.
hiveMeta = getHiveMetadata(metastore, hiveClientConfig, listeningExecutor);
Map<String, Optional<Partition>> partitions = hiveMeta.getMetastore().getPartitionsByNames(
new MetastoreContext(
connectorSession.getUser(),
connectorSession.getQueryId(),
Optional.empty(),
Collections.emptySet(),
Optional.empty(),
Optional.empty(),
false,
HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
connectorSession.getWarningCollector(),
connectorSession.getRuntimeStats()),
TEST_SCHEMA,
TEST_TABLE,
ImmutableList.of(new PartitionNameWithVersion(partitionName, Optional.empty())));
handle = hiveMeta.commit();
Optional<Partition> partition = partitions.get(partitionName);
assertTrue(partition.isPresent());
assertEquals(
handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)),
Long.toString(partition.get().getLastDataCommitTime()));
assertEquals(handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), serializedCommitOutput);
assertTrue(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)).isEmpty());
// Add the same partition with different location, it should trigger the metastore to generate different commit output.
hiveMeta = getHiveMetadata(metastore, hiveClientConfig, listeningExecutor);
hiveMeta.getMetastore().addPartition(
connectorSession,
TEST_SCHEMA,
TEST_TABLE,
"random_table_path",
false,
createPartition(partitionName, "location2"),
new Path("/" + TEST_TABLE),
PartitionStatistics.empty());
handle = hiveMeta.commit();
assertEquals(handle.getSerializedCommitOutputForRead(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), "");
assertFalse(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)).isEmpty());
assertEquals(handle.getSerializedCommitOutputForWrite(new SchemaTableName(TEST_SCHEMA, TEST_TABLE)), serializedCommitOutput);
}
private HiveMetadata getHiveMetadata(TestingExtendedHiveMetastore metastore, HiveClientConfig hiveClientConfig, ListeningExecutorService listeningExecutor)
{
HdfsEnvironment hdfsEnvironment = new TestingHdfsEnvironment(ImmutableList.of());
HiveMetadataFactory hiveMetadataFactory = new HiveMetadataFactory(
metastore,
hdfsEnvironment,
new HivePartitionManager(FUNCTION_AND_TYPE_MANAGER, hiveClientConfig),
DateTimeZone.forOffsetHours(1),
true,
false,
false,
true,
true,
hiveClientConfig.getMaxPartitionBatchSize(),
hiveClientConfig.getMaxPartitionsPerScan(),
false,
10_000,
FUNCTION_AND_TYPE_MANAGER,
new HiveLocationService(hdfsEnvironment),
FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE,
FILTER_STATS_CALCULATOR_SERVICE,
new TableParameterCodec(),
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
listeningExecutor,
new HiveTypeTranslator(),
new HiveStagingFileCommitter(hdfsEnvironment, listeningExecutor),
new HiveZeroRowFileCreator(hdfsEnvironment, new OutputStreamDataSinkFactory(), listeningExecutor),
TEST_SERVER_VERSION,
new HivePartitionObjectBuilder(),
new HiveEncryptionInformationProvider(ImmutableList.of()),
new HivePartitionStats(),
new HiveFileRenamer(),
HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));
return hiveMetadataFactory.get();
}
private Partition createPartition(String partitionName, String partitionLocation)
{
Partition.Builder partitionBuilder = Partition.builder()
.setCatalogName(Optional.of("hive"))
.setDatabaseName(TEST_SCHEMA)
.setTableName(TEST_TABLE)
.setColumns(ImmutableList.of())
.setValues(toPartitionValues(partitionName))
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(HiveStorageFormat.ORC))
.setLocation(new Path("/" + TEST_TABLE + "/" + partitionLocation, partitionName).toString()))
.setEligibleToIgnore(true)
.setSealedPartition(true)
.setParameters(ImmutableMap.of(PRESTO_QUERY_ID_NAME, "random_query_id"));
return partitionBuilder.build();
}
private static class TestingHdfsEnvironment
extends HdfsEnvironment
{
private final List<LocatedFileStatus> files;
public TestingHdfsEnvironment(List<LocatedFileStatus> files)
{
super(
new HiveHdfsConfiguration(
new HdfsConfigurationInitializer(new HiveClientConfig(), new MetastoreClientConfig()),
ImmutableSet.of(),
new HiveClientConfig()),
new MetastoreClientConfig(),
new NoHdfsAuthentication());
this.files = ImmutableList.copyOf(files);
}
@Override
public ExtendedFileSystem getFileSystem(String user, Path path, Configuration configuration)
{
return new TestingHdfsFileSystem(files);
}
}
private static class TestingHdfsFileSystem
extends ExtendedFileSystem
{
private final List<LocatedFileStatus> files;
public TestingHdfsFileSystem(List<LocatedFileStatus> files)
{
this.files = ImmutableList.copyOf(files);
}
@Override
public boolean delete(Path f, boolean recursive)
{
throw new UnsupportedOperationException();
}
@Override
public boolean rename(Path src, Path dst)
{
throw new UnsupportedOperationException();
}
@Override
public void setWorkingDirectory(Path dir)
{
throw new UnsupportedOperationException();
}
@Override
public FileStatus[] listStatus(Path f)
{
throw new UnsupportedOperationException();
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
{
return new RemoteIterator<LocatedFileStatus>()
{
private final Iterator<LocatedFileStatus> iterator = files.iterator();
@Override
public boolean hasNext()
throws IOException
{
return iterator.hasNext();
}
@Override
public LocatedFileStatus next()
throws IOException
{
return iterator.next();
}
};
}
@Override
public FSDataOutputStream create(
Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress)
{
throw new UnsupportedOperationException();
}
@Override
public boolean mkdirs(Path f, FsPermission permission)
{
return true;
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
{
throw new UnsupportedOperationException();
}
@Override
public FSDataInputStream open(Path f, int bufferSize)
{
throw new UnsupportedOperationException();
}
@Override
public FileStatus getFileStatus(Path f)
{
return new FileStatus(0, true, 0, 0, 0, 0, null, null, null, f);
}
@Override
public Path getWorkingDirectory()
{
throw new UnsupportedOperationException();
}
@Override
public URI getUri()
{
throw new UnsupportedOperationException();
}
@Override
public boolean exists(Path f)
{
return false;
}
}
}