TestQuickStatsProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.statistics;
import com.facebook.presto.hive.DirectoryLister;
import com.facebook.presto.hive.HadoopDirectoryLister;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveDirectoryContext;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.NamenodeStats;
import com.facebook.presto.hive.TestingExtendedHiveMetastore;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.HiveSessionProperties.QUICK_STATS_BACKGROUND_BUILD_TIMEOUT;
import static com.facebook.presto.hive.HiveSessionProperties.QUICK_STATS_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.QUICK_STATS_INLINE_BUILD_TIMEOUT;
import static com.facebook.presto.hive.HiveSessionProperties.SKIP_EMPTY_FILES;
import static com.facebook.presto.hive.HiveSessionProperties.USE_LIST_DIRECTORY_CACHE;
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment;
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.NestedDirectoryPolicy.RECURSE;
import static com.facebook.presto.hive.RetryDriver.retry;
import static com.facebook.presto.hive.metastore.PartitionStatistics.empty;
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.hive.statistics.PartitionQuickStats.convertToPartitionStatistics;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.common.io.Resources.getResource;
import static java.nio.file.Files.copy;
import static java.nio.file.Files.createDirectory;
import static java.nio.file.Files.createFile;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.newBufferedWriter;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.util.Collections.emptyIterator;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.ForkJoinPool.commonPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testcontainers.shaded.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestQuickStatsProvider
{
public static final String TEST_TABLE = "test_table";
public static final String TEST_SCHEMA = "test_schema";
private static final List<PropertyMetadata<?>> quickStatsProperties = ImmutableList.of(booleanProperty(
QUICK_STATS_ENABLED,
"Use quick stats to resolve stats",
true,
false),
new PropertyMetadata<>(
QUICK_STATS_INLINE_BUILD_TIMEOUT,
"Inline build timeout for a quick stats call",
VARCHAR,
Duration.class,
new Duration(5, TimeUnit.MINUTES),
false,
value -> Duration.valueOf((String) value),
Duration::toString),
new PropertyMetadata<>(
QUICK_STATS_BACKGROUND_BUILD_TIMEOUT,
"Duration to wait for a background build on another thread",
VARCHAR,
Duration.class,
new Duration(0, TimeUnit.MINUTES),
false,
value -> Duration.valueOf((String) value),
Duration::toString),
booleanProperty(
USE_LIST_DIRECTORY_CACHE,
"Directory list caching",
false,
false),
booleanProperty(
SKIP_EMPTY_FILES,
"If it is required empty files will be skipped",
false,
false));
public static final ConnectorSession SESSION = new TestingConnectorSession(quickStatsProperties);
private final HiveClientConfig hiveClientConfig = new HiveClientConfig().setRecursiveDirWalkerEnabled(true);
private HdfsEnvironment hdfsEnvironment;
private DirectoryLister directoryListerMock;
private ExtendedHiveMetastore metastoreMock;
private MetastoreContext metastoreContext;
private PartitionQuickStats mockPartitionQuickStats;
private PartitionStatistics expectedPartitionStats;
private ColumnQuickStats<Integer> mockIntegerColumnStats;
private static ConnectorSession getSession(String inlineBuildTimeout, String backgroundBuildTimeout)
{
return new TestingConnectorSession(quickStatsProperties, ImmutableMap.of(
QUICK_STATS_INLINE_BUILD_TIMEOUT, inlineBuildTimeout,
QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, backgroundBuildTimeout));
}
@BeforeTest
public void setUp()
{
metastoreContext = new MetastoreContext(SESSION.getUser(),
SESSION.getQueryId(),
Optional.empty(),
Collections.emptySet(),
Optional.empty(),
Optional.empty(),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER,
SESSION.getWarningCollector(),
SESSION.getRuntimeStats());
Storage mockStorage = new Storage(
fromHiveStorageFormat(PARQUET),
"some/path",
Optional.empty(),
true,
ImmutableMap.of(),
ImmutableMap.of());
Partition mockPartition = new Partition(
Optional.of("catalogName"),
TEST_SCHEMA,
TEST_TABLE,
ImmutableList.of(),
mockStorage,
ImmutableList.of(),
ImmutableMap.of(),
Optional.empty(),
false,
true,
0,
0,
Optional.empty());
Table mockTable = new Table(
Optional.of("catalogName"),
TEST_SCHEMA,
TEST_TABLE,
"owner",
MANAGED_TABLE,
Storage.builder()
.setStorageFormat(fromHiveStorageFormat(PARQUET))
.setLocation("location")
.build(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty());
metastoreMock = new TestingExtendedHiveMetastore();
metastoreMock.createTable(metastoreContext, mockTable, new PrincipalPrivileges(ImmutableMultimap.of(), ImmutableMultimap.of()), ImmutableList.of());
metastoreMock.addPartitions(metastoreContext, TEST_SCHEMA, TEST_TABLE, ImmutableList.of(new PartitionWithStatistics(mockPartition, "TEST_PARTITION", empty())));
directoryListerMock = (fileSystem, table2, path, partition, namenodeStats, hiveDirectoryContext) -> emptyIterator();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
hdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
mockIntegerColumnStats = new ColumnQuickStats<>("column", Integer.class);
mockIntegerColumnStats.setMinValue(Integer.MIN_VALUE);
mockIntegerColumnStats.setMaxValue(Integer.MAX_VALUE);
mockIntegerColumnStats.addToRowCount(4242L);
mockPartitionQuickStats = new PartitionQuickStats("partitionId", ImmutableList.of(mockIntegerColumnStats), 42);
expectedPartitionStats = convertToPartitionStatistics(mockPartitionQuickStats);
}
@Test
public void testReadThruCaching()
{
QuickStatsBuilder quickStatsBuilderMock = (session, metastore, table, metastoreContext, partitionId, files) -> mockPartitionQuickStats;
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(metastoreMock, hdfsEnvironment, directoryListerMock, hiveClientConfig, new NamenodeStats(),
ImmutableList.of(quickStatsBuilderMock));
// Execute
ImmutableList<String> testPartitions1 = ImmutableList.of("partition1", "partition2", "partition3");
Map<String, PartitionStatistics> quickStats = quickStatsProvider.getQuickStats(SESSION,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions1);
// Verify only one call was made for each test partition
assertEquals(quickStats.entrySet().size(), testPartitions1.size());
assertTrue(quickStats.keySet().containsAll(testPartitions1));
quickStats.values().forEach(ps -> assertEquals(ps, expectedPartitionStats));
// For subsequent calls for the same partitions that are already cached, no new calls are mode to the quick stats builder
quickStatsProvider.getQuickStats(SESSION,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions1);
// For subsequent calls with a mix of old and new partitions, we only see calls to the quick stats builder for the new partitions
ImmutableList<String> testPartitions2 = ImmutableList.of("partition4", "partition5", "partition6");
ImmutableList<String> testPartitionsMix = ImmutableList.<String>builder().addAll(testPartitions1).addAll(testPartitions2).build();
quickStats = quickStatsProvider.getQuickStats(SESSION,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitionsMix);
assertEquals(quickStats.entrySet().size(), testPartitionsMix.size());
assertTrue(quickStats.keySet().containsAll(testPartitionsMix));
quickStats.values().forEach(ps -> assertEquals(ps, expectedPartitionStats));
}
/**
* A test to demonstrate that concurrent quick stats build for the same partition results in only a single call to the quick stats builder
* Note: This test simulates long-running operations with a sleep, which makes running these in CI flaky
* To remove noise due to these flakiness, we disable this test.
* It is a good candidate for a manual test run for any changes related to Quick Stats
*/
@Test(enabled = false, invocationCount = 3)
public void testConcurrentFetchForSamePartition()
throws ExecutionException, InterruptedException
{
QuickStatsBuilder longRunningQuickStatsBuilderMock = (session, metastore, table, metastoreContext, partitionId, files) -> {
// Sleep for 50ms to simulate a long-running quick stats call
sleepUninterruptibly(50, MILLISECONDS);
return mockPartitionQuickStats;
};
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(metastoreMock, hdfsEnvironment, directoryListerMock, hiveClientConfig, new NamenodeStats(),
ImmutableList.of(longRunningQuickStatsBuilderMock));
List<String> testPartitions = ImmutableList.of("partition1", "partition2", "partition3");
{
// Build a session where an inline quick stats build will occur for the 1st query that initiates the build,
// but subsequent queries will NOT wait
ConnectorSession session = getSession("600ms", "0ms");
// Execute two concurrent calls for the same partitions; wait for them to complete
CompletableFuture<Map<String, PartitionStatistics>> future1 = supplyAsync(() -> quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions), commonPool());
CompletableFuture<Map<String, PartitionStatistics>> future2 = supplyAsync(() -> quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions), commonPool());
allOf(future1, future2).join();
Map<String, PartitionStatistics> quickStats1 = future1.get();
Map<String, PartitionStatistics> quickStats2 = future2.get();
// Both PartitionStatistics have stats for all partitions
assertEquals(quickStats1.entrySet().size(), testPartitions.size());
assertTrue(quickStats1.keySet().containsAll(testPartitions));
assertEquals(quickStats2.entrySet().size(), testPartitions.size());
assertTrue(quickStats2.keySet().containsAll(testPartitions));
// For the same partition, we will observe that one of them has the expected partition stats and the other one is empty
for (String testPartition : testPartitions) {
PartitionStatistics partitionStatistics1 = quickStats1.get(testPartition);
PartitionStatistics partitionStatistics2 = quickStats2.get(testPartition);
if (partitionStatistics1.equals(empty())) {
assertEquals(partitionStatistics2, expectedPartitionStats);
}
else if (partitionStatistics2.equals(empty())) {
assertEquals(partitionStatistics1, expectedPartitionStats);
}
else {
fail(String.format("For [%s] one of the partitions stats was expected to be empty. Actual partitionStatistics1 [%s], partitionStatistics2 [%s]",
testPartition, partitionStatistics1, partitionStatistics2));
}
}
// Future calls for the same partitions will return from cached partition stats with valid values
Map<String, PartitionStatistics> quickStats = quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions);
// Verify only one call was made for each test partition
assertEquals(quickStats.entrySet().size(), testPartitions.size());
assertTrue(quickStats.keySet().containsAll(testPartitions));
quickStats.values().forEach(ps -> assertEquals(ps, expectedPartitionStats));
}
{
// Build a session where an inline quick stats build will occur for the 1st query that initiates the build,
// and subsequent queries will wait for this inline build to finish too
ConnectorSession session = getSession("300ms", "300ms");
// Execute two concurrent calls for the same partitions; wait for them to complete
CompletableFuture<Map<String, PartitionStatistics>> future1 = supplyAsync(() -> quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions), commonPool());
CompletableFuture<Map<String, PartitionStatistics>> future2 = supplyAsync(() -> quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions), commonPool());
allOf(future1, future2).join();
Map<String, PartitionStatistics> quickStats1 = future1.get();
Map<String, PartitionStatistics> quickStats2 = future2.get();
assertEquals(quickStats1, quickStats2); // Both PartitionStatistics have exactly same stats
assertTrue(quickStats1.keySet().containsAll(testPartitions)); // Stats for all partitions are present
// None of the partition stats are empty, they are all the mocked value
assertTrue(quickStats1.values().stream().allMatch(x -> x.equals(convertToPartitionStatistics(mockPartitionQuickStats))));
}
}
/**
* A test to demonstrate that building quick stats, either inline or in the background is time-bounded
* Note: This test simulates long-running operations with a sleep, which makes running these in CI flaky
* To remove noise due to these flakiness, we disable this test.
* It is a good candidate for a manual test run for any changes related to Quick Stats
*/
@Test(enabled = false)
public void quickStatsBuildTimeIsBounded()
throws Exception
{
ImmutableMap<String, Long> mockPerPartitionStatsFetchTimes = ImmutableMap.of("p1", 10L, "p2", 20L, "p3", 1500L, "p4", 1800L);
QuickStatsBuilder longRunningQuickStatsBuilderMock = (session, metastore, table, metastoreContext, partitionId, files) -> {
sleepUninterruptibly(mockPerPartitionStatsFetchTimes.get(partitionId), MILLISECONDS);
return mockPartitionQuickStats;
};
{
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(metastoreMock, hdfsEnvironment, directoryListerMock, hiveClientConfig, new NamenodeStats(),
ImmutableList.of(longRunningQuickStatsBuilderMock));
// Create a session where an inline build will occur for any newly requested partition
ConnectorSession session = getSession("300ms", "0ms");
List<String> testPartitions = ImmutableList.copyOf(mockPerPartitionStatsFetchTimes.keySet());
Map<String, PartitionStatistics> quickStats = quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, testPartitions);
Map<String, Instant> inProgressBuildsSnapshot = quickStatsProvider.getInProgressBuildsSnapshot();
assertEquals(quickStats.size(), 4);
assertEquals(quickStats.get("p1"), convertToPartitionStatistics(mockPartitionQuickStats));
assertEquals(quickStats.get("p2"), convertToPartitionStatistics(mockPartitionQuickStats));
// Since fetching quick stats for p3 and p4 would take > 300ms, we would return EMPTY partition stats for them
assertEquals(quickStats.get("p3"), empty());
assertEquals(quickStats.get("p4"), empty());
// Snapshot of the in-progress builds confirms that quickstats build for these partitions is in progress
String p3CacheKey = String.format("%s.%s/%s", TEST_SCHEMA, TEST_TABLE, "p3");
String p4CacheKey = String.format("%s.%s/%s", TEST_SCHEMA, TEST_TABLE, "p4");
assertEquals(inProgressBuildsSnapshot.keySet(), ImmutableSet.of(p3CacheKey, p4CacheKey));
}
{
// Create a session where no inline builds will occur for any requested partition; empty() quick stats will be returned
ConnectorSession session = getSession("0ms", "0ms");
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(metastoreMock, hdfsEnvironment, directoryListerMock, hiveClientConfig, new NamenodeStats(),
ImmutableList.of((session1, metastore, table, metastoreContext, partitionId, files) -> mockPartitionQuickStats));
Map<String, PartitionStatistics> quickStats = quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, ImmutableList.of("p5", "p6"));
assertEquals(quickStats.size(), 2);
assertEquals(quickStats.get("p5"), empty());
assertEquals(quickStats.get("p6"), empty());
// Subsequent queries for the same partitions will fetch the cached stats
// Stats may not appear immediately, so we retry with exponential delay
retry()
.maxAttempts(10)
.exponentialBackoff(new Duration(20D, MILLISECONDS), new Duration(500D, MILLISECONDS), new Duration(2, SECONDS), 2.0)
.run("waitForQuickStatsBuild", () -> {
Map<String, PartitionStatistics> quickStatsAfter = quickStatsProvider.getQuickStats(session,
new SchemaTableName(TEST_SCHEMA, TEST_TABLE), metastoreContext, ImmutableList.of("p5", "p6"));
try {
assertEquals(quickStatsAfter.size(), 2);
assertEquals(quickStatsAfter.get("p5"), convertToPartitionStatistics(mockPartitionQuickStats));
assertEquals(quickStatsAfter.get("p6"), convertToPartitionStatistics(mockPartitionQuickStats));
return true;
}
catch (AssertionError e) {
throw new RuntimeException(e);
}
});
}
}
@Test
public void testFollowSymlinkFile()
throws IOException
{
java.nio.file.Path testTempDir = createTempDirectory("test");
java.nio.file.Path symlinkFileDir = testTempDir.resolve("symlink");
java.nio.file.Path tableDir1 = testTempDir.resolve("table_1");
java.nio.file.Path tableDir2 = testTempDir.resolve("table_2");
createDirectory(symlinkFileDir);
createDirectory(tableDir1);
createDirectory(tableDir2);
// Copy a parquet file from resources to the test/table temp dir
String fileName1 = "data_1.parquet";
String fileName2 = "data_2.parquet";
URL resourceUrl1 = getResource("quick_stats/tpcds_store_sales_sf_point_01/20230706_110621_00007_4uxkh_10e94cd0-1f67-4440-afd0-75cd328ea570");
URL resourceUrl2 = getResource("quick_stats/tpcds_store_sales_sf_point_01/20230706_110621_00007_4uxkh_12b3ec73-4952-4df7-9987-2beb20cd5953");
assertNotNull(resourceUrl1);
assertNotNull(resourceUrl2);
java.nio.file.Path targetFilePath1 = tableDir1.resolve(fileName1);
java.nio.file.Path targetFilePath2 = tableDir2.resolve(fileName2);
try (InputStream in = resourceUrl1.openStream()) {
copy(in, targetFilePath1, REPLACE_EXISTING);
}
try (InputStream in = resourceUrl2.openStream()) {
copy(in, targetFilePath2, REPLACE_EXISTING);
}
// Create the symlink manifest pointing to data.parquet
java.nio.file.Path manifestFilePath = createFile(symlinkFileDir.resolve("manifest"));
try (BufferedWriter writer = newBufferedWriter(manifestFilePath, CREATE, TRUNCATE_EXISTING)) {
writer.write("file:" + tableDir1 + "/" + fileName1);
writer.newLine();
writer.write("file:" + tableDir2 + "/" + fileName2);
}
String symlinkTableName = "symlink_table";
Table symlinkTable = new Table(
Optional.of("catalogName"),
TEST_SCHEMA,
symlinkTableName,
"owner",
EXTERNAL_TABLE,
Storage.builder()
.setStorageFormat(StorageFormat.create(ParquetHiveSerDe.class.getName(), SymlinkTextInputFormat.class.getName(), HiveIgnoreKeyTextOutputFormat.class.getName()))
.setLocation(symlinkFileDir.toString())
.build(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty());
metastoreMock.createTable(metastoreContext, symlinkTable, new PrincipalPrivileges(ImmutableMultimap.of(), ImmutableMultimap.of()), ImmutableList.of());
DirectoryLister directoryLister = new HadoopDirectoryLister();
QuickStatsBuilder quickStatsBuilder = (session1, metastore, table, metastoreContext, partitionId, files) -> {
List<HiveFileInfo> fileInfoList = ImmutableList.copyOf(files);
assertEquals(fileInfoList.size(), 2);
for (HiveFileInfo hiveFileInfo : fileInfoList) {
assertTrue(hiveFileInfo.getPath().equals("file:" + targetFilePath1) || hiveFileInfo.getPath().equals("file:" + targetFilePath2));
}
return new PartitionQuickStats(UNPARTITIONED_ID.getPartitionName(), ImmutableList.of(mockIntegerColumnStats), fileInfoList.size());
};
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(metastoreMock, hdfsEnvironment, directoryLister, hiveClientConfig, new NamenodeStats(),
ImmutableList.of(quickStatsBuilder));
SchemaTableName table = new SchemaTableName(TEST_SCHEMA, symlinkTableName);
Table resolvedTable = metastoreMock.getTable(metastoreContext, table.getSchemaName(), table.getTableName()).get();
Path symlinkTablePath = new Path(resolvedTable.getStorage().getLocation());
HdfsContext hdfsContext = new HdfsContext(SESSION, table.getSchemaName(), table.getTableName(), UNPARTITIONED_ID.getPartitionName(), false);
ExtendedFileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, symlinkTablePath);
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(RECURSE, isUseListDirectoryCache(SESSION),
isSkipEmptyFilesEnabled(SESSION), hdfsContext.getIdentity(), buildDirectoryContextProperties(SESSION), SESSION.getRuntimeStats());
// Test directoryLister finds the manifest file in the table dir
Iterator<HiveFileInfo> fileInfoIterator = directoryLister.list(fs, resolvedTable, symlinkTablePath, Optional.empty(), new NamenodeStats(), hiveDirectoryContext);
ImmutableList<HiveFileInfo> fileInfoList = ImmutableList.copyOf(fileInfoIterator);
assertEquals(fileInfoList.size(), 1);
assertEquals(fileInfoList.get(0).getPath(), "file:" + manifestFilePath);
assertEquals(fileInfoList.get(0).getParent(), "file:" + symlinkFileDir);
// Test that the input format is correct
InputFormat<?, ?> inputFormat = getInputFormat(
hdfsEnvironment.getConfiguration(hdfsContext, symlinkTablePath),
resolvedTable.getStorage().getStorageFormat().getInputFormat(),
resolvedTable.getStorage().getStorageFormat().getSerDe(),
false);
assertTrue(inputFormat instanceof SymlinkTextInputFormat);
// Test entire getQuickStats and ensure file count matches fileList size in buildQuickStats
PartitionStatistics quickStats = quickStatsProvider.getQuickStats(SESSION, table, metastoreContext, UNPARTITIONED_ID.getPartitionName());
assertTrue(quickStats.getBasicStatistics().getFileCount().isPresent());
assertEquals(quickStats.getBasicStatistics().getFileCount().getAsLong(), 2L);
deleteRecursively(testTempDir, ALLOW_INSECURE);
}
}