AlluxioCachingFileSystem.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.cache.alluxio;

import alluxio.client.file.CacheContext;
import alluxio.client.file.URIStatus;
import alluxio.hadoop.LocalCacheFileSystem;
import alluxio.wire.FileInfo;
import com.facebook.presto.cache.CachingFileSystem;
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;

import static alluxio.conf.PropertyKey.USER_CLIENT_CACHE_QUOTA_ENABLED;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.hash.Hashing.sha256;
import static java.nio.charset.StandardCharsets.UTF_8;

public class AlluxioCachingFileSystem
        extends CachingFileSystem
{
    private static final int BUFFER_SIZE = 65536;
    private final boolean cacheValidationEnabled;
    private final boolean lastModifiedTimeCheckEnabled;
    private boolean cacheQuotaEnabled;
    private LocalCacheFileSystem localCacheFileSystem;

    public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri)
    {
        this(dataTier, uri, false, false);
    }

    public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri, boolean cacheValidationEnabled, boolean lastModifiedTimeCheckEnabled)
    {
        super(dataTier, uri);
        this.cacheValidationEnabled = cacheValidationEnabled;
        this.lastModifiedTimeCheckEnabled = lastModifiedTimeCheckEnabled;
    }

    @Override
    public synchronized void initialize(URI uri, Configuration configuration)
            throws IOException
    {
        this.localCacheFileSystem = new LocalCacheFileSystem(dataTier, uriStatus -> {
            // CacheContext is the mechanism to pass the hiveFileContext to the source filesystem
            // hiveFileContext is critical to use to open file.
            CacheContext cacheContext = uriStatus.getCacheContext();
            checkState(cacheContext instanceof PrestoCacheContext);
            HiveFileContext hiveFileContext = ((PrestoCacheContext) cacheContext).getHiveFileContext();
            try {
                return dataTier.openFile(new Path(uriStatus.getPath()), hiveFileContext);
            }
            catch (Exception e) {
                throw new IOException("Failed to open file", e);
            }
        });
        this.cacheQuotaEnabled = configuration.getBoolean(USER_CLIENT_CACHE_QUOTA_ENABLED.getName(), false);
        localCacheFileSystem.initialize(uri, configuration);
    }

    @Override
    public FSDataInputStream openFile(Path path, HiveFileContext hiveFileContext)
            throws Exception
    {
        // Using Alluxio caching requires knowing file size for now
        if (hiveFileContext.isCacheable() && hiveFileContext.getFileSize().isPresent()) {
            // FilePath is a unique identifier for a file, however it can be a long string
            // hence using sha256 hash of the file path as the identifier in the cache.
            // We don't set fileId because fileId is Alluxio specific
            FileInfo info = new FileInfo()
                    .setLastModificationTimeMs(hiveFileContext.getModificationTime())
                    .setPath(path.toString())
                    .setFolder(false)
                    .setLength(hiveFileContext.getFileSize().getAsLong());
            String cacheIdentifier = sha256().hashString(path.toString(), UTF_8).toString();
            if (lastModifiedTimeCheckEnabled) {
                cacheIdentifier = sha256().hashString(cacheIdentifier + hiveFileContext.getModificationTime(), UTF_8).toString();
            }
            // CacheContext is the mechanism to pass the cache related context to the source filesystem
            CacheContext cacheContext = PrestoCacheContext.build(cacheIdentifier, hiveFileContext, cacheQuotaEnabled);
            URIStatus uriStatus = new URIStatus(info, cacheContext);
            FSDataInputStream cachingInputStream = localCacheFileSystem.open(uriStatus, BUFFER_SIZE);
            if (cacheValidationEnabled) {
                return new CacheValidatingInputStream(
                        cachingInputStream, dataTier.openFile(path, hiveFileContext));
            }
            return cachingInputStream;
        }
        return dataTier.openFile(path, hiveFileContext);
    }
}