TestFastForwardBranchProcedure.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergQueryRunner;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.testng.annotations.Test;

import java.io.File;
import java.nio.file.Path;
import java.util.Map;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static java.lang.String.format;

public class TestFastForwardBranchProcedure
        extends AbstractTestQueryFramework
{
    public static final String TEST_SCHEMA = "tpch";

    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return IcebergQueryRunner.builder()
                .setCatalogType(HADOOP)
                .build()
                .getQueryRunner();
    }

    public void createTable(String tableName)
    {
        assertUpdate("CREATE TABLE IF NOT EXISTS " + tableName + " (id integer, value VARCHAR)");
    }

    public void dropTable(String tableName)
    {
        assertQuerySucceeds("DROP TABLE IF EXISTS " + TEST_SCHEMA + "." + tableName);
    }

    @Test
    public void testFastForwardBranchUsingPositionalArgs()
    {
        String tableName = "fast_forward_table_test";
        createTable(tableName);
        try {
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
            assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);

            Table table = loadTable(tableName);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch").commit();
            assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'c')", 1);
            table.refresh();

            assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch' ORDER BY id", " VALUES (1, 'a'), (2, 'b')");

            String fromBranch = "testBranch";
            String toBranch = "main";
            assertUpdate(format("CALL system.fast_forward('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, fromBranch, toBranch));

            // now testBranch branch should have 3 entries same as main
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch' ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test
    public void testFastForwardBranchUsingNamedArgs()
    {
        String tableName = "fast_forward_table_arg_test";
        createTable(tableName);
        try {
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
            assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);

            Table table = loadTable(tableName);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch").commit();
            assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'c')", 1);
            table.refresh();

            assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch' ORDER BY id", " VALUES (1, 'a'), (2, 'b')");

            String fromBranch = "testBranch";
            String toBranch = "main";
            assertUpdate(format("CALL system.fast_forward(schema => '%s', branch => '%s', to => '%s', table_name => '%s')",
                    TEST_SCHEMA, fromBranch, toBranch, tableName));

            // now testBranch branch should have 3 entries same as main
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch' ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test
    public void testFastForwardWhenTargetIsNotAncestorFails()
    {
        String tableName = "fast_forward_table_fail_test";
        createTable(tableName);
        try {
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
            assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);

            Table table = loadTable(tableName);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch1").commit();
            assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'c')", 1);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch2").commit();
            table.refresh();

            assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch1' ORDER BY id", " VALUES (1, 'a'), (2, 'b')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch2' ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");

            String fromBranch = "testBranch2";
            String toBranch = "testBranch1";
            // this should fail since fromBranch is not ancestor of toBranch
            assertQueryFails(format("CALL system.fast_forward('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, fromBranch, toBranch),
                    "Cannot fast-forward: testBranch2 is not an ancestor of testBranch1");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test
    public void testInvalidFastForwardBranchCases()
    {
        assertQueryFails("CALL system.fast_forward('test_table', branch => 'main', to => 'newBranch')",
                "line 1:1: Named and positional arguments cannot be mixed");
        assertQueryFails("CALL custom.fast_forward('test_table', 'main', 'newBranch')",
                "Procedure not registered: custom.fast_forward");
        assertQueryFails("CALL system.fast_forward('test_table', 'main')",
                "line 1:1: Required procedure argument 'branch' is missing");
        assertQueryFails("CALL system.fast_forward('', 'main', 'newBranch')",
                "line 1:1: Required procedure argument 'to' is missing");
    }

    @Test
    public void testFastForwardNonExistingToRefFails()
    {
        String tableName = "sample_table";
        createTable(tableName);
        try {
            String fromBranch = "main";
            String toBranch = "non_existing_branch";
            assertQueryFails(format("CALL system.fast_forward(branch => '%s', to => '%s', table_name => '%s', schema => '%s')",
                            fromBranch, toBranch, tableName, TEST_SCHEMA),
                    "Ref does not exist: non_existing_branch");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test
    public void testFastForwardNonMain()
    {
        String tableName = "fast_forward_table_nonmain_test";
        createTable(tableName);
        try {
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
            assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);

            Table table = loadTable(tableName);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch1").commit();
            assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'c')", 1);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch2").commit();
            table.refresh();

            assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch1' ORDER BY id", " VALUES (1, 'a'), (2, 'b')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch2' ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");

            String fromBranch = "testBranch1";
            String toBranch = "testBranch2";
            assertUpdate(format("CALL system.fast_forward('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, fromBranch, toBranch));

            // now testBranch1 branch should have 3 entries same as testBranch2
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch1' ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
        }
        finally {
            dropTable(tableName);
        }
    }

    @Test
    public void testFastForwardNonExistingBranch()
    {
        String tableName = "fast_forward_table_non_existing_test";
        createTable(tableName);
        try {
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
            assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1);

            Table table = loadTable(tableName);
            table.refresh();

            table.manageSnapshots().createBranch("testBranch1").commit();
            assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'c')", 1); // main branch here
            table.refresh();

            assertQuery("SELECT * FROM " + tableName + " ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'testBranch1' ORDER BY id", " VALUES (1, 'a'), (2, 'b')");

            String fromBranch = "non_existing_branch"; // non existing branch
            String toBranch = "main";
            assertUpdate(format("CALL system.fast_forward('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, fromBranch, toBranch));

            // New branch non_existing_branch would be created and it should have 3 entries same as main branch
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'non_existing_branch' ORDER BY id", " VALUES (1, 'a'), (2, 'b'), (3, 'c')");

            String fromBranch1 = "non_existing_branch1"; // non existing branch
            String toBranch1 = "testBranch1";
            assertUpdate(format("CALL system.fast_forward('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, fromBranch1, toBranch1));

            // New branch non_existing_branch1 would be created and it should have 2 entries same as testBranch1 branch
            assertQuery("SELECT * FROM " + tableName + " FOR SYSTEM_VERSION AS OF 'non_existing_branch1' ORDER BY id", " VALUES (1, 'a'), (2, 'b')");
        }
        finally {
            dropTable(tableName);
        }
    }

    private Table loadTable(String tableName)
    {
        tableName = normalizeIdentifier(tableName, ICEBERG_CATALOG);
        Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration());
        return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName));
    }

    private Map<String, String> getProperties()
    {
        File metastoreDir = getCatalogDirectory();
        return ImmutableMap.of("warehouse", metastoreDir.toString());
    }

    private File getCatalogDirectory()
    {
        Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
        Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false);
        return catalogDirectory.toFile();
    }
}