TestFileFragmentResultCacheManager.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.TestingBlockEncodingSerde;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.block.BlockAssertions.createStringsBlock;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.file.Files.createTempDirectory;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
public class TestFileFragmentResultCacheManager
{
private static final String SERIALIZED_PLAN_FRAGMENT_1 = "test plan fragment 1";
private static final String SERIALIZED_PLAN_FRAGMENT_2 = "test plan fragment 2";
private static final Split SPLIT_1 = new Split(new ConnectorId("test"), new ConnectorTransactionHandle() {}, new TestingSplit(1));
private static final Split SPLIT_2 = new Split(new ConnectorId("test"), new ConnectorTransactionHandle() {}, new TestingSplit(2));
private static final long INPUT_DATA_SIZE_1 = 1000;
private static final long INPUT_DATA_SIZE_2 = 2000;
private final ExecutorService writeExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-flusher-%s"));
private final ExecutorService removalExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-remover-%s"));
private final ExecutorService multithreadingWriteExecutor = newScheduledThreadPool(10, daemonThreadsNamed("test-cache-multithreading-flusher-%s"));
@AfterClass
public void close()
throws IOException, InterruptedException
{
writeExecutor.shutdown();
removalExecutor.shutdown();
removalExecutor.awaitTermination(30, TimeUnit.SECONDS);
multithreadingWriteExecutor.shutdown();
}
private URI getNewCacheDirectory(String prefix)
throws Exception
{
return createTempDirectory(prefix).toUri();
}
private void cleanupCacheDirectory(URI cacheDirectory)
throws IOException
{
checkState(cacheDirectory != null);
File[] files = new File(cacheDirectory).listFiles();
if (files != null) {
for (File file : files) {
Files.deleteIfExists(file.toPath());
}
}
Files.deleteIfExists(new File(cacheDirectory).toPath());
}
@Test(timeOut = 30_000)
public void testBasic()
throws Exception
{
URI cacheDirectory = getNewCacheDirectory("testBasic");
FragmentCacheStats stats = new FragmentCacheStats();
FileFragmentResultCacheManager cacheManager = fileFragmentResultCacheManager(stats, cacheDirectory);
// Test fetching new fragment. Current cache status: empty
FragmentCacheResult fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1);
assertFalse(fragmentCacheResult.getPages().isPresent());
assertEquals(fragmentCacheResult.getInputDataSize(), 0);
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 0);
assertEquals(stats.getCacheEntries(), 0);
assertEquals(stats.getCacheSizeInBytes(), 0);
// Test empty page. Current cache status: empty
cacheManager.put(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1, ImmutableList.of(), INPUT_DATA_SIZE_1).get();
fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1);
Optional<Iterator<Page>> result = fragmentCacheResult.getPages();
assertTrue(result.isPresent());
assertFalse(result.get().hasNext());
assertEquals(fragmentCacheResult.getInputDataSize(), INPUT_DATA_SIZE_1);
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 1);
assertEquals(stats.getCacheEntries(), 1);
assertEquals(stats.getCacheSizeInBytes(), 0);
// Test non-empty page. Current cache status: { (plan1, split1) -> [] }
List<Page> pages = ImmutableList.of(new Page(createStringsBlock("plan-1-split-2")));
cacheManager.put(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_2, pages, INPUT_DATA_SIZE_2).get();
fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_2);
result = fragmentCacheResult.getPages();
assertTrue(result.isPresent());
assertPagesEqual(result.get(), pages.iterator());
assertEquals(fragmentCacheResult.getInputDataSize(), INPUT_DATA_SIZE_2);
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 2);
assertEquals(stats.getCacheEntries(), 2);
assertEquals(stats.getCacheSizeInBytes(), getCachePhysicalSize(cacheDirectory));
// Test cache miss for plan mismatch and split mismatch. Current cache status: { (plan1, split1) -> [], (plan2, split2) -> ["plan-1-split-2"] }
cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_2);
assertEquals(stats.getCacheMiss(), 2);
assertEquals(stats.getCacheHit(), 2);
assertEquals(stats.getCacheEntries(), 2);
cacheManager.get(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_1);
assertEquals(stats.getCacheMiss(), 3);
assertEquals(stats.getCacheHit(), 2);
assertEquals(stats.getCacheEntries(), 2);
assertEquals(stats.getCacheSizeInBytes(), getCachePhysicalSize(cacheDirectory));
// Test cache invalidation
cacheManager.invalidateAllCache();
assertEquals(stats.getCacheMiss(), 3);
assertEquals(stats.getCacheHit(), 2);
assertEquals(stats.getCacheEntries(), 0);
assertEquals(stats.getCacheRemoval(), 2);
assertEquals(stats.getCacheSizeInBytes(), 0);
cleanupCacheDirectory(cacheDirectory);
}
@Test(timeOut = 30_000)
public void testMaxCacheSize()
throws Exception
{
List<Page> pages = ImmutableList.of(new Page(createStringsBlock("plan-1-split-2")));
URI cacheDirectory = getNewCacheDirectory("testMaxCacheSize");
FragmentCacheStats stats = new FragmentCacheStats();
FileFragmentResultCacheConfig config = new FileFragmentResultCacheConfig();
config.setMaxCacheSize(new DataSize(71, DataSize.Unit.BYTE));
FileFragmentResultCacheManager cacheManager = fileFragmentResultCacheManager(stats, config, cacheDirectory);
// Put one cache entry.
cacheManager.put(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1, pages, INPUT_DATA_SIZE_1).get();
FragmentCacheResult fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1);
Optional<Iterator<Page>> result = fragmentCacheResult.getPages();
assertTrue(result.isPresent());
assertPagesEqual(result.get(), pages.iterator());
assertEquals(fragmentCacheResult.getInputDataSize(), INPUT_DATA_SIZE_1);
assertEquals(stats.getCacheMiss(), 0);
assertEquals(stats.getCacheHit(), 1);
assertEquals(stats.getCacheEntries(), 1);
assertEquals(stats.getCacheSizeInBytes(), getCachePhysicalSize(cacheDirectory));
// Trying to add another cache entry which will fail due to total size limit.
assertNull(cacheManager.put(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_2, pages, INPUT_DATA_SIZE_2).get());
fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_2);
result = fragmentCacheResult.getPages();
assertFalse(result.isPresent());
assertEquals(fragmentCacheResult.getInputDataSize(), 0);
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 1);
assertEquals(stats.getCacheEntries(), 1);
assertEquals(stats.getCacheSizeInBytes(), getCachePhysicalSize(cacheDirectory));
// Adding an empty page is fine.
cacheManager.put(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_1, ImmutableList.of(), 0).get();
fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_1);
result = fragmentCacheResult.getPages();
assertTrue(result.isPresent());
assertEquals(fragmentCacheResult.getInputDataSize(), 0);
assertFalse(result.get().hasNext());
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 2);
assertEquals(stats.getCacheEntries(), 2);
assertEquals(stats.getCacheSizeInBytes(), getCachePhysicalSize(cacheDirectory));
// Test cache invalidation
cacheManager.invalidateAllCache();
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 2);
assertEquals(stats.getCacheEntries(), 0);
assertEquals(stats.getCacheRemoval(), 2);
assertEquals(stats.getCacheSizeInBytes(), 0);
cleanupCacheDirectory(cacheDirectory);
}
private static void assertPagesEqual(Iterator<Page> pages1, Iterator<Page> pages2)
{
while (pages1.hasNext() && pages2.hasNext()) {
Page page1 = pages1.next();
Page page2 = pages2.next();
assertEquals(page1.getChannelCount(), page2.getChannelCount());
for (int i = 0; i < page1.getChannelCount(); i++) {
assertTrue(page1.getBlock(i).equals(0, 0, page2.getBlock(i), 0, 0, page1.getBlock(0).getSliceLength(0)));
}
}
assertFalse(pages1.hasNext());
assertFalse(pages2.hasNext());
}
@Test(timeOut = 30_000)
public void testThreadWrite()
throws Exception
{
URI cacheDirectory = getNewCacheDirectory("testThreadWrite");
String writeThreadNameFormat = "test write content,thread %s,%s";
FragmentCacheStats stats = new FragmentCacheStats();
FileFragmentResultCacheManager threadWriteCacheManager = fileFragmentResultCacheManager(stats, cacheDirectory);
ImmutableList.Builder<Future<Boolean>> futures = ImmutableList.builder();
for (int i = 0; i < 10; i++) {
Future<Boolean> future = multithreadingWriteExecutor.submit(() -> {
try {
String threadInfo = String.format(writeThreadNameFormat, Thread.currentThread().getName(), Thread.currentThread().getId());
List<Page> pages = ImmutableList.of(new Page(createStringsBlock(threadInfo)));
threadWriteCacheManager.put(threadInfo, SPLIT_2, pages, INPUT_DATA_SIZE_2).get();
FragmentCacheResult fragmentCacheResult = threadWriteCacheManager.get(threadInfo, SPLIT_2);
Optional<Iterator<Page>> result = fragmentCacheResult.getPages();
assertTrue(result.isPresent());
assertPagesEqual(result.get(), pages.iterator());
assertEquals(fragmentCacheResult.getInputDataSize(), INPUT_DATA_SIZE_2);
return true;
}
catch (Exception e) {
return false;
}
});
futures.add(future);
}
for (Future<Boolean> future : futures.build()) {
assertTrue(future.get(30, TimeUnit.SECONDS));
}
assertTrue(stats.getCacheSizeInBytes() > 0);
threadWriteCacheManager.invalidateAllCache();
assertEquals(stats.getCacheSizeInBytes(), 0);
cleanupCacheDirectory(cacheDirectory);
}
// Returns the total physical size in bytes for all the cache files.
private long getCachePhysicalSize(URI cacheDirectory)
{
checkState(cacheDirectory != null);
File[] files = new File(cacheDirectory).listFiles();
long physicalSize = 0;
if (files != null) {
for (File file : files) {
physicalSize += file.length();
}
}
return physicalSize;
}
private FileFragmentResultCacheManager fileFragmentResultCacheManager(FragmentCacheStats fragmentCacheStats, URI cacheDirectory)
{
return fileFragmentResultCacheManager(fragmentCacheStats, new FileFragmentResultCacheConfig(), cacheDirectory);
}
private FileFragmentResultCacheManager fileFragmentResultCacheManager(FragmentCacheStats fragmentCacheStats, FileFragmentResultCacheConfig cacheConfig, URI cacheDirectory)
{
return new FileFragmentResultCacheManager(
cacheConfig.setBaseDirectory(cacheDirectory).setInputDataStatsEnabled(true),
new TestingBlockEncodingSerde(),
fragmentCacheStats,
writeExecutor,
removalExecutor);
}
private static class TestingSplit
implements ConnectorSplit
{
private final int id;
public TestingSplit(int id)
{
this.id = id;
}
@Override
public NodeSelectionStrategy getNodeSelectionStrategy()
{
return NO_PREFERENCE;
}
@Override
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of();
}
@Override
public Object getInfo()
{
return this;
}
@Override
public Object getSplitIdentifier()
{
return id;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestingSplit that = (TestingSplit) o;
return id == that.id;
}
@Override
public int hashCode()
{
return Objects.hash(id);
}
@Override
public String toString()
{
return toStringHelper(this)
.add("id", id)
.toString();
}
}
}