CachingSource.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.io;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;

import org.apache.commons.io.IOUtils;

import org.apache.tika.metadata.Metadata;
import org.apache.tika.utils.StringUtils;

/**
 * Input source that wraps a raw InputStream with optional caching.
 * <p>
 * Starts in passthrough mode using {@link BufferedInputStream} for basic
 * mark/reset support. When {@link #enableRewind()} is called (at position 0),
 * switches to caching mode using {@link CachingInputStream} which enables
 * full rewind/seek capability.
 * <p>
 * If caching is not enabled, {@link #seekTo(long)} will fail for any position
 * other than the current position.
 */
class CachingSource extends InputStream implements TikaInputSource {

    private final TemporaryResources tmp;
    private final Metadata metadata;
    private long length;

    // Passthrough mode: just a BufferedInputStream
    private BufferedInputStream passthroughStream;
    private long passthroughPosition;

    // Caching mode: CachingInputStream for full rewind support
    private CachingInputStream cachingStream;

    // After spilling to file, we switch to file-backed mode
    private Path spilledPath;
    private InputStream fileStream;
    private long filePosition;  // Track position in file mode

    CachingSource(InputStream source, TemporaryResources tmp, long length, Metadata metadata) {
        this.tmp = tmp;
        this.length = length;
        this.metadata = metadata;
        // Start in passthrough mode
        this.passthroughStream = source instanceof BufferedInputStream
                ? (BufferedInputStream) source
                : new BufferedInputStream(source);
        this.passthroughPosition = 0;
    }

    @Override
    public int read() throws IOException {
        if (fileStream != null) {
            int b = fileStream.read();
            if (b != -1) {
                filePosition++;
            }
            return b;
        }
        if (cachingStream != null) {
            return cachingStream.read();
        }
        int b = passthroughStream.read();
        if (b != -1) {
            passthroughPosition++;
        }
        return b;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (fileStream != null) {
            int n = fileStream.read(b, off, len);
            if (n > 0) {
                filePosition += n;
            }
            return n;
        }
        if (cachingStream != null) {
            return cachingStream.read(b, off, len);
        }
        int n = passthroughStream.read(b, off, len);
        if (n > 0) {
            passthroughPosition += n;
        }
        return n;
    }

    @Override
    public long skip(long n) throws IOException {
        if (fileStream != null) {
            long skipped = IOUtils.skip(fileStream, n);
            filePosition += skipped;
            return skipped;
        }
        if (cachingStream != null) {
            return cachingStream.skip(n);
        }
        long skipped = IOUtils.skip(passthroughStream, n);
        passthroughPosition += skipped;
        return skipped;
    }

    @Override
    public int available() throws IOException {
        if (fileStream != null) {
            return fileStream.available();
        }
        if (cachingStream != null) {
            return cachingStream.available();
        }
        return passthroughStream.available();
    }

    // Track mark position across all modes
    private long markPosition = -1;

    @Override
    public synchronized void mark(int readlimit) {
        if (fileStream != null) {
            // File mode - track position for seekTo-based reset
            markPosition = filePosition;
            return;
        }
        if (cachingStream != null) {
            // Caching mode - track position for seekTo-based reset
            markPosition = cachingStream.getPosition();
            return;
        }
        // Passthrough mode - delegate to BufferedInputStream
        passthroughStream.mark(readlimit);
        markPosition = passthroughPosition;
    }

    @Override
    public synchronized void reset() throws IOException {
        if (markPosition < 0) {
            throw new IOException("Mark not set");
        }
        if (fileStream != null) {
            // File mode - use seekTo
            seekTo(markPosition);
            return;
        }
        if (cachingStream != null) {
            // Caching mode - use seekTo
            cachingStream.seekTo(markPosition);
            return;
        }
        // Passthrough mode - delegate to BufferedInputStream
        passthroughStream.reset();
        passthroughPosition = markPosition;
    }

    @Override
    public boolean markSupported() {
        return true;
    }

    @Override
    public void enableRewind() {
        // Already in caching or file mode - no-op
        if (cachingStream != null || fileStream != null) {
            return;
        }

        if (passthroughPosition != 0) {
            throw new IllegalStateException(
                    "Cannot enable rewind: position is " + passthroughPosition +
                            ", must be 0. Call enableRewind() before reading.");
        }

        // Switch to caching mode
        StreamCache cache = new StreamCache(tmp);
        cachingStream = new CachingInputStream(passthroughStream, cache);
        passthroughStream = null;
    }

    @Override
    public void seekTo(long position) throws IOException {
        if (fileStream != null) {
            // After spilling, we need to reopen the file and skip
            fileStream.close();
            fileStream = new BufferedInputStream(Files.newInputStream(spilledPath));
            if (position > 0) {
                IOUtils.skipFully(fileStream, position);
            }
            filePosition = position;
            return;
        }

        if (cachingStream != null) {
            cachingStream.seekTo(position);
            return;
        }

        // Passthrough mode - can only "seek" to current position
        if (position != passthroughPosition) {
            throw new IOException(
                    "Cannot seek in passthrough mode. Call enableRewind() first. " +
                            "Current position: " + passthroughPosition + ", requested: " + position);
        }
    }

    @Override
    public boolean hasPath() {
        return spilledPath != null;
    }

    @Override
    public Path getPath(String suffix) throws IOException {
        if (spilledPath == null) {
            // If still in passthrough mode, enable caching first
            if (cachingStream == null) {
                if (passthroughPosition != 0) {
                    throw new IOException(
                            "Cannot spill to file: position is " + passthroughPosition +
                                    ", must be 0. Call enableRewind() before reading if you need file access.");
                }
                enableRewind();
            }

            // Spill to file and switch to file-backed mode
            spilledPath = cachingStream.spillToFile(suffix);

            // Get current position before closing cache
            long currentPosition = cachingStream.getPosition();

            // Close only the cache, not the source stream (for archive support)
            cachingStream.closeCacheOnly();

            // Open file stream at current position
            fileStream = new BufferedInputStream(Files.newInputStream(spilledPath));
            if (currentPosition > 0) {
                IOUtils.skipFully(fileStream, currentPosition);
            }
            filePosition = currentPosition;

            // Update length from file size
            long fileSize = Files.size(spilledPath);
            if (length == -1 || fileSize > 0) {
                length = fileSize;
            }

            // Update metadata if not already set
            if (metadata != null &&
                    StringUtils.isBlank(metadata.get(Metadata.CONTENT_LENGTH))) {
                metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length));
            }

            cachingStream = null;
        }
        return spilledPath;
    }

    @Override
    public long getLength() {
        return length;
    }

    @Override
    public void close() throws IOException {
        if (fileStream != null) {
            fileStream.close();
        }
        if (cachingStream != null) {
            cachingStream.close();
        }
        if (passthroughStream != null) {
            passthroughStream.close();
        }
    }
}