TestIcebergTableChangelog.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg;

import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.stream.Collectors;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;

public class TestIcebergTableChangelog
        extends AbstractTestQueryFramework
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return IcebergQueryRunner.builder()
                .setCatalogType(HADOOP)
                .build()
                .getQueryRunner();
    }

    private long[] snapshots = new long[0];

    @Override
    @BeforeClass
    public void init()
            throws Exception
    {
        super.init();
        assertQuerySucceeds("CREATE TABLE ctas_orders as SELECT * FROM orders LIMIT 10");
        assertQuerySucceeds("TRUNCATE TABLE ctas_orders");
        assertQuerySucceeds("INSERT INTO ctas_orders SELECT * FROM orders LIMIT 20");
        assertQuerySucceeds("INSERT INTO ctas_orders SELECT * FROM orders LIMIT 30");
        snapshots = Lists.reverse(
                        getQueryRunner().execute("SELECT snapshot_id FROM \"ctas_orders$snapshots\" ORDER BY committed_at").getOnlyColumn()
                                .collect(Collectors.toList()))
                // reverse and skip the latest snapshot ID since it's invalid
                // to get the changelog for the current snapshot
                .stream().skip(1)
                .mapToLong(Long.class::cast)
                .toArray();
    }

    @Test
    public void testSchema()
    {
        assertQuery(String.format("SHOW COLUMNS FROM \"ctas_orders@%d$changelog\"", snapshots[0]),
                "VALUES" +
                        "('operation', 'varchar', '', '')," +
                        "('ordinal', 'bigint', '', '')," +
                        "('snapshotid', 'bigint', '', '')," +
                        "('rowdata', 'row(\"orderkey\" bigint, \"custkey\" bigint, \"orderstatus\" varchar, \"totalprice\" double, \"orderdate\" date, \"orderpriority\" varchar, \"clerk\" varchar, \"shippriority\" integer, \"comment\" varchar)', '', '')");
    }

    @Test
    public void testBasicSelect()
    {
        for (long id : snapshots) {
            assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\"", id));
        }
    }

    @Test
    public void testNoSnapSpecified()
    {
        assertQuerySucceeds("SELECT * FROM \"ctas_orders$changelog\"");
    }

    @Test
    public void testSelectSingleColumn()
    {
        assertQuerySucceeds(String.format("SELECT operation FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testSelectMultiColumn()
    {
        assertQuerySucceeds(String.format("SELECT operation, ordinal FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testSelectMultiColumnReorder()
    {
        assertQuerySucceeds(String.format("SELECT rowdata, rowdata.orderkey, operation FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testSelectPredicatePrimaryKey()
    {
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE rowdata.orderkey > 9000", snapshots[0]));
    }

    @Test
    public void testSelectPredicateStaticColumns()
    {
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE ordinal != 0", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE ordinal = 0", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE snapshotid = 0", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE snapshotid != 0", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE operation != 'INSERT'", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE operation = 'INSERT'", snapshots[0]));
    }

    @Test
    public void testVerifyProjectAndFilterOutput()
    {
        assertQuerySucceeds("CREATE TABLE test_changelog (a int, b int)  WITH (partitioning = ARRAY['a'], delete_mode = 'copy-on-write')");
        assertQuerySucceeds("INSERT INTO test_changelog VALUES (1, 2)");
        assertQuerySucceeds("INSERT INTO test_changelog VALUES (2, 2)");
        assertQuerySucceeds("DELETE FROM test_changelog WHERE a = 2");
        long[] testSnapshots = Lists.reverse(
                        getQueryRunner().execute("SELECT snapshot_id FROM \"test_changelog$snapshots\" ORDER BY committed_at desc").getOnlyColumn()
                                .collect(Collectors.toList()))
                // skip the earliest snapshot since the changelog starts from there.
                .stream().skip(1)
                .mapToLong(Long.class::cast)
                .toArray();
        assertQuery("SELECT snapshotid FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES " + Joiner.on(", ").join(Arrays.stream(testSnapshots).iterator()));
        // Verify correct projections for single columns
        assertQuery("SELECT ordinal FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES 0, 1");
        assertQuery("SELECT operation FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES 'INSERT', 'DELETE'");
        assertQuery("SELECT rowdata.a, rowdata.b FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES (2, 2), (2, 2)"); // inserted then deleted
        // Verify correct filters results on filters
        assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE ordinal = 0 order by ordinal asc", "VALUES (0, 'INSERT')");
        assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE ordinal = 1 order by ordinal asc", "VALUES (1, 'DELETE')");
        assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE operation = 'INSERT' order by ordinal asc", "VALUES (0, 'INSERT')");
        assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE operation = 'DELETE' order by ordinal asc", "VALUES (1, 'DELETE')");
        assertQueryReturnsEmptyResult("SELECT * FROM \"test_changelog$changelog\" WHERE operation = 'AAABBBCCC'");
        assertQuery(String.format("SELECT ordinal FROM \"test_changelog$changelog\" WHERE snapshotid = %d order by ordinal asc", testSnapshots[0]), "VALUES 0");
        assertQuery(String.format("SELECT ordinal FROM \"test_changelog$changelog\" WHERE snapshotid = %d order by ordinal asc", testSnapshots[1]), "VALUES 1");
        assertQuery("SELECT * FROM \"test_changelog$changelog\" WHERE rowdata.a = 2 order by ordinal asc", String.format("VALUES ('INSERT', 0, %d, (2, 2)), ('DELETE', 1, %d, (2, 2))", testSnapshots[0], testSnapshots[1]));
    }

    @Test
    public void testSelectCount()
    {
        assertQuerySucceeds(String.format("SELECT count(*) FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testCountGroupByAggregation()
    {
        assertQuerySucceeds(String.format("SELECT count(*) FROM \"ctas_orders@%d$changelog\" GROUP BY ordinal", snapshots[0]));
    }

    @Test
    public void testPrimaryKeyProjection()
    {
        assertQuerySucceeds(String.format("SELECT rowdata.orderkey FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testBasicAggregation()
    {
        assertQuerySucceeds(String.format("SELECT approx_distinct(rowdata.orderkey) FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testStaticColumnProjections()
    {
        assertQuerySucceeds(String.format("SELECT operation, ordinal, snapshotid FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT snapshotid, ordinal, operation FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT ordinal, snapshotid FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT operation, snapshotid FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT snapshotid FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT ordinal FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT operation FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testCombinedColumnProjections()
    {
        assertQuerySucceeds(String.format("SELECT rowdata.orderkey, operation FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT rowdata.orderkey, ordinal FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
        assertQuerySucceeds(String.format("SELECT rowdata.orderkey, snapshotid FROM \"ctas_orders@%d$changelog\"", snapshots[0]));
    }

    @Test
    public void testJoinOnSnapshotTimestamp()
    {
        assertQuerySucceeds(String.format("SELECT snap.committed_at, change.operation, rowdata.orderkey, ordinal" +
                " FROM \"ctas_orders$snapshots\" as snap" +
                " JOIN \"ctas_orders@%d$changelog\" as change" +
                " ON change.snapshotid = snap.snapshot_id" +
                " ORDER BY snap.committed_at asc", snapshots[0]));
    }

    @Test
    public void testRightOuterJoin()
    {
        assertQuerySucceeds(String.format("SELECT orderkey, operation, ordinal, snapshotid" +
                "   FROM orders as sample" +
                "   RIGHT OUTER JOIN \"ctas_orders@%d$changelog\" as cl" +
                "   ON cl.rowdata.orderkey = sample.orderkey", snapshots[0]));
    }

    @Test
    public void testDisallowedDropColumn()
    {
        assertQueryFails(String.format("ALTER TABLE \"ctas_orders@%d$changelog\" DROP COLUMN ordinal", snapshots[0]), "only the data table can have columns dropped");
    }

    @Test
    public void testDisallowedAddColumn()
    {
        assertQueryFails(String.format("ALTER TABLE \"ctas_orders@%d$changelog\" ADD COLUMN orderkey_added int", snapshots[0]), "only the data table can have columns added");
    }

    @Test
    public void testDisallowedRenameColumn()
    {
        assertQueryFails(String.format("ALTER TABLE \"ctas_orders@%d$changelog\" RENAME COLUMN ordinal TO ordinal_renamed", snapshots[0]), "only the data table can have columns renamed");
    }

    @Test
    public void testDisallowedDropTable()
    {
        assertQueryFails(String.format("DROP TABLE \"ctas_orders@%d$changelog\"", snapshots[0]), "only the data table can be dropped");
    }

    @Test
    public void testChangelogWithScemaChange()
    {
        assertQuerySucceeds("CREATE TABLE changelog_alter (c int)");
        assertQuerySucceeds("INSERT INTO changelog_alter VALUES 0");
        assertQuerySucceeds("INSERT INTO changelog_alter VALUES 1, 2, 3, 4, 5");
        assertQuerySucceeds("ALTER TABLE changelog_alter ADD COLUMN d int");
        assertQuerySucceeds("TRUNCATE TABLE changelog_alter");
        assertQuerySucceeds("ALTER TABLE changelog_alter DROP COLUMN c");
        assertQuerySucceeds("INSERT INTO changelog_alter VALUES 1, 2, 3, 4, 5");
        assertQuerySucceeds("SELECT * FROM \"changelog_alter$changelog\"");
    }

    @Test
    public void testChangelogQueryResults()
    {
        assertQuerySucceeds("CREATE TABLE changelog_results (c int)");
        assertQuerySucceeds("INSERT INTO changelog_results VALUES 0");
        assertQuerySucceeds("INSERT INTO changelog_results VALUES 1, 2, 3, 4, 5");
        assertQuerySucceeds("TRUNCATE TABLE changelog_results");
        assertQuerySucceeds("INSERT INTO changelog_results VALUES 1, 2, 3, 4, 5");

        long insert0Snapshot = getSnapshot(0, "changelog_results");
        long insert5ValuesSnapshot = getSnapshot(1, "changelog_results");
        long truncateSnapshot = getSnapshot(2, "changelog_results");
        long insert5AgainSnapshot = getSnapshot(3, "changelog_results");

        // test initial insert
        assertQuery(String.format("SELECT rowdata.c FROM \"changelog_results@%d$changelog@%d\" ORDER BY rowdata.c", insert0Snapshot, insert5ValuesSnapshot),
                "VALUES 1, 2, 3, 4, 5");
        assertQuery(String.format("SELECT ordinal, count(*) FROM \"changelog_results@%d$changelog@%d\" GROUP BY ordinal", insert0Snapshot, insert5ValuesSnapshot),
                "VALUES (0, 5)");
        assertQuery(String.format("SELECT rowdata.c FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'INSERT' ORDER BY rowdata.c", insert0Snapshot, insert5ValuesSnapshot),
                "VALUES 1, 2, 3, 4, 5");

        // test after truncate
        assertQueryReturnsEmptyResult(String.format("SELECT rowdata.c FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'INSERT'", insert5ValuesSnapshot, truncateSnapshot));
        assertQuery(String.format("SELECT rowdata.c FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'DELETE' ORDER BY rowdata.c", insert5ValuesSnapshot, truncateSnapshot),
                "VALUES 0, 1, 2, 3, 4, 5");
        assertQuery(String.format("SELECT ordinal, count(*) FROM \"changelog_results@%d$changelog@%d\" GROUP BY ordinal", insert5ValuesSnapshot, truncateSnapshot),
                "VALUES (0, 6)");

        // test changelog across the insertion and truncate snapshots
        assertQuery(String.format("SELECT count(*) FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'INSERT'", insert0Snapshot, truncateSnapshot),
                "VALUES 5");
        assertQuery(String.format("SELECT count(*) FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'DELETE'", insert0Snapshot, truncateSnapshot),
                "VALUES 6");
        assertQuery(String.format("SELECT ordinal, count(*) FROM \"changelog_results@%d$changelog@%d\" GROUP BY ordinal", insert0Snapshot, truncateSnapshot),
                "VALUES (0, 5), (1, 6)");
        assertQuery(String.format("SELECT rowdata.c FROM \"changelog_results@%d$changelog@%d\" ORDER BY rowdata.c", insert0Snapshot, truncateSnapshot),
                "VALUES 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5");

        // test changelog across delete and 2nd insert
        assertQuery(String.format("SELECT count(*) FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'INSERT'", truncateSnapshot, insert5AgainSnapshot),
                "VALUES 5");
        assertQuery(String.format("SELECT count(*) FROM \"changelog_results@%d$changelog@%d\" WHERE operation = 'DELETE'", truncateSnapshot, insert5AgainSnapshot),
                "VALUES 0");
        assertQuery(String.format("SELECT ordinal, count(*) FROM \"changelog_results@%d$changelog@%d\" GROUP BY ordinal ORDER BY ordinal", truncateSnapshot, insert5AgainSnapshot),
                "VALUES (0, 5)");
        assertQuery(String.format("SELECT rowdata.c FROM \"changelog_results@%d$changelog@%d\" ORDER BY rowdata.c", truncateSnapshot, insert5AgainSnapshot),
                "VALUES 1, 2, 3, 4, 5");

        assertQuerySucceeds("DROP TABLE changelog_results");
    }

    private long getSnapshot(int idx, String tableName)
    {
        return getQueryRunner().execute(String.format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at", tableName)).getOnlyColumn()
                .mapToLong(Long.class::cast)
                .skip(idx).findFirst().getAsLong();
    }
}