ObjectInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl.streams;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.LeakReporter;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
/**
* A stream of data from an S3 object.
* <p>
* The base class includes common methods, stores
* common data and incorporates leak tracking.
*/
public abstract class ObjectInputStream extends FSInputStream
implements StreamCapabilities, IOStatisticsSource {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectInputStream.class);
/**
* IOStatistics report.
*/
private final IOStatistics ioStatistics;
/**
* Read-specific operation context structure.
*/
private final S3AReadOpContext context;
/**
* Callbacks for reading input stream data from the S3 Store.
*/
private final ObjectInputStreamCallbacks callbacks;
/**
* Thread pool used for bounded IO operations.
*/
private final ExecutorService boundedThreadPool;
/**
* URI of path.
*/
private final String uri;
/**
* Store bucket.
*/
private final String bucket;
/**
* Store key.
*/
private final String key;
/**
* Path URI as a string.
*/
private final String pathStr;
/**
* Content length from HEAD or openFile option.
*/
private final long contentLength;
/**
* Attributes of the remote object.
*/
private final S3ObjectAttributes objectAttributes;
/**
* Stream statistics.
*/
private final S3AInputStreamStatistics streamStatistics;
/** Aggregator used to aggregate per thread IOStatistics. */
private final IOStatisticsAggregator threadIOStatistics;
/**
* Report of leaks.
* with report and abort unclosed streams in finalize().
*/
private final LeakReporter leakReporter;
/**
* Stream type.
*/
private final InputStreamType streamType;
/**
* Requested input policy.
*/
private S3AInputPolicy inputPolicy;
/** Vectored IO context. */
private final VectoredIOContext vectoredIOContext;
/**
* Constructor.
* @param streamType stream type enum.
* @param parameters extensible parameter list.
*/
protected ObjectInputStream(
final InputStreamType streamType,
final ObjectReadParameters parameters) {
this.streamType = requireNonNull(streamType);
this.objectAttributes = parameters.getObjectAttributes();
checkArgument(isNotEmpty(objectAttributes.getBucket()),
"No Bucket");
checkArgument(isNotEmpty(objectAttributes.getKey()), "No Key");
long l = objectAttributes.getLen();
checkArgument(l >= 0, "Negative content length");
this.context = parameters.getContext();
this.contentLength = l;
this.bucket = objectAttributes.getBucket();
this.key = objectAttributes.getKey();
this.pathStr = objectAttributes.getPath().toString();
this.callbacks = parameters.getCallbacks();
this.uri = "s3a://" + bucket + "/" + key;
this.streamStatistics = parameters.getStreamStatistics();
this.ioStatistics = streamStatistics.getIOStatistics();
this.inputPolicy = context.getInputPolicy();
streamStatistics.inputPolicySet(inputPolicy.ordinal());
this.boundedThreadPool = parameters.getBoundedThreadPool();
this.threadIOStatistics = requireNonNull(context.getIOStatisticsAggregator());
// build the leak reporter
this.leakReporter = new LeakReporter(
"Stream not closed while reading " + uri,
this::isStreamOpen,
this::abortInFinalizer);
this.vectoredIOContext = getContext().getVectoredIOContext();
}
/**
* Probe for stream being open.
* Not synchronized; the flag is volatile.
* @return true if the stream is still open.
*/
protected abstract boolean isStreamOpen();
/**
* Brute force stream close; invoked by {@link LeakReporter}.
* All exceptions raised are ignored.
*/
protected abstract void abortInFinalizer();
/**
* Close the stream.
* This triggers publishing of the stream statistics back to the filesystem
* statistics.
* This operation is synchronized, so that only one thread can attempt to
* @throws IOException on any problem
*/
@Override
public synchronized void close() throws IOException {
// end the client+audit span.
callbacks.close();
// merge the statistics back into the FS statistics.
streamStatistics.close();
// Collect ThreadLevel IOStats
mergeThreadIOStatistics(streamStatistics.getIOStatistics());
}
/**
* Merging the current thread's IOStatistics with the current IOStatistics
* context.
* @param streamIOStats Stream statistics to be merged into thread
* statistics aggregator.
*/
protected void mergeThreadIOStatistics(IOStatistics streamIOStats) {
threadIOStatistics.aggregate(streamIOStats);
}
/**
* Finalizer.
* <p>
* Verify that the inner stream is closed.
* <p>
* If it is not, it means streams are being leaked in application code.
* Log a warning, including the stack trace of the caller,
* then abort the stream.
* <p>
* This does not attempt to invoke {@link #close()} as that is
* a more complex operation, and this method is being executed
* during a GC finalization phase.
* <p>
* Applications MUST close their streams; this is a defensive
* operation to return http connections and warn the end users
* that their applications are at risk of running out of connections.
*
* {@inheritDoc}
*/
@Override
protected void finalize() throws Throwable {
leakReporter.close();
super.finalize();
}
/**
* Get the current input policy.
* @return input policy.
*/
@VisibleForTesting
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
/**
* Set/update the input policy of the stream.
* This updates the stream statistics.
* @param inputPolicy new input policy.
*/
protected void setInputPolicy(S3AInputPolicy inputPolicy) {
LOG.debug("Switching to input policy {}", inputPolicy);
this.inputPolicy = inputPolicy;
streamStatistics.inputPolicySet(inputPolicy.ordinal());
}
/**
* Access the input stream statistics.
* This is for internal testing and may be removed without warning.
* @return the statistics for this input stream
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
return streamStatistics;
}
@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
/**
* Declare the base capabilities implemented by this class and so by
* all subclasses.
* <p>
* Subclasses MUST override this if they add more capabilities,
* or actually remove any of these.
* @param capability string to query the stream support for.
* @return true if all implementations are known to have the specific
* capability.
*/
@Override
public boolean hasCapability(String capability) {
switch (toLowerCase(capability)) {
case StreamCapabilities.IOSTATISTICS:
case StreamStatisticNames.STREAM_LEAKS:
return true;
default:
// dynamic probe for the name of this stream
if (streamType.capability().equals(capability)) {
return true;
}
return false;
}
}
/**
* Read-specific operation context structure.
* @return Read-specific operation context structure.
*/
protected final S3AReadOpContext getContext() {
return context;
}
/**
* Callbacks for reading input stream data from the S3 Store.
* @return Callbacks for reading input stream data from the S3 Store.
*/
protected final ObjectInputStreamCallbacks getCallbacks() {
return callbacks;
}
/**
* Thread pool used for bounded IO operations.
* @return Thread pool used for bounded IO operations.
*/
protected final ExecutorService getBoundedThreadPool() {
return boundedThreadPool;
}
/**
* URI of path.
* @return URI of path.
*/
protected final String getUri() {
return uri;
}
/**
* Store bucket.
* @return Store bucket.
*/
protected final String getBucket() {
return bucket;
}
/**
* Store key.
* @return Store key.
*/
protected final String getKey() {
return key;
}
/**
* Path URI as a string.
* @return Path URI as a string.
*/
protected final String getPathStr() {
return pathStr;
}
/**
* Content length from HEAD or openFile option.
* @return Content length from HEAD or openFile option.
*/
protected final long getContentLength() {
return contentLength;
}
/**
* Aggregator used to aggregate per thread IOStatistics.
* @return Aggregator used to aggregate per thread IOStatistics.
*/
protected final IOStatisticsAggregator getThreadIOStatistics() {
return threadIOStatistics;
}
/**
* Attributes of the remote object.
* @return Attributes of the remote object.
*/
protected final S3ObjectAttributes getObjectAttributes() {
return objectAttributes;
}
/**
* Get Vectored IO context.
* @return Vectored IO context.
*/
protected VectoredIOContext getVectoredIOContext() {
return vectoredIOContext;
}
/**
* {@inheritDoc}.
*/
@Override
public int minSeekForVectorReads() {
return vectoredIOContext.getMinSeekForVectorReads();
}
/**
* {@inheritDoc}.
*/
@Override
public int maxReadSizeForVectorReads() {
return vectoredIOContext.getMaxReadSizeForVectorReads();
}
public InputStreamType streamType() {
return streamType;
}
@Override
public String toString() {
return "ObjectInputStream{" +
"streamType=" + streamType +
", uri='" + uri + '\'' +
", contentLength=" + contentLength +
", inputPolicy=" + inputPolicy +
", vectoredIOContext=" + vectoredIOContext +
"} " + super.toString();
}
}