CosNOutputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.cosn;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import com.qcloud.cos.model.PartETag;

import org.apache.hadoop.conf.Configuration;

/**
 * The output stream for the COS blob store.
 * Implement streaming upload to COS based on the multipart upload function.
 * ( the maximum size of each part is 5GB)
 * Support up to 40TB single file by multipart upload (each part is 5GB).
 * Improve the upload performance of writing large files by using byte buffers
 * and a fixed thread pool.
 */
public class CosNOutputStream extends OutputStream {
  private static final Logger LOG =
      LoggerFactory.getLogger(CosNOutputStream.class);

  private final Configuration conf;
  private final NativeFileSystemStore store;
  private MessageDigest digest;
  private long blockSize;
  private String key;
  private int currentBlockId = 0;
  private Set<ByteBufferWrapper> blockCacheBuffers = new HashSet<>();
  private ByteBufferWrapper currentBlockBuffer;
  private OutputStream currentBlockOutputStream;
  private String uploadId = null;
  private ListeningExecutorService executorService;
  private List<ListenableFuture<PartETag>> etagList = new LinkedList<>();
  private int blockWritten = 0;
  private boolean closed = false;

  public CosNOutputStream(Configuration conf, NativeFileSystemStore store,
      String key, long blockSize, ExecutorService executorService)
      throws IOException {
    this.conf = conf;
    this.store = store;
    this.key = key;
    this.blockSize = blockSize;
    if (this.blockSize < Constants.MIN_PART_SIZE) {
      LOG.warn(
          String.format(
              "The minimum size of a single block is limited to %d.",
              Constants.MIN_PART_SIZE));
      this.blockSize = Constants.MIN_PART_SIZE;
    }
    if (this.blockSize > Constants.MAX_PART_SIZE) {
      LOG.warn(
          String.format(
              "The maximum size of a single block is limited to %d.",
              Constants.MAX_PART_SIZE));
      this.blockSize = Constants.MAX_PART_SIZE;
    }

    // Use a blocking thread pool with fair scheduling
    this.executorService = MoreExecutors.listeningDecorator(executorService);

    try {
      this.currentBlockBuffer =
          BufferPool.getInstance().getBuffer((int) this.blockSize);
    } catch (IOException e) {
      throw new IOException("Getting a buffer size: "
          + String.valueOf(this.blockSize)
          + " from buffer pool occurs an exception: ", e);
    }

    try {
      this.digest = MessageDigest.getInstance("MD5");
      this.currentBlockOutputStream = new DigestOutputStream(
          new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer()),
          this.digest);
    } catch (NoSuchAlgorithmException e) {
      this.digest = null;
      this.currentBlockOutputStream =
          new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer());
    }
  }

  @Override
  public void flush() throws IOException {
    this.currentBlockOutputStream.flush();
  }

  @Override
  public synchronized void close() throws IOException {
    if (this.closed) {
      return;
    }
    this.currentBlockOutputStream.flush();
    this.currentBlockOutputStream.close();
    LOG.info("The output stream has been close, and "
        + "begin to upload the last block: [{}].", this.currentBlockId);
    this.blockCacheBuffers.add(this.currentBlockBuffer);
    if (this.blockCacheBuffers.size() == 1) {
      byte[] md5Hash = this.digest == null ? null : this.digest.digest();
      store.storeFile(this.key,
          new ByteBufferInputStream(this.currentBlockBuffer.getByteBuffer()),
          md5Hash, this.currentBlockBuffer.getByteBuffer().remaining());
    } else {
      PartETag partETag = null;
      if (this.blockWritten > 0) {
        LOG.info("Upload the last part..., blockId: [{}], written bytes: [{}]",
            this.currentBlockId, this.blockWritten);
        partETag = store.uploadPart(
            new ByteBufferInputStream(currentBlockBuffer.getByteBuffer()),
            key, uploadId, currentBlockId + 1,
            currentBlockBuffer.getByteBuffer().remaining());
      }
      final List<PartETag> futurePartETagList = this.waitForFinishPartUploads();
      if (null == futurePartETagList) {
        throw new IOException("Failed to multipart upload to cos, abort it.");
      }
      List<PartETag> tmpPartEtagList = new LinkedList<>(futurePartETagList);
      if (null != partETag) {
        tmpPartEtagList.add(partETag);
      }
      store.completeMultipartUpload(this.key, this.uploadId, tmpPartEtagList);
    }
    try {
      BufferPool.getInstance().returnBuffer(this.currentBlockBuffer);
    } catch (InterruptedException e) {
      LOG.error("An exception occurred "
          + "while returning the buffer to the buffer pool.", e);
    }
    LOG.info("The outputStream for key: [{}] has been uploaded.", key);
    this.blockWritten = 0;
    this.closed = true;
  }

  private List<PartETag> waitForFinishPartUploads() throws IOException {
    try {
      LOG.info("Wait for all parts to finish their uploading.");
      return Futures.allAsList(this.etagList).get();
    } catch (InterruptedException e) {
      LOG.error("Interrupt the part upload.", e);
      return null;
    } catch (ExecutionException e) {
      LOG.error("Cancelling futures.");
      for (ListenableFuture<PartETag> future : this.etagList) {
        future.cancel(true);
      }
      (store).abortMultipartUpload(this.key, this.uploadId);
      LOG.error("Multipart upload with id: [{}] to COS key: [{}]",
          this.uploadId, this.key, e);
      throw new IOException("Multipart upload with id: "
          + this.uploadId + " to " + this.key, e);
    }
  }

  private void uploadPart() throws IOException {
    this.currentBlockOutputStream.flush();
    this.currentBlockOutputStream.close();
    this.blockCacheBuffers.add(this.currentBlockBuffer);

    if (this.currentBlockId == 0) {
      uploadId = (store).getUploadId(key);
    }

    ListenableFuture<PartETag> partETagListenableFuture =
        this.executorService.submit(
            new Callable<PartETag>() {
              private final ByteBufferWrapper buf = currentBlockBuffer;
              private final String localKey = key;
              private final String localUploadId = uploadId;
              private final int blockId = currentBlockId;

              @Override
              public PartETag call() throws Exception {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("{} is uploading a part.",
                      Thread.currentThread().getName());
                }
                PartETag partETag = (store).uploadPart(
                    new ByteBufferInputStream(this.buf.getByteBuffer()),
                    this.localKey, this.localUploadId,
                    this.blockId + 1, this.buf.getByteBuffer().remaining());
                BufferPool.getInstance().returnBuffer(this.buf);
                return partETag;
              }
            });
    this.etagList.add(partETagListenableFuture);
    try {
      this.currentBlockBuffer =
          BufferPool.getInstance().getBuffer((int) this.blockSize);
    } catch (IOException e) {
      String errMsg = String.format("Getting a buffer [size:%d] from "
          + "the buffer pool failed.", this.blockSize);
      throw new IOException(errMsg, e);
    }
    this.currentBlockId++;
    if (null != this.digest) {
      this.digest.reset();
      this.currentBlockOutputStream = new DigestOutputStream(
          new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer()),
          this.digest);
    } else {
      this.currentBlockOutputStream =
          new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer());
    }
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    if (this.closed) {
      throw new IOException("block stream has been closed.");
    }

    while (len > 0) {
      long writeBytes;
      if (this.blockWritten + len > this.blockSize) {
        writeBytes = this.blockSize - this.blockWritten;
      } else {
        writeBytes = len;
      }

      this.currentBlockOutputStream.write(b, off, (int) writeBytes);
      this.blockWritten += writeBytes;
      if (this.blockWritten >= this.blockSize) {
        this.uploadPart();
        this.blockWritten = 0;
      }
      len -= writeBytes;
      off += writeBytes;
    }
  }

  @Override
  public void write(byte[] b) throws IOException {
    this.write(b, 0, b.length);
  }

  @Override
  public void write(int b) throws IOException {
    if (this.closed) {
      throw new IOException("block stream has been closed.");
    }

    byte[] singleBytes = new byte[1];
    singleBytes[0] = (byte) b;
    this.currentBlockOutputStream.write(singleBytes, 0, 1);
    this.blockWritten += 1;
    if (this.blockWritten >= this.blockSize) {
      this.uploadPart();
      this.blockWritten = 0;
    }
  }
}