FileStore.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.object;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.conf.FileStoreKeys;
import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException;
import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.fs.tosfs.util.Range;
import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FileStore implements ObjectStorage {

  private static final Logger LOG = LoggerFactory.getLogger(FileStore.class);

  private static final String NAME = "filestore";
  private static final String STAGING_DIR = "__STAGING__";
  private static final String SLASH = "/";

  public static final String DEFAULT_BUCKET = "dummy-bucket";
  public static final String ENV_FILE_STORAGE_ROOT = "FILE_STORAGE_ROOT";

  private static final int MAX_DELETE_OBJECTS_COUNT = 1000;
  private static final int MIN_PART_SIZE = 5 * 1024 * 1024;
  private static final int MAX_PART_COUNT = 10000;

  private String bucket;
  private String root;
  private Configuration conf;
  private ChecksumInfo checksumInfo;

  @Override
  public String scheme() {
    return NAME;
  }

  @Override
  public BucketInfo bucket() {
    return new BucketInfo(bucket, false);
  }

  @Override
  public void initialize(Configuration config, String bucketName) {
    this.bucket = bucketName;
    this.conf = config;
    String endpoint = config.get(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(NAME));
    if (endpoint == null || endpoint.isEmpty()) {
      endpoint = System.getenv(ENV_FILE_STORAGE_ROOT);
    }
    Preconditions.checkNotNull(endpoint, "%s cannot be null",
        ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(NAME));

    if (endpoint.endsWith(SLASH)) {
      this.root = endpoint;
    } else {
      this.root = endpoint + SLASH;
    }
    LOG.debug("the root path is: {}", this.root);

    String algorithm = config.get(FileStoreKeys.FS_FILESTORE_CHECKSUM_ALGORITHM,
        FileStoreKeys.FS_FILESTORE_CHECKSUM_ALGORITHM_DEFAULT);
    ChecksumType checksumType = ChecksumType.valueOf(
        config.get(FileStoreKeys.FS_FILESTORE_CHECKSUM_TYPE,
            FileStoreKeys.FS_FILESTORE_CHECKSUM_TYPE_DEFAULT).toUpperCase());
    Preconditions.checkArgument(checksumType == ChecksumType.MD5,
        "Checksum type %s is not supported by FileStore.", checksumType.name());
    checksumInfo = new ChecksumInfo(algorithm, checksumType);

    File rootDir = new File(root);
    if (!rootDir.mkdirs() && !rootDir.exists()) {
      throw new IllegalArgumentException("Failed to create root dir. " + root);
    } else {
      LOG.info("Create root dir successfully. {}", root);
    }
  }

  @Override
  public Configuration conf() {
    return conf;
  }

  private static String encode(String key) {
    try {
      return URLEncoder.encode(key, "UTF-8");
    } catch (UnsupportedEncodingException e) {
      LOG.warn("failed to encode key: {}", key);
      return key;
    }
  }

  private static String decode(String key) {
    try {
      return URLDecoder.decode(key, "UTF-8");
    } catch (UnsupportedEncodingException e) {
      LOG.warn("failed to decode key: {}", key);
      return key;
    }
  }

  @Override
  public ObjectContent get(String key, long offset, long limit) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");
    File file = path(encode(key)).toFile();
    if (!file.exists()) {
      throw new RuntimeException(String.format("File not found %s", file.getAbsolutePath()));
    }

    Range range = ObjectUtils.calculateRange(offset, limit, file.length());
    try (FileInputStream in = new FileInputStream(file)) {
      in.skip(range.off());
      byte[] bs = new byte[(int) range.len()];
      in.read(bs);

      byte[] fileChecksum = getFileChecksum(file.toPath());
      return new ObjectContent(fileChecksum, new ByteArrayInputStream(bs));
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public byte[] put(String key, InputStreamProvider streamProvider, long contentLength) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");
    File destFile = path(encode(key)).toFile();
    copyInputStreamToFile(streamProvider.newStream(), destFile, contentLength);

    return ObjectInfo.isDir(key) ? Constants.MAGIC_CHECKSUM : getFileChecksum(destFile.toPath());
  }

  @Override
  public byte[] append(String key, InputStreamProvider streamProvider, long contentLength) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");
    File destFile = path(encode(key)).toFile();
    if (!destFile.exists()) {
      if (contentLength == 0) {
        throw new NotAppendableException(String.format(
            "%s is not appendable because append non-existed object with "
                + "zero byte is not supported.", key));
      }
      return put(key, streamProvider, contentLength);
    } else {
      appendInputStreamToFile(streamProvider.newStream(), destFile, contentLength);
      return ObjectInfo.isDir(key) ? Constants.MAGIC_CHECKSUM : getFileChecksum(destFile.toPath());
    }
  }

  private static File createTmpFile(File destFile) {
    String tmpFilename = ".tmp." + UUIDUtils.random();
    File file = new File(destFile.getParentFile(), tmpFilename);

    try {
      if (!file.exists() && !file.createNewFile()) {
        throw new RuntimeException("failed to create tmp file");
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return file;
  }

  @Override
  public void delete(String key) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");
    File file = path(encode(key)).toFile();
    if (file.exists()) {
      try {
        if (file.isDirectory()) {
          FileUtils.deleteDirectory(file);
        } else {
          Files.delete(file.toPath());
        }
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      }
    }
  }

  @Override
  public List<String> batchDelete(List<String> keys) {
    Preconditions.checkArgument(keys.size() <= MAX_DELETE_OBJECTS_COUNT,
        "The batch delete object count should <= %s", MAX_DELETE_OBJECTS_COUNT);
    List<String> failedKeys = Lists.newArrayList();
    for (String key : keys) {
      try {
        delete(key);
      } catch (Exception e) {
        LOG.error("Failed to delete key {}", key, e);
        failedKeys.add(key);
      }
    }
    return failedKeys;
  }

  @Override
  public void deleteAll(String prefix) {
    Iterable<ObjectInfo> objects = listAll(prefix, "");
    ObjectUtils.deleteAllObjects(this, objects, conf.getInt(
        ConfKeys.FS_BATCH_DELETE_SIZE.key(NAME),
        ConfKeys.FS_BATCH_DELETE_SIZE_DEFAULT));
  }

  @Override
  public ObjectInfo head(String key) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");
    File file = path(encode(key)).toFile();
    if (file.exists()) {
      return toObjectInfo(file.toPath());
    } else {
      return null;
    }
  }

  @Override
  public Iterable<ListObjectsResponse> list(ListObjectsRequest request) {
    try (Stream<Path> stream = Files.walk(Paths.get(root))) {
      List<ObjectInfo> allObjects = list(stream, request.prefix(), request.startAfter())
          .collect(Collectors.toList());
      int maxKeys = request.maxKeys() < 0 ? allObjects.size() : request.maxKeys();
      return Collections.singletonList(
          splitObjects(request.prefix(), request.delimiter(), maxKeys, request.startAfter(),
              allObjects));
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private ListObjectsResponse splitObjects(
      String prefix,
      String delimiter,
      int limit,
      String startAfter,
      List<ObjectInfo> objects) {
    int retSize = Math.min(limit, objects.size());

    if (Strings.isNullOrEmpty(delimiter)) {
      // the response only contains objects
      List<ObjectInfo> retObjs = objects.subList(0, retSize);
      return new ListObjectsResponse(retObjs, Collections.emptyList());
    } else {
      // the response only contains objects and common prefixes
      Set<String> commonPrefixes = new TreeSet<>();
      List<ObjectInfo> objectInfos = new ArrayList<>();

      for (ObjectInfo obj : objects) {
        String suffixKey = obj.key().substring(prefix.length());
        String[] tokens = suffixKey.split(delimiter, 2);
        if (tokens.length == 2) {
          String key = prefix + tokens[0] + delimiter;
          // the origin key is bigger than startAfter,
          // but after the new key after split might equal to startAfter, need to exclude.
          if (!key.equals(startAfter)) {
            commonPrefixes.add(key);

            // why don't break the loop before add new key to common prefixes list
            // is that the new key might be an existed common prefix, but we still want to continue
            // visited new object since the new object might also be an existed common prefix until
            // the total size is out of limit.
            if (commonPrefixes.size() + objectInfos.size() > retSize) {
              commonPrefixes.remove(key);
              break;
            }
          }
        } else {
          if (commonPrefixes.size() + objectInfos.size() >= retSize) {
            break;
          }

          objectInfos.add(obj);
        }
      }
      return new ListObjectsResponse(objectInfos, new ArrayList<>(commonPrefixes));
    }
  }

  @Override
  public MultipartUpload createMultipartUpload(String key) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");

    String uploadId = UUIDUtils.random();
    Path uploadDir = uploadPath(key, uploadId);
    if (uploadDir.toFile().mkdirs()) {
      return new MultipartUpload(key, uploadId, MIN_PART_SIZE, MAX_PART_COUNT);
    } else {
      throw new RuntimeException("Failed to create MultipartUpload with key: " + key);
    }
  }

  private Path uploadPath(String key, String uploadId) {
    return Paths.get(root, STAGING_DIR, encode(key), uploadId);
  }

  @Override
  public Part uploadPart(
      String key, String uploadId, int partNum,
      InputStreamProvider streamProvider, long contentLength) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");

    File uploadDir = uploadPath(key, uploadId).toFile();
    if (!uploadDir.exists()) {
      throw new RuntimeException("cannot locate the upload id: " + uploadId);
    }

    File partFile = new File(uploadDir, String.valueOf(partNum));
    copyInputStreamToFile(streamProvider.newStream(), partFile, contentLength);

    try {
      byte[] data = Files.readAllBytes(partFile.toPath());
      return new Part(partNum, data.length, DigestUtils.md5Hex(data));
    } catch (IOException e) {
      LOG.error("failed to locate the part file: {}", partFile.getAbsolutePath());
      throw new RuntimeException(e);
    }
  }

  private static void appendInputStreamToFile(InputStream in, File partFile, long contentLength) {
    try (FileOutputStream out = new FileOutputStream(partFile, true)) {
      long copiedBytes = IOUtils.copyLarge(in, out, 0, contentLength);

      if (copiedBytes < contentLength) {
        throw new IOException(String.format("Unexpect end of stream, expected to write length:%s,"
                + " actual written:%s", contentLength, copiedBytes));
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    } finally {
      CommonUtils.runQuietly(in::close);
    }
  }

  private static void copyInputStreamToFile(InputStream in, File partFile, long contentLength) {
    File tmpFile = createTmpFile(partFile);
    try (FileOutputStream out = new FileOutputStream(tmpFile)) {
      long copiedBytes = IOUtils.copyLarge(in, out, 0, contentLength);

      if (copiedBytes < contentLength) {
        throw new IOException(
            String.format("Unexpect end of stream, expected length:%s, actual:%s", contentLength,
                tmpFile.length()));
      }
    } catch (IOException e) {
      CommonUtils.runQuietly(() -> FileUtils.delete(tmpFile));
      throw new RuntimeException(e);
    } finally {
      CommonUtils.runQuietly(in::close);
    }

    if (!tmpFile.renameTo(partFile)) {
      throw new RuntimeException("failed to put file since rename fail.");
    }
  }

  @Override
  public byte[] completeUpload(String key, String uploadId, List<Part> uploadParts) {
    Preconditions.checkArgument(uploadParts != null && uploadParts.size() > 0,
        "upload parts cannot be null or empty.");
    File uploadDir = uploadPath(key, uploadId).toFile();
    if (!uploadDir.exists()) {
      throw new RuntimeException("cannot locate the upload id: " + uploadId);
    }

    List<Integer> partNums = listPartNums(uploadDir);
    if (partNums.size() != uploadParts.size()) {
      throw new RuntimeException(String.format("parts length mismatched: %d != %d",
          partNums.size(), uploadParts.size()));
    }

    Collections.sort(partNums);
    uploadParts.sort(Comparator.comparingInt(Part::num));

    Path keyPath = path(encode(key));
    File tmpFile = createTmpFile(keyPath.toFile());
    try (FileOutputStream outputStream = new FileOutputStream(tmpFile);
        FileChannel outputChannel = outputStream.getChannel()) {
      int offset = 0;
      for (int i = 0; i < partNums.size(); i++) {
        Part part = uploadParts.get(i);
        if (part.num() != partNums.get(i)) {
          throw new RuntimeException(
              String.format("part num mismatched: %d != %d", part.num(), partNums.get(i)));
        }

        File partFile = new File(uploadDir, String.valueOf(part.num()));
        checkPartFile(part, partFile);

        try (FileInputStream inputStream = new FileInputStream(partFile);
            FileChannel inputChannel = inputStream.getChannel()) {
          outputChannel.transferFrom(inputChannel, offset, partFile.length());
          offset += partFile.length();
        }
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    if (!tmpFile.renameTo(keyPath.toFile())) {
      throw new RuntimeException("rename file failed");
    } else {
      try {
        FileUtils.deleteDirectory(uploadDir);
      } catch (IOException e) {
        LOG.warn("failed to clean upload directory.");
      }
    }

    return getFileChecksum(keyPath);
  }

  private byte[] getFileChecksum(Path keyPath) {
    return getFileMD5(keyPath);
  }

  private static byte[] getFileMD5(Path keyPath) {
    try {
      return DigestUtils.md5(Files.readAllBytes(keyPath));
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private static void checkPartFile(Part part, File partFile) throws IOException {
    if (part.size() != partFile.length()) {
      throw new RuntimeException(String.format("part size mismatched: %d != %d",
          part.size(), partFile.length()));
    }

    try (FileInputStream inputStream = new FileInputStream(partFile)) {
      String md5Hex = DigestUtils.md5Hex(inputStream);
      if (!Objects.equals(part.eTag(), md5Hex)) {
        throw new RuntimeException(String.format("part etag mismatched: %s != %s",
            part.eTag(), md5Hex));
      }
    }
  }

  private List<Integer> listPartNums(File uploadDir) {
    try (Stream<Path> stream = Files.list(uploadDir.toPath())) {
      return stream
          .map(f -> Integer.valueOf(f.toFile().getName()))
          .collect(Collectors.toList());
    } catch (IOException e) {
      LOG.error("failed to list part files.");
      throw new RuntimeException(e);
    }
  }

  @Override
  public void abortMultipartUpload(String key, String uploadId) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "Key should not be empty.");
    Path uploadDir = uploadPath(key, uploadId);
    if (uploadDir.toFile().exists()) {
      try {
        FileUtils.deleteDirectory(uploadDir.toFile());
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
  }

  @Override
  public Iterable<MultipartUpload> listUploads(String prefix) {
    Path stagingDir = Paths.get(root, STAGING_DIR);
    if (!Files.exists(stagingDir)) {
      return Collections.emptyList();
    }
    try (Stream<Path> encodedKeyStream = Files.list(stagingDir)) {
      return encodedKeyStream
          .filter(key -> Objects.equals(prefix, "") ||
              key.toFile().getName().startsWith(encode(prefix)))
          .flatMap(key -> {
            try {
              return Files.list(key)
                  .map(id -> new MultipartUpload(decode(key.toFile().getName()),
                      id.toFile().getName(), MIN_PART_SIZE, MAX_PART_COUNT));
            } catch (IOException e) {
              throw new RuntimeException(e);
            }
          })
          .sorted()
          .collect(Collectors.toList());
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public Part uploadPartCopy(
      String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
      long copySourceRangeEnd) {
    File uploadDir = uploadPath(dstKey, uploadId).toFile();
    if (!uploadDir.exists()) {
      throw new RuntimeException(String.format("Upload directory %s already exits", uploadDir));
    }
    File partFile = new File(uploadDir, String.valueOf(partNum));
    int fileSize = (int) (copySourceRangeEnd - copySourceRangeStart + 1);
    try (InputStream is = get(srcKey, copySourceRangeStart, fileSize).stream();
        FileOutputStream fos = new FileOutputStream(partFile)) {
      byte[] data = new byte[fileSize];
      IOUtils.readFully(is, data);
      fos.write(data);
      return new Part(partNum, fileSize, DigestUtils.md5Hex(data));
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void copy(String srcKey, String dstKey) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(srcKey), "Src key should not be empty.");
    File file = path(encode(srcKey)).toFile();
    if (!file.exists()) {
      throw new RuntimeException(String.format("File not found %s", file.getAbsolutePath()));
    }

    put(dstKey, () -> get(srcKey).stream(), file.length());
  }

  @Override
  public void rename(String srcKey, String dstKey) {
    Preconditions.checkArgument(!Objects.equals(srcKey, dstKey),
        "Cannot rename to the same object");
    Preconditions.checkNotNull(head(srcKey), "Source key %s doesn't exist", srcKey);

    File srcFile = path(encode(srcKey)).toFile();
    File dstFile = path(encode(dstKey)).toFile();
    boolean ret = srcFile.renameTo(dstFile);
    if (!ret) {
      throw new RuntimeException(String.format("Failed to rename %s to %s", srcKey, dstKey));
    }
  }

  @Override
  public ObjectInfo objectStatus(String key) {
    ObjectInfo obj = head(key);
    if (obj == null && !ObjectInfo.isDir(key)) {
      key = key + '/';
      obj = head(key);
    }

    if (obj == null) {
      Iterable<ObjectInfo> objs = list(key, null, 1);
      if (objs.iterator().hasNext()) {
        obj = new ObjectInfo(key, 0, new Date(0), Constants.MAGIC_CHECKSUM);
      }
    }

    return obj;
  }

  @Override
  public ChecksumInfo checksumInfo() {
    return checksumInfo;
  }

  private Stream<ObjectInfo> list(Stream<Path> stream, String prefix, String startAfter) {
    return stream
        .filter(p -> {
          String absolutePath = p.toFile().getAbsolutePath();
          return !Objects.equals(key(absolutePath), "") &&
              decode(key(absolutePath)).startsWith(prefix)
              && !absolutePath.contains(STAGING_DIR)
              && filter(decode(key(absolutePath)), startAfter);
        })
        .map(this::toObjectInfo)
        .sorted(Comparator.comparing(ObjectInfo::key));
  }

  private boolean filter(String key, String startAfter) {
    if (Strings.isNullOrEmpty(startAfter)) {
      return true;
    } else {
      return key.compareTo(startAfter) > 0;
    }
  }

  private ObjectInfo toObjectInfo(Path path) {
    File file = path.toFile();
    String key = decode(key(file.getAbsolutePath()));
    return new ObjectInfo(key, file.length(), new Date(file.lastModified()),
        getFileChecksum(path));
  }

  private Path path(String key) {
    return Paths.get(root, key);
  }

  private String key(String path) {
    if (path.length() < root.length()) {
      // root = path + "/"
      return "";
    }
    return path.substring(root.length());
  }

  @Override
  public void close() throws IOException {
  }
}