TestAlluxioCachingFileSystem.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.cache.alluxio;

import alluxio.client.file.cache.CacheManager;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.util.io.FileUtils;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.hive.CacheQuota;
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.cache.CacheType.ALLUXIO;
import static com.facebook.presto.cache.TestingCacheUtils.stressTest;
import static com.facebook.presto.cache.TestingCacheUtils.validateBuffer;
import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS;
import static com.facebook.presto.hive.CacheQuotaScope.TABLE;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.nio.file.Files.createTempDirectory;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestAlluxioCachingFileSystem
{
    private static final int DATA_LENGTH = (int) new DataSize(20, KILOBYTE).toBytes();
    private static final int PAGE_SIZE = (int) new DataSize(1, KILOBYTE).toBytes();
    private final byte[] data = new byte[DATA_LENGTH];
    private URI cacheDirectory;
    private String testFilePath;
    private long lastModifiedTime;
    private Map<String, Long> baseline = new HashMap<>();

    @BeforeClass
    public void setup()
            throws IOException
    {
        new Random().nextBytes(data);
        this.cacheDirectory = createTempDirectory("alluxio_cache").toUri();
    }

    @AfterClass
    public void close()
            throws IOException
    {
        checkState(cacheDirectory != null);
        FileUtils.deletePathRecursively(cacheDirectory.getPath());
    }

    @BeforeMethod
    public void setupMethod()
    {
        // This path is only used for in memory stream without file materialized.
        testFilePath = String.format("/test/file_%d", new Random().nextLong());
        lastModifiedTime = 0;
        resetBaseline();
    }

    @AfterMethod(alwaysRun = true)
    public void tearDown()
            throws Exception
    {
        // Cleanup CacheManager singleton to prevent state leftover across tests
        resetCacheManager();
    }

    @Test(timeOut = 30_000)
    public void testBasicWithValidationDisabled()
            throws Exception
    {
        testBasic(false);
    }

    private void testBasic(boolean validationEnabled)
            throws Exception
    {
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(cacheDirectory)
                .setValidationEnabled(validationEnabled);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);
        Path p = new Path("/tmp");
        assertEquals(fileSystem.getDefaultBlockSize(p), 1024L);
        assertEquals(fileSystem.getDefaultReplication(p), 10);

        byte[] buffer = new byte[PAGE_SIZE * 2];
        int pageOffset = PAGE_SIZE;

        // new read
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + 10, buffer, 0, 100);

        // read within the cached page
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + 20, buffer, 0, 90);

        // read partially after the range of the cache
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + PAGE_SIZE - 10, buffer, 0, 100), 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 10);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + PAGE_SIZE - 10, buffer, 0, 100);

        // read partially before the range of the cache
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset - 10, buffer, 10, 50), 50);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 40);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 10);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset - 10, buffer, 10, 50);

        // skip one page
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + PAGE_SIZE * 3, buffer, 40, 50), 50);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 50);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + PAGE_SIZE * 3, buffer, 40, 50);

        // read between cached pages
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + PAGE_SIZE * 2 - 10, buffer, 400, PAGE_SIZE + 20), PAGE_SIZE + 20);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 20);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + PAGE_SIZE * 2 - 10, buffer, 400, PAGE_SIZE + 20);
    }

    @Test(timeOut = 30_000)
    public void testCacheRefreshAfterFileChanged()
            throws Exception
    {
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(cacheDirectory)
                .setValidationEnabled(false)
                .setLastModifiedTimeCheckEnabled(true);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);
        Path p = new Path("/tmp");

        byte[] buffer = new byte[PAGE_SIZE * 2];
        int pageOffset = PAGE_SIZE;

        // new read
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + 10, buffer, 0, 100);

        // read from cache
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + 20, buffer, 0, 90);

        //file updated
        lastModifiedTime = 100;

        //read from external as new read
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + 10, buffer, 0, 100);

        // read from cache
        resetBaseline();
        assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, pageOffset + 20, buffer, 0, 90);
    }

    @Test(invocationCount = 10)
    public void testStress()
            throws ExecutionException, InterruptedException, URISyntaxException, IOException
    {
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(cacheDirectory);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig()
                .setMaxCacheSize(new DataSize(10, KILOBYTE));
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration, cacheConfig);
        stressTest(data, (position, buffer, offset, length) -> {
            try {
                readFully(cachingFileSystem, position, buffer, offset, length);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    @Test(timeOut = 30_000, expectedExceptions = {IOException.class})
    public void testSyncRestoreFailure()
            throws Exception
    {
        URI badCacheDirectory = createTempDirectory("alluxio_cache_bad").toUri();
        File cacheDirectory = new File(badCacheDirectory.getPath());
        cacheDirectory.setWritable(false);
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(badCacheDirectory);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        try {
            cachingFileSystem(configuration, cacheConfig);
        }
        finally {
            cacheDirectory.setWritable(true);
        }
    }

    @Test(timeOut = 30_000)
    public void testBasicReadWithAsyncRestoreFailure()
            throws Exception
    {
        File cacheDirectory = new File(this.cacheDirectory.getPath());
        cacheDirectory.setWritable(false);
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(this.cacheDirectory);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        configuration.set("alluxio.user.client.cache.async.restore.enabled", String.valueOf(true));
        try {
            AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);
            long state = MetricsSystem.counter(MetricKey.CLIENT_CACHE_STATE.getName()).getCount();
            assertTrue(state == CacheManager.State.READ_ONLY.getValue() || state == CacheManager.State.NOT_IN_USE.getValue());
            // different cases of read can still proceed even cache is read-only or not-in-use
            byte[] buffer = new byte[PAGE_SIZE * 2];
            int pageOffset = PAGE_SIZE;
            // new read
            resetBaseline();
            assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
            validateBuffer(data, pageOffset + 10, buffer, 0, 100);

            // read within the cached page
            resetBaseline();
            assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 90);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
            validateBuffer(data, pageOffset + 20, buffer, 0, 90);

            // read partially after the range of the cache
            resetBaseline();
            assertEquals(readFully(fileSystem, pageOffset + PAGE_SIZE - 10, buffer, 0, 100), 100);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 2 * PAGE_SIZE);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
            validateBuffer(data, pageOffset + PAGE_SIZE - 10, buffer, 0, 100);

            // read partially before the range of the cache
            resetBaseline();
            assertEquals(readFully(fileSystem, pageOffset - 10, buffer, 10, 50), 50);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 50);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 2 * PAGE_SIZE);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
            validateBuffer(data, pageOffset - 10, buffer, 10, 50);

            // skip one page
            resetBaseline();
            assertEquals(readFully(fileSystem, pageOffset + PAGE_SIZE * 3, buffer, 40, 50), 50);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 50);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
            validateBuffer(data, pageOffset + PAGE_SIZE * 3, buffer, 40, 50);

            // read between cached pages
            resetBaseline();
            assertEquals(readFully(fileSystem, pageOffset + PAGE_SIZE * 2 - 10, buffer, 400, PAGE_SIZE + 20), PAGE_SIZE + 20);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, PAGE_SIZE + 20);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 3 * PAGE_SIZE);
            checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
            validateBuffer(data, pageOffset + PAGE_SIZE * 2 - 10, buffer, 400, PAGE_SIZE + 20);

            state = MetricsSystem.counter(MetricKey.CLIENT_CACHE_STATE.getName()).getCount();
            assertTrue(state == CacheManager.State.READ_ONLY.getValue() || state == CacheManager.State.NOT_IN_USE.getValue());
        }
        finally {
            cacheDirectory.setWritable(true);
        }
    }

    @Test(timeOut = 30_000)
    public void testQuotaBasics()
            throws Exception
    {
        DataSize quotaSize = DataSize.succinctDataSize(1, KILOBYTE);
        CacheQuota cacheQuota = new CacheQuota("test.table", Optional.of(quotaSize));
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(cacheDirectory)
                .setValidationEnabled(false)
                .setCacheQuotaScope(TABLE);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig().setCacheQuotaEnabled(true);
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);

        byte[] buffer = new byte[10240];

        // read within the cache quota
        resetBaseline();
        assertEquals(readFully(fileSystem, cacheQuota, 42, buffer, 0, 100), 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, 42, buffer, 0, 100);

        // read beyond cache quota
        resetBaseline();
        assertEquals(readFully(fileSystem, cacheQuota, 47, buffer, 0, 9000), 9000);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, PAGE_SIZE - 47);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 9000 - PAGE_SIZE + 47);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, (9000 / PAGE_SIZE) * PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, (9000 / PAGE_SIZE) * PAGE_SIZE);
        validateBuffer(data, 47, buffer, 0, 9000);
    }

    @Test(timeOut = 30_000)
    public void testQuotaUpdated()
            throws Exception
    {
        CacheQuota smallCacheQuota = new CacheQuota("test.table", Optional.of(DataSize.succinctDataSize(1, KILOBYTE)));
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setBaseDirectory(cacheDirectory)
                .setValidationEnabled(false)
                .setCacheQuotaScope(TABLE);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig().setCacheQuotaEnabled(true);
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);

        byte[] buffer = new byte[10240];

        // read beyond the small cache quota
        resetBaseline();
        assertEquals(readFully(fileSystem, smallCacheQuota, 0, buffer, 0, 9000), 9000);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 9000);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, (9000 / PAGE_SIZE + 1) * PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, (9000 / PAGE_SIZE) * PAGE_SIZE);
        validateBuffer(data, 0, buffer, 0, 9000);

        // read again within an updated larger cache quota
        CacheQuota largeCacheQuota = new CacheQuota("test.table", Optional.of(DataSize.succinctDataSize(10, KILOBYTE)));
        resetBaseline();
        assertEquals(readFully(fileSystem, largeCacheQuota, 0, buffer, 0, 9000), 9000);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 9000 - (9000 / PAGE_SIZE) * PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, (9000 / PAGE_SIZE) * PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, (9000 / PAGE_SIZE) * PAGE_SIZE);
        checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
        validateBuffer(data, 0, buffer, 0, 9000);
    }

    @Test(invocationCount = 10)
    public void testStressWithQuota()
            throws ExecutionException, InterruptedException, URISyntaxException, IOException
    {
        CacheQuota cacheQuota = new CacheQuota("test.table", Optional.of(DataSize.succinctDataSize(5, KILOBYTE)));
        CacheConfig cacheConfig = new CacheConfig()
                .setCacheType(ALLUXIO)
                .setCachingEnabled(true)
                .setValidationEnabled(false)
                .setBaseDirectory(cacheDirectory)
                .setCacheQuotaScope(TABLE);
        AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig()
                .setMaxCacheSize(new DataSize(10, KILOBYTE))
                .setCacheQuotaEnabled(true);
        Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
        AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration, cacheConfig);
        stressTest(data, (position, buffer, offset, length) -> {
            try {
                readFully(cachingFileSystem, cacheQuota, position, buffer, offset, length);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    @Test(timeOut = 30_000)
    public void testInitialization()
            throws Exception
    {
        int pageSize = (int) new DataSize(8, KILOBYTE).toBytes();
        int maxCacheSize = (int) new DataSize(512, MEGABYTE).toBytes();
        String jmxClass = "alluxio.metrics.sink.JmxSink";
        String metricsDomain = "com.facebook.alluxio";

        Configuration configuration = new Configuration();
        configuration.set("alluxio.user.local.cache.enabled", "true");
        configuration.set("alluxio.user.client.cache.dirs", cacheDirectory.getPath());
        configuration.set("alluxio.user.client.cache.page.size", Integer.toString(pageSize));
        configuration.set("alluxio.user.client.cache.size", Integer.toString(maxCacheSize));
        configuration.set("sink.jmx.class", jmxClass);
        configuration.set("sink.jmx.domain", metricsDomain);

        AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, new CacheConfig());
        Configuration conf = fileSystem.getConf();
        assertTrue(conf.getBoolean("alluxio.user.local.cache.enabled", false));
        assertEquals(cacheDirectory.getPath(), conf.get("alluxio.user.client.cache.dirs", "bad result"));
        assertEquals(pageSize, conf.getInt("alluxio.user.client.cache.page.size", 0));
        assertEquals(maxCacheSize, conf.getInt("alluxio.user.client.cache.size", 0));
        assertEquals(jmxClass, conf.get("sink.jmx.class", "bad result"));
        assertEquals(metricsDomain, conf.get("sink.jmx.domain", "bad result"));
    }

    // TODO: update unit tests after CacheManager.reset() is available to avoid using reflection to modify singleton
    private void resetCacheManager()
            throws Exception
    {
        Field field = CacheManager.Factory.class.getDeclaredField("CACHE_MANAGER");
        field.setAccessible(true);
        AtomicReference<CacheManager> managerReference = (AtomicReference<CacheManager>) field.get(null);
        if (managerReference != null) {
            CacheManager manager = managerReference.getAndSet(null);
            if (manager != null) {
                manager.close();
            }
        }
    }

    private void resetBaseline()
    {
        updateBaseline(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE);
        updateBaseline(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL);
        updateBaseline(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL);
        updateBaseline(MetricKey.CLIENT_CACHE_BYTES_EVICTED);
    }

    private void updateBaseline(MetricKey metricsKey)
    {
        baseline.put(metricsKey.getName(), MetricsSystem.meter(metricsKey.getName()).getCount());
    }

    private void checkMetrics(MetricKey metricsKey, long expected)
    {
        assertEquals(MetricsSystem.meter(metricsKey.getName()).getCount() - baseline.getOrDefault(metricsKey.getName(), 0L), expected);
    }

    private AlluxioCachingFileSystem cachingFileSystem(Configuration configuration, CacheConfig cacheConfig)
            throws URISyntaxException, IOException
    {
        Map<Path, byte[]> files = new HashMap<>();
        files.put(new Path(testFilePath), data);
        ExtendedFileSystem testingFileSystem = new TestingFileSystem(files, configuration);
        URI uri = new URI("alluxio://test:8020/");
        AlluxioCachingFileSystem cachingFileSystem = new AlluxioCachingFileSystem(testingFileSystem, uri,
                cacheConfig.isValidationEnabled(), cacheConfig.isLastModifiedTimeCheckEnabled());
        cachingFileSystem.initialize(uri, configuration);
        return cachingFileSystem;
    }

    private Configuration getHdfsConfiguration(CacheConfig cacheConfig, AlluxioCacheConfig alluxioCacheConfig)
    {
        AlluxioCachingConfigurationProvider provider = new AlluxioCachingConfigurationProvider(cacheConfig, alluxioCacheConfig);
        Configuration configuration = new Configuration();
        provider.updateConfiguration(configuration, null /* ignored */, null /* ignored */);
        if (cacheConfig.isCachingEnabled() && cacheConfig.getCacheType() == ALLUXIO) {
            // we don't have corresponding Presto properties for these two, set them manually
            configuration.set("alluxio.user.client.cache.page.size", Integer.toString(PAGE_SIZE));
            configuration.set("alluxio.user.client.cache.async.restore.enabled", String.valueOf(false));
        }
        return configuration;
    }

    private int readFully(AlluxioCachingFileSystem fileSystem, long position, byte[] buffer, int offset, int length)
            throws Exception
    {
        return readFully(fileSystem, NO_CACHE_CONSTRAINTS, position, buffer, offset, length);
    }

    private int readFully(AlluxioCachingFileSystem fileSystem, CacheQuota quota, long position, byte[] buffer, int offset, int length)
            throws Exception
    {
        try (FSDataInputStream stream = fileSystem.openFile(
                new Path(testFilePath),
                new HiveFileContext(
                        true,
                        quota,
                        Optional.empty(),
                        OptionalLong.of(DATA_LENGTH),
                        OptionalLong.of(offset),
                        OptionalLong.of(length),
                        lastModifiedTime,
                        false))) {
            return stream.read(position, buffer, offset, length);
        }
    }

    private static class TestingFileSystem
            extends ExtendedFileSystem
    {
        private final Map<Path, byte[]> files;

        TestingFileSystem(Map<Path, byte[]> files, Configuration configuration)
        {
            this.files = files;
            setConf(configuration);
        }

        @Override
        public FileStatus getFileStatus(Path path)
        {
            if (files.containsKey(path)) {
                return generateURIStatus(path, files.get(path).length);
            }
            return null;
        }

        private FileStatus generateURIStatus(Path path, int length)
        {
            return new FileStatus(length, false, 1, 512, 0, 0, null, null, null, path);
        }

        @Override
        public URI getUri()
        {
            return null;
        }

        @Override
        public FSDataInputStream open(Path path, int bufferSize)
        {
            return new ByteArrayDataInputStream(files.get(path));
        }

        @Override
        public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
        {
            return null;
        }

        @Override
        public FSDataOutputStream append(Path path, int bufferSize, Progressable progress)
        {
            return null;
        }

        @Override
        public boolean rename(Path source, Path destination)
        {
            return false;
        }

        @Override
        public boolean delete(Path path, boolean recursive)
        {
            return false;
        }

        @Override
        public FileStatus[] listStatus(Path path)
        {
            return new FileStatus[0];
        }

        @Override
        public void setWorkingDirectory(Path directory)
        {
        }

        @Override
        public Path getWorkingDirectory()
        {
            return null;
        }

        @Override
        public boolean mkdirs(Path path, FsPermission permission)
        {
            return false;
        }

        @Override
        public short getDefaultReplication()
        {
            throw new UnsupportedOperationException("getDefaultReplication not implemented");
        }

        @Override
        public short getDefaultReplication(Path path)
        {
            return 10;
        }

        @Override
        public long getDefaultBlockSize()
        {
            throw new UnsupportedOperationException("getDefaultBlockSize not implemented");
        }

        @Override
        public long getDefaultBlockSize(Path path)
        {
            return 1024L;
        }

        private static class ByteArrayDataInputStream
                extends FSDataInputStream
        {
            public ByteArrayDataInputStream(byte[] bytes)
            {
                super(new ByteArraySeekableStream(bytes));
            }
        }
    }
}