TestDeltaIntegration.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.delta;
import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.testing.MaterializedResult;
import com.google.common.base.Joiner;
import org.testng.annotations.Test;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;
/**
* Integration tests for reading Delta tables.
*/
@Test
public class TestDeltaIntegration
extends AbstractDeltaDistributedQueryTestBase
{
@Test(dataProvider = "deltaReaderVersions")
public void readPrimitiveTypeData(String version)
{
// Test reading following primitive types from a Delta table
// (all integers, float, double, decimal, boolean, varchar, varbinary)
String testQuery =
format("SELECT * FROM \"%s\".\"%s\"", PATH_SCHEMA, goldenTablePathWithPrefix(version,
"data-reader-primitives"));
String expResultsQuery = getPrimitiveTypeTableData();
assertQuery(testQuery, expResultsQuery);
}
@Test(dataProvider = "deltaReaderVersions")
public void readArrayTypeData(String version)
{
// Test reading following array elements with type
// (all integers, float, double, decimal, boolean, varchar, varbinary)
String testQuery =
format("SELECT * FROM \"%s\".\"%s\"", PATH_SCHEMA, goldenTablePathWithPrefix(version,
"data-reader-array-primitives"));
// Create query for the expected results.
List<String> expRows = new ArrayList<>();
for (byte i = 0; i < 10; i++) {
expRows.add(format("SELECT " +
" array[cast(%s as integer)]," +
" array[cast(%s as bigint)]," +
" array[cast(%s as tinyint)]," +
" array[cast(%s as smallint)]," +
" array[%s]," +
" array[cast(%s as real)]," +
" array[cast(%s as double)], " +
" array['%s'], " +
" array[cast(X'0%s0%s' as varbinary)], " +
" array[cast(%s as decimal)]", i, i, i, i, (i % 2 == 0 ? "true" : "false"), i, i, i, i, i, i));
}
String expResultsQuery = Joiner.on(" UNION ").join(expRows);
assertQuery(testQuery, expResultsQuery);
}
@Test(dataProvider = "deltaReaderVersions")
public void readMapTypeData(String version)
{
// Test reading MAP data type columns from Delta table
String testQuery =
format("SELECT map_keys(a), map_values(e) FROM \"%s\".\"%s\"", PATH_SCHEMA,
goldenTablePathWithPrefix(version, "data-reader-map"));
List<String> expRows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
expRows.add("SELECT " +
" ARRAY[cast(" + i + " as integer)]," +
" ARRAY[cast(" + i + " as decimal)]");
}
String expResultsQuery = Joiner.on(" UNION ").join(expRows);
assertQuery(testQuery, expResultsQuery);
}
@Test(dataProvider = "deltaReaderVersions")
public void readTableRegisteredInHMS(String version)
{
String expResultsQuery = getPrimitiveTypeTableData();
assertQuery("SELECT * FROM \"" + getVersionPrefix(version) +
"data-reader-primitives\"", expResultsQuery);
}
@Test(dataProvider = "deltaReaderVersions")
public void readSpecificSnapshotVersion(String version)
{
String testQueryTemplate = "SELECT * FROM \"" + getVersionPrefix(version) +
"snapshot-data3@%s\" WHERE col1 = 0";
// read snapshot version 2
String testQueryV2 = format(testQueryTemplate, "v2");
String expResultsQueryV2 = "SELECT * FROM VALUES(0, 'data-2-0')";
assertQuery(testQueryV2, expResultsQueryV2);
// read snapshot version 3
String testQueryV3 = format(testQueryTemplate, "v3");
String expResultsQueryV3 = "SELECT * FROM VALUES(0, 'data-2-0'), (0, 'data-3-0')";
assertQuery(testQueryV3, expResultsQueryV3);
}
@Test(dataProvider = "deltaReaderVersions")
public void readSpecificSnapshotAtGivenTimestamp(String version)
throws Exception
{
String deltaTable = "snapshot-data3";
String testQueryTemplate = "SELECT * FROM \"" + getVersionPrefix(version) +
"snapshot-data3@%s\" WHERE col1 = 0";
// Delta library looks at the last modification time of the checkpoint and commit files
// to figure out when the snapshot is created. In the tests, as the test files are copied
// to target location, the modification time will be the time when they are copied and not when
// the snapshot is created. In order to test reading the snapshot version for a given timestamp,
// manually update the modification time of the commit and checkpoint files.
// 1637274601000L millis = 2021-11-18 10:30:01
String deltaTableLocation = goldenTablePathWithPrefix(version, deltaTable);
setCommitFileModificationTime(deltaTableLocation, 0, 1637231401000L);
setCommitFileModificationTime(deltaTableLocation, 1, 1637231402000L);
setCommitFileModificationTime(deltaTableLocation, 2, 1637231405000L);
setCommitFileModificationTime(deltaTableLocation, 3, 1637231407000L);
// read snapshot as of 2020-10-26 02:50:00 - this should fail as there are no snapshots before this timestamp
String testQueryTs1 = format(testQueryTemplate, "t2020-10-27 02:50:00");
assertQueryFails(
testQueryTs1,
".*The provided timestamp 1603767000000 ms \\(2020-10-27T02:50:00Z\\) is before the earliest available version 0\\. Please use a timestamp greater than or equal to 1637231401000 ms \\(2021-11-18T10:30:01Z\\)\\.");
// read snapshot as of 2021-11-18 10:30:02 - this should read the data from commit id 1.
String testQueryTs2 = format(testQueryTemplate, "t2021-11-18 10:30:02");
String expResultsQueryTs2 = "SELECT * FROM VALUES(0, 'data-0-0'), (0, 'data-1-0')";
assertQuery(testQueryTs2, expResultsQueryTs2);
// read snapshot as of 2021-11-18 10:30:07 - this should read the data from the latest commit
String testQueryTs3 = format(testQueryTemplate, "t2021-11-18 10:30:07");
String expResultsQueryTs3 = "SELECT * FROM VALUES(0, 'data-2-0'), (0, 'data-3-0')";
assertQuery(testQueryTs3, expResultsQueryTs3);
}
@Test(enabled = false, dataProvider = "deltaReaderVersions") // Enable once the bug in Delta library is fixed
public void readCheckpointedDeltaTable(String version)
{
// Delta table commits are periodically checkpointed into a parquet file.
// Test Delta connector is able to read the checkpointed commits in a parquet file.
// Test table has commit files (0-10) deleted. So it has to rely on reading the Parquet file
// to fetch the files latest commit (i.e > 10).
String testQueryTemplate = "SELECT * FROM \"" + getVersionPrefix(version) +
"checkpointed-delta-table%s\" WHERE col1 in (0, 10, 15)";
// read snapshot version 3 - expect can't time travel error
String testQueryV3 = format(testQueryTemplate, "@v3");
assertQueryFails(
testQueryV3,
"Can not find snapshot \\(3\\) in Delta table 'deltatables." +
getVersionPrefix(version) +
"checkpointed-delta-table\\@v3': No reproducible commits found at .*");
// read latest data
String testQueryLatest = format(testQueryTemplate, "");
String expResultsQueryLatest = "SELECT * FROM VALUES(0, 'data-0-0'), (10, 'data-0-10'), (15, 'data-0-15')";
assertQuery(testQueryLatest, expResultsQueryLatest);
// read snapshot version 13
String testQueryV13 = format(testQueryTemplate, "@v13");
String expResultsQueryV13 = "SELECT * FROM VALUES(0, 'data-0-0'), (10, 'data-0-10')";
assertQuery(testQueryV13, expResultsQueryV13);
}
@Test(dataProvider = "deltaReaderVersions")
public void readPartitionedTable(String version)
{
String testQuery1 = "SELECT * FROM \"" + getVersionPrefix(version) +
"time-travel-partition-changes-b\" WHERE id in (10, 15, 12, 13)";
String expResultsQuery1 = "SELECT * FROM VALUES(10, 0),(15, 1),(12, 0),(13, 1)";
assertQuery(testQuery1, expResultsQuery1);
// reorder the columns in output and query the partitioned table
String testQuery2 = "SELECT part2, id FROM \"" + getVersionPrefix(version) +
"time-travel-partition-changes-b\" WHERE id in (16, 14, 19)";
String expResultsQuery2 = "SELECT * FROM VALUES(0, 16),(0, 14),(1, 19)";
assertQuery(testQuery2, expResultsQuery2);
}
@Test(dataProvider = "deltaReaderVersions")
public void readPartitionedTableAllDataTypes(String version)
{
String testQuery = "SELECT * FROM \"" + getVersionPrefix(version) +
"data-reader-partition-values\"";
String expResultsQuery = "SELECT * FROM VALUES" +
"( 0," +
" cast(0 as bigint)," +
" cast(0 as smallint), " +
" cast(0 as tinyint), " +
" true, " +
" 0.0, " +
" cast(0.0 as double), " +
" '0', " +
" DATE '2021-09-08', " +
" TIMESTAMP WITH TIME ZONE '2021-09-08 11:11:11 UTC', " +
" cast(0 as decimal)," +
" '0'" + // regular column
"), " +
"( 1," +
" cast(1 as bigint)," +
" cast(1 as smallint), " +
" cast(1 as tinyint), " +
" false, " +
" 1.0, " +
" cast(1.0 as double), " +
" '1', " +
" DATE '2021-09-08', " +
" TIMESTAMP WITH TIME ZONE '2021-09-08 11:11:11 UTC', " +
" cast(1 as decimal), " +
" '1'" + // regular column
"), " +
"( null," +
" null," +
" null, " +
" null, " +
" null, " +
" null, " +
" null, " +
" null, " +
" null, " +
" null, " +
" null, " +
" '2'" + // regular column
")";
assertQuery(testQuery, expResultsQuery);
}
@Test(dataProvider = "deltaReaderVersions")
public void testDeltaTimezoneTypeSupportINT96(String version)
{
/*
https://docs.delta.io/3.2.1/api/java/kernel/index.html?io/delta/kernel/types/TimestampNTZType.html
According to delta's type specifications, the expected behaviour for TimestampNTZ
The timestamp without time zone type represents a local time in microsecond precision, which is independent of time zone.
So TimestampNTZ is independent of local timezones and should return the same value regardless of the timezone.
If legacy_timestamp is true, Presto TimestampNTZ (Timestamp) is adjusted to the timezone.
If legacy_timestamp is false, TimestampNTZ is not adjusted.
This test sets the timezone to UTC+3, and the original data file the timestamp is 12 AM.
The proper delta implementation would return 12 AM regardless of the timezone, but with
legacy_timestamp true we get 3 AM. legacy_timestamp set to false matches the specifications.
*/
Session session = Session.builder(getSession())
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey("UTC+3"))
.setSystemProperty("legacy_timestamp", "false")
.build();
String testQuery = format("SELECT tpep_dropoff_datetime, tpep_pickup_datetime FROM \"%s\".\"%s\"",
PATH_SCHEMA, goldenTablePathWithPrefix(version, "test-typing"));
MaterializedResult actual = computeActual(session, testQuery);
String timestamptzField = actual.getMaterializedRows().get(0).getField(0).toString();
assertEquals(timestamptzField, "2021-12-31T16:53:29Z[UTC]", "Delta Timestamp type not being read correctly.");
if (version.equals("delta_v3")) {
String timestamptzntz = actual.getMaterializedRows().get(0).getField(1).toString();
assertEquals(timestamptzntz, "2022-01-01T00:35:40", "Delta TimestampNTZ type not being read correctly.");
}
}
@Test(dataProvider = "deltaReaderVersions")
public void testDeltaTimezoneTypeSupportINT64(String version)
{
Session session = Session.builder(getSession())
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey("UTC+3"))
.setSystemProperty("legacy_timestamp", "false")
.build();
String testQuery = format("SELECT created_at_tz FROM \"%s\".\"%s\"",
PATH_SCHEMA, goldenTablePathWithPrefix(version, "timestamp_64"));
MaterializedResult actual = computeActual(session, testQuery);
String timestamptzField = actual.getMaterializedRows().get(0).getField(0).toString();
assertEquals(timestamptzField, "2025-05-22T09:24:11.321Z[UTC]", "Delta Timestamp type not being read correctly.");
if (version.equals("delta_v3")) {
String ntzTestQuery = format("SELECT created_at_ntz, created_at_ntz FROM \"%s\".\"%s\"",
PATH_SCHEMA, goldenTablePathWithPrefix(version, "timestamp_64"));
actual = computeActual(session, ntzTestQuery);
String timestamptzntz = actual.getMaterializedRows().get(0).getField(0).toString();
assertEquals(timestamptzntz, "2025-05-22T12:25:16.544", "Delta TimestampNTZ type not being read correctly.");
}
}
/**
* Expected results for table "data-reader-primitives"
*/
private static String getPrimitiveTypeTableData()
{
// Create query for the expected results.
List<String> expRows = new ArrayList<>();
for (byte i = 0; i < 10; i++) {
expRows.add(format("SELECT " +
" cast(%s as integer)," +
" cast(%s as bigint)," +
" cast(%s as tinyint)," +
" cast(%s as smallint)," +
" %s," +
" cast(%s as real)," +
" cast(%s as double), " +
" '%s', " +
" cast(X'0%s0%s' as varbinary), " +
" cast(%s as decimal)", i, i, i, i, (i % 2 == 0 ? "true" : "false"), i, i, i, i, i, i));
}
expRows.add("SELECT null, null, null, null, null, null, null, null, null, null");
return Joiner.on(" UNION ").join(expRows);
}
private static void setCommitFileModificationTime(String tableLocation, long commitId, long commitTimeMillis)
throws Exception
{
Files.setLastModifiedTime(
Paths.get(URI.create(tableLocation)).resolve("_delta_log/").resolve(format("%020d.json", commitId)),
FileTime.from(commitTimeMillis, TimeUnit.MILLISECONDS));
}
}