TestFileMergeCacheManager.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cache.filemerge;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.cache.CacheManager;
import com.facebook.presto.cache.CacheStats;
import com.facebook.presto.cache.FileReadRequest;
import com.facebook.presto.hive.CacheQuota;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.file.Files;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.cache.TestingCacheUtils.stressTest;
import static com.facebook.presto.cache.TestingCacheUtils.validateBuffer;
import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public class TestFileMergeCacheManager
{
private static final int DATA_LENGTH = (int) new DataSize(20, KILOBYTE).toBytes();
private final byte[] data = new byte[DATA_LENGTH];
private final ExecutorService flushExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-flusher-%s"));
private final ExecutorService removeExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-remover-%s"));
private final ScheduledExecutorService cacheSizeCalculator = newScheduledThreadPool(1, daemonThreadsNamed("hive-cache-size-calculator-%s"));
private URI cacheDirectory;
private URI fileDirectory;
private File dataFile;
@BeforeClass
public void setup()
throws IOException
{
new Random().nextBytes(data);
this.cacheDirectory = createTempDirectory("cache").toUri();
this.fileDirectory = createTempDirectory("file").toUri();
this.dataFile = new File(fileDirectory.getPath() + "/data");
Files.write((new File(dataFile.toString())).toPath(), data, CREATE_NEW);
}
@AfterClass
public void close()
throws IOException
{
flushExecutor.shutdown();
removeExecutor.shutdown();
checkState(cacheDirectory != null);
checkState(fileDirectory != null);
Files.deleteIfExists(dataFile.toPath());
File[] files = new File(cacheDirectory).listFiles();
if (files != null) {
for (File file : files) {
Files.delete(file.toPath());
}
}
Files.deleteIfExists(new File(cacheDirectory).toPath());
Files.deleteIfExists(new File(fileDirectory).toPath());
}
@Test(timeOut = 30_000)
public void testBasic()
throws InterruptedException, ExecutionException, IOException
{
TestingCacheStats stats = new TestingCacheStats();
CacheManager cacheManager = fileMergeCacheManager(stats);
byte[] buffer = new byte[1024];
// new read
assertFalse(readFully(cacheManager, NO_CACHE_CONSTRAINTS, 42, buffer, 0, 100));
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 0);
stats.trigger();
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 42, buffer, 0, 100);
// within the range of the cache
assertTrue(readFully(cacheManager, NO_CACHE_CONSTRAINTS, 47, buffer, 0, 90));
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 1);
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 47, buffer, 0, 90);
// partially within the range of the cache
assertFalse(readFully(cacheManager, NO_CACHE_CONSTRAINTS, 52, buffer, 0, 100));
assertEquals(stats.getCacheMiss(), 2);
assertEquals(stats.getCacheHit(), 1);
stats.trigger();
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 52, buffer, 0, 100);
// partially within the range of the cache
assertFalse(readFully(cacheManager, NO_CACHE_CONSTRAINTS, 32, buffer, 10, 50));
assertEquals(stats.getCacheMiss(), 3);
assertEquals(stats.getCacheHit(), 1);
stats.trigger();
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 32, buffer, 10, 50);
// create a hole within two caches
assertFalse(readFully(cacheManager, NO_CACHE_CONSTRAINTS, 200, buffer, 40, 50));
assertEquals(stats.getCacheMiss(), 4);
assertEquals(stats.getCacheHit(), 1);
stats.trigger();
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 200, buffer, 40, 50);
// use a range to cover the hole
assertFalse(readFully(cacheManager, NO_CACHE_CONSTRAINTS, 40, buffer, 400, 200));
assertEquals(stats.getCacheMiss(), 5);
assertEquals(stats.getCacheHit(), 1);
stats.trigger();
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 40, buffer, 400, 200);
}
@Test(invocationCount = 10)
public void testStress()
throws ExecutionException, InterruptedException
{
CacheConfig cacheConfig = new CacheConfig().setBaseDirectory(cacheDirectory);
FileMergeCacheConfig fileMergeCacheConfig = new FileMergeCacheConfig().setCacheTtl(new Duration(10, MILLISECONDS));
CacheManager cacheManager = fileMergeCacheManager(cacheConfig, fileMergeCacheConfig);
stressTest(data, (position, buffer, offset, length) -> readFully(cacheManager, NO_CACHE_CONSTRAINTS, position, buffer, offset, length));
}
@Test(timeOut = 30_000)
public void testQuota()
throws InterruptedException, ExecutionException, IOException
{
TestingCacheStats stats = new TestingCacheStats();
CacheManager cacheManager = fileMergeCacheManager(stats);
byte[] buffer = new byte[10240];
CacheQuota cacheQuota = new CacheQuota("test.table", Optional.of(DataSize.succinctDataSize(1, KILOBYTE)));
// read within the cache quota
assertFalse(readFully(cacheManager, cacheQuota, 42, buffer, 0, 100));
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 0);
assertEquals(stats.getQuotaExceed(), 0);
stats.trigger();
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 42, buffer, 0, 100);
// read beyond cache quota
assertFalse(readFully(cacheManager, cacheQuota, 47, buffer, 0, 9000));
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 0);
assertEquals(stats.getQuotaExceed(), 1);
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 47, buffer, 0, 90);
// previous data won't be evicted if last read exceed quota
assertTrue(readFully(cacheManager, cacheQuota, 47, buffer, 0, 90));
assertEquals(stats.getCacheMiss(), 1);
assertEquals(stats.getCacheHit(), 1);
assertEquals(stats.getQuotaExceed(), 1);
assertEquals(stats.getInMemoryRetainedBytes(), 0);
validateBuffer(data, 47, buffer, 0, 90);
}
private CacheManager fileMergeCacheManager(CacheConfig cacheConfig, FileMergeCacheConfig fileMergeCacheConfig)
{
return new FileMergeCacheManager(cacheConfig, fileMergeCacheConfig, new CacheStats(), flushExecutor, removeExecutor, cacheSizeCalculator);
}
private CacheManager fileMergeCacheManager(CacheStats cacheStats)
{
CacheConfig cacheConfig = new CacheConfig();
FileMergeCacheConfig fileMergeCacheConfig = new FileMergeCacheConfig();
return new FileMergeCacheManager(cacheConfig.setBaseDirectory(cacheDirectory), fileMergeCacheConfig, cacheStats, flushExecutor, removeExecutor, cacheSizeCalculator);
}
private boolean readFully(CacheManager cacheManager, CacheQuota cacheQuota, long position, byte[] buffer, int offset, int length)
throws IOException
{
FileReadRequest key = new FileReadRequest(new Path(dataFile.getAbsolutePath()), position, length);
switch (cacheManager.get(key, buffer, offset, cacheQuota)) {
case HIT:
return true;
case MISS:
RandomAccessFile file = new RandomAccessFile(dataFile.getAbsolutePath(), "r");
file.seek(position);
file.readFully(buffer, offset, length);
file.close();
cacheManager.put(key, wrappedBuffer(buffer, offset, length), NO_CACHE_CONSTRAINTS);
return false;
case CACHE_QUOTA_EXCEED:
default:
return false;
}
}
private static class TestingCacheStats
extends CacheStats
{
private SettableFuture<?> trigger;
public TestingCacheStats()
{
this.trigger = SettableFuture.create();
}
@Override
public void addInMemoryRetainedBytes(long bytes)
{
super.addInMemoryRetainedBytes(bytes);
if (bytes < 0) {
trigger.set(null);
}
}
public void trigger()
throws InterruptedException, ExecutionException
{
trigger.get();
trigger = SettableFuture.create();
}
}
}