ChainTOSInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.object.tos;

import org.apache.hadoop.fs.tosfs.common.Chain;
import org.apache.hadoop.util.Preconditions;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class ChainTOSInputStream extends InputStream {
  private final Chain<TOSInputStream> chain;
  private final TOS.GetObjectFactory factory;
  private final String key;
  private long curOff;
  private final long endOff; // range end offset (inclusive)
  private final long maxDrainByteSize;
  private final int maxInputStreamRetries;

  private int readBytes;
  private long skipped;
  private byte[] objChecksum = null;
  private final AtomicBoolean closed = new AtomicBoolean(false);

  public ChainTOSInputStream(
      TOS.GetObjectFactory factory,
      String key,
      long startOff,
      long endOff,
      long maxDrainByteSize,
      int maxInputStreamRetries) {
    this.factory = factory;
    this.key = key;
    this.curOff = startOff;
    this.endOff = endOff;
    this.maxDrainByteSize = maxDrainByteSize;
    this.maxInputStreamRetries = maxInputStreamRetries;
    this.chain = createChain();
    Preconditions.checkNotNull(objChecksum, "Checksum should not be null.");
  }

  private Chain<TOSInputStream> createChain() {
    Chain.Builder<TOSInputStream> builder = Chain.<TOSInputStream>builder()
        .shouldContinue(e -> !(e instanceof EOFException));

    for (int i = 0; i <= maxInputStreamRetries; i++) {
      builder.addLast(() -> {
        GetObjectOutput output = factory.create(key, curOff, endOff);

        // Note: If there are some IO errors occur, the ChainTOSInputStream will create a new
        // stream in the chain to continue reading object data, we need to record the checksum
        // during first open object stream, and ensure the checksum of object stream won't be
        // changed if opening object many times within the lifecycle of the chained stream in case
        // the underlying object is changed.
        if (objChecksum == null) {
          // Init the stream checksum.
          objChecksum = output.checksum();
        }
        return new TOSInputStream(output, curOff, endOff, maxDrainByteSize, objChecksum);
      });
    }

    try {
      return builder.build();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

  @Override
  public long skip(long n) throws IOException {
    skipped = 0;
    return chain.run(stream -> {
      long skip = stream.skip(n - skipped);

      curOff += skip;
      skipped += skip;
      return skipped;
    });
  }

  @Override
  public int read() throws IOException {
    return chain.run(stream -> {
      int ret = stream.read();
      curOff++;
      return ret;
    });
  }

  @Override
  public int available() throws IOException {
    return chain.run(InputStream::available);
  }

  @Override
  public int read(byte[] b, int off, int len) throws IOException {
    readBytes = 0;
    return chain.run(in -> {
      int read = in.read(b, off + readBytes, len - readBytes);

      readBytes += read;
      curOff += read;
      return readBytes;
    });
  }

  @Override
  public void close() throws IOException {
    if (closed.compareAndSet(false, true)) {
      chain.close();
    }
  }

  public byte[] checksum() {
    return objChecksum;
  }
}