_Private_IonReaderBuilder.java
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.impl;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.util.InputStreamInterceptor;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.util.IonStreamUtils;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.Collections;
import java.util.List;
import static com.amazon.ion.impl.LocalSymbolTable.DEFAULT_LST_FACTORY;
import static com.amazon.ion.impl._Private_IonReaderFactory.makeReader;
import static com.amazon.ion.impl._Private_IonReaderFactory.makeReaderText;
/**
* {@link IonReaderBuilder} extension for internal use only.
*/
public class _Private_IonReaderBuilder extends IonReaderBuilder {
private _Private_LocalSymbolTableFactory lstFactory;
private _Private_IonReaderBuilder() {
super();
lstFactory = DEFAULT_LST_FACTORY;
}
private _Private_IonReaderBuilder(_Private_IonReaderBuilder that) {
super(that);
this.lstFactory = that.lstFactory;
}
/**
* Declares the {@link _Private_LocalSymbolTableFactory} to use when constructing applicable readers.
*
* @param factory the factory to use, or {@link LocalSymbolTable#DEFAULT_LST_FACTORY} if null.
*
* @return this builder instance, if mutable;
* otherwise a mutable copy of this builder.
*
* @see #setLstFactory(_Private_LocalSymbolTableFactory)
*/
public IonReaderBuilder withLstFactory(_Private_LocalSymbolTableFactory factory) {
_Private_IonReaderBuilder b = (_Private_IonReaderBuilder) mutable();
b.setLstFactory(factory);
return b;
}
/**
* @see #withLstFactory(_Private_LocalSymbolTableFactory)
*/
public void setLstFactory(_Private_LocalSymbolTableFactory factory) {
mutationCheck();
if (factory == null) {
lstFactory = DEFAULT_LST_FACTORY;
} else {
lstFactory = factory;
}
}
public static class Mutable extends _Private_IonReaderBuilder {
public Mutable() {
}
public Mutable(IonReaderBuilder that) {
super((_Private_IonReaderBuilder) that);
}
@Override
public IonReaderBuilder immutable() {
return new _Private_IonReaderBuilder(this);
}
@Override
public IonReaderBuilder mutable() {
return this;
}
@Override
protected void mutationCheck() {
}
}
/**
* InputStream that reads from exactly two delegate InputStreams in sequence. The second delegate remains
* in place even if it indicates EOF during a read call. This behavior differentiates this implementation from
* SequenceInputStream, which will return EOF forever after its final delegate returns EOF for the first time.
* The TwoElementSequenceInputStream allows the second delegate InputStream to return valid data if it subsequently
* receives more data, which is common when performing continuable reads.
*/
private static final class TwoElementInputStream extends InputStream {
/**
* The first InputStream in the sequence.
*/
private final InputStream first;
/**
* The last InputStream in the sequence.
*/
private final InputStream last;
/**
* The current InputStream.
*/
private InputStream in;
/**
* Constructor.
* @param first first InputStream in the sequence.
* @param last last InputStream in the sequence.
*/
private TwoElementInputStream(final InputStream first, final InputStream last) {
this.first = first;
this.last = last;
this.in = first;
}
@Override
public int available() throws IOException {
return first.available() + last.available();
}
@Override
public int read() throws IOException {
int b = in.read();
if (b < 0 && in == first) {
in = last;
b = in.read();
}
return b;
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
int bytesToRead = len;
int bytesRead = 0;
int destinationOffset = off;
while (true) {
int bytesReadThisIteration = in.read(b, destinationOffset, bytesToRead);
if (bytesReadThisIteration < 0) {
if (in == first) {
in = last;
continue;
}
break;
}
bytesRead += bytesReadThisIteration;
if (bytesRead == len) {
break;
} else if (in == last) {
// There's no other source of bytes, so return fewer bytes than requested.
break;
}
bytesToRead -= bytesReadThisIteration;
destinationOffset += bytesReadThisIteration;
}
if (bytesRead > 0) {
return bytesRead;
}
return -1;
}
@Override
public void close() throws IOException {
try {
first.close();
} finally {
last.close();
}
}
}
@FunctionalInterface
interface IonReaderFromBytesFactoryText {
IonReader makeReader(IonCatalog catalog, byte[] ionData, int offset, int length, _Private_LocalSymbolTableFactory lstFactory);
}
@FunctionalInterface
interface IonReaderFromBytesFactoryBinary {
IonReader makeReader(_Private_IonReaderBuilder builder, byte[] ionData, int offset, int length);
}
private static void validateHeaderLength(int maxHeaderLength) {
if (maxHeaderLength > _Private_IonConstants.ARRAY_MAXIMUM_SIZE) {
// Note: we could choose an arbitrary limit lower than this. The purpose at this point is to avoid OOM
// in the case where Java cannot allocate an array of the requested size.
throw new IonException(String.format(
"The maximum header length %d exceeds the maximum array size %d.",
maxHeaderLength,
_Private_IonConstants.ARRAY_MAXIMUM_SIZE
));
}
}
static IonReader buildReader(
_Private_IonReaderBuilder builder,
byte[] ionData,
int offset,
int length,
IonReaderFromBytesFactoryBinary binary,
IonReaderFromBytesFactoryText text
) {
List<InputStreamInterceptor> streamInterceptors = builder.getInputStreamInterceptors();
for (InputStreamInterceptor streamInterceptor : streamInterceptors) {
int headerLength = streamInterceptor.numberOfBytesNeededToDetermineMatch();
validateHeaderLength(headerLength);
if (length < headerLength) {
continue;
}
if (streamInterceptor.isMatch(ionData, offset, length)) {
try {
return buildReader(
builder,
streamInterceptor.newInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText,
// The builder provides only one level of detection, e.g. GZIP-compressed binary Ion *or*
// zstd-compressed binary Ion; *not* GZIP-compressed zstd-compressed binary Ion. Users that
// need to intercept multiple format layers can provide a custom InputStreamInterceptor to
// achieve this.
/*inputStreamInterceptors=*/ Collections.emptyList()
);
} catch (IOException e) {
throw new IonException(e);
}
}
}
if (IonStreamUtils.isIonBinary(ionData, offset, length)) {
return binary.makeReader(builder, ionData, offset, length);
}
return text.makeReader(builder.validateCatalog(), ionData, offset, length, builder.lstFactory);
}
@Override
public IonReader build(byte[] ionData, int offset, int length)
{
return buildReader(
this,
ionData,
offset,
length,
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
);
}
/**
* Determines whether a stream that begins with the bytes in the provided buffer could be binary Ion.
* @param buffer up to the first four bytes in a stream.
* @param length the actual number of bytes in the buffer.
* @return true if the first 'length' bytes in 'buffer' match the first 'length' bytes in the binary IVM.
*/
private static boolean startsWithIvm(byte[] buffer, int length) {
if (length >= _Private_IonConstants.BINARY_VERSION_MARKER_SIZE) {
return buffer[0] == (byte) 0xE0
&& buffer[3] == (byte) 0xEA;
} else if (length >= 1) {
return buffer[0] == (byte) 0xE0;
}
return true;
}
@FunctionalInterface
interface IonReaderFromInputStreamFactoryText {
IonReader makeReader(IonCatalog catalog, InputStream source, _Private_LocalSymbolTableFactory lstFactory);
}
@FunctionalInterface
interface IonReaderFromInputStreamFactoryBinary {
IonReader makeReader(_Private_IonReaderBuilder builder, InputStream source, byte[] alreadyRead, int alreadyReadOff, int alreadyReadLen);
}
/**
* Reads from the given source into the given byte array, stopping once either
* <ol>
* <li>`length` bytes have been read, or</li>
* <li>the end of the source stream has been reached, or</li>
* <li>the source stream throws an exception.</li>
* </ol>
* @param source the source of the bytes to read.
* @param destination the destination for the bytes read.
* @param length the number of bytes to attempt to read.
* @return the number of bytes read into `destination`.
*/
private static int fillToLengthOrStreamEnd(InputStream source, byte[] destination, int length) {
int bytesRead = 0;
while (bytesRead < length) {
int bytesToRead = length - bytesRead;
int bytesReadThisIteration;
try {
bytesReadThisIteration = source.read(destination, bytesRead, bytesToRead);
} catch (EOFException e) {
// Some InputStream implementations throw EOFException in certain cases to convey
// that the end of the stream has been reached.
break;
} catch (IOException e) {
throw new IonException(e);
}
if (bytesReadThisIteration < 0) { // This indicates the end of the stream.
break;
}
bytesRead += bytesReadThisIteration;
}
return bytesRead;
}
static IonReader buildReader(
_Private_IonReaderBuilder builder,
InputStream source,
IonReaderFromInputStreamFactoryBinary binary,
IonReaderFromInputStreamFactoryText text,
List<InputStreamInterceptor> inputStreamInterceptors
) {
if (source == null) {
throw new NullPointerException("Cannot build a reader from a null InputStream.");
}
int maxHeaderLength = Math.max(
_Private_IonConstants.BINARY_VERSION_MARKER_SIZE,
inputStreamInterceptors.stream().mapToInt(InputStreamInterceptor::numberOfBytesNeededToDetermineMatch).max().orElse(0)
);
validateHeaderLength(maxHeaderLength);
// Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called
// from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream,
// GZIPInputStream -> PushbackInputStream -> ByteArrayInputStream). If this creates a drag on efficiency,
// alternatives should be evaluated.
byte[] possibleIVM = new byte[maxHeaderLength];
InputStream ionData = source;
int bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, maxHeaderLength);
// If the input stream is growing, it is possible that fewer than BINARY_VERSION_MARKER_SIZE bytes are
// available yet. Simply check whether the stream *could* contain binary Ion based on the available bytes.
// If it can't, fall back to text.
// NOTE: if incremental text reading is added, there will need to be logic that handles the case where
// the reader is created with 0 bytes available, as it is impossible to determine text vs. binary without
// reading at least one byte. Currently, in that case, just create a binary incremental reader. Either the
// stream will always be empty (in which case it doesn't matter whether a text or binary reader is used)
// or it's a binary stream (in which case the correct reader was created) or it's a growing text stream
// (which has always been unsupported).
for (InputStreamInterceptor streamInterceptor : inputStreamInterceptors) {
if (bytesRead < streamInterceptor.numberOfBytesNeededToDetermineMatch()) {
continue;
}
if (streamInterceptor.isMatch(possibleIVM, 0, bytesRead)) {
try {
ionData = streamInterceptor.newInputStream(
new TwoElementInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
} catch (IOException e) {
throw new IonException(e);
}
bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, _Private_IonConstants.BINARY_VERSION_MARKER_SIZE);
break;
}
}
if (startsWithIvm(possibleIVM, bytesRead)) {
return binary.makeReader(builder, ionData, possibleIVM, 0, bytesRead);
}
InputStream wrapper;
if (bytesRead > 0) {
wrapper = new TwoElementInputStream(
new ByteArrayInputStream(possibleIVM, 0, bytesRead),
ionData
);
} else {
wrapper = ionData;
}
return text.makeReader(builder.validateCatalog(), wrapper, builder.lstFactory);
}
@Override
public IonReader build(InputStream source)
{
return buildReader(
this,
source,
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText,
getInputStreamInterceptors()
);
}
@Override
public IonReader build(Reader ionText) {
return makeReaderText(validateCatalog(), ionText, lstFactory);
}
@Override
public IonReader build(IonValue value) {
return makeReader(validateCatalog(), value, lstFactory);
}
@Override
public IonTextReader build(String ionText) {
return makeReaderText(validateCatalog(), ionText, lstFactory);
}
}