CachingInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.io;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
/**
* Package-private InputStream wrapper that caches all bytes read,
* allowing seeking back to any previously-read position.
* <p>
* Bytes are cached in a {@link StreamCache} which stores them in memory
* up to a threshold, then spills to a temporary file.
*/
class CachingInputStream extends InputStream {
private final InputStream source;
private final StreamCache cache;
private long position; // Current logical position in the stream
private boolean sourceExhausted;
CachingInputStream(InputStream source, StreamCache cache) {
this.source = source;
this.cache = cache;
this.position = 0;
this.sourceExhausted = false;
}
@Override
public int read() throws IOException {
if (position < cache.size()) {
// Reading from cache (replay mode)
int b = cache.readAt(position);
if (b != -1) {
position++;
}
return b;
}
if (sourceExhausted) {
return -1;
}
// Reading new byte from source
int b = source.read();
if (b == -1) {
sourceExhausted = true;
return -1;
}
cache.append(b);
position++;
return b;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
int totalRead = 0;
// First, read any available bytes from cache
long cacheSize = cache.size();
if (position < cacheSize) {
int availableInCache = (int) Math.min(len, cacheSize - position);
int n = cache.readAt(position, b, off, availableInCache);
if (n > 0) {
position += n;
totalRead += n;
off += n;
len -= n;
}
}
// If we need more bytes and source isn't exhausted, read from source
if (len > 0 && !sourceExhausted) {
int n = source.read(b, off, len);
if (n == -1) {
sourceExhausted = true;
} else if (n > 0) {
cache.append(b, off, n);
position += n;
totalRead += n;
}
}
return totalRead > 0 ? totalRead : -1;
}
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
// We need to actually read the bytes to cache them
long skipped = 0;
byte[] buffer = new byte[4096];
while (skipped < n) {
int toRead = (int) Math.min(buffer.length, n - skipped);
int read = read(buffer, 0, toRead);
if (read == -1) {
break;
}
skipped += read;
}
return skipped;
}
/**
* Seek to a specific position in the stream.
* Can only seek to positions that have already been read (cached).
*/
void seekTo(long newPosition) throws IOException {
if (newPosition < 0) {
throw new IOException("Cannot seek to negative position: " + newPosition);
}
if (newPosition > cache.size()) {
throw new IOException("Cannot seek past cached content. Position: " + newPosition + ", cached: " + cache.size());
}
this.position = newPosition;
}
/**
* Get the current position in the stream.
*/
long getPosition() {
return position;
}
/**
* Get the number of bytes currently cached.
*/
long getCachedSize() {
return cache.size();
}
/**
* Force all remaining content to be read and cached, then return the file path.
*/
Path spillToFile(String suffix) throws IOException {
return cache.toFile(source, suffix);
}
/**
* Check if the cache has spilled to a file.
*/
boolean isFileBacked() {
return cache.isFileBacked();
}
@Override
public void close() throws IOException {
source.close();
cache.close();
}
/**
* Close only the cache, not the underlying source stream.
* Used when TikaInputStream spills to file - the source stream
* (e.g., an archive stream) may need to remain open.
*/
void closeCacheOnly() throws IOException {
cache.close();
}
@Override
public int available() throws IOException {
// Return cached bytes available from current position
long cachedAvailable = cache.size() - position;
if (cachedAvailable > 0) {
return (int) Math.min(cachedAvailable, Integer.MAX_VALUE);
}
return source.available();
}
// Mark/reset support using seekTo
private long markPosition = -1;
@Override
public synchronized void mark(int readlimit) {
markPosition = position;
}
@Override
public synchronized void reset() throws IOException {
if (markPosition < 0) {
throw new IOException("Mark not set");
}
seekTo(markPosition);
}
@Override
public boolean markSupported() {
return true;
}
}