HadoopExtendedFileSystem.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import static com.google.common.util.concurrent.Futures.immediateFuture;
public class HadoopExtendedFileSystem
extends ExtendedFileSystem
{
private FileSystem fs;
private String swapScheme;
public HadoopExtendedFileSystem(FileSystem fs)
{
this.fs = fs;
this.statistics = fs.statistics;
}
@Override
public void initialize(URI name, Configuration conf)
throws IOException
{
super.initialize(name, conf);
// this is less than ideal, but existing filesystems sometimes neglect
// to initialize the embedded filesystem
if (fs.getConf() == null) {
fs.initialize(name, conf);
}
String scheme = name.getScheme();
if (!scheme.equals(fs.getUri().getScheme())) {
swapScheme = scheme;
}
}
public FileSystem getRawFileSystem()
{
return fs;
}
@Override
public String getScheme()
{
return fs.getScheme();
}
@Override
public URI getUri()
{
return fs.getUri();
}
@Override
protected URI getCanonicalUri()
{
return fs.getCanonicalUri();
}
@Override
protected URI canonicalizeUri(URI uri)
{
return fs.canonicalizeUri(uri);
}
@Override
public Path makeQualified(Path path)
{
Path fqPath = fs.makeQualified(path);
// swap in our scheme if the filtered fs is using a different scheme
if (swapScheme != null) {
try {
// NOTE: should deal with authority, but too much other stuff is broken
fqPath = new Path(new URI(swapScheme, fqPath.toUri().getSchemeSpecificPart(), null));
}
catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
return fqPath;
}
@Override
protected void checkPath(Path path)
{
fs.checkPath(path);
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len)
throws IOException
{
return fs.getFileBlockLocations(file, start, len);
}
@Override
public Path resolvePath(final Path p)
throws IOException
{
return fs.resolvePath(p);
}
@Override
public FSDataInputStream open(Path f, int bufferSize)
throws IOException
{
return fs.open(f, bufferSize);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress)
throws IOException
{
return fs.append(f, bufferSize, progress);
}
@Override
public void concat(Path f, Path[] psrcs)
throws IOException
{
fs.concat(f, psrcs);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress)
throws IOException
{
return fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress,
Options.ChecksumOpt checksumOpt)
throws IOException
{
return fs.create(f, permission, flags, bufferSize, replication, blockSize, progress, checksumOpt);
}
@Override
@Deprecated
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress)
throws IOException
{
return fs.createNonRecursive(f, permission, flags, bufferSize, replication, blockSize, progress);
}
@Override
public boolean setReplication(Path src, short replication)
throws IOException
{
return fs.setReplication(src, replication);
}
@Override
public boolean rename(Path src, Path dst)
throws IOException
{
return fs.rename(src, dst);
}
@Override
public ListenableFuture<Void> renameFileAsync(Path src, Path dst)
throws IOException
{
fs.rename(src, dst);
return immediateFuture(null);
}
@Override
protected void rename(Path src, Path dst, Options.Rename... options)
throws IOException
{
fs.rename(src, dst, options);
}
@Override
public boolean truncate(Path f, final long newLength)
throws IOException
{
return fs.truncate(f, newLength);
}
@Override
public boolean delete(Path f, boolean recursive)
throws IOException
{
return fs.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f)
throws IOException
{
return fs.listStatus(f);
}
@Override
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
throws IOException
{
return fs.listCorruptFileBlocks(path);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException
{
return fs.listLocatedStatus(f);
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(Path f)
throws IOException
{
return fs.listStatusIterator(f);
}
@Override
public Path getHomeDirectory()
{
return fs.getHomeDirectory();
}
@Override
public void setWorkingDirectory(Path newDir)
{
fs.setWorkingDirectory(newDir);
}
@Override
public Path getWorkingDirectory()
{
return fs.getWorkingDirectory();
}
@Override
protected Path getInitialWorkingDirectory()
{
return fs.getInitialWorkingDirectory();
}
@Override
public FsStatus getStatus(Path p)
throws IOException
{
return fs.getStatus(p);
}
@Override
public boolean mkdirs(Path f, FsPermission permission)
throws IOException
{
return fs.mkdirs(f, permission);
}
@Override
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException
{
fs.copyFromLocalFile(delSrc, src, dst);
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path[] srcs, Path dst)
throws IOException
{
fs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException
{
fs.copyFromLocalFile(delSrc, overwrite, src, dst);
}
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst)
throws IOException
{
fs.copyToLocalFile(delSrc, src, dst);
}
@Override
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException
{
return fs.startLocalOutput(fsOutputFile, tmpLocalFile);
}
@Override
public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException
{
fs.completeLocalOutput(fsOutputFile, tmpLocalFile);
}
@Override
public long getUsed()
throws IOException
{
return fs.getUsed();
}
@Override
public long getDefaultBlockSize()
{
return fs.getDefaultBlockSize();
}
@Override
public short getDefaultReplication()
{
return fs.getDefaultReplication();
}
@Override
public FsServerDefaults getServerDefaults()
throws IOException
{
return fs.getServerDefaults();
}
// path variants delegate to underlying filesystem
@Override
public long getDefaultBlockSize(Path f)
{
return fs.getDefaultBlockSize(f);
}
@Override
public short getDefaultReplication(Path f)
{
return fs.getDefaultReplication(f);
}
@Override
public FsServerDefaults getServerDefaults(Path f)
throws IOException
{
return fs.getServerDefaults(f);
}
@Override
public FileStatus getFileStatus(Path f)
throws IOException
{
return fs.getFileStatus(f);
}
@Override
public void access(Path path, FsAction mode)
throws IOException
{
fs.access(path, mode);
}
public void createSymlink(final Path target, final Path link,
final boolean createParent)
throws IOException
{
fs.createSymlink(target, link, createParent);
}
public FileStatus getFileLinkStatus(final Path f)
throws IOException
{
return fs.getFileLinkStatus(f);
}
public boolean supportsSymlinks()
{
return fs.supportsSymlinks();
}
public Path getLinkTarget(Path f)
throws IOException
{
return fs.getLinkTarget(f);
}
protected Path resolveLink(Path f)
throws IOException
{
return fs.resolveLink(f);
}
@Override
public FileChecksum getFileChecksum(Path f)
throws IOException
{
return fs.getFileChecksum(f);
}
@Override
public FileChecksum getFileChecksum(Path f, long length)
throws IOException
{
return fs.getFileChecksum(f, length);
}
@Override
public void setVerifyChecksum(boolean verifyChecksum)
{
fs.setVerifyChecksum(verifyChecksum);
}
@Override
public void setWriteChecksum(boolean writeChecksum)
{
fs.setWriteChecksum(writeChecksum);
}
@Override
public Configuration getConf()
{
return fs.getConf();
}
@Override
public void close()
throws IOException
{
super.close();
fs.close();
}
@Override
public void setOwner(Path path, String user, String group)
throws IOException
{
fs.setOwner(path, user, group);
}
@Override
public void setTimes(Path p, long mtime, long atime)
throws IOException
{
fs.setTimes(p, mtime, atime);
}
@Override
public void setPermission(Path p, FsPermission permission)
throws IOException
{
fs.setPermission(p, permission);
}
@Override
protected FSDataOutputStream primitiveCreate(Path f,
FsPermission absolutePermission, EnumSet<CreateFlag> flag,
int bufferSize, short replication, long blockSize,
Progressable progress, Options.ChecksumOpt checksumOpt)
throws IOException
{
return fs.primitiveCreate(f, absolutePermission, flag,
bufferSize, replication, blockSize, progress, checksumOpt);
}
@Override
@SuppressWarnings("deprecation")
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
throws IOException
{
return fs.primitiveMkdir(f, absolutePermission);
}
@Override // FileSystem
public FileSystem[] getChildFileSystems()
{
return new FileSystem[] {fs};
}
@Override // FileSystem
public Path createSnapshot(Path path, String snapshotName)
throws IOException
{
return fs.createSnapshot(path, snapshotName);
}
@Override // FileSystem
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName)
throws IOException
{
fs.renameSnapshot(path, snapshotOldName, snapshotNewName);
}
@Override // FileSystem
public void deleteSnapshot(Path path, String snapshotName)
throws IOException
{
fs.deleteSnapshot(path, snapshotName);
}
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException
{
fs.modifyAclEntries(path, aclSpec);
}
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException
{
fs.removeAclEntries(path, aclSpec);
}
@Override
public void removeDefaultAcl(Path path)
throws IOException
{
fs.removeDefaultAcl(path);
}
@Override
public void removeAcl(Path path)
throws IOException
{
fs.removeAcl(path);
}
@Override
public void setAcl(Path path, List<AclEntry> aclSpec)
throws IOException
{
fs.setAcl(path, aclSpec);
}
@Override
public AclStatus getAclStatus(Path path)
throws IOException
{
return fs.getAclStatus(path);
}
@Override
public void setXAttr(Path path, String name, byte[] value)
throws IOException
{
fs.setXAttr(path, name, value);
}
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag)
throws IOException
{
fs.setXAttr(path, name, value, flag);
}
@Override
public byte[] getXAttr(Path path, String name)
throws IOException
{
return fs.getXAttr(path, name);
}
@Override
public Map<String, byte[]> getXAttrs(Path path)
throws IOException
{
return fs.getXAttrs(path);
}
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException
{
return fs.getXAttrs(path, names);
}
@Override
public List<String> listXAttrs(Path path)
throws IOException
{
return fs.listXAttrs(path);
}
@Override
public void removeXAttr(Path path, String name)
throws IOException
{
fs.removeXAttr(path, name);
}
@Override
public RemoteIterator<LocatedFileStatus> listDirectory(Path path)
{
throw new UnsupportedOperationException();
}
@Override
public FSDataInputStream openFile(Path path, HiveFileContext hiveFileContext)
throws Exception
{
return fs.open(path);
}
}