TestHiveFileRenamer.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_HIVE_METADATA_UPDATE_REQUEST;
import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_PARTITION_NAME;
import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_REQUEST_ID;
import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_SCHEMA_NAME;
import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_SCHEMA_TABLE_NAME;
import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_TABLE_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Test
public class TestHiveFileRenamer
{
private static final QueryId TEST_QUERY_ID = new QueryId("test");
private static final int REQUEST_COUNT = 10;
private static final int PARTITION_COUNT = 10;
private static final int TABLE_COUNT = 10;
private static final int THREAD_COUNT = 100;
private static final int THREAD_POOL_SIZE = 10;
@Test
public void testHiveFileRenamer()
{
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
List<ConnectorMetadataUpdateHandle> requests = ImmutableList.of(TEST_HIVE_METADATA_UPDATE_REQUEST);
List<ConnectorMetadataUpdateHandle> results = hiveFileRenamer.getMetadataUpdateResults(requests, TEST_QUERY_ID);
// Assert # of requests is equal to # of results
assertEquals(requests.size(), results.size());
HiveMetadataUpdateHandle result = (HiveMetadataUpdateHandle) results.get(0);
assertEquals(result.getRequestId(), TEST_REQUEST_ID);
assertEquals(result.getSchemaTableName(), TEST_SCHEMA_TABLE_NAME);
assertEquals(result.getPartitionName(), Optional.of(TEST_PARTITION_NAME));
// Assert file name returned is "1"
assertEquals(result.getMetadataUpdate(), Optional.of("0"));
}
@Test
public void testFileNamesForSinglePartition()
{
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
List<ConnectorMetadataUpdateHandle> requests = createHiveMetadataUpdateRequests(TEST_SCHEMA_NAME, TEST_TABLE_NAME, TEST_PARTITION_NAME);
List<String> fileNames = getFileNames(hiveFileRenamer, requests);
List<String> aggregatedFileNames = new ArrayList<>(fileNames);
assertTrue(areFileNamesIncreasingSequentially(fileNames));
// Send more requests
requests = createHiveMetadataUpdateRequests(TEST_SCHEMA_NAME, TEST_TABLE_NAME, TEST_PARTITION_NAME);
fileNames = getFileNames(hiveFileRenamer, requests);
aggregatedFileNames.addAll(fileNames);
// Assert that the # of filenames is equal to # of requests
assertEquals(fileNames.size(), REQUEST_COUNT);
// Assert that the file names are continuous increasing numbers
assertTrue(areFileNamesIncreasingSequentially(aggregatedFileNames));
}
@Test
public void testFileNamesForMultiplePartitions()
{
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
for (int partitionNumber = 1; partitionNumber <= PARTITION_COUNT; partitionNumber++) {
List<ConnectorMetadataUpdateHandle> requests = createHiveMetadataUpdateRequests(TEST_SCHEMA_NAME, TEST_TABLE_NAME, "partition_" + partitionNumber);
List<String> fileNames = getFileNames(hiveFileRenamer, requests);
// Assert that the # of filenames is equal to # of requests
assertEquals(fileNames.size(), REQUEST_COUNT);
assertTrue(areFileNamesIncreasingSequentially(fileNames));
}
}
@Test
public void testFileNamesForMultipleTables()
{
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
for (int tableNumber = 1; tableNumber <= TABLE_COUNT; tableNumber++) {
List<ConnectorMetadataUpdateHandle> requests = createHiveMetadataUpdateRequests(TEST_SCHEMA_NAME, "table_" + tableNumber, TEST_PARTITION_NAME);
List<String> fileNames = getFileNames(hiveFileRenamer, requests);
// Assert that the # of filenames is equal to # of requests
assertEquals(fileNames.size(), REQUEST_COUNT);
assertTrue(areFileNamesIncreasingSequentially(fileNames));
}
}
@Test
public void testFileNameResultCache()
{
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
List<ConnectorMetadataUpdateHandle> requests = createHiveMetadataUpdateRequests(TEST_SCHEMA_NAME, TEST_TABLE_NAME, TEST_PARTITION_NAME);
// Get the file names 1st time
List<String> fileNames = getFileNames(hiveFileRenamer, requests);
// Get the file names 2nd time, for the same set of requests. This should be served from cache.
List<String> fileNamesList = getFileNames(hiveFileRenamer, requests);
// Assert that the file name is same for a given request. This is to imitate retries from workers.
assertEquals(fileNames, fileNamesList);
assertEquals(fileNames, getFileNames(hiveFileRenamer, requests));
}
@Test
public void testMultiThreadedRequests()
throws InterruptedException
{
ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
List<String> fileNames = new CopyOnWriteArrayList<>();
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
// Spawn THREAD_COUNT threads. And each thread will send REQUEST_COUNT requests to HiveFileRenamer
for (int i = 0; i < THREAD_COUNT; i++) {
service.execute(() -> {
List<ConnectorMetadataUpdateHandle> requests = createHiveMetadataUpdateRequests(TEST_SCHEMA_NAME, TEST_TABLE_NAME, TEST_PARTITION_NAME);
fileNames.addAll(getFileNames(hiveFileRenamer, requests));
latch.countDown();
});
}
// wait for all threads to finish
latch.await();
// Assert the # of filenames
assertEquals(fileNames.size(), THREAD_COUNT * REQUEST_COUNT);
// Assert that the filenames are an increasing sequence
assertTrue(areFileNamesIncreasingSequentially(fileNames));
}
@Test
public void testCleanup()
{
HiveFileRenamer hiveFileRenamer = new HiveFileRenamer();
List<ConnectorMetadataUpdateHandle> requests = ImmutableList.of(TEST_HIVE_METADATA_UPDATE_REQUEST);
List<ConnectorMetadataUpdateHandle> results = hiveFileRenamer.getMetadataUpdateResults(requests, TEST_QUERY_ID);
assertEquals(results.size(), 1);
HiveMetadataUpdateHandle result = (HiveMetadataUpdateHandle) results.get(0);
assertEquals(result.getMetadataUpdate(), Optional.of("0"));
hiveFileRenamer.cleanup(TEST_QUERY_ID);
requests = ImmutableList.of(TEST_HIVE_METADATA_UPDATE_REQUEST);
results = hiveFileRenamer.getMetadataUpdateResults(requests, TEST_QUERY_ID);
assertEquals(results.size(), 1);
result = (HiveMetadataUpdateHandle) results.get(0);
assertEquals(result.getMetadataUpdate(), Optional.of("0"));
}
private List<String> getFileNames(HiveFileRenamer hiveFileRenamer, List<ConnectorMetadataUpdateHandle> requests)
{
List<ConnectorMetadataUpdateHandle> results = hiveFileRenamer.getMetadataUpdateResults(requests, TEST_QUERY_ID);
return results.stream()
.map(result -> {
Optional<String> fileName = ((HiveMetadataUpdateHandle) result).getMetadataUpdate();
assertTrue(fileName.isPresent());
return fileName.get();
})
.collect(Collectors.toList());
}
private List<ConnectorMetadataUpdateHandle> createHiveMetadataUpdateRequests(String schemaName, String tableName, String partitionName)
{
List<ConnectorMetadataUpdateHandle> requests = new ArrayList<>();
for (int i = 1; i <= REQUEST_COUNT; i++) {
requests.add(new HiveMetadataUpdateHandle(UUID.randomUUID(), new SchemaTableName(schemaName, tableName), Optional.of(partitionName), Optional.empty()));
}
return requests;
}
private boolean areFileNamesIncreasingSequentially(List<String> fileNames)
{
// Sort the filenames
fileNames.sort(Comparator.comparingInt(Integer::valueOf));
long start = 0;
for (String fileName : fileNames) {
if (!fileName.equals(String.valueOf(start))) {
return false;
}
start++;
}
return true;
}
}