StreamCache.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.io;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
/**
* Package-private cache that stores bytes in memory up to a threshold,
* then spills to a temporary file. Supports reading from any offset.
*/
class StreamCache implements Closeable {
private static final int DEFAULT_MEMORY_THRESHOLD = 1024 * 1024; // 1MB
private final int memoryThreshold;
private final TemporaryResources tmp;
// Memory storage (null after spill)
private byte[] memoryBuffer;
private int memorySize;
// File storage (null until spill)
private Path spillFile;
private OutputStream spillOutputStream;
private long totalSize;
private boolean closed;
StreamCache(TemporaryResources tmp) {
this(tmp, DEFAULT_MEMORY_THRESHOLD);
}
StreamCache(TemporaryResources tmp, int memoryThreshold) {
this.tmp = tmp;
this.memoryThreshold = memoryThreshold;
this.memoryBuffer = new byte[Math.min(memoryThreshold, 8192)];
this.memorySize = 0;
this.totalSize = 0;
}
/**
* Append a single byte to the cache.
*/
void append(int b) throws IOException {
if (closed) {
throw new IOException("StreamCache is closed");
}
if (memoryBuffer != null) {
// Still in memory mode
if (memorySize >= memoryThreshold) {
spillToFile();
spillOutputStream.write(b);
} else {
ensureMemoryCapacity(memorySize + 1);
memoryBuffer[memorySize++] = (byte) b;
}
} else {
// Already spilled to file
spillOutputStream.write(b);
}
totalSize++;
}
/**
* Append multiple bytes to the cache.
*/
void append(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("StreamCache is closed");
}
if (memoryBuffer != null) {
if (memorySize + len > memoryThreshold) {
spillToFile();
spillOutputStream.write(b, off, len);
} else {
ensureMemoryCapacity(memorySize + len);
System.arraycopy(b, off, memoryBuffer, memorySize, len);
memorySize += len;
}
} else {
spillOutputStream.write(b, off, len);
}
totalSize += len;
}
private void ensureMemoryCapacity(int needed) {
if (needed <= memoryBuffer.length) {
return;
}
int newSize = Math.min(memoryThreshold, Math.max(memoryBuffer.length * 2, needed));
byte[] newBuffer = new byte[newSize];
System.arraycopy(memoryBuffer, 0, newBuffer, 0, memorySize);
memoryBuffer = newBuffer;
}
private String suffix;
private void spillToFile() throws IOException {
if (spillFile != null) {
return; // Already spilled
}
spillFile = tmp.createTempFile(suffix);
spillOutputStream = new BufferedOutputStream(Files.newOutputStream(spillFile));
// Write existing memory content to file
if (memorySize > 0) {
spillOutputStream.write(memoryBuffer, 0, memorySize);
}
// Release memory buffer
memoryBuffer = null;
memorySize = 0;
}
/**
* Read a single byte at the given position.
*/
int readAt(long position) throws IOException {
if (position < 0 || position >= totalSize) {
return -1;
}
if (memoryBuffer != null) {
return memoryBuffer[(int) position] & 0xFF;
} else {
flushSpillStream();
try (RandomAccessFile raf = new RandomAccessFile(spillFile.toFile(), "r")) {
raf.seek(position);
return raf.read();
}
}
}
/**
* Read multiple bytes starting at the given position.
*/
int readAt(long position, byte[] b, int off, int len) throws IOException {
if (position < 0 || position >= totalSize) {
return -1;
}
int available = (int) Math.min(len, totalSize - position);
if (memoryBuffer != null) {
System.arraycopy(memoryBuffer, (int) position, b, off, available);
return available;
} else {
flushSpillStream();
try (RandomAccessFile raf = new RandomAccessFile(spillFile.toFile(), "r")) {
raf.seek(position);
return raf.read(b, off, available);
}
}
}
/**
* Get an InputStream that reads from the given offset.
*/
InputStream getInputStreamFrom(long offset) throws IOException {
return new CacheInputStream(offset);
}
private void flushSpillStream() throws IOException {
if (spillOutputStream != null) {
spillOutputStream.flush();
}
}
/**
* Force all content to a file and return the path.
* After this call, the cache is in file-backed mode.
*/
Path toFile() throws IOException {
if (spillFile == null) {
spillToFile();
}
flushSpillStream();
return spillFile;
}
/**
* Finish writing (drain remaining source bytes) and return the file path.
*/
Path toFile(InputStream remainingSource, String suffix) throws IOException {
this.suffix = suffix;
// Copy remaining bytes from source
byte[] buffer = new byte[8192];
int n;
while ((n = remainingSource.read(buffer)) != -1) {
append(buffer, 0, n);
}
return toFile();
}
/**
* Number of bytes currently cached.
*/
long size() {
return totalSize;
}
/**
* Whether the cache has spilled to a file.
*/
boolean isFileBacked() {
return spillFile != null;
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
memoryBuffer = null;
if (spillOutputStream != null) {
spillOutputStream.close();
spillOutputStream = null;
}
// spillFile cleanup is handled by TemporaryResources
}
/**
* Inner class for reading from the cache at a specific offset.
*/
private class CacheInputStream extends InputStream {
private long position;
CacheInputStream(long startOffset) {
this.position = startOffset;
}
@Override
public int read() throws IOException {
if (position >= totalSize) {
return -1;
}
int b = readAt(position);
if (b != -1) {
position++;
}
return b;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (position >= totalSize) {
return -1;
}
int n = StreamCache.this.readAt(position, b, off, len);
if (n > 0) {
position += n;
}
return n;
}
}
}