TestRenameTableOnFragileFileSystem.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.hive;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.cost.ConnectorFilterStatsCalculatorService;
import com.facebook.presto.cost.FilterStatsCalculator;
import com.facebook.presto.cost.ScalarStatsCalculator;
import com.facebook.presto.cost.StatsNormalizer;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.HiveStorageFormat;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.ParquetFileWriterConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.file.DatabaseMetadata;
import com.facebook.presto.hive.metastore.file.FileHiveMetastoreConfig;
import com.facebook.presto.hive.metastore.file.TableMetadata;
import com.facebook.presto.iceberg.CommitTaskData;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergHiveMetadata;
import com.facebook.presto.iceberg.IcebergHiveMetadataFactory;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergSessionProperties;
import com.facebook.presto.iceberg.IcebergTableHandle;
import com.facebook.presto.iceberg.IcebergTableName;
import com.facebook.presto.iceberg.IcebergTableProperties;
import com.facebook.presto.iceberg.IcebergTableType;
import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.ExpressionOptimizer;
import com.facebook.presto.spi.relation.ExpressionOptimizerProvider;
import com.facebook.presto.spi.relation.PredicateCompiler;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.sql.InMemoryExpressionOptimizerProvider;
import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
import com.facebook.presto.sql.planner.planPrinter.RowExpressionFormatter;
import com.facebook.presto.sql.relational.FunctionResolution;
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.facebook.presto.sql.relational.RowExpressionOptimizer;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HadoopExtendedFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.testng.annotations.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.metastore.MetastoreUtil.ICEBERG_TABLE_TYPE_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.ICEBERG_TABLE_TYPE_VALUE;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.google.common.io.Files.createTempDir;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;
@Test(singleThreaded = true)
public class TestRenameTableOnFragileFileSystem
{
private static final MetadataManager METADATA = MetadataManager.createTestMetadataManager();
private static final FunctionAndTypeManager FUNCTION_AND_TYPE_MANAGER = METADATA.getFunctionAndTypeManager();
private static final StandardFunctionResolution FUNCTION_RESOLUTION = new FunctionResolution(METADATA.getFunctionAndTypeManager().getFunctionAndTypeResolver());
private static final RowExpressionService ROW_EXPRESSION_SERVICE = new RowExpressionService()
{
@Override
public DomainTranslator getDomainTranslator()
{
return new RowExpressionDomainTranslator(METADATA);
}
@Override
public ExpressionOptimizer getExpressionOptimizer(ConnectorSession session)
{
return new RowExpressionOptimizer(METADATA);
}
@Override
public PredicateCompiler getPredicateCompiler()
{
return new RowExpressionPredicateCompiler(METADATA);
}
@Override
public DeterminismEvaluator getDeterminismEvaluator()
{
return new RowExpressionDeterminismEvaluator(METADATA);
}
@Override
public String formatRowExpression(ConnectorSession session, RowExpression expression)
{
return new RowExpressionFormatter(METADATA.getFunctionAndTypeManager()).formatRowExpression(session, expression);
}
};
private static final ExpressionOptimizerProvider EXPRESSION_OPTIMIZER_PROVIDER = new InMemoryExpressionOptimizerProvider(METADATA);
private static final FilterStatsCalculatorService FILTER_STATS_CALCULATOR_SERVICE = new ConnectorFilterStatsCalculatorService(
new FilterStatsCalculator(METADATA, new ScalarStatsCalculator(METADATA, EXPRESSION_OPTIMIZER_PROVIDER), new StatsNormalizer()));
private static final JsonCodec<DatabaseMetadata> DATABASE_CODEC = jsonCodec(DatabaseMetadata.class);
private static final JsonCodec<TableMetadata> TABLE_CODEC = jsonCodec(TableMetadata.class);
private static final DatabaseMetadata databaseMetadata = new DatabaseMetadata(
"owner0",
USER,
Optional.empty(),
ImmutableMap.of());
private static final TableMetadata tableMetadata = new TableMetadata(
"owner0",
MANAGED_TABLE,
ImmutableList.of(column("col1"), column("col2")),
ImmutableList.of(column("part1")),
ImmutableMap.of(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE),
fromHiveStorageFormat(HiveStorageFormat.PARQUET),
Optional.empty(),
ImmutableMap.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of());
private static final ConnectorSession connectorSession = new TestingConnectorSession(
new IcebergSessionProperties(
new IcebergConfig(),
new ParquetFileWriterConfig(),
new OrcFileWriterConfig(),
new CacheConfig(),
Optional.empty()).getSessionProperties());
private static final String catalogDirectory = createTempDir().toURI().toString();
private static final String originSchemaName = "origin_schema_name";
private static final String newSchemaName = "new_schema_name";
private static final String originTableName = "origin_table_name";
private static final String newTableName = "new_table_name";
private static final String originSchemaDirPath = String.format("%s/%s", catalogDirectory, originSchemaName);
private static final String originSchemaMetadataPath = String.format("%s/%s/%s", catalogDirectory, originSchemaName, ".prestoSchema");
private static final String newSchemaDirPath = String.format("%s/%s", catalogDirectory, newSchemaName);
private static final String newSchemaMetadataPath = String.format("%s/%s/%s", catalogDirectory, newSchemaName, ".prestoSchema");
private static final String originTableMetadataPath = String.format("%s/%s/%s/%s", catalogDirectory, originSchemaName, originTableName, ".prestoSchema");
private static final String newTableMetadataPath = String.format("%s/%s/%s/%s", catalogDirectory, newSchemaName, newTableName, ".prestoSchema");
private static final String originTablePermissionDirPath = String.format("%s/%s/%s/%s", catalogDirectory, originSchemaName, originTableName, ".prestoPermissions");
private static final String originTablePermissionFilePath = String.format("%s/%s/%s/%s/%s", catalogDirectory, originSchemaName, originTableName, ".prestoPermissions", "testFile");
private static final String newTablePermissionDirPath = String.format("%s/%s/%s/%s", catalogDirectory, newSchemaName, newTableName, ".prestoPermissions");
private static final String newTablePermissionFilePath = String.format("%s/%s/%s/%s/%s", catalogDirectory, newSchemaName, newTableName, ".prestoPermissions", "testFile");
IcebergTableHandle icebergTableHandle = new IcebergTableHandle(originSchemaName,
new IcebergTableName(originTableName, IcebergTableType.DATA, Optional.empty(), Optional.empty()),
false,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
ImmutableList.of());
@Test
public void testRenameTableSucceed()
throws Exception
{
testRenameTableWithFailSignalAndValidation(FailSignal.NONE,
icebergHiveMetadata -> {
try {
icebergHiveMetadata.renameTable(connectorSession, icebergTableHandle, new SchemaTableName(newSchemaName, newTableName));
}
catch (Exception e) {
fail("Rename table should not fail.", e);
}
},
(icebergHiveMetadata, fileSystem) -> {
List<SchemaTableName> schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(newSchemaName));
assertTrue(schemaTableNames.contains(new SchemaTableName(newSchemaName, newTableName)));
schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(newSchemaName));
assertFalse(schemaTableNames.contains(new SchemaTableName(originSchemaName, originTableName)));
assertTrue(fileSystem.exists(new Path(newTableMetadataPath)));
assertTrue(fileSystem.exists(new Path(newTablePermissionDirPath)));
assertTrue(fileSystem.exists(new Path(newTablePermissionFilePath)));
assertFalse(fileSystem.exists(new Path(originTableMetadataPath)));
assertFalse(fileSystem.exists(new Path(originTablePermissionDirPath)));
assertFalse(fileSystem.exists(new Path(originTablePermissionFilePath)));
});
}
@Test
public void testRenameTableSucceedWithDeleteRedundantPermissionFileFails()
throws Exception
{
testRenameTableWithFailSignalAndValidation(FailSignal.DELETE,
icebergHiveMetadata -> {
try {
icebergHiveMetadata.renameTable(connectorSession, icebergTableHandle, new SchemaTableName(newSchemaName, newTableName));
}
catch (Exception e) {
fail("Rename table should not fail", e);
}
},
(icebergHiveMetadata, fileSystem) -> {
List<SchemaTableName> schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(originSchemaName));
assertFalse(schemaTableNames.contains(new SchemaTableName(originSchemaName, originTableName)));
schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(newSchemaName));
assertTrue(schemaTableNames.contains(new SchemaTableName(newSchemaName, newTableName)));
assertTrue(fileSystem.exists(new Path(newTableMetadataPath)));
assertTrue(fileSystem.exists(new Path(newTablePermissionDirPath)));
assertTrue(fileSystem.exists(new Path(newTablePermissionFilePath)));
assertFalse(fileSystem.exists(new Path(originTableMetadataPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionDirPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionFilePath)));
});
}
@Test
public void testRenameTableFailCausedByCopyPermissionFile()
throws Exception
{
testRenameTableWithFailSignalAndValidation(FailSignal.MKDIRS,
icebergHiveMetadata -> {
try {
icebergHiveMetadata.renameTable(connectorSession, icebergTableHandle, new SchemaTableName(newSchemaName, newTableName));
fail("Rename table should fail here");
}
catch (Exception e) {
assertTrue(e.getMessage().startsWith("Could not rename table. Failed to copy directory: "));
}
},
(icebergHiveMetadata, fileSystem) -> {
// The same as before
List<SchemaTableName> schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(originSchemaName));
assertTrue(schemaTableNames.contains(new SchemaTableName(originSchemaName, originTableName)));
schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(newSchemaName));
assertFalse(schemaTableNames.contains(new SchemaTableName(newSchemaName, newTableName)));
assertFalse(fileSystem.exists(new Path(newTableMetadataPath)));
assertFalse(fileSystem.exists(new Path(newTablePermissionFilePath)));
assertTrue(fileSystem.exists(new Path(originTableMetadataPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionDirPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionFilePath)));
});
}
@Test
public void testRenameTableFailCausedByRenameTableSchemaFile()
throws Exception
{
testRenameTableWithFailSignalAndValidation(FailSignal.RENAME,
icebergHiveMetadata -> {
try {
icebergHiveMetadata.renameTable(connectorSession, icebergTableHandle, new SchemaTableName(newSchemaName, newTableName));
fail("Rename table should fail here.");
}
catch (Exception e) {
assertTrue(e.getMessage().startsWith("Could not rename table. Failed to rename file "));
}
},
(icebergHiveMetadata, fileSystem) -> {
// The same as before
List<SchemaTableName> schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(originSchemaName));
assertTrue(schemaTableNames.contains(new SchemaTableName(originSchemaName, originTableName)));
schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(newSchemaName));
assertFalse(schemaTableNames.contains(new SchemaTableName(newSchemaName, newTableName)));
assertFalse(fileSystem.exists(new Path(newTableMetadataPath)));
assertFalse(fileSystem.exists(new Path(newTablePermissionFilePath)));
assertTrue(fileSystem.exists(new Path(originTableMetadataPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionDirPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionFilePath)));
});
}
private void createInitFiles(ExtendedFileSystem fileSystem)
throws IOException
{
createFile(fileSystem, new Path(originSchemaMetadataPath), DATABASE_CODEC.toBytes(databaseMetadata));
createFile(fileSystem, new Path(newSchemaMetadataPath), DATABASE_CODEC.toBytes(databaseMetadata));
createFile(fileSystem, new Path(originTableMetadataPath), TABLE_CODEC.toBytes(tableMetadata));
createFile(fileSystem, new Path(originTablePermissionFilePath), new byte[128]);
}
private void checkInitFileSate(ExtendedFileSystem fileSystem)
throws IOException
{
assertFalse(fileSystem.exists(new Path(newTableMetadataPath)));
assertFalse(fileSystem.exists(new Path(newTablePermissionDirPath)));
assertFalse(fileSystem.exists(new Path(newTablePermissionFilePath)));
assertTrue(fileSystem.exists(new Path(originTableMetadataPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionDirPath)));
assertTrue(fileSystem.exists(new Path(originTablePermissionFilePath)));
}
private void testRenameTableWithFailSignalAndValidation(FailSignal failSignal, RenameLogic renameLogic, ValidationLogic validationLogic)
throws IOException
{
FileHiveMetastoreConfig config = createFileHiveMetastoreConfig();
TestingHdfsEnvironment hdfsEnvironment = getTestingHdfsEnvironment();
IcebergFileHiveMetastore metastore = new IcebergFileHiveMetastore(hdfsEnvironment, config);
IcebergHiveMetadata icebergHiveMetadata = (IcebergHiveMetadata) getIcebergHiveMetadata(metastore);
ExtendedFileSystem fileSystem = hdfsEnvironment.getFileSystem(connectorSession.getUser(), new Path(originSchemaMetadataPath), new Configuration());
try {
createInitFiles(fileSystem);
List<SchemaTableName> schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(originSchemaName));
assertTrue(schemaTableNames.contains(new SchemaTableName(originSchemaName, originTableName)));
schemaTableNames = icebergHiveMetadata.listTables(connectorSession, Optional.of(newSchemaName));
assertFalse(schemaTableNames.contains(new SchemaTableName(newSchemaName, newTableName)));
checkInitFileSate(fileSystem);
if (failSignal != null && !failSignal.equals(FailSignal.NONE)) {
hdfsEnvironment.setFailSignal(failSignal);
}
renameLogic.rename(icebergHiveMetadata);
validationLogic.validate(icebergHiveMetadata, fileSystem);
}
finally {
if (failSignal != null && failSignal.equals(FailSignal.DELETE)) {
hdfsEnvironment.setFailSignal(FailSignal.NONE);
}
fileSystem.delete(new Path(originSchemaDirPath), true);
fileSystem.delete(new Path(newSchemaDirPath), true);
}
}
private void createFile(FileSystem fileSystem, Path path, byte[] content)
throws IOException
{
FSDataOutputStream outputStream = fileSystem.create(path, true, 1024);
outputStream.write(content);
outputStream.flush();
outputStream.close();
}
private TestingHdfsEnvironment getTestingHdfsEnvironment()
{
return new TestingHdfsEnvironment();
}
private FileHiveMetastoreConfig createFileHiveMetastoreConfig()
{
FileHiveMetastoreConfig config = new FileHiveMetastoreConfig();
config.setCatalogDirectory(catalogDirectory);
return config;
}
private static Column column(String name)
{
return new Column(name, HIVE_STRING, Optional.of(name), Optional.empty());
}
private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore)
{
HdfsEnvironment hdfsEnvironment = new TestingHdfsEnvironment();
IcebergHiveMetadataFactory icebergHiveMetadataFactory = new IcebergHiveMetadataFactory(
new IcebergCatalogName("unimportant"),
metastore,
hdfsEnvironment,
FUNCTION_AND_TYPE_MANAGER,
FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE,
jsonCodec(CommitTaskData.class),
new NodeVersion("test_node_v1"),
FILTER_STATS_CALCULATOR_SERVICE,
new IcebergHiveTableOperationsConfig(),
new StatisticsFileCache(CacheBuilder.newBuilder().build()),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024),
new IcebergTableProperties(new IcebergConfig()),
() -> false);
return icebergHiveMetadataFactory.create();
}
private interface RenameLogic
{
void rename(IcebergHiveMetadata icebergHiveMetadata);
}
private interface ValidationLogic
{
void validate(IcebergHiveMetadata icebergHiveMetadata, ExtendedFileSystem fileSystem)
throws IOException;
}
private static class TestingHdfsEnvironment
extends HdfsEnvironment
{
private final AtomicReference<FailSignal> failSignal = new AtomicReference<>(FailSignal.NONE);
public TestingHdfsEnvironment()
{
super(
new HiveHdfsConfiguration(
new HdfsConfigurationInitializer(new HiveClientConfig(), new MetastoreClientConfig()),
ImmutableSet.of(),
new HiveClientConfig()),
new MetastoreClientConfig(),
new NoHdfsAuthentication());
}
@Override
public ExtendedFileSystem getFileSystem(String user, Path path, Configuration configuration)
{
return new TestingDelegateFileSystem(configuration, failSignal);
}
public void setFailSignal(FailSignal signal)
{
this.failSignal.set(signal);
}
}
private enum FailSignal
{
MKDIRS,
RENAME,
DELETE,
NONE,
}
private static class TestingDelegateFileSystem
extends ExtendedFileSystem
{
Configuration configuration;
HadoopExtendedFileSystem delegate;
private final AtomicReference<FailSignal> failSignal;
public TestingDelegateFileSystem(Configuration configuration, AtomicReference<FailSignal> failSignal)
{
this.configuration = configuration;
this.failSignal = failSignal;
LocalFileSystem localFileSystem = new LocalFileSystem();
try {
localFileSystem.initialize(URI.create("file:///"), configuration);
}
catch (IOException e) {
throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Fail to initialize LocalFileSystem");
}
delegate = new HadoopExtendedFileSystem(localFileSystem);
}
@Override
public URI getUri()
{
return delegate.getUri();
}
@Override
public FSDataInputStream open(Path f, int bufferSize)
throws IOException
{
return delegate.open(f, bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
throws IOException
{
return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException
{
return delegate.append(f, bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst)
throws IOException
{
if (failSignal.get() == FailSignal.RENAME) {
return false;
}
return delegate.rename(src, dst);
}
@Override
public boolean delete(Path f, boolean recursive)
throws IOException
{
if (failSignal.get() == FailSignal.DELETE) {
return false;
}
return delegate.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f)
throws FileNotFoundException, IOException
{
return delegate.listStatus(f);
}
@Override
public void setWorkingDirectory(Path newDir)
{
delegate.setWorkingDirectory(newDir);
}
@Override
public Path getWorkingDirectory()
{
return delegate.getWorkingDirectory();
}
@Override
public boolean mkdirs(Path f, FsPermission permission)
throws IOException
{
if (this.failSignal.get() == FailSignal.MKDIRS) {
return false;
}
return delegate.mkdirs(f, permission);
}
@Override
public FileStatus getFileStatus(Path f)
throws IOException
{
return delegate.getFileStatus(f);
}
@Override
public Configuration getConf()
{
return this.configuration;
}
}
}