RawFileSystem.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.tosfs.commit.MagicOutputStream;
import org.apache.hadoop.fs.tosfs.common.Bytes;
import org.apache.hadoop.fs.tosfs.common.ThreadPools;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.object.ChecksumInfo;
import org.apache.hadoop.fs.tosfs.object.Constants;
import org.apache.hadoop.fs.tosfs.object.DirectoryStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
import org.apache.hadoop.fs.tosfs.object.ObjectMultiRangeInputStream;
import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream;
import org.apache.hadoop.fs.tosfs.object.ObjectRangeInputStream;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException;
import org.apache.hadoop.fs.tosfs.ops.DefaultFsOps;
import org.apache.hadoop.fs.tosfs.ops.DirectoryFsOps;
import org.apache.hadoop.fs.tosfs.ops.FsOps;
import org.apache.hadoop.fs.tosfs.util.FSUtils;
import org.apache.hadoop.fs.tosfs.util.FuseUtils;
import org.apache.hadoop.fs.tosfs.util.Range;
import org.apache.hadoop.fs.tosfs.util.RemoteIterators;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hadoop.fs.XAttrSetFlag.CREATE;
import static org.apache.hadoop.fs.XAttrSetFlag.REPLACE;

public class RawFileSystem extends FileSystem {
  private static final Logger LOG = LoggerFactory.getLogger(RawFileSystem.class);
  private static final String MULTIPART_THREAD_POOL_PREFIX = "rawfs-multipart-thread-pool";
  private static final String TASK_THREAD_POOL_PREFIX = "rawfs-task-thread-pool";
  // This is the same as HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, we do not
  // use that directly because we don't want to introduce the hdfs client library.
  private static final String DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
  private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20;

  private String scheme;
  private String username;
  private Path workingDir;
  private URI uri;
  private String bucket;
  private ObjectStorage storage;
  // Use for task parallel execution, such as parallel to copy multiple files.
  private ExecutorService taskThreadPool;
  // Use for file multipart upload only.
  private ExecutorService uploadThreadPool;
  private FsOps fsOps;

  @Override
  public URI getUri() {
    return uri;
  }

  @Override
  public String getScheme() {
    return scheme;
  }

  @VisibleForTesting
  String bucket() {
    return bucket;
  }

  @Override
  public void setConf(Configuration conf) {
    super.setConf(conf);
  }

  @Override
  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
    LOG.debug("Opening '{}' for reading.", path);
    RawFileStatus status = innerFileStatus(path);
    if (status.isDirectory()) {
      throw new FileNotFoundException(
          String.format("Can't open %s because it is a directory", path));
    }

    // Parse the range size from the hadoop conf.
    long rangeSize = getConf().getLong(
        ConfKeys.FS_OBJECT_STREAM_RANGE_SIZE,
        ConfKeys.FS_OBJECT_STREAM_RANGE_SIZE_DEFAULT);
    Preconditions.checkArgument(rangeSize > 0, "Object storage range size must be positive.");

    FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path,
        status.getLen(), rangeSize, status.checksum());
    return new FSDataInputStream(fsIn);
  }

  public FSDataInputStream open(Path path, byte[] expectedChecksum, Range range) {
    return new FSDataInputStream(
        new ObjectRangeInputStream(storage, path, range, expectedChecksum));
  }

  @Override
  public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite,
      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
    FileStatus fileStatus = getFileStatusOrNull(path);
    if (fileStatus != null) {
      if (fileStatus.isDirectory()) {
        throw new FileAlreadyExistsException(path + " is a directory");
      }

      if (!overwrite) {
        throw new FileAlreadyExistsException(path + " already exists");
      }
      LOG.debug("Overwriting file {}", path);
    }

    if (MagicOutputStream.isMagic(path)) {
      return new FSDataOutputStream(
          new MagicOutputStream(this, storage, uploadThreadPool, getConf(), makeQualified(path)),
          null);
    } else {
      ObjectOutputStream out =
          new ObjectOutputStream(storage, uploadThreadPool, getConf(), makeQualified(path), true);

      if (fileStatus == null && FuseUtils.fuseEnabled()) {
        // The fuse requires the file to be visible when accessing getFileStatus once we created
        // the file, so here we close and commit the file to be visible explicitly for fuse, and
        // then reopen the file output stream for further data bytes writing.
        out.close();
        out =
            new ObjectOutputStream(storage, uploadThreadPool, getConf(), makeQualified(path), true);
      }

      return new FSDataOutputStream(out, null);
    }
  }

  @Override
  public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
    Path path = makeQualified(f);
    LOG.debug("listFiles({}, {})", path, recursive);

    // assume the path is a dir at first, and list sub files
    RemoteIterator<LocatedFileStatus> subFiles = RemoteIterators.fromIterable(
        fsOps.listDir(path, recursive, key -> !ObjectInfo.isDir(key)), this::toLocatedFileStatus);
    if (!subFiles.hasNext()) {
      final RawFileStatus fileStatus = innerFileStatus(path);
      if (fileStatus.isFile()) {
        return RemoteIterators.fromSingleton(toLocatedFileStatus(fileStatus));
      }
    }
    return subFiles;
  }

  private RawLocatedFileStatus toLocatedFileStatus(RawFileStatus status) throws IOException {
    return new RawLocatedFileStatus(status,
        status.isFile() ? getFileBlockLocations(status, 0, status.getLen()) : null);
  }

  @Override
  public FSDataOutputStream createNonRecursive(
      Path path,
      FsPermission permission,
      EnumSet<CreateFlag> flag,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException {
    Path qualified = makeQualified(path);
    return create(qualified, permission, flag.contains(CreateFlag.OVERWRITE),
        bufferSize, replication, blockSize, progress);
  }

  @Override
  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
      throws IOException {
    throw new IOException("Not supported");
  }

  /**
   * Rename src path to dest path, if dest path is an existed dir,
   * then FS will rename the src path under the dst dir.
   * E.g. rename('/a/b', '/a/c') and dest 'c' is an existed dir,
   * then the source path '/a/b' will be renamed with dest path '/a/b/c' internally.
   *
   * <ul>
   *   <li>Return false if src doesn't exist</li>
   *   <li>Return false if src is root</li>
   *   <li>Return false if dst path is under src path, e.g. rename('/a/b', '/a/b/c')</li>
   *   <li>Return false if dst path already exists</li>
   *   <li>Return true if rename('/a/b', '/a/b') and 'b' is an existed file</li>
   *   <li>Return true if rename('/a/b', '/a') and 'a' is an existed dir,
   *   fs will rename '/a/b' to '/a/b' internally</li>
   *   <li>Return false if rename('/a/b', '/a/b') and 'b' is an existed dir,
   *   because fs will try to rename '/a/b' to '/a/b/b', which is under '/a/b', this behavior
   *   is forbidden.</li>
   * </ul>
   *
   * @param src path to be renamed
   * @param dst path after rename
   * @return true if rename is successful
   * @throws IOException on failure
   */
  @Override
  public boolean rename(Path src, Path dst) throws IOException {
    LOG.debug("Rename source path {} to dest path {}", src, dst);

    // 1. Check source and destination path
    Future<FileStatus> srcStatusFuture = taskThreadPool.submit(() -> checkAndGetSrcStatus(src));
    Future<Path> destPathFuture = taskThreadPool.submit(() -> checkAndGetDstPath(src, dst));

    FileStatus srcStatus;
    Path dstPath;
    try {
      srcStatus = srcStatusFuture.get();
      dstPath = destPathFuture.get();

      if (src.equals(dstPath)) {
        return true;
      }
    } catch (InterruptedException | ExecutionException e) {
      LOG.error("Failed to rename path, src: {}, dst: {}", src, dst, e);
      return false;
    }

    // 2. Start copy source to destination
    if (srcStatus.isDirectory()) {
      fsOps.renameDir(srcStatus.getPath(), dstPath);
    } else {
      fsOps.renameFile(srcStatus.getPath(), dstPath, srcStatus.getLen());
    }

    return true;
  }

  private Path checkAndGetDstPath(Path src, Path dest) throws IOException {
    FileStatus destStatus = getFileStatusOrNull(dest);
    // 1. Rebuilding the destination path
    Path finalDstPath = dest;
    if (destStatus != null && destStatus.isDirectory()) {
      finalDstPath = new Path(dest, src.getName());
    }

    // 2. No need to check the dest path because renaming itself is allowed.
    if (src.equals(finalDstPath)) {
      return finalDstPath;
    }

    // 3. Ensure the source path cannot be the ancestor of destination path.
    if (RawFSUtils.inSubtree(src, finalDstPath)) {
      throw new IOException(String.format("Failed to rename since it is prohibited to " +
          "rename dest path %s under src path %s", finalDstPath, src));
    }

    // 4. Ensure the destination path doesn't exist.
    FileStatus finalDstStatus = destStatus;
    if (destStatus != null && destStatus.isDirectory()) {
      finalDstStatus = getFileStatusOrNull(finalDstPath);
    }
    if (finalDstStatus != null) {
      throw new FileAlreadyExistsException(
          String.format("Failed to rename since the dest path %s already exists.", finalDstPath));
    } else {
      return finalDstPath;
    }
  }

  private FileStatus checkAndGetSrcStatus(Path src) throws IOException {
    // throw FileNotFoundException if src not found.
    FileStatus srcStatus = innerFileStatus(src);

    if (src.isRoot()) {
      throw new IOException(String.format("Cannot rename the root directory %s to another name",
          src));
    }
    return srcStatus;
  }

  @Override
  public boolean delete(Path f, boolean recursive) throws IOException {
    LOG.debug("Delete path {} - recursive {}", f, recursive);
    try {
      FileStatus fileStatus = getFileStatus(f);
      Path path = fileStatus.getPath();

      if (path.isRoot()) {
        return deleteRoot(path, recursive);
      } else {
        if (fileStatus.isDirectory()) {
          fsOps.deleteDir(path, recursive);
        } else {
          fsOps.deleteFile(path);
        }
        return true;
      }
    } catch (FileNotFoundException e) {
      LOG.debug("Couldn't delete {} - does not exist", f);
      return false;
    }
  }

  /**
   * Reject deleting root directory and implement the specific logic to compatible with
   * AbstractContractRootDirectoryTest rm test cases.
   *
   * @param root      the root path.
   * @param recursive indicate whether delete directory recursively
   * @return true if root directory is empty, false if trying to delete a non-empty dir recursively.
   * @throws IOException if trying to delete the non-empty root dir non-recursively.
   */
  private boolean deleteRoot(Path root, boolean recursive) throws IOException {
    LOG.info("Delete the {} root directory of {}", bucket, recursive);
    boolean isEmptyDir = fsOps.isEmptyDirectory(root);
    if (isEmptyDir) {
      return true;
    }
    if (recursive) {
      // AbstractContractRootDirectoryTest#testRmRootRecursive doesn't expect any exception if
      // trying to delete a non-empty root directory recursively, so we have to return false here
      // instead of throwing a IOException.
      return false;
    } else {
      // AbstractContractRootDirectoryTest#testRmNonEmptyRootDirNonRecursive expect a exception if
      // trying to delete a non-empty root directory non-recursively, so we have to throw a
      // IOException instead of returning false.
      throw new PathIOException(bucket, "Cannot delete root path");
    }
  }

  @Override
  public RawFileStatus[] listStatus(Path f) throws IOException {
    LOG.debug("List status for path: {}", f);
    return Iterators.toArray(listStatus(f, false), RawFileStatus.class);
  }

  public Iterator<RawFileStatus> listStatus(Path f, boolean recursive) throws IOException {
    Path path = makeQualified(f);
    // Assuming path is a dir at first.
    Iterator<RawFileStatus> iterator = fsOps.listDir(path, recursive, key -> true).iterator();
    if (iterator.hasNext()) {
      return iterator;
    } else {
      RawFileStatus fileStatus = innerFileStatus(path);
      if (fileStatus.isFile()) {
        return Collections.singletonList(fileStatus).iterator();
      } else {
        // The path is an empty dir.
        return Collections.emptyIterator();
      }
    }
  }

  @Override
  public RemoteIterator<FileStatus> listStatusIterator(Path p) throws IOException {
    // We expect throw FileNotFoundException if the path doesn't exist during creating the
    // RemoteIterator instead of throwing FileNotFoundException during call hasNext method.

    // The follow RemoteIterator is as same as {@link FileSystem#DirListingIterator} above
    // hadoop 3.2.2, but below 3.2.2, the DirListingIterator fetches the directory entries during
    // call hasNext method instead of create the DirListingIterator instance.
    return new RemoteIterator<FileStatus>() {
      private DirectoryEntries entries = listStatusBatch(p, null);
      private int index = 0;

      @Override
      public boolean hasNext() {
        return index < entries.getEntries().length || entries.hasMore();
      }

      private void fetchMore() throws IOException {
        byte[] token = entries.getToken();
        entries = listStatusBatch(p, token);
        index = 0;
      }

      @Override
      public FileStatus next() throws IOException {
        if (!hasNext()) {
          throw new NoSuchElementException("No more items in iterator");
        } else {
          if (index == entries.getEntries().length) {
            fetchMore();
            if (!hasNext()) {
              throw new NoSuchElementException("No more items in iterator");
            }
          }

          return entries.getEntries()[index++];
        }
      }
    };
  }

  public static long dateToLong(final Date date) {
    return date == null ? 0L : date.getTime();
  }

  @Override
  public Path getWorkingDirectory() {
    return workingDir;
  }

  @Override
  public void setWorkingDirectory(Path newDir) {
    this.workingDir = newDir;
  }

  @Override
  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
    try {
      FileStatus fileStatus = innerFileStatus(path);
      if (fileStatus.isDirectory()) {
        return true;
      } else {
        throw new FileAlreadyExistsException("Path is a file: " + path);
      }
    } catch (FileNotFoundException e) {
      Path dir = makeQualified(path);
      validatePath(dir);
      fsOps.mkdirs(dir);
    }
    return true;
  }

  private void validatePath(Path path) throws IOException {
    Path parent = path.getParent();
    do {
      try {
        FileStatus fileStatus = innerFileStatus(parent);
        if (fileStatus.isDirectory()) {
          // If path exists and a directory, exit
          break;
        } else {
          throw new FileAlreadyExistsException(String.format("Can't make directory for path '%s',"
                  + " it is a file.", parent));
        }
      } catch (FileNotFoundException ignored) {
      }
      parent = parent.getParent();
    } while (parent != null);
  }

  @Override
  public FileStatus getFileStatus(Path path) throws IOException {
    try {
      return innerFileStatus(path);
    } catch (ParentNotDirectoryException e) {
      // Treat ParentNotDirectoryException as FileNotFoundException for the case that check whether
      // path exist or not.
      throw new FileNotFoundException(e.getMessage());
    }
  }


  /**
   * Get the file status of given path.
   *
   * @param f the path
   * @return {@link RawFileStatus} describe file status info.
   * @throws FileNotFoundException       if the path doesn't exist.
   * @throws ParentNotDirectoryException if the path is locating under an existing file, which is
   *                                     not allowed in directory bucket case.
   */
  RawFileStatus innerFileStatus(Path f) throws ParentNotDirectoryException, FileNotFoundException {
    Path qualifiedPath = f.makeQualified(uri, workingDir);
    RawFileStatus fileStatus = getFileStatusOrNull(qualifiedPath);
    if (fileStatus == null) {
      throw new FileNotFoundException(
          String.format("No such file or directory: %s", qualifiedPath));
    }
    return fileStatus;
  }

  /**
   * The different with {@link RawFileSystem#getFileStatus(Path)} is that:
   * 1. throw  {@link ParentNotDirectoryException} if the path is locating under an existing file in
   * directory bucket case, but {@link RawFileSystem#getFileStatus(Path)} will ignore whether the
   * invalid path and throw {@link FileNotFoundException}
   * 2. return null if the path doesn't exist instead of throwing {@link FileNotFoundException}.
   *
   * @param path the object path.
   * @return null if the path doesn't exist.
   * @throws ParentNotDirectoryException if the path is locating under an existing file, which is
   *                                     not allowed in directory bucket case.
   */
  public RawFileStatus getFileStatusOrNull(final Path path) throws ParentNotDirectoryException {
    Path qualifiedPath = path.makeQualified(uri, workingDir);
    String key = ObjectUtils.pathToKey(qualifiedPath);

    // Root directory always exists
    if (key.isEmpty()) {
      return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.MAGIC_CHECKSUM);
    }

    try {
      ObjectInfo obj = storage.objectStatus(key);
      if (obj == null) {
        return null;
      } else {
        return objectToFileStatus(obj);
      }
    } catch (InvalidObjectKeyException e) {
      String msg =
          String.format("The object key %s is a invalid key, detail: %s", key, e.getMessage());
      throw new ParentNotDirectoryException(msg);
    }
  }

  private RawFileStatus objectToFileStatus(ObjectInfo obj) {
    Path keyPath = makeQualified(ObjectUtils.keyToPath(obj.key()));
    long blockSize = obj.isDir() ? 0 : getDefaultBlockSize(keyPath);
    long modificationTime = dateToLong(obj.mtime());
    return new RawFileStatus(obj.size(), obj.isDir(), blockSize, modificationTime, keyPath,
        username, obj.checksum());
  }

  @Override
  @Deprecated
  public long getDefaultBlockSize() {
    return getConf().getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
  }

  @Override
  public FsServerDefaults getServerDefaults(Path p) {
    Configuration config = getConf();
    // CRC32 is chosen as default as it is available in all
    // releases that support checksum.
    // The client trash configuration is ignored.
    return new FsServerDefaults(getDefaultBlockSize(),
        config.getInt("dfs.bytes-per-checksum", 512),
        64 * 1024,
        getDefaultReplication(),
        config.getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
            CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
        false,
        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
        DataChecksum.Type.CRC32,
        "");
  }

  private void stopAllServices() {
    ThreadPools.shutdown(uploadThreadPool, 30, TimeUnit.SECONDS);
    ThreadPools.shutdown(taskThreadPool, 30, TimeUnit.SECONDS);
  }

  @Override
  public void initialize(URI name, Configuration conf) throws IOException {
    super.initialize(name, conf);
    setConf(conf);
    this.scheme = FSUtils.scheme(conf, name);

    // Username is the current user at the time the FS was instantiated.
    this.username = UserGroupInformation.getCurrentUser().getShortUserName();
    this.workingDir = new Path("/user", username).makeQualified(name, null);
    this.uri = URI.create(scheme + "://" + name.getAuthority());
    this.bucket = this.uri.getAuthority();
    this.storage = ObjectStorageFactory.create(scheme, bucket, getConf());
    if (storage.bucket() == null) {
      throw new FileNotFoundException(String.format("Bucket: %s not found.", name.getAuthority()));
    }

    int taskThreadPoolSize =
        getConf().getInt(ConfKeys.FS_TASK_THREAD_POOL_SIZE.key(storage.scheme()),
            ConfKeys.FS_TASK_THREAD_POOL_SIZE_DEFAULT);
    this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX, taskThreadPoolSize);

    int uploadThreadPoolSize =
        getConf().getInt(ConfKeys.FS_MULTIPART_THREAD_POOL_SIZE.key(storage.scheme()),
            ConfKeys.FS_MULTIPART_THREAD_POOL_SIZE_DEFAULT);
    this.uploadThreadPool =
        ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize);

    if (storage.bucket().isDirectory()) {
      fsOps = new DirectoryFsOps((DirectoryStorage) storage, this::objectToFileStatus);
    } else {
      fsOps = new DefaultFsOps(storage, getConf(), taskThreadPool, this::objectToFileStatus);
    }
  }

  @Override
  public void close() throws IOException {
    try {
      super.close();
      storage.close();
    } finally {
      stopAllServices();
    }
  }

  public ObjectStorage storage() {
    return storage;
  }

  public ExecutorService uploadThreadPool() {
    return uploadThreadPool;
  }

  /**
   * @return null if checksum is not supported.
   */
  @Override
  public FileChecksum getFileChecksum(Path f, long length) throws IOException {
    Preconditions.checkArgument(length >= 0);

    RawFileStatus fileStatus = innerFileStatus(f);
    if (fileStatus.isDirectory()) {
      // Compatible with HDFS
      throw new FileNotFoundException(String.format("Path is not a file, %s", f));
    }
    if (!getConf().getBoolean(ConfKeys.FS_CHECKSUM_ENABLED.key(storage.scheme()),
        ConfKeys.FS_CHECKSUM_ENABLED_DEFAULT)) {
      return null;
    }

    ChecksumInfo csInfo = storage.checksumInfo();
    return new TosChecksum(csInfo.algorithm(), fileStatus.checksum());
  }

  @Override
  public String getCanonicalServiceName() {
    return null;
  }

  @Override
  public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flag)
      throws IOException {
    Preconditions.checkNotNull(name, "xAttr name must not be null.");
    Preconditions.checkArgument(!name.isEmpty(), "xAttr name must not be empty.");
    Preconditions.checkNotNull(value, "xAttr value must not be null.");

    if (getFileStatus(path).isFile()) {
      Path qualifiedPath = path.makeQualified(uri, workingDir);
      String key = ObjectUtils.pathToKey(qualifiedPath);

      Map<String, String> existedTags = storage.getTags(key);
      validateXAttrFlag(name, existedTags.containsKey(name), flag);

      String newValue = Bytes.toString(value);
      String previousValue = existedTags.put(name, newValue);
      if (!newValue.equals(previousValue)) {
        storage.putTags(key, existedTags);
      }
    }
  }

  @Override
  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
    if (getFileStatus(path).isDirectory()) {
      return new HashMap<>();
    } else {
      Path qualifiedPath = path.makeQualified(uri, workingDir);
      String key = ObjectUtils.pathToKey(qualifiedPath);

      Map<String, String> tags = storage.getTags(key);
      return tags.entrySet().stream()
          .collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue())));
    }
  }

  @Override
  public byte[] getXAttr(Path path, String name) throws IOException {
    Map<String, byte[]> xAttrs = getXAttrs(path);
    if (xAttrs.containsKey(name)) {
      return xAttrs.get(name);
    } else {
      throw new IOException("Attribute with name " + name + " is not found.");
    }
  }

  @Override
  public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException {
    Map<String, byte[]> xAttrs = getXAttrs(path);
    xAttrs.keySet().retainAll(names);
    if (xAttrs.size() == names.size()) {
      return xAttrs;
    } else {
      List<String> badNames = names.stream().filter(n -> !xAttrs.containsKey(n)).collect(
          Collectors.toList());
      throw new IOException("Attributes with name " + badNames + " are not found.");
    }
  }

  @Override
  public List<String> listXAttrs(Path path) throws IOException {
    return Lists.newArrayList(getXAttrs(path).keySet());
  }

  @Override
  public void removeXAttr(Path path, String name) throws IOException {
    if (getFileStatus(path).isFile()) {
      Path qualifiedPath = path.makeQualified(uri, workingDir);
      String key = ObjectUtils.pathToKey(qualifiedPath);

      Map<String, String> existedTags = storage.getTags(key);
      if (existedTags.remove(name) != null) {
        storage.putTags(key, existedTags);
      }
    }
  }

  private void validateXAttrFlag(String xAttrName, boolean xAttrExists, EnumSet<XAttrSetFlag> flag)
      throws IOException {
    if (xAttrExists) {
      if (!flag.contains(REPLACE)) {
        throw new IOException("XAttr: " + xAttrName + " already exists. The REPLACE flag must be"
            + " specified.");
      }
    } else {
      if (!flag.contains(CREATE)) {
        throw new IOException("XAttr: " + xAttrName + " does not exist. The CREATE flag must be"
            + " specified.");
      }
    }
  }
}