TestIcebergTableVersion.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.presto.Session;
import com.facebook.presto.Session.SessionBuilder;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.testng.Assert.assertTrue;
public class TestIcebergTableVersion
extends AbstractTestQueryFramework
{
public static final String schemaName = "test_tt_schema";
public static final String tab1 = "test_table_version_tab1";
public static final String tab2 = "test_table_version_tab2";
public static final String view1 = "test_table_version_view1";
public static final String view2 = "test_table_version_view2";
public static final String view3 = "test_table_version_view3";
public static final String view4 = "test_table_version_view4";
private static final String tableName1 = schemaName + "." + tab1;
private static final String tableName2 = schemaName + "." + tab2;
private static final String tableName3 = schemaName + "." + "tab1_version1";
private static final String tableName4 = schemaName + "." + "tab2_version1";
private static final String tableName5 = schemaName + "." + "tab1_version2";
private static final String tableName6 = schemaName + "." + "tab2_version2";
private static final String viewName1 = schemaName + "." + view1;
private static final String viewName2 = schemaName + "." + view2;
private static final String viewName3 = schemaName + "." + view3;
private static final String viewName4 = schemaName + "." + view4;
private static long tab1VersionId1;
private static long tab1VersionId2;
private static long tab1VersionId3;
private static long tab2VersionId1;
private static long tab2VersionId2;
private static long tab2VersionId3;
private static String tab1Timestamp1;
private static String tab1Timestamp2;
private static String tab1Timestamp3;
private static String tab2Timestamp1;
private static String tab2Timestamp2;
private static String tab2Timestamp3;
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build();
Path dataDirectory = queryRunner.getCoordinator().getDataDirectory();
Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HIVE.name(), new IcebergConfig().getFileFormat(), false);
queryRunner.installPlugin(new IcebergPlugin());
Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.build();
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
return queryRunner;
}
@BeforeClass
public void setUp()
{
assertQuerySucceeds("CREATE SCHEMA " + ICEBERG_CATALOG + "." + schemaName);
assertUpdate("CREATE TABLE " + tableName1 + " (id integer, desc varchar) WITH(partitioning = ARRAY['id'])");
assertUpdate("INSERT INTO " + tableName1 + " VALUES (1,'aaa')", 1);
tab1VersionId1 = getLatestSnapshotId(tab1);
tab1Timestamp1 = getLatestTimestampId(tab1);
assertUpdate("INSERT INTO " + tableName1 + " VALUES (2,'bbb')", 1);
tab1VersionId2 = getLatestSnapshotId(tab1);
tab1Timestamp2 = getLatestTimestampId(tab1);
assertUpdate("INSERT INTO " + tableName1 + " VALUES (3,'ccc')", 1);
tab1VersionId3 = getLatestSnapshotId(tab1);
tab1Timestamp3 = getLatestTimestampId(tab1);
assertUpdate("CREATE TABLE " + tableName2 + " (id integer, desc varchar) WITH(partitioning = ARRAY['id'])");
assertUpdate("INSERT INTO " + tableName2 + " VALUES (1,'xxx')", 1);
tab2VersionId1 = getLatestSnapshotId(tab2);
tab2Timestamp1 = getLatestTimestampId(tab2);
assertUpdate("INSERT INTO " + tableName2 + " VALUES (2,'yyy')", 1);
tab2VersionId2 = getLatestSnapshotId(tab2);
tab2Timestamp2 = getLatestTimestampId(tab2);
assertUpdate("INSERT INTO " + tableName2 + " VALUES (3,'zzz')", 1);
tab2VersionId3 = getLatestSnapshotId(tab2);
tab2Timestamp3 = getLatestTimestampId(tab2);
}
@AfterClass(alwaysRun = true)
public void tearDown()
{
assertQuerySucceeds("DROP VIEW IF EXISTS " + ICEBERG_CATALOG + "." + viewName1);
assertQuerySucceeds("DROP VIEW IF EXISTS " + ICEBERG_CATALOG + "." + viewName2);
assertQuerySucceeds("DROP VIEW IF EXISTS " + ICEBERG_CATALOG + "." + viewName3);
assertQuerySucceeds("DROP VIEW IF EXISTS " + ICEBERG_CATALOG + "." + viewName4);
assertQuerySucceeds("DROP TABLE IF EXISTS " + ICEBERG_CATALOG + "." + tableName1);
assertQuerySucceeds("DROP TABLE IF EXISTS " + ICEBERG_CATALOG + "." + tableName2);
assertQuerySucceeds("DROP TABLE IF EXISTS " + ICEBERG_CATALOG + "." + tableName3);
assertQuerySucceeds("DROP TABLE IF EXISTS " + ICEBERG_CATALOG + "." + tableName4);
assertQuerySucceeds("DROP TABLE IF EXISTS " + ICEBERG_CATALOG + "." + tableName5);
assertQuerySucceeds("DROP TABLE IF EXISTS " + ICEBERG_CATALOG + "." + tableName6);
assertQuerySucceeds("DROP SCHEMA IF EXISTS " + ICEBERG_CATALOG + "." + schemaName);
}
private long getLatestSnapshotId(String tableName)
{
return (long) computeActual("SELECT snapshot_id FROM " + schemaName + "." + "\"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1")
.getOnlyValue();
}
private String getLatestTimestampId(String tableName)
{
return (String) computeActual("SELECT cast(made_current_at as varchar) FROM " + schemaName + "." + "\"" + tableName + "$history\" ORDER BY made_current_at DESC LIMIT 1")
.getOnlyValue();
}
@Test
public void testTableVersionBasic()
{
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId1 + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId3 + " ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId2 + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3 + " ORDER BY 1", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp1 + "'" + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp3 + "'" + " ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "'" + " ORDER BY 1", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF CURRENT_TIMESTAMP ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF NOW() ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF CAST ('" + tab1Timestamp3 + "' AS TIMESTAMP WITH TIME ZONE) ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE CURRENT_TIMESTAMP ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE NOW() ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE CAST ('" + tab1Timestamp3 + "' AS TIMESTAMP WITH TIME ZONE) ORDER BY 1", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " WHERE id = 2 ORDER BY 1", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " WHERE id = 2 ORDER BY 1", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " AS tab1_v2 WHERE id = 2", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " AS tab1_v2 WHERE id = 2", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3 + " WHERE id = 2 ORDER BY 1", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "'" + " WHERE id = 2 ORDER BY 1", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3 + " AS tab1_v2 WHERE id = 2", "VALUES 'bbb'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "'" + " AS tab1_v2 WHERE id = 2", "VALUES 'bbb'");
assertQuery("SELECT SUM(id) FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2, "VALUES 3");
assertQuery("SELECT desc, SUM(id) FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " GROUP BY desc ORDER BY 2", "VALUES ('aaa', 1), ('bbb', 2)");
assertQuery("SELECT MAX(id) FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'", "VALUES 2");
assertQuery("SELECT desc, MAX(id) FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "' GROUP BY desc ORDER BY 2", "VALUES ('aaa', 1), ('bbb', 2)");
}
@Test
public void testTableVersionMisc()
{
// Alias cases - SYSTEM_TIME and SYSTEM_VERSION
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF " + tab1VersionId1 + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF 'main'", "VALUES 3");
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF 'main'" + " ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_TIME AS OF TIMESTAMP " + "'" + tab1Timestamp1 + "'" + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT SUM(id) FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF " + tab1VersionId2, "VALUES 3");
assertQuery("SELECT desc, MAX(id) FROM " + tableName1 + " FOR SYSTEM_TIME AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "' GROUP BY desc ORDER BY 2", "VALUES ('aaa', 1), ('bbb', 2)");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR SYSTEM_VERSION AS OF " + tab1VersionId2 + " , " +
tableName2 + " FOR SYSTEM_VERSION AS OF " + tab2VersionId2, "VALUES 4");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR SYSTEM_TIME AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "' , " +
tableName2 + " FOR SYSTEM_TIME AS OF TIMESTAMP " + "'" + tab2Timestamp3 + "'" + " WHERE " + tableName1 + ".id = " + tableName2 + ".id", "VALUES 2");
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_VERSION BEFORE " + tab1VersionId2 + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_TIME BEFORE TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY 1", "VALUES 'aaa'");
assertQuery("SELECT desc FROM " + tableName1 + " FOR SYSTEM_TIME BEFORE CURRENT_TIMESTAMP ORDER BY 1", "VALUES ('aaa'), ('bbb'), ('ccc')");
assertQuery("SELECT SUM(id) FROM " + tableName1 + " FOR SYSTEM_VERSION BEFORE " + tab1VersionId3, "VALUES 3");
assertQuery("SELECT desc, MAX(id) FROM " + tableName1 + " FOR SYSTEM_TIME BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "' GROUP BY desc ORDER BY 2", "VALUES ('aaa', 1), ('bbb', 2)");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR SYSTEM_VERSION BEFORE " + tab1VersionId3 + " , " +
tableName2 + " FOR SYSTEM_VERSION BEFORE " + tab2VersionId3, "VALUES 4");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR SYSTEM_TIME BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "' , " +
tableName2 + " FOR SYSTEM_TIME BEFORE TIMESTAMP " + "'" + tab2Timestamp3 + "'" + " WHERE " + tableName1 + ".id = " + tableName2 + ".id", "VALUES 2");
// Joins, CTE, create table as, union/intersect/except, subquery, view
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " , " +
tableName2 + " FOR VERSION AS OF " + tab2VersionId2, "VALUES 4");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " AS tab1_v2 INNER JOIN " + tableName2 +
" FOR VERSION AS OF " + tab2VersionId2 + " AS tab2_v2 ON tab1_v2.id = tab2_v2.id", "VALUES 2");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "' , " +
tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp3 + "'" + " WHERE " + tableName1 + ".id = " + tableName2 + ".id", "VALUES 2");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " AS tab1_v2 INNER JOIN " +
tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp2 + "'" + " AS tab2_v2 ON tab1_v2.id = tab2_v2.id", "VALUES 2");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3 + " , " +
tableName2 + " FOR VERSION BEFORE " + tab2VersionId3, "VALUES 4");
assertQuery("SELECT count(*) FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "'" + " AS tab1_v2 INNER JOIN " +
tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp3 + "'" + " AS tab2_v2 ON tab1_v2.id = tab2_v2.id", "VALUES 2");
assertQuery("WITH CTE1 AS (SELECT id, desc FROM " + tableName2 + " FOR VERSION AS OF " + tab2VersionId2 + ") SELECT desc FROM CTE1", "VALUES ('xxx'), ('yyy')");
assertQuery("WITH CTE2 AS (SELECT id, desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp2 + "' ) SELECT desc FROM CTE2", "VALUES ('xxx'), ('yyy')");
assertQuery("WITH CTE3 AS (SELECT id, desc FROM " + tableName2 + " FOR VERSION BEFORE " + tab2VersionId3 + ") SELECT desc FROM CTE3", "VALUES ('xxx'), ('yyy')");
assertQuery("WITH CTE4 AS (SELECT id, desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp3 + "' ) SELECT desc FROM CTE4", "VALUES ('xxx'), ('yyy')");
assertUpdate("CREATE TABLE " + tableName3 + " AS SELECT * FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2, 2);
assertQuery("SELECT COUNT(*) FROM " + tableName3, "VALUES 2");
assertUpdate("CREATE TABLE " + tableName4 + " AS SELECT * FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp2 + "'", 2);
assertQuery("SELECT COUNT(*) FROM " + tableName4, "VALUES 2");
assertUpdate("CREATE TABLE " + tableName5 + " AS SELECT * FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3, 2);
assertQuery("SELECT COUNT(*) FROM " + tableName5, "VALUES 2");
assertUpdate("CREATE TABLE " + tableName6 + " AS SELECT * FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp3 + "'", 2);
assertQuery("SELECT COUNT(*) FROM " + tableName6, "VALUES 2");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId1 + " UNION " +
" SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " ORDER BY desc", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp1 + "'" + " UNION ALL " +
" SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY desc", "VALUES ('aaa'), ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId2 + " UNION " +
" SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3 + " ORDER BY desc", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " UNION ALL " +
" SELECT desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "'" + " ORDER BY desc", "VALUES ('aaa'), ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId1 + " INTERSECT " +
" SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " ORDER BY desc", "VALUES ('aaa')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp1 + "'" + " INTERSECT " +
" SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " ORDER BY desc", "VALUES ('aaa')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " EXCEPT " +
" SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId1 + " ORDER BY desc", "VALUES ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'" + " EXCEPT " +
" SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp1 + "'" + " ORDER BY desc", "VALUES ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " WHERE id IN " +
"(SELECT id FROM " + tableName2 + " FOR VERSION AS OF " + tab2VersionId2 + ") ORDER BY 1", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "' WHERE id IN " +
"(SELECT id FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp2 + "' ) ORDER BY 1", "VALUES ('aaa'), ('bbb')");
assertQuery("SELECT desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2 + " WHERE id IN " +
"(SELECT id FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp2 + "' ) ORDER BY 1", "VALUES ('aaa'), ('bbb')");
assertUpdate("CREATE VIEW " + viewName1 + " AS SELECT id, desc FROM " + tableName1 + " FOR VERSION AS OF " + tab1VersionId2);
assertQuery("SELECT desc FROM " + viewName1 + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertUpdate("CREATE VIEW " + viewName2 + " AS SELECT id, desc FROM " + tableName1 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab1Timestamp2 + "'");
assertQuery("SELECT desc FROM " + viewName2 + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertQuery("SELECT count(*) FROM " + viewName1 + " INNER JOIN " + viewName2 + " ON " + viewName1 + ".id = " + viewName2 + ".id", "VALUES 2");
assertUpdate("CREATE VIEW " + viewName3 + " AS SELECT id, desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId3);
assertQuery("SELECT desc FROM " + viewName3 + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertUpdate("CREATE VIEW " + viewName4 + " AS SELECT id, desc FROM " + tableName1 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab1Timestamp3 + "'");
assertQuery("SELECT desc FROM " + viewName4 + " ORDER BY 1", "VALUES ('aaa'),('bbb')");
assertQuery("SELECT count(*) FROM " + viewName3 + " INNER JOIN " + viewName4 + " ON " + viewName3 + ".id = " + viewName4 + ".id", "VALUES 2");
}
@DataProvider(name = "timezones")
public Object[][] timezones()
{
return new Object[][] {
{"UTC", true},
{"America/Los_Angeles", true},
{"Asia/Shanghai", true},
{"UTC", false}};
}
@Test(dataProvider = "timezones")
public void testTableVersionWithTimestamp(String zoneId, boolean legacyTimestamp)
{
Session session = sessionForTimezone(zoneId, legacyTimestamp);
String tableName = schemaName + "." + "table_version_with_timestamp";
try {
assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, desc varchar) WITH(partitioning = ARRAY['id'])");
assertUpdate(session, "INSERT INTO " + tableName + " VALUES(1, 'aaa')", 1);
waitUntilAfter(System.currentTimeMillis());
long timestampMillis1 = System.currentTimeMillis();
String timestampWithoutTZ1 = getTimestampString(timestampMillis1, zoneId);
waitUntilAfter(timestampMillis1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES(2, 'bbb')", 1);
waitUntilAfter(System.currentTimeMillis());
long timestampMillis2 = System.currentTimeMillis();
String timestampWithoutTZ2 = getTimestampString(timestampMillis2, zoneId);
waitUntilAfter(timestampMillis2);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES(3, 'ccc')", 1);
waitUntilAfter(System.currentTimeMillis());
long timestampMillis3 = System.currentTimeMillis();
String timestampWithoutTZ3 = getTimestampString(timestampMillis3, zoneId);
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + timestampWithoutTZ1 + "'", "VALUES 'aaa'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + timestampWithoutTZ1 + "'", "VALUES 'aaa'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + timestampWithoutTZ2 + "'", "VALUES 'aaa', 'bbb'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + timestampWithoutTZ2 + "'", "VALUES 'aaa', 'bbb'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + timestampWithoutTZ3 + "'", "VALUES 'aaa', 'bbb', 'ccc'");
assertQuery(session, "SELECT desc FROM " + tableName + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + timestampWithoutTZ3 + "'", "VALUES 'aaa', 'bbb', 'ccc'");
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS " + tableName);
}
}
@Test
public void testTableVersionErrors()
{
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF 'bad'", "Could not find Iceberg table branch or tag: bad");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CURRENT_TIMESTAMP", ".* Type timestamp with time zone is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF id", ".* cannot be resolved");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF (SELECT 10000000)", ".* Constant expression cannot contain a subquery");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF " + tab2VersionId1 + " - " + tab2VersionId1, "Iceberg snapshot ID does not exists: 0");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION AS OF CAST (100 AS BIGINT)", "Iceberg snapshot ID does not exists: 100");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF id", ".* cannot be resolved");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF (SELECT CURRENT_TIMESTAMP)", ".* Constant expression cannot contain a subquery");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP WITH TIME ZONE)", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP)", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS DATE)", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP '2023-01-01 00:00:00.000'", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId1 + " ORDER BY 1", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab1");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE " + tab2VersionId1 + " - " + tab2VersionId1, "Iceberg snapshot ID does not exists: 0");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
}
private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)
{
SessionBuilder sessionBuilder = Session.builder(getSession())
.setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp));
if (legacyTimestamp) {
sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId));
}
return sessionBuilder.build();
}
private long waitUntilAfter(long snapshotTimeMillis)
{
long currentTimeMillis = System.currentTimeMillis();
assertTrue(snapshotTimeMillis - currentTimeMillis <= 10,
format("Snapshot time %s is greater than the current time %s by more than 10ms", snapshotTimeMillis, currentTimeMillis));
while (currentTimeMillis <= snapshotTimeMillis) {
currentTimeMillis = System.currentTimeMillis();
}
return currentTimeMillis;
}
private String getTimestampString(long timeMillsUtc, String zoneId)
{
Instant instant = Instant.ofEpochMilli(timeMillsUtc);
LocalDateTime localDateTime = instant
.atZone(ZoneId.of(zoneId))
.toLocalDateTime();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
formatter = formatter.withZone(ZoneId.of(zoneId));
return localDateTime.format(formatter);
}
}