S3AInputStream.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.impl.LeakReporter;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.IOUtils;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* The input stream for an S3A object.
*
* As this stream seeks withing an object, it may close then re-open the stream.
* When this happens, any updated stream data may be retrieved, and, given
* the consistency model of Amazon S3, outdated data may in fact be picked up.
*
* As a result, the outcome of reading from a stream of an object which is
* actively manipulated during the read process is "undefined".
*
* The class is marked as private as code should not be creating instances
* themselves. Any extra feature (e.g instrumentation) should be considered
* unstable.
*
* Because it prints some of the state of the instrumentation,
* the output of {@link #toString()} must also be considered unstable.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInputStream extends ObjectInputStream implements CanSetReadahead,
CanUnbuffer, StreamCapabilities, IOStatisticsSource {
public static final String E_NEGATIVE_READAHEAD_VALUE
= "Negative readahead value";
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";
/**
* Switch for behavior on when wrappedStream.read()
* returns -1 or raises an EOF; the original semantics
* are that the stream is kept open.
* Value {@value}.
*/
private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;
/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
* operation. This is to ensure that when a big range of data is
* requested in direct byte buffer doesn't leads to OOM errors.
*/
private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
private static final Logger LOG =
LoggerFactory.getLogger(S3AInputStream.class);
/**
* Atomic boolean variable to stop all ongoing vectored read operation
* for this input stream. This will be set to true when the stream is
* closed or unbuffer is called.
*/
private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false);
/**
* This is the public position; the one set in {@link #seek(long)}
* and returned in {@link #getPos()}.
*/
private long pos;
/**
* Closed bit. Volatile so reads are non-blocking.
* Updates must be in a synchronized block to guarantee an atomic check and
* set
*/
private volatile boolean closed;
/**
* Input stream returned by a getObject call.
*/
private ResponseInputStream<GetObjectResponse> wrappedStream;
/**
* Content length in format for vector IO.
*/
private final Optional<Long> fileLength;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
*/
private long nextReadPos;
/**
* The end of the content range of the last request.
* This is an absolute value of the range, not a length field.
*/
private long contentRangeFinish;
/**
* The start of the content range of the last request.
*/
private long contentRangeStart;
/** change tracker. */
private final ChangeTracker changeTracker;
/**
* Threshold for stream reads to switch to
* asynchronous draining.
*/
private final long asyncDrainThreshold;
/**
* Create the stream.
* This does not attempt to open it; that is only done on the first
* actual read() operation.
*
* @param parameters creation parameters.
*/
public S3AInputStream(ObjectReadParameters parameters) {
super(InputStreamType.Classic, parameters);
this.fileLength = Optional.of(getContentLength());
S3AReadOpContext context = getContext();
this.changeTracker = new ChangeTracker(getUri(),
context.getChangeDetectionPolicy(),
getS3AStreamStatistics().getChangeTrackerStatistics(),
getObjectAttributes());
setReadahead(context.getReadahead());
this.asyncDrainThreshold = context.getAsyncDrainThreshold();
}
/**
* Probe for stream being open.
* Not synchronized; the flag is volatile.
* @return true if the stream is still open.
*/
@Override
protected boolean isStreamOpen() {
return !closed;
}
/**
* Brute force stream close; invoked by {@link LeakReporter}.
* All exceptions raised are ignored.
*/
@Override
protected void abortInFinalizer() {
try {
// stream was leaked: update statistic
getS3AStreamStatistics().streamLeaked();
// abort the stream. This merges statistics into the filesystem.
closeStream("finalize()", true, true).get();
} catch (InterruptedException | ExecutionException ignroed) {
/* ignore this failure shutdown */
}
}
/**
* If the stream is in Adaptive mode, switch to random IO at this
* point. Unsynchronized.
*/
private void maybeSwitchToRandomIO() {
if (getInputPolicy().isAdaptive()) {
setInputPolicy(S3AInputPolicy.Random);
}
}
/**
* Opens up the stream at specified target position and for given length.
*
* @param reason reason for reopen
* @param targetPos target position
* @param length length requested
* @throws IOException on any failure to open the object
*/
@Retries.OnceTranslated
private synchronized void reopen(String reason, long targetPos, long length,
boolean forceAbort) throws IOException {
if (isObjectStreamOpen()) {
closeStream("reopen(" + reason + ")", forceAbort, false);
}
contentRangeFinish = calculateRequestLimit(getInputPolicy(), targetPos,
length, getContentLength(), readahead);
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
" streamPosition={}, nextReadPosition={}, policy={}",
getUri(), reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
getInputPolicy());
GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey())
.range(S3AUtils.formatRange(targetPos, contentRangeFinish - 1))
.applyMutation(changeTracker::maybeApplyConstraint)
.build();
long opencount = getS3AStreamStatistics().streamOpened();
String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN;
String text = String.format("%s %s at %d",
operation, getUri(), targetPos);
wrappedStream = onceTrackingDuration(text, getUri(),
getS3AStreamStatistics().initiateGetRequest(), () ->
getCallbacks().getObject(request));
changeTracker.processResponse(wrappedStream.response(), operation,
targetPos);
contentRangeStart = targetPos;
this.pos = targetPos;
}
@Override
public synchronized long getPos() throws IOException {
return (nextReadPos < 0) ? 0 : nextReadPos;
}
@Override
public synchronized void seek(long targetPos) throws IOException {
checkNotClosed();
// Do not allow negative seek
if (targetPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ " " + targetPos);
}
if (this.getContentLength() <= 0) {
return;
}
// Lazy seek
nextReadPos = targetPos;
}
/**
* Seek without raising any exception. This is for use in
* {@code finally} clauses
* @param positiveTargetPos a target position which must be positive.
*/
private void seekQuietly(long positiveTargetPos) {
try {
seek(positiveTargetPos);
} catch (IOException ioe) {
LOG.debug("Ignoring IOE on seek of {} to {}",
getUri(), positiveTargetPos, ioe);
}
}
/**
* Adjust the stream to a specific position.
*
* @param targetPos target seek position
* @param length length of content that needs to be read from targetPos
* @throws IOException
*/
@Retries.OnceTranslated
private void seekInStream(long targetPos, long length) throws IOException {
checkNotClosed();
if (!isObjectStreamOpen()) {
return;
}
// compute how much more to skip
long diff = targetPos - pos;
if (diff > 0) {
// forward seek -this is where data can be skipped
int available = wrappedStream.available();
// always seek at least as far as what is available
long forwardSeekRange = Math.max(readahead, available);
// work out how much is actually left in the stream
// then choose whichever comes first: the range or the EOF
long remainingInCurrentRequest = remainingInCurrentRequest();
long forwardSeekLimit = Math.min(remainingInCurrentRequest,
forwardSeekRange);
boolean skipForward = remainingInCurrentRequest > 0
&& diff < forwardSeekLimit;
if (skipForward) {
// the forward seek range is within the limits
LOG.debug("Forward seek on {}, of {} bytes", getUri(), diff);
long skipped = wrappedStream.skip(diff);
if (skipped > 0) {
pos += skipped;
}
getS3AStreamStatistics().seekForwards(diff, skipped);
if (pos == targetPos) {
// all is well
LOG.debug("Now at {}: bytes remaining in current request: {}",
pos, remainingInCurrentRequest());
return;
} else {
// log a warning; continue to attempt to re-open
LOG.warn("Failed to seek on {} to {}. Current position {}",
getUri(), targetPos, pos);
}
} else {
// not attempting to read any bytes from the stream
getS3AStreamStatistics().seekForwards(diff, 0);
}
} else if (diff < 0) {
// backwards seek
getS3AStreamStatistics().seekBackwards(diff);
// if the stream is in "Normal" mode, switch to random IO at this
// point, as it is indicative of columnar format IO
maybeSwitchToRandomIO();
} else {
// targetPos == pos
if (remainingInCurrentRequest() > 0) {
// if there is data left in the stream, keep going
return;
}
}
// if the code reaches here, the stream needs to be reopened.
// close the stream; if read the object will be opened at the new pos
closeStream("seekInStream()", false, false);
pos = targetPos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
/**
* Perform lazy seek and adjust stream to correct position for reading.
* If an EOF Exception is raised there are two possibilities
* <ol>
* <li>the stream is at the end of the file</li>
* <li>something went wrong with the network connection</li>
* </ol>
* This method does not attempt to distinguish; it assumes that an EOF
* exception is always "end of file".
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
* @throws RangeNotSatisfiableEOFException GET is out of range
* @throws IOException anything else.
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {
Invoker invoker = getContext().getReadInvoker();
invoker.retry("lazySeek to " + targetPos, getPathStr(), true,
() -> {
//For lazy seek
seekInStream(targetPos, len);
//re-open at specific location if needed
if (!isObjectStreamOpen()) {
reopen("read from new offset", targetPos, len, false);
}
});
}
/**
* Increment the bytes read counter if there is a stats instance
* and the number of bytes read is more than zero.
* @param bytesRead number of bytes read
*/
private void incrementBytesRead(long bytesRead) {
getS3AStreamStatistics().bytesRead(bytesRead);
if (getContext().stats != null && bytesRead > 0) {
getContext().stats.incrementBytesRead(bytesRead);
}
}
@Override
@Retries.RetryTranslated
public synchronized int read() throws IOException {
checkNotClosed();
if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) {
return -1;
}
try {
lazySeek(nextReadPos, 1);
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
return -1;
}
Invoker invoker = getContext().getReadInvoker();
int byteRead = invoker.retry("read", getPathStr(), true,
() -> {
int b;
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (!isObjectStreamOpen()) {
reopen("failure recovery", getPos(), 1, false);
}
try {
b = wrappedStream.read();
} catch (HttpChannelEOFException | SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (IOException e) {
onReadFailure(e, false);
throw e;
}
return b;
});
if (byteRead >= 0) {
pos++;
nextReadPos++;
incrementBytesRead(1);
} else {
streamReadResultNegative();
}
return byteRead;
}
/**
* Close the stream on read failure.
* The filesystem's readException count will be incremented.
* @param ioe exception caught.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe, boolean forceAbort) {
GetObjectResponse objectResponse = wrappedStream == null ? null : wrappedStream.response();
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: ",
getUri(), getCallbacks(), objectResponse, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: " + ioe,
getUri(), getCallbacks(), objectResponse);
}
getS3AStreamStatistics().readException();
closeStream("failure recovery", forceAbort, false);
}
/**
* the read() call returned -1.
* this means "the connection has gone past the end of the object" or
* the stream has broken for some reason.
* so close stream (without an abort).
*/
private void streamReadResultNegative() {
if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
closeStream("wrappedStream.read() returned -1", false, false);
}
}
/**
* {@inheritDoc}
*
* This updates the statistics on read operations started and whether
* or not the read operation "completed", that is: returned the exact
* number of bytes requested.
* @throws IOException if there are other problems
*/
@Override
@Retries.RetryTranslated
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
checkNotClosed();
validatePositionedReadArgs(nextReadPos, buf, off, len);
if (len == 0) {
return 0;
}
if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) {
return -1;
}
try {
lazySeek(nextReadPos, len);
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
return -1;
}
Invoker invoker = getContext().getReadInvoker();
getS3AStreamStatistics().readOperationStarted(nextReadPos, len);
int bytesRead = invoker.retry("read", getPathStr(), true,
() -> {
int bytes;
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (!isObjectStreamOpen()) {
reopen("failure recovery", getPos(), 1, false);
}
try {
// read data; will block until there is data or the end of the stream is reached.
// returns 0 for "stream is open but no data yet" and -1 for "end of stream".
bytes = wrappedStream.read(buf, off, len);
} catch (HttpChannelEOFException | SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (EOFException e) {
LOG.debug("EOFException raised by http stream read(); downgrading to a -1 response", e);
return -1;
} catch (IOException e) {
onReadFailure(e, false);
throw e;
}
return bytes;
});
if (bytesRead > 0) {
pos += bytesRead;
nextReadPos += bytesRead;
incrementBytesRead(bytesRead);
} else {
streamReadResultNegative();
}
getS3AStreamStatistics().readOperationCompleted(len, bytesRead);
return bytesRead;
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(getUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
/**
* Close the stream.
* This triggers publishing of the stream statistics back to the filesystem
* statistics.
* This operation is synchronized, so that only one thread can attempt to
* close the connection; all later/blocked calls are no-ops.
* @throws IOException on any problem
*/
@Override
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
try {
stopVectoredIOOperations.set(true);
// close or abort the stream; blocking
closeStream("close() operation", false, true);
// end the client+audit span.
getCallbacks().close();
} finally {
super.close();
}
}
}
/**
* Close a stream: decide whether to abort or close, based on
* the length of the stream and the current position.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* The close is potentially; a future is returned.
* It's the draining of a stream which is time consuming so
* worth scheduling on a separate thread.
* In stream close, when an abort is issued or when there's no
* data to drain, block.
* This does not set the {@link #closed} flag.
* @param reason reason for stream being closed; used in messages
* @param forceAbort force an abort; used if explicitly requested.
* @param blocking should the call block for completion, or is async IO allowed
* @return a future for the async operation
*/
@Retries.OnceRaw
private CompletableFuture<Boolean> closeStream(
final String reason,
final boolean forceAbort,
final boolean blocking) {
if (!isObjectStreamOpen()) {
// steam is already closed
return CompletableFuture.completedFuture(false);
}
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture<Boolean> operation;
SDKStreamDrainer<ResponseInputStream<GetObjectResponse>> drainer = new SDKStreamDrainer<>(
getUri(),
wrappedStream,
shouldAbort,
(int) remaining,
getS3AStreamStatistics(),
reason);
if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
// don't bother with async IO if the caller plans to wait for
// the result, there's an abort (which is fast), or
// there is not much data to read.
operation = CompletableFuture.completedFuture(drainer.apply());
} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort
operation = getCallbacks().submit(drainer);
}
// either the stream is closed in the blocking call or the async call is
// submitted with its own copy of the references
wrappedStream = null;
return operation;
}
/**
* Forcibly reset the stream, by aborting the connection. The next
* {@code read()} operation will trigger the opening of a new HTTPS
* connection.
*
* This is potentially very inefficient, and should only be invoked
* in extreme circumstances. It logs at info for this reason.
*
* Blocks until the abort is completed.
*
* @return true if the connection was actually reset.
* @throws IOException if invoked on a closed stream.
*/
@InterfaceStability.Unstable
public synchronized boolean resetConnection() throws IOException {
checkNotClosed();
LOG.info("Forcing reset of connection to {}", getUri());
return awaitFuture(closeStream("reset()", true, true));
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = remainingInFile();
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
}
/**
* Bytes left in stream.
* @return how many bytes are left to read
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInFile() {
return this.getContentLength() - this.pos;
}
/**
* Bytes left in the current request.
* Only valid if there is an active request.
* @return how many bytes are left to read in the current GET.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInCurrentRequest() {
return this.contentRangeFinish - this.pos;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long getContentRangeFinish() {
return contentRangeFinish;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long getContentRangeStart() {
return contentRangeStart;
}
@Override
public boolean markSupported() {
return false;
}
/**
* String value includes statistics as well as stream state.
* <b>Important: there are no guarantees as to the stability
* of this value.</b>
* @return a string value for printing in logs/diagnostics
*/
@Override
@InterfaceStability.Unstable
public String toString() {
String s = getS3AStreamStatistics().toString();
synchronized (this) {
final StringBuilder sb = new StringBuilder(
"S3AInputStream{");
sb.append(super.toString()).append(" ");
sb.append(getUri());
sb.append(" wrappedStream=")
.append(isObjectStreamOpen() ? "open" : "closed");
sb.append(" read policy=").append(getInputPolicy());
sb.append(" pos=").append(pos);
sb.append(" nextReadPos=").append(nextReadPos);
sb.append(" contentLength=").append(getContentLength());
sb.append(" contentRangeStart=").append(contentRangeStart);
sb.append(" contentRangeFinish=").append(contentRangeFinish);
sb.append(" remainingInCurrentRequest=")
.append(remainingInCurrentRequest());
sb.append(" ").append(changeTracker);
sb.append(" ").append(getVectoredIOContext());
sb.append('\n').append(s);
sb.append('}');
return sb.toString();
}
}
/**
* Subclass {@code readFully()} operation which only seeks at the start
* of the series of operations; seeking back at the end.
*
* This is significantly higher performance if multiple read attempts are
* needed to fetch the data, as it does not break the HTTP connection.
*
* To maintain thread safety requirements, this operation is synchronized
* for the duration of the sequence.
* {@inheritDoc}
*
*/
@Override
@Retries.RetryTranslated
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkNotClosed();
validatePositionedReadArgs(position, buffer, offset, length);
getS3AStreamStatistics().readFullyOperationStarted(position, length);
if (length == 0) {
return;
}
int nread = 0;
synchronized (this) {
long oldPos = getPos();
try {
seek(position);
while (nread < length) {
int nbytes = read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
// no attempt is currently made to recover from stream read problems;
// a lazy seek to the offset is probably the solution.
// but it will need more qualification against failure handling
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
} finally {
seekQuietly(oldPos);
}
}
}
/**
* {@inheritDoc}
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public synchronized void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}
/**
* {@inheritDoc}
* Vectored read implementation for S3AInputStream.
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @param release the function to release a ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
LOG.debug("Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
checkNotClosed();
if (stopVectoredIOOperations.getAndSet(false)) {
LOG.debug("Reinstating vectored read operation for path {} ", getPathStr());
}
requireNonNull(allocate, "ranges");
requireNonNull(allocate, "allocate");
requireNonNull(release, "release");
// prepare to read
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
fileLength);
for (FileRange range : ranges) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
// switch to random IO and close any open stream.
// what happens if a read is in progress? bad things.
// ...which is why this method is synchronized
closeStream("readVectored()", false, false);
maybeSwitchToRandomIO();
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
sortedRanges.size());
for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
getBoundedThreadPool().submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
combinedFileRanges.size());
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
getBoundedThreadPool().submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", getPathStr(), ranges);
}
/**
* Read the data from S3 for the bigger combined file range and update all the
* underlying ranges.
* @param combinedFileRange big combined file range.
* @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
IntFunction<ByteBuffer> allocate) {
LOG.debug("Start reading {} from path {} ", combinedFileRange, getPathStr());
ResponseInputStream<GetObjectResponse> rangeContent = null;
try {
rangeContent = getS3ObjectInputStream("readCombinedFileRange",
combinedFileRange.getOffset(),
combinedFileRange.getLength());
populateChildBuffers(combinedFileRange, rangeContent, allocate);
} catch (Exception ex) {
LOG.debug("Exception while reading {} from path {} ", combinedFileRange, getPathStr(), ex);
// complete exception all the underlying ranges which have not already
// finished.
for(FileRange child : combinedFileRange.getUnderlying()) {
if (!child.getData().isDone()) {
child.getData().completeExceptionally(ex);
}
}
} finally {
IOUtils.cleanupWithLogger(LOG, rangeContent);
}
LOG.debug("Finished reading {} from path {} ", combinedFileRange, getPathStr());
}
/**
* Populate underlying buffers of the child ranges.
* There is no attempt to recover from any read failures.
* @param combinedFileRange big combined file range.
* @param objectContent data from s3.
* @param allocate method to allocate child byte buffers.
* @throws IOException any IOE.
* @throws EOFException if EOF if read() call returns -1
* @throws InterruptedIOException if vectored IO operation is stopped.
*/
private void populateChildBuffers(CombinedFileRange combinedFileRange,
InputStream objectContent,
IntFunction<ByteBuffer> allocate) throws IOException {
// If the combined file range just contains a single child
// range, we only have to fill that one child buffer else
// we drain the intermediate data between consecutive ranges
// and fill the buffers one by one.
if (combinedFileRange.getUnderlying().size() == 1) {
FileRange child = combinedFileRange.getUnderlying().get(0);
ByteBuffer buffer = allocate.apply(child.getLength());
populateBuffer(child, buffer, objectContent);
child.getData().complete(buffer);
} else {
FileRange prev = null;
for (FileRange child : combinedFileRange.getUnderlying()) {
checkIfVectoredIOStopped();
if (prev != null) {
final long position = prev.getOffset() + prev.getLength();
if (position < child.getOffset()) {
// there's data to drain between the requests.
// work out how much
long drainQuantity = child.getOffset() - position;
// and drain it.
drainUnnecessaryData(objectContent, position, drainQuantity);
}
}
ByteBuffer buffer = allocate.apply(child.getLength());
populateBuffer(child, buffer, objectContent);
child.getData().complete(buffer);
prev = child;
}
}
}
/**
* Drain unnecessary data in between ranges.
* There's no attempt at recovery here; it should be done at a higher level.
* @param objectContent s3 data stream.
* @param position position in file, for logging
* @param drainQuantity how many bytes to drain.
* @throws IOException any IOE.
* @throws EOFException if the end of stream was reached during the draining
*/
@Retries.OnceTranslated
private void drainUnnecessaryData(
final InputStream objectContent,
final long position,
long drainQuantity) throws IOException {
int drainBytes = 0;
int readCount;
byte[] drainBuffer;
int size = (int)Math.min(InternalConstants.DRAIN_BUFFER_SIZE, drainQuantity);
drainBuffer = new byte[size];
LOG.debug("Draining {} bytes from stream from offset {}; buffer size={}",
drainQuantity, position, size);
try {
long remaining = drainQuantity;
while (remaining > 0) {
checkIfVectoredIOStopped();
readCount = objectContent.read(drainBuffer, 0, (int)Math.min(size, remaining));
LOG.debug("Drained {} bytes from stream", readCount);
if (readCount < 0) {
// read request failed; often network issues.
// no attempt is made to recover at this point.
final String s = String.format(
"End of stream reached draining data between ranges; expected %,d bytes;"
+ " only drained %,d bytes before -1 returned (position=%,d)",
drainQuantity, drainBytes, position + drainBytes);
throw new EOFException(s);
}
drainBytes += readCount;
remaining -= readCount;
}
} finally {
getS3AStreamStatistics().readVectoredBytesDiscarded(drainBytes);
LOG.debug("{} bytes drained from stream ", drainBytes);
}
}
/**
* Read data from S3 for this range and populate the buffer.
* @param range range of data to read.
* @param buffer buffer to fill.
*/
private void readSingleRange(FileRange range, ByteBuffer buffer) {
LOG.debug("Start reading {} from {} ", range, getPathStr());
if (range.getLength() == 0) {
// a zero byte read.
buffer.flip();
range.getData().complete(buffer);
return;
}
ResponseInputStream<GetObjectResponse> objectRange = null;
try {
long position = range.getOffset();
int length = range.getLength();
objectRange = getS3ObjectInputStream("readSingleRange", position, length);
populateBuffer(range, buffer, objectRange);
range.getData().complete(buffer);
} catch (Exception ex) {
LOG.warn("Exception while reading a range {} from path {} ", range, getPathStr(), ex);
range.getData().completeExceptionally(ex);
} finally {
IOUtils.cleanupWithLogger(LOG, objectRange);
}
LOG.debug("Finished reading range {} from path {} ", range, getPathStr());
}
/**
* Get the s3 object input stream for S3 server for a specified range.
* Also checks if the vectored io operation has been stopped before and after
* the http get request such that we don't waste time populating the buffers.
* @param operationName name of the operation for which get object on S3 is called.
* @param position position of the object to be read from S3.
* @param length length from position of the object to be read from S3.
* @return result s3 object.
* @throws IOException exception if any.
* @throws InterruptedIOException if vectored io operation is stopped.
*/
@Retries.RetryTranslated
private ResponseInputStream<GetObjectResponse> getS3ObjectInputStream(
final String operationName, final long position, final int length) throws IOException {
checkIfVectoredIOStopped();
ResponseInputStream<GetObjectResponse> objectRange =
getS3Object(operationName, position, length);
checkIfVectoredIOStopped();
return objectRange;
}
/**
* Populates the buffer with data from objectContent
* till length. Handles both direct and heap byte buffers.
* calls {@code buffer.flip()} on the buffer afterwards.
* @param range vector range to populate.
* @param buffer buffer to fill.
* @param objectContent result retrieved from S3 store.
* @throws IOException any IOE.
* @throws EOFException if EOF if read() call returns -1
* @throws InterruptedIOException if vectored IO operation is stopped.
*/
private void populateBuffer(FileRange range,
ByteBuffer buffer,
InputStream objectContent) throws IOException {
int length = range.getLength();
if (buffer.isDirect()) {
VectoredReadUtils.readInDirectBuffer(range, buffer,
(position, tmp, offset, currentLength) -> {
readByteArray(objectContent, range, tmp, offset, currentLength);
return null;
});
buffer.flip();
} else {
// there is no use of a temp byte buffer, or buffer.put() calls,
// so flip() is not needed.
readByteArray(objectContent, range, buffer.array(), 0, length);
}
}
/**
* Read data into destination buffer from s3 object content.
* Calls {@link #incrementBytesRead(long)} to update statistics
* incrementally.
* @param objectContent result from S3.
* @param range range being read into
* @param dest destination buffer.
* @param offset start offset of dest buffer.
* @param length number of bytes to fill in dest.
* @throws IOException any IOE.
* @throws EOFException if EOF if read() call returns -1
* @throws InterruptedIOException if vectored IO operation is stopped.
*/
private void readByteArray(InputStream objectContent,
final FileRange range,
byte[] dest,
int offset,
int length) throws IOException {
LOG.debug("Reading {} bytes", length);
int readBytes = 0;
long position = range.getOffset();
while (readBytes < length) {
checkIfVectoredIOStopped();
int readBytesCurr = objectContent.read(dest,
offset + readBytes,
length - readBytes);
LOG.debug("read {} bytes from stream", readBytesCurr);
if (readBytesCurr < 0) {
throw new EOFException(
String.format("HTTP stream closed before all bytes were read."
+ " Expected %,d bytes but only read %,d bytes. Current position %,d"
+ " (%s)",
length, readBytes, position, range));
}
readBytes += readBytesCurr;
position += readBytesCurr;
// update io stats incrementally
incrementBytesRead(readBytesCurr);
}
}
/**
* Read data from S3 with retries for the GET request
* This also handles if file has been changed while the
* http call is getting executed. If the file has been
* changed RemoteFileChangedException is thrown.
* @param operationName name of the operation for which get object on S3 is called.
* @param position position of the object to be read from S3.
* @param length length from position of the object to be read from S3.
* @return S3Object result s3 object.
* @throws IOException exception if any.
* @throws InterruptedIOException if vectored io operation is stopped.
* @throws RemoteFileChangedException if file has changed on the store.
*/
@Retries.RetryTranslated
private ResponseInputStream<GetObjectResponse> getS3Object(String operationName,
long position,
int length)
throws IOException {
final GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey())
.range(S3AUtils.formatRange(position, position + length - 1))
.applyMutation(changeTracker::maybeApplyConstraint)
.build();
DurationTracker tracker = getS3AStreamStatistics().initiateGetRequest();
ResponseInputStream<GetObjectResponse> objectRange;
Invoker invoker = getContext().getReadInvoker();
try {
objectRange = invoker.retry(operationName, getPathStr(), true,
() -> {
checkIfVectoredIOStopped();
return getCallbacks().getObject(request);
});
} catch (IOException ex) {
tracker.failed();
throw ex;
} finally {
tracker.close();
}
changeTracker.processResponse(objectRange.response(), operationName,
position);
return objectRange;
}
/**
* Check if vectored io operation has been stooped. This happens
* when the stream is closed or unbuffer is called.
* @throws InterruptedIOException throw InterruptedIOException such
* that all running vectored io is
* terminated thus releasing resources.
*/
private void checkIfVectoredIOStopped() throws InterruptedIOException {
if (stopVectoredIOOperations.get()) {
throw new InterruptedIOException("Stream closed or unbuffer is called");
}
}
@Override
public synchronized void setReadahead(Long readahead) {
this.readahead = validateReadahead(readahead);
}
/**
* Get the current readahead value.
* @return a non-negative readahead value
*/
public synchronized long getReadahead() {
return readahead;
}
/**
* Calculate the limit for a get request, based on input policy
* and state of object.
* @param inputPolicy input policy
* @param targetPos position of the read
* @param length length of bytes requested; if less than zero "unknown"
* @param contentLength total length of file
* @param readahead current readahead value
* @return the absolute value of the limit of the request.
*/
static long calculateRequestLimit(
S3AInputPolicy inputPolicy,
long targetPos,
long length,
long contentLength,
long readahead) {
long rangeLimit;
switch (inputPolicy) {
case Random:
// positioned.
// read either this block, or the here + readahead value.
rangeLimit = (length < 0) ? contentLength
: targetPos + Math.max(readahead, length);
break;
case Sequential:
// sequential: plan for reading the entire object.
rangeLimit = contentLength;
break;
case Normal:
// normal is considered sequential until a backwards seek switches
// it to 'Random'
default:
rangeLimit = contentLength;
}
// cannot read past the end of the object
rangeLimit = Math.min(contentLength, rangeLimit);
return rangeLimit;
}
/**
* from a possibly null Long value, return a valid
* readahead.
* @param readahead new readahead
* @return a natural number.
* @throws IllegalArgumentException if the range is invalid.
*/
public static long validateReadahead(@Nullable Long readahead) {
if (readahead == null) {
return Constants.DEFAULT_READAHEAD_RANGE;
} else {
Preconditions.checkArgument(readahead >= 0, E_NEGATIVE_READAHEAD_VALUE);
return readahead;
}
}
/**
* Closes the underlying S3 stream, and merges the {@link #streamStatistics}
* instance associated with the stream.
* Also sets the {@code stopVectoredIOOperations} flag to true such that
* active vectored read operations are terminated. However termination of
* old vectored reads are not guaranteed if a new vectored read operation
* is initiated after unbuffer is called.
*/
@Override
public synchronized void unbuffer() {
try {
stopVectoredIOOperations.set(true);
closeStream("unbuffer()", false, false);
} finally {
getS3AStreamStatistics().unbuffered();
if (getInputPolicy().isAdaptive()) {
S3AInputPolicy policy = S3AInputPolicy.Random;
setInputPolicy(policy);
}
}
}
@Override
public boolean hasCapability(String capability) {
switch (toLowerCase(capability)) {
case StreamCapabilities.IOSTATISTICS_CONTEXT:
case StreamCapabilities.READAHEAD:
case StreamCapabilities.UNBUFFER:
return true;
default:
return super.hasCapability(capability);
}
}
/**
* Is the inner object stream open?
* @return true if there is an active HTTP request to S3.
*/
@VisibleForTesting
public boolean isObjectStreamOpen() {
return wrappedStream != null;
}
/**
* Get the wrapped stream.
* This is for testing only.
*
* @return the wrapped stream, or null if there is none.
*/
@VisibleForTesting
public ResponseInputStream<GetObjectResponse> getWrappedStream() {
return wrappedStream;
}
}