GoogleHadoopFSInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.gs;

import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;

final class GoogleHadoopFSInputStream extends FSInputStream {
  public static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFSInputStream.class);

  // Used for single-byte reads.
  private final byte[] singleReadBuf = new byte[1];

  // Path of the file to read.
  private final URI gcsPath;
  // File Info of gcsPath, will be pre-populated in some cases i.e. when Json client is used and
  // failFast is disabled.

  // All store IO access goes through this.
  private final SeekableByteChannel channel;
  // Number of bytes read through this channel.
  private long totalBytesRead = 0;

  /**
   * Closed bit. Volatile so reads are non-blocking. Updates must be in a synchronized block to
   * guarantee an atomic check and set
   */
  private volatile boolean closed;

  // Statistics tracker provided by the parent GoogleHadoopFileSystem for recording stats
  private final FileSystem.Statistics statistics;

  static GoogleHadoopFSInputStream create(
      GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics statistics)
      throws IOException {
    LOG.trace("create(gcsPath: {})", gcsPath);
    GoogleCloudStorageFileSystem gcsFs = ghfs.getGcsFs();
    FileInfo fileInfo = gcsFs.getFileInfoObject(gcsPath);
    SeekableByteChannel channel = gcsFs.open(fileInfo, ghfs.getFileSystemConfiguration());
    return new GoogleHadoopFSInputStream(gcsPath, channel, statistics);
  }

  private GoogleHadoopFSInputStream(
      URI gcsPath,
      SeekableByteChannel channel,
      FileSystem.Statistics statistics) {
    LOG.trace("GoogleHadoopFSInputStream(gcsPath: {})", gcsPath);
    this.gcsPath = gcsPath;
    this.channel = channel;
    this.statistics = statistics;
  }

  @Override
  public synchronized int read() throws IOException {
    checkNotClosed();
    int numRead = read(singleReadBuf, /* offset= */ 0, /* length= */ 1);
    checkState(
        numRead == -1 || numRead == 1,
        "Read %s bytes using single-byte buffer for path %s ending in position %s",
        numRead,
        gcsPath,
        channel.position());
    return numRead > 0 ? singleReadBuf[0] & 0xff : numRead;
  }

  @Override
  public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws IOException {
    checkNotClosed();
    checkNotNull(buf, "buf must not be null");
    if (offset < 0 || length < 0 || length > buf.length - offset) {
      throw new IndexOutOfBoundsException();
    }

    // TODO(user): Wrap this in a while-loop if we ever introduce a non-blocking mode for
    // the underlying channel.
    int numRead = channel.read(ByteBuffer.wrap(buf, offset, length));
    if (numRead > 0) {
      // -1 means we actually read 0 bytes, but requested at least one byte.
      totalBytesRead += numRead;
      statistics.incrementBytesRead(numRead);
      statistics.incrementReadOps(1);
    }
    return numRead;
  }

  @Override
  public synchronized void seek(long pos) throws IOException {
    checkNotClosed();
    LOG.trace("seek({})", pos);
    try {
      channel.position(pos);
    } catch (IllegalArgumentException e) {
      throw new IOException(e);
    }
  }

  @Override
  public synchronized void close() throws IOException {
    if (!closed) {
      closed = true;

      LOG.trace("close(): {}", gcsPath);
      try {
        if (channel != null) {
          LOG.trace(
              "Closing '{}' file with {} total bytes read", gcsPath, totalBytesRead);
          channel.close();
        }
      } catch (Exception e) {
        LOG.warn("Error while closing underneath read channel resources for path: {}", gcsPath, e);
      }
    }
  }

  /**
   * Gets the current position within the file being read.
   *
   * @return The current position within the file being read.
   * @throws IOException if an IO error occurs.
   */
  @Override
  public synchronized long getPos() throws IOException {
    checkNotClosed();
    long pos = channel.position();
    LOG.trace("getPos(): {}", pos);
    return pos;
  }

  /**
   * Seeks a different copy of the data. Not supported.
   *
   * @return true if a new source is found, false otherwise.
   */
  @Override
  public boolean seekToNewSource(long targetPos) {
    LOG.trace("seekToNewSource({}): false", targetPos);
    return false;
  }

  @Override
  public int available() throws IOException {
    if (!channel.isOpen()) {
      throw new ClosedChannelException();
    }
    return super.available();
  }

  /**
   * Verify that the input stream is open. Non-blocking; this gives the last state of the volatile
   * {@link #closed} field.
   *
   * @throws IOException if the connection is closed.
   */
  private void checkNotClosed() throws IOException {
    if (closed) {
      throw new IOException(gcsPath + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
    }
  }
}