TestRemoveOrphanFilesProcedureBase.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.Session;
import com.facebook.presto.Session.SessionBuilder;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.iceberg.CatalogType;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergQueryRunner;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.Table;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;

import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.iceberg.IcebergUtil.dataLocation;
import static com.facebook.presto.iceberg.IcebergUtil.metadataLocation;
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.getFileSystem;
import static com.google.common.io.Files.createTempDir;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public abstract class TestRemoveOrphanFilesProcedureBase
        extends AbstractTestQueryFramework
{
    public static final String ICEBERG_CATALOG = "iceberg";
    public static final String TEST_SCHEMA = "tpch";

    private final CatalogType catalogType;
    private final Map<String, String> extraConnectorProperties;

    protected TestRemoveOrphanFilesProcedureBase(CatalogType catalogType, Map<String, String> extraConnectorProperties)
    {
        this.catalogType = requireNonNull(catalogType, "catalogType is null");
        this.extraConnectorProperties = requireNonNull(extraConnectorProperties, "extraConnectorProperties is null");
    }

    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return IcebergQueryRunner.builder()
                .setCatalogType(catalogType)
                .setExtraConnectorProperties(extraConnectorProperties)
                .build().getQueryRunner();
    }

    @DataProvider(name = "timezones")
    public Object[][] timezones()
    {
        return new Object[][] {
                {"UTC", true},
                {"America/Los_Angeles", true},
                {"Asia/Shanghai", true},
                {"UTC", false}};
    }

    @Test
    public void testRemoveOrphanFilesInEmptyTable()
    {
        String tableName = "test_empty_remove_orphan_files_table";
        Session session = getQueryRunner().getDefaultSession();
        try {
            assertUpdate(format("create table %s (a int, b varchar)", tableName));

            Table table = loadTable(tableName);
            int metadataFilesCountBefore = allMetadataFilesCount(session, table);
            int dataFilesCountBefore = allDataFilesCount(session, table);
            assertUpdate(format("call iceberg.system.remove_orphan_files('%s', '%s')", TEST_SCHEMA, tableName));

            int metadataFilesCountAfter = allMetadataFilesCount(session, table);
            int dataFilesCountAfter = allDataFilesCount(session, table);
            assertEquals(metadataFilesCountBefore, metadataFilesCountAfter);
            assertEquals(dataFilesCountBefore, dataFilesCountAfter);
            assertQuery("select count(*) from " + tableName, "values(0)");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test(dataProvider = "timezones")
    public void testRemoveOrphanFilesInMetadataAndDataFolder(String zoneId, boolean legacyTimestamp)
    {
        String tableName = "test_remove_orphan_files_table";
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        try {
            assertUpdate(session, format("create table %s (a int, b varchar)", tableName));
            assertUpdate(session, format("insert into %s values(1, '1001'), (2, '1002')", tableName), 2);
            assertUpdate(session, format("insert into %s values(3, '1003'), (4, '1004')", tableName), 2);
            assertUpdate(session, format("insert into %s values(5, '1005'), (6, '1006')", tableName), 2);
            assertUpdate(session, format("delete from %s where a between 5 and 6", tableName), 2);
            assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003'), (4, '1004')");

            Table table = loadTable(tableName);
            int metadataFilesCountBefore = allMetadataFilesCount(session, table);
            int dataFilesCountBefore = allDataFilesCount(session, table);

            // Generate 3 files in iceberg table data location
            assertTrue(generateFile(session, TEST_SCHEMA, tableName, dataLocation(table), "file_1.test"));
            assertTrue(generateFile(session, TEST_SCHEMA, tableName, dataLocation(table), "file_2.test"));
            assertTrue(generateFile(session, TEST_SCHEMA, tableName, dataLocation(table), "file_3.test"));

            // Generate 2 files in iceberg table metadata location
            assertTrue(generateFile(session, TEST_SCHEMA, tableName, metadataLocation(table), "file_4.test"));
            assertTrue(generateFile(session, TEST_SCHEMA, tableName, metadataLocation(table), "file_5.test"));

            int metadataFilesCountMiddle = allMetadataFilesCount(session, table);
            int dataFilesCountMiddle = allDataFilesCount(session, table);

            // Remove all orphan files older than now
            waitUntilAfter(System.currentTimeMillis());
            String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId);
            assertUpdate(session, format("call iceberg.system.remove_orphan_files('%s', '%s', timestamp '%s')", TEST_SCHEMA, tableName, formattedDateTime));

            int metadataFilesCountAfter = allMetadataFilesCount(session, table);
            int dataFilesCountAfter = allDataFilesCount(session, table);

            assertEquals(metadataFilesCountBefore, metadataFilesCountAfter);
            assertEquals(dataFilesCountBefore, dataFilesCountAfter);

            assertEquals(metadataFilesCountBefore + 2, metadataFilesCountMiddle);
            assertEquals(dataFilesCountBefore + 3, dataFilesCountMiddle);

            assertEquals(metadataFilesCountMiddle, metadataFilesCountAfter + 2);
            assertEquals(dataFilesCountMiddle, dataFilesCountAfter + 3);
            assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003'), (4, '1004')");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test(dataProvider = "timezones")
    public void testRemoveOrphanFilesWithNonDefaultMetadataPath(String zoneId, boolean legacyTimestamp)
    {
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        String tempTableName = "temp_test_table_with_specified_metadata_path";
        String tableName = "test_table_with_specified_metadata_path";
        String tableTargetPath = createTempDir().toURI().toString();
        File metadataDir = new File(createTempDir().getAbsolutePath(), "metadata");
        metadataDir.mkdir();
        String specifiedMetadataPath = metadataDir.getAbsolutePath();

        // Create an iceberg table using specified table properties
        Table table = createTable(tempTableName, tableTargetPath,
                ImmutableMap.of(WRITE_METADATA_LOCATION, specifiedMetadataPath,
                                DELETE_MODE, "merge-on-read"));
        assertNotNull(table.properties().get(WRITE_METADATA_LOCATION));
        assertEquals(table.properties().get(WRITE_METADATA_LOCATION), specifiedMetadataPath);

        assertUpdate(session, format("CALL system.register_table('%s', '%s', '%s')", TEST_SCHEMA, tableName, metadataLocation(table)));
        assertUpdate(session, "insert into " + tableName + " values(1, '1001'), (2, '1002')", 2);
        assertUpdate(session, "insert into " + tableName + " values(3, '1003'), (4, '1004')", 2);
        assertUpdate(session, "delete from " + tableName + " where a between 3 and 4", 2);
        assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')");

        int metadataFilesCountBefore = allMetadataFilesCount(session, table);
        int dataFilesCountBefore = allDataFilesCount(session, table);
        assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedMetadataPath, "metadata_file_1.test"));
        assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedMetadataPath, "metadata_file_2.test"));

        int metadataFilesCountMiddle = allMetadataFilesCount(session, table);
        int dataFilesCountMiddle = allDataFilesCount(session, table);

        // Remove all orphan files older than now
        waitUntilAfter(System.currentTimeMillis());
        String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId);
        assertUpdate(session, format("call system.remove_orphan_files('%s', '%s', timestamp '%s')", TEST_SCHEMA, tableName, formattedDateTime));

        int metadataFilesCountAfter = allMetadataFilesCount(session, table);
        int dataFilesCountAfter = allDataFilesCount(session, table);

        assertEquals(metadataFilesCountBefore, metadataFilesCountAfter);
        assertEquals(dataFilesCountBefore, dataFilesCountAfter);

        assertEquals(metadataFilesCountBefore + 2, metadataFilesCountMiddle);
        assertEquals(dataFilesCountBefore, dataFilesCountMiddle);

        assertEquals(metadataFilesCountMiddle, metadataFilesCountAfter + 2);
        assertEquals(dataFilesCountMiddle, dataFilesCountAfter);
        assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')");

        assertUpdate(format("CALL system.unregister_table('%s', '%s')", TEST_SCHEMA, tableName));
        dropTableFromCatalog(tempTableName);
    }

    @Test(dataProvider = "timezones")
    public void testRemoveOrphanFilesWithNonDefaultDataPath(String zoneId, boolean legacyTimestamp)
    {
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        String tempTableName = "temp_test_table_with_specified_data_path";
        String tableName = "test_table_with_specified_data_path";
        String tableTargetPath = createTempDir().toURI().toString();
        File dataDir = new File(createTempDir().getAbsolutePath(), "metadata");
        dataDir.mkdir();
        String specifiedDataPath = dataDir.getAbsolutePath();

        // Create an iceberg table using specified table properties
        Table table = createTable(tempTableName, tableTargetPath,
                ImmutableMap.of(WRITE_DATA_LOCATION, specifiedDataPath,
                                DELETE_MODE, "merge-on-read"));
        assertNotNull(table.properties().get(WRITE_DATA_LOCATION));
        assertEquals(table.properties().get(WRITE_DATA_LOCATION), specifiedDataPath);

        assertUpdate(session, format("CALL system.register_table('%s', '%s', '%s')", TEST_SCHEMA, tableName, metadataLocation(table)));
        assertUpdate(session, "insert into " + tableName + " values(1, '1001'), (2, '1002')", 2);
        assertUpdate(session, "insert into " + tableName + " values(3, '1003'), (4, '1004')", 2);
        assertUpdate(session, "delete from " + tableName + " where a between 3 and 4", 2);
        assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')");

        int metadataFilesCountBefore = allMetadataFilesCount(session, table);
        int dataFilesCountBefore = allDataFilesCount(session, table);
        assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedDataPath, "metadata_file_1.test"));
        assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedDataPath, "metadata_file_2.test"));

        int metadataFilesCountMiddle = allMetadataFilesCount(session, table);
        int dataFilesCountMiddle = allDataFilesCount(session, table);

        // Remove all orphan files older than now
        waitUntilAfter(System.currentTimeMillis());
        String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId);
        assertUpdate(session, format("call system.remove_orphan_files('%s', '%s', timestamp '%s')", TEST_SCHEMA, tableName, formattedDateTime));

        int metadataFilesCountAfter = allMetadataFilesCount(session, table);
        int dataFilesCountAfter = allDataFilesCount(session, table);

        assertEquals(metadataFilesCountBefore, metadataFilesCountAfter);
        assertEquals(dataFilesCountBefore, dataFilesCountAfter);

        assertEquals(metadataFilesCountBefore, metadataFilesCountMiddle);
        assertEquals(dataFilesCountBefore + 2, dataFilesCountMiddle);

        assertEquals(metadataFilesCountMiddle, metadataFilesCountAfter);
        assertEquals(dataFilesCountMiddle, dataFilesCountAfter + 2);
        assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')");

        assertUpdate(format("CALL system.unregister_table('%s', '%s')", TEST_SCHEMA, tableName));
        dropTableFromCatalog(tempTableName);
    }

    abstract Table createTable(String tableName, String targetPath, Map<String, String> tableProperties);

    abstract Table loadTable(String tableName);

    abstract void dropTableFromCatalog(String tableName);

    private void dropTable(String tableName)
    {
        assertQuerySucceeds("DROP TABLE IF EXISTS " + TEST_SCHEMA + "." + tableName);
    }

    private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)
    {
        SessionBuilder sessionBuilder = Session.builder(getSession())
                .setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp));
        if (legacyTimestamp) {
            sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId));
        }
        return sessionBuilder.build();
    }

    protected File getCatalogDirectory(CatalogType type)
    {
        Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
        Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, type.name(), new IcebergConfig().getFileFormat(), false);
        return catalogDirectory.toFile();
    }

    protected static HdfsEnvironment getHdfsEnvironment()
    {
        HiveClientConfig hiveClientConfig = new HiveClientConfig();
        MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
        HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(
                new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig),
                ImmutableSet.of(),
                hiveClientConfig);
        return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
    }

    private static long waitUntilAfter(long snapshotTimeMillis)
    {
        long currentTimeMillis = System.currentTimeMillis();
        assertTrue(snapshotTimeMillis - currentTimeMillis <= 10,
                format("Snapshot time %s is greater than the current time %s by more than 10ms", snapshotTimeMillis, currentTimeMillis));

        while (currentTimeMillis <= snapshotTimeMillis) {
            currentTimeMillis = System.currentTimeMillis();
        }
        return currentTimeMillis;
    }

    private static int allMetadataFilesCount(Session session, Table table)
    {
        return allFilesCount(session.toConnectorSession(), TEST_SCHEMA, table.name(), metadataLocation(table));
    }

    private static int allDataFilesCount(Session session, Table table)
    {
        return allFilesCount(session.toConnectorSession(), TEST_SCHEMA, table.name(), dataLocation(table));
    }

    private static int allFilesCount(ConnectorSession session, String schema, String table, String folderFullPath)
    {
        try {
            org.apache.hadoop.fs.Path fullPath = new org.apache.hadoop.fs.Path(folderFullPath);
            FileSystem fileSystem = getFileSystem(session, getHdfsEnvironment(), new SchemaTableName(schema, table), fullPath);
            RemoteIterator<LocatedFileStatus> allFiles = fileSystem.listFiles(fullPath, true);
            int count = 0;
            while (allFiles.hasNext()) {
                allFiles.next();
                count++;
            }
            return count;
        }
        catch (Exception e) {
            return 0;
        }
    }

    private static boolean generateFile(Session session, String schema, String table, String path, String fileName)
    {
        try {
            ConnectorSession connectorSession = session.toConnectorSession();
            FileSystem fileSystem = getFileSystem(connectorSession, getHdfsEnvironment(), new SchemaTableName(schema, table), new org.apache.hadoop.fs.Path(path));
            fileSystem.createNewFile(new org.apache.hadoop.fs.Path(path, fileName));
        }
        catch (IOException e) {
            return false;
        }
        return true;
    }

    private static String getTimestampString(long timeMillsUtc, String zoneId)
    {
        Instant instant = Instant.ofEpochMilli(timeMillsUtc);
        LocalDateTime localDateTime = instant
                .atZone(ZoneId.of(zoneId))
                .toLocalDateTime();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        formatter = formatter.withZone(ZoneId.of(zoneId));
        return localDateTime.format(formatter);
    }
}