AnalyticsStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.s3a.impl.streams;
import java.io.EOFException;
import java.io.IOException;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
/**
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
* parquet specific optimisations such as parquet-aware prefetching. For more details, see
* https://github.com/awslabs/analytics-accelerator-s3.
*/
public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities {
private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private volatile boolean closed;
public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);
public AnalyticsStream(final ObjectReadParameters parameters,
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
super(InputStreamType.Analytics, parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
}
@Override
public int read() throws IOException {
throwIfClosed();
int bytesRead;
try {
bytesRead = inputStream.read();
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}
@Override
public void seek(long pos) throws IOException {
throwIfClosed();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ " " + pos);
}
inputStream.seek(pos);
}
@Override
public synchronized long getPos() {
if (!closed) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
}
/**
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
* reached. Leaves the position of the stream unaltered.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
* @return the total number of bytes read into the buffer
* @throws IOException if an I/O error occurs
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public int available() throws IOException {
throwIfClosed();
return super.available();
}
@Override
protected boolean isStreamOpen() {
return !isClosed();
}
protected boolean isClosed() {
return inputStream == null;
}
@Override
protected void abortInFinalizer() {
try {
close();
} catch (IOException ignored) {
}
}
@Override
public synchronized void close() throws IOException {
if(!closed) {
closed = true;
try {
inputStream.close();
inputStream = null;
super.close();
} catch (IOException ioe) {
LOG.debug("Failure closing stream {}: ", getKey());
throw ioe;
}
}
}
/**
* Close the stream on read failure.
* No attempt to recover from failure
*
* @param ioe exception caught.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"not trying to recover:",
getKey(), ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"not trying to recover:",
getKey(), ioe);
}
this.close();
}
private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
.getInputPolicy()));
if (parameters.getObjectAttributes().getETag() != null) {
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
.contentLength(parameters.getObjectAttributes().getLen())
.etag(parameters.getObjectAttributes().getETag()).build());
}
return openStreamInformationBuilder.build();
}
/**
* If S3A's input policy is Sequential, that is, if the file format to be read is sequential
* (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's parquet specific
* optimisations will be turned off, regardless of the file extension. This is to allow for
* applications like DISTCP that read parquet files, but will read them whole, and so do not
* follow the typical parquet read patterns of reading footer first etc. and will not benefit
* from parquet optimisations.
* Else, AAL will make a decision on which optimisations based on the file extension,
* if the file ends in .par or .parquet, then parquet specific optimisations are used.
*
* @param inputPolicy S3A's input file policy passed down when opening the file
* @return the AAL read policy
*/
private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
switch (inputPolicy) {
case Sequential:
return InputPolicy.Sequential;
default:
return InputPolicy.None;
}
}
protected void throwIfClosed() throws IOException {
if (closed) {
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
}