TikaInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.io;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Blob;
import java.sql.SQLException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.TaggedInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.utils.StringUtils;
/**
* Input stream with extended capabilities for detection and parsing.
* <p>
* This implementation uses backing strategies to handle different input types:
* <ul>
* <li>{@link ByteArraySource} for byte[] inputs - no caching needed</li>
* <li>{@link FileSource} for Path/File inputs - direct file access</li>
* <li>{@link CachingSource} for InputStream inputs - caches bytes as read</li>
* </ul>
*
* @since Apache Tika 0.8
*/
public class TikaInputStream extends TaggedInputStream {
private static final int MAX_CONSECUTIVE_EOFS = 1000;
private static final int BLOB_SIZE_THRESHOLD = 1024 * 1024;
protected TemporaryResources tmp;
private long position = 0;
private long mark = -1;
private Object openContainer;
private int consecutiveEOFs = 0;
private int closeShieldDepth = 0;
private String suffix = null;
private long overrideLength = -1; // For getFromContainer() to set explicit length
// ========== Constructors ==========
/**
* Protected constructor for subclasses.
*/
protected TikaInputStream(InputStream stream, long length) {
super(stream);
this.tmp = null;
}
/**
* Strategy-based constructor.
* TikaInputSource extends InputStream, so we pass it directly to super().
*/
private TikaInputStream(TikaInputSource inputSource, TemporaryResources tmp, String suffix) {
super((InputStream) inputSource);
this.tmp = tmp;
this.suffix = suffix;
}
/**
* Returns the backing TikaInputSource, or null if using protected constructor.
*/
private TikaInputSource inputSource() {
return in instanceof TikaInputSource ? (TikaInputSource) in : null;
}
// ========== Static Factory Methods ==========
public static TikaInputStream get(InputStream stream, TemporaryResources tmp, Metadata metadata) {
if (stream == null) {
throw new NullPointerException("The Stream must not be null");
}
if (stream instanceof TikaInputStream) {
return (TikaInputStream) stream;
}
String ext = getExtension(metadata);
TikaInputSource inputSource = new CachingSource(stream, tmp, -1, metadata);
return new TikaInputStream(inputSource, tmp, ext);
}
public static TikaInputStream get(InputStream stream) {
return get(stream, new TemporaryResources(), null);
}
public static TikaInputStream get(InputStream stream, Metadata metadata) {
return get(stream, new TemporaryResources(), metadata);
}
public static TikaInputStream get(byte[] data) throws IOException {
return get(data, new Metadata());
}
public static TikaInputStream get(byte[] data, Metadata metadata) throws IOException {
metadata.set(Metadata.CONTENT_LENGTH, Integer.toString(data.length));
String ext = getExtension(metadata);
TemporaryResources tmp = new TemporaryResources();
TikaInputSource inputSource = new ByteArraySource(data, tmp);
return new TikaInputStream(inputSource, tmp, ext);
}
public static TikaInputStream get(Path path) throws IOException {
return get(path, new Metadata());
}
public static TikaInputStream get(Path path, Metadata metadata) throws IOException {
if (StringUtils.isBlank(metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY))) {
metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, path.getFileName().toString());
}
metadata.set(Metadata.CONTENT_LENGTH, Long.toString(Files.size(path)));
String ext = FilenameUtils.getSuffixFromPath(path.getFileName().toString());
TemporaryResources tmp = new TemporaryResources();
TikaInputSource inputSource = new FileSource(path);
return new TikaInputStream(inputSource, tmp, ext);
}
public static TikaInputStream get(Path path, Metadata metadata, TemporaryResources tmp)
throws IOException {
long length = Files.size(path);
if (StringUtils.isBlank(metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY))) {
metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, path.getFileName().toString());
}
metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length));
String ext = FilenameUtils.getSuffixFromPath(path.getFileName().toString());
TikaInputSource inputSource = new FileSource(path);
return new TikaInputStream(inputSource, tmp, ext);
}
public static TikaInputStream get(File file) throws IOException {
return get(file.toPath(), new Metadata());
}
public static TikaInputStream get(File file, Metadata metadata) throws IOException {
return get(file.toPath(), metadata);
}
public static TikaInputStream get(Blob blob) throws SQLException, IOException {
return get(blob, new Metadata());
}
public static TikaInputStream get(Blob blob, Metadata metadata) throws SQLException, IOException {
long length = -1;
try {
length = blob.length();
metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length));
} catch (SQLException ignore) {
}
if (0 <= length && length <= BLOB_SIZE_THRESHOLD) {
return get(blob.getBytes(1, (int) length), metadata);
} else {
String ext = getExtension(metadata);
TemporaryResources tmp = new TemporaryResources();
TikaInputSource inputSource = new CachingSource(
new BufferedInputStream(blob.getBinaryStream()), tmp, length, metadata);
return new TikaInputStream(inputSource, tmp, ext);
}
}
public static TikaInputStream get(URI uri) throws IOException {
return get(uri, new Metadata());
}
public static TikaInputStream get(URI uri, Metadata metadata) throws IOException {
if ("file".equalsIgnoreCase(uri.getScheme())) {
Path path = Paths.get(uri);
if (Files.isRegularFile(path)) {
return get(path, metadata);
}
}
return get(uri.toURL(), metadata);
}
public static TikaInputStream get(URL url) throws IOException {
return get(url, new Metadata());
}
public static TikaInputStream get(URL url, Metadata metadata) throws IOException {
if ("file".equalsIgnoreCase(url.getProtocol())) {
try {
Path path = Paths.get(url.toURI());
if (Files.isRegularFile(path)) {
return get(path, metadata);
}
} catch (URISyntaxException e) {
// fall through
}
}
URLConnection connection = url.openConnection();
String urlPath = url.getPath();
int slash = urlPath.lastIndexOf('/');
if (slash + 1 < urlPath.length()) {
metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, urlPath.substring(slash + 1));
}
String type = connection.getContentType();
if (type != null) {
metadata.set(Metadata.CONTENT_TYPE, type);
}
String encoding = connection.getContentEncoding();
if (encoding != null) {
metadata.set(Metadata.CONTENT_ENCODING, encoding);
}
int length = connection.getContentLength();
if (length >= 0) {
metadata.set(Metadata.CONTENT_LENGTH, Integer.toString(length));
}
String ext = getExtension(metadata);
TemporaryResources tmp = new TemporaryResources();
TikaInputSource inputSource = new CachingSource(
new BufferedInputStream(connection.getInputStream()), tmp, length, metadata);
return new TikaInputStream(inputSource, tmp, ext);
}
public static TikaInputStream getFromContainer(Object openContainer, long length, Metadata metadata)
throws IOException {
TikaInputStream tis = TikaInputStream.get(new byte[0], metadata);
tis.setOpenContainer(openContainer);
tis.setLength(length);
metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length));
return tis;
}
private static String getExtension(Metadata metadata) {
if (metadata == null) {
return StringUtils.EMPTY;
}
String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
return FilenameUtils.getSuffixFromPath(name);
}
// ========== InputStream Methods ==========
/**
* Skips up to {@code n} bytes. Returns the actual number of bytes skipped,
* which may be less than requested if the end of stream is reached.
* <p>
* This method does NOT throw {@link java.io.EOFException} if fewer bytes
* are available. Callers must check the return value to determine how many
* bytes were actually skipped.
*
* @param n the number of bytes to skip
* @return the actual number of bytes skipped (may be less than {@code n})
*/
@Override
public long skip(long n) throws IOException {
long skipped = IOUtils.skip(in, n);
position += skipped;
return skipped;
}
@Override
public void mark(int readlimit) {
super.mark(readlimit);
mark = position;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public void reset() throws IOException {
if (mark < 0) {
throw new IOException("Resetting to invalid mark");
}
// Delegate to underlying stream's reset (handles passthrough and caching modes)
super.reset();
position = mark;
// Don't invalidate mark - allow multiple reset() calls to same mark
consecutiveEOFs = 0;
}
@Override
public void close() throws IOException {
if (closeShieldDepth > 0) {
return;
}
mark = -1;
if (tmp != null) {
tmp.addResource(in);
tmp.close();
}
}
@Override
protected void afterRead(int n) throws IOException {
if (n != -1) {
position += n;
consecutiveEOFs = 0;
} else {
consecutiveEOFs++;
if (consecutiveEOFs > MAX_CONSECUTIVE_EOFS) {
throw new IOException("Read too many -1 (EOFs); there could be an infinite loop. " +
"If you think your file is not corrupt, please open an issue on Tika's JIRA");
}
}
}
// ========== TikaInputStream-specific Methods ==========
public int peek(byte[] buffer) throws IOException {
int n = 0;
mark(buffer.length);
int m = read(buffer);
while (m != -1) {
n += m;
if (n < buffer.length) {
m = read(buffer, n, buffer.length - n);
} else {
m = -1;
}
}
reset();
return n;
}
public Object getOpenContainer() {
return openContainer;
}
public void setOpenContainer(Object container) {
openContainer = container;
if (container instanceof Closeable) {
tmp.addResource((Closeable) container);
}
}
public void addCloseableResource(Closeable closeable) {
tmp.addResource(closeable);
}
public boolean hasFile() {
TikaInputSource source = inputSource();
return source != null && source.hasPath();
}
public Path getPath() throws IOException {
TikaInputSource source = inputSource();
if (source == null) {
throw new IOException("No TikaInputSource available");
}
return source.getPath(suffix);
}
public File getFile() throws IOException {
return getPath().toFile();
}
public FileChannel getFileChannel() throws IOException {
FileChannel channel = FileChannel.open(getPath());
tmp.addResource(channel);
return channel;
}
public boolean hasLength() {
if (overrideLength >= 0) {
return true;
}
TikaInputSource source = inputSource();
return source != null && source.getLength() != -1;
}
public long getLength() throws IOException {
if (overrideLength >= 0) {
return overrideLength;
}
TikaInputSource source = inputSource();
if (source == null) {
return -1;
}
long len = source.getLength();
if (len == -1) {
// Force spill to get length
getPath();
len = source.getLength();
}
return len;
}
public long getPosition() {
return position;
}
protected void setPosition(long position) {
this.position = position;
}
private void setLength(long length) {
this.overrideLength = length;
}
public void setCloseShield() {
this.closeShieldDepth++;
}
public void removeCloseShield() {
this.closeShieldDepth--;
}
public boolean isCloseShield() {
return closeShieldDepth > 0;
}
/**
* Rewind the stream to the beginning.
* <p>
* For streams created from byte arrays or files, this always works.
* For streams created from raw InputStreams, this requires
* {@link #enableRewind()} to have been called first.
*/
public void rewind() throws IOException {
TikaInputSource source = inputSource();
if (source != null) {
source.seekTo(0);
} else {
throw new IOException("Cannot rewind: no TikaInputSource available");
}
position = 0;
mark = -1;
consecutiveEOFs = 0;
}
/**
* Enables full rewind capability for this stream.
* <p>
* For streams backed by byte arrays or files, this is a no-op since they
* are inherently rewindable. For streams backed by raw InputStreams, this
* switches from passthrough mode to caching mode, enabling subsequent
* {@link #rewind()}, {@link #mark(int)}/{@link #reset()}, and random access.
* <p>
* Must be called when position is 0 (before any reading), otherwise
* throws IllegalStateException.
* <p>
* Use this method when you know you'll need to rewind the stream later
* (e.g., for detection followed by parsing, or digest calculation).
* For streaming-only operations (e.g., HTML parsing), skip this call
* to avoid unnecessary caching overhead.
*
* @throws IllegalStateException if position is not 0
*/
public void enableRewind() {
TikaInputSource source = inputSource();
if (source != null) {
source.enableRewind();
}
}
@Override
public String toString() {
String str = "TikaInputStream of ";
if (hasFile()) {
try {
str += getPath().toString();
} catch (IOException e) {
str += "unknown path";
}
} else {
str += in.toString();
}
if (openContainer != null) {
str += " (in " + openContainer + ")";
}
return str;
}
}