PipesForkParser.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.fork;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import org.apache.tika.config.EmbeddedLimits;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.core.EmitStrategy;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.PipesException;
import org.apache.tika.pipes.core.PipesParser;
import org.apache.tika.pipes.core.config.ConfigMerger;
import org.apache.tika.pipes.core.config.ConfigOverrides;
import org.apache.tika.sax.ContentHandlerFactory;

/**
 * A ForkParser implementation backed by {@link PipesParser}.
 * <p>
 * <strong>This class is intended to replace the legacy
 * {@code org.apache.tika.fork.ForkParser}.</strong> The legacy ForkParser streamed
 * SAX events between processes, which was complex and error-prone. This implementation
 * uses the modern pipes infrastructure and returns parsed content in the metadata
 * (via {@link org.apache.tika.metadata.TikaCoreProperties#TIKA_CONTENT}).
 * <p>
 * This parser runs parsing in forked JVM processes, providing isolation from
 * crashes, memory leaks, and other issues that can occur during parsing.
 * Multiple forked processes can be used for concurrent parsing.
 * <p>
 * <strong>Getting Started:</strong> This class is designed as a simple entry point
 * to help users get started with forked parsing using files on the local filesystem.
 * Under the hood, it uses a {@code FileSystemFetcher} to read files. For more advanced
 * use cases, the Tika Pipes infrastructure supports many other sources and destinations
 * through plugins:
 * <ul>
 *   <li><strong>Fetchers</strong> (read from): S3, Azure Blob, Google Cloud Storage,
 *       HTTP, Microsoft Graph, and more</li>
 *   <li><strong>Emitters</strong> (write to): OpenSearch, Solr, S3, filesystem, and more</li>
 *   <li><strong>Pipes Iterators</strong> (batch processing): JDBC, CSV, filesystem crawling,
 *       and more</li>
 * </ul>
 * See the {@code tika-pipes} module and its submodules for available plugins. For
 * production batch processing, consider using {@code AsyncProcessor} or the
 * {@code tika-pipes-cli} directly with a JSON configuration file.
 * <p>
 * <strong>Thread Safety:</strong> This class is thread-safe. Multiple threads can
 * call {@link #parse} concurrently, and requests will be distributed across the
 * pool of forked processes.
 * <p>
 * <strong>Error Handling:</strong>
 * <ul>
 *   <li>Application errors (initialization failures, config errors) throw
 *       {@link PipesForkParserException}</li>
 *   <li>Process crashes (OOM, timeout) are returned in the result - the next
 *       parse will automatically restart the forked process</li>
 *   <li>Per-document errors (fetch/parse exceptions) are returned in the result</li>
 * </ul>
 * <p>
 * Example usage:
 * <pre>
 * PipesForkParserConfig config = new PipesForkParserConfig();
 * config.setHandlerType(HANDLER_TYPE.TEXT);
 * config.setParseMode(ParseMode.RMETA);
 *
 * try (PipesForkParser parser = new PipesForkParser(config)) {
 *     // Parse a file by Path
 *     Path file = Paths.get("/path/to/file.pdf");
 *     PipesForkResult result = parser.parse(file);
 *     for (Metadata m : result.getMetadataList()) {
 *         String content = m.get(TikaCoreProperties.TIKA_CONTENT);
 *         // process content and metadata
 *     }
 *
 *     // Or parse from an InputStream (will be spooled to temp file)
 *     try (TikaInputStream tis = TikaInputStream.get(inputStream)) {
 *         result = parser.parse(tis);
 *         // ...
 *     }
 * }
 * </pre>
 *
 * @see org.apache.tika.pipes.core.async.AsyncProcessor for batch processing
 */
public class PipesForkParser implements Closeable {

    public static final String DEFAULT_FETCHER_NAME = "fs";

    private final PipesForkParserConfig config;
    private final PipesParser pipesParser;
    private final Path tikaConfigPath;
    private final String internalFetcherId;

    /**
     * Creates a new PipesForkParser with default configuration.
     *
     * @throws IOException if the temporary config file cannot be created
     * @throws TikaConfigException if configuration is invalid
     */
    public PipesForkParser() throws IOException, TikaConfigException {
        this(new PipesForkParserConfig());
    }

    /**
     * Creates a new PipesForkParser with the specified configuration.
     *
     * @param config the configuration for this parser
     * @throws IOException if the temporary config file cannot be created
     * @throws TikaConfigException if configuration is invalid
     */
    public PipesForkParser(PipesForkParserConfig config) throws IOException, TikaConfigException {
        this.config = config;
        ConfigMerger.MergeResult mergeResult = createTikaConfigFile();
        this.tikaConfigPath = mergeResult.configPath();
        this.internalFetcherId = mergeResult.fetcherId();
        this.pipesParser = PipesParser.load(tikaConfigPath);
    }

    /**
     * Parse a file in a forked JVM process.
     *
     * @param path the path to the file to parse
     * @return the parse result containing metadata and content
     * @throws IOException if an I/O error occurs
     * @throws InterruptedException if the parsing is interrupted
     * @throws PipesException if a pipes infrastructure error occurs
     * @throws PipesForkParserException if an application error occurs (initialization
     *         failure or configuration error)
     */
    public PipesForkResult parse(Path path)
            throws IOException, InterruptedException, PipesException, TikaException {
        return parse(path, new Metadata(), new ParseContext());
    }

    /**
     * Parse a file in a forked JVM process with the specified metadata.
     *
     * @param path the path to the file to parse
     * @param metadata initial metadata (e.g., content type hint)
     * @return the parse result containing metadata and content
     * @throws IOException if an I/O error occurs
     * @throws InterruptedException if the parsing is interrupted
     * @throws PipesException if a pipes infrastructure error occurs
     * @throws PipesForkParserException if an application error occurs (initialization
     *         failure or configuration error)
     */
    public PipesForkResult parse(Path path, Metadata metadata)
            throws IOException, InterruptedException, PipesException, TikaException {
        return parse(path, metadata, new ParseContext());
    }

    /**
     * Parse a file in a forked JVM process with the specified metadata and parse context.
     *
     * @param path the path to the file to parse
     * @param metadata initial metadata (e.g., content type hint)
     * @param parseContext the parse context
     * @return the parse result containing metadata and content
     * @throws IOException if an I/O error occurs
     * @throws InterruptedException if the parsing is interrupted
     * @throws PipesException if a pipes infrastructure error occurs
     * @throws PipesForkParserException if an application error occurs (initialization
     *         failure or configuration error)
     */
    public PipesForkResult parse(Path path, Metadata metadata, ParseContext parseContext)
            throws IOException, InterruptedException, PipesException, TikaException {
        return parseInternal(path, metadata, parseContext);
    }

    /**
     * Parse a file in a forked JVM process.
     *
     * @param tis the TikaInputStream to parse. If the stream doesn't have an underlying
     *            file, it will be spooled to a temporary file. The caller must keep
     *            the TikaInputStream open until this method returns.
     * @return the parse result containing metadata and content
     * @throws IOException if an I/O error occurs
     * @throws InterruptedException if the parsing is interrupted
     * @throws PipesException if a pipes infrastructure error occurs
     * @throws PipesForkParserException if an application error occurs (initialization
     *         failure or configuration error)
     */
    public PipesForkResult parse(TikaInputStream tis)
            throws IOException, InterruptedException, PipesException, TikaException {
        return parse(tis, new Metadata(), new ParseContext());
    }

    /**
     * Parse a file in a forked JVM process with the specified metadata.
     *
     * @param tis the TikaInputStream to parse. If the stream doesn't have an underlying
     *            file, it will be spooled to a temporary file. The caller must keep
     *            the TikaInputStream open until this method returns.
     * @param metadata initial metadata (e.g., content type hint)
     * @return the parse result containing metadata and content
     * @throws IOException if an I/O error occurs
     * @throws InterruptedException if the parsing is interrupted
     * @throws PipesException if a pipes infrastructure error occurs
     * @throws PipesForkParserException if an application error occurs (initialization
     *         failure or configuration error)
     */
    public PipesForkResult parse(TikaInputStream tis, Metadata metadata)
            throws IOException, InterruptedException, PipesException, TikaException {
        return parse(tis, metadata, new ParseContext());
    }

    /**
     * Parse a file in a forked JVM process with the specified metadata and parse context.
     *
     * @param tis the TikaInputStream to parse. If the stream doesn't have an underlying
     *            file, it will be spooled to a temporary file. The caller must keep
     *            the TikaInputStream open until this method returns.
     * @param metadata initial metadata (e.g., content type hint)
     * @param parseContext the parse context
     * @return the parse result containing metadata and content
     * @throws IOException if an I/O error occurs
     * @throws InterruptedException if the parsing is interrupted
     * @throws PipesException if a pipes infrastructure error occurs
     * @throws PipesForkParserException if an application error occurs (initialization
     *         failure or configuration error)
     */
    public PipesForkResult parse(TikaInputStream tis, Metadata metadata, ParseContext parseContext)
            throws IOException, InterruptedException, PipesException, TikaException {
        // Get the path - this will spool to a temp file if the stream doesn't have
        // an underlying file. The temp file is managed by TikaInputStream and will
        // be cleaned up when the TikaInputStream is closed.
        return parseInternal(tis.getPath(), metadata, parseContext);
    }

    /**
     * Internal parse implementation that takes a Path directly.
     */
    private PipesForkResult parseInternal(Path path, Metadata metadata, ParseContext parseContext)
            throws IOException, InterruptedException, PipesException, TikaException {
        String absolutePath = path.toAbsolutePath().toString();
        String id = absolutePath;

        // Use the internal fetcher ID generated by ConfigMerger (UUID-based)
        FetchKey fetchKey = new FetchKey(internalFetcherId, absolutePath);
        EmitKey emitKey = new EmitKey("", id); // Empty emitter name since we're using PASSBACK_ALL

        // Add content handler factory and parse mode to parse context
        parseContext.set(ContentHandlerFactory.class, config.getContentHandlerFactory());
        parseContext.set(ParseMode.class, config.getParseMode());

        // Add embedded limits if configured
        if (config.getEmbeddedLimits() != null) {
            parseContext.set(EmbeddedLimits.class, config.getEmbeddedLimits());
        }

        FetchEmitTuple tuple = new FetchEmitTuple(id, fetchKey, emitKey, metadata, parseContext);

        PipesResult result = pipesParser.parse(tuple);

        // Check for application errors and throw if necessary
        // Process crashes are NOT thrown - the next parse will restart the process
        checkForApplicationError(result);

        return new PipesForkResult(result);
    }

    /**
     * Checks if the result represents an application error and throws an exception if so.
     * <p>
     * Application errors that cause exceptions:
     * <ul>
     *   <li>Initialization failures (parser, fetcher, or emitter)</li>
     *   <li>Configuration errors (fetcher or emitter not found)</li>
     *   <li>Client unavailable within timeout</li>
     * </ul>
     * <p>
     * Process crashes (OOM, timeout, unspecified crash) are NOT thrown as exceptions.
     * The forked process will be automatically restarted on the next parse call.
     * Check {@link PipesForkResult#isProcessCrash()} to detect these cases.
     * <p>
     * Per-document errors (fetch exception, parse exception) are also NOT thrown.
     * These are returned in the result so the caller can handle them appropriately
     * (e.g., log and continue with the next file).
     *
     * @param result the pipes result to check
     * @throws PipesForkParserException if the result represents an application error
     */
    private void checkForApplicationError(PipesResult result) throws PipesForkParserException {
        PipesResult.RESULT_STATUS status = result.status();

        // Only throw for application errors that indicate infrastructure/config problems
        // Process crashes and per-document errors are returned to the caller
        switch (status) {
            case FAILED_TO_INITIALIZE:
                throw new PipesForkParserException(status,
                        "Failed to initialize parser" +
                        (result.message() != null ? ": " + result.message() : ""));

            case FETCHER_INITIALIZATION_EXCEPTION:
                throw new PipesForkParserException(status,
                        "Failed to initialize fetcher" +
                        (result.message() != null ? ": " + result.message() : ""));

            case EMITTER_INITIALIZATION_EXCEPTION:
                throw new PipesForkParserException(status,
                        "Failed to initialize emitter" +
                        (result.message() != null ? ": " + result.message() : ""));

            case FETCHER_NOT_FOUND:
                throw new PipesForkParserException(status,
                        "Fetcher not found" +
                        (result.message() != null ? ": " + result.message() : ""));

            case EMITTER_NOT_FOUND:
                throw new PipesForkParserException(status,
                        "Emitter not found" +
                        (result.message() != null ? ": " + result.message() : ""));

            case CLIENT_UNAVAILABLE_WITHIN_MS:
                throw new PipesForkParserException(status,
                        "No client available within timeout" +
                        (result.message() != null ? ": " + result.message() : ""));

            default:
                // Process crashes (OOM, TIMEOUT, UNSPECIFIED_CRASH) - not thrown,
                // next parse will restart the process automatically
                //
                // Per-document errors (FETCH_EXCEPTION, PARSE_EXCEPTION_NO_EMIT, etc.) -
                // not thrown, caller can check result and decide how to handle
                //
                // Success states - obviously not thrown
                break;
        }
    }

    @Override
    public void close() throws IOException {
        pipesParser.close();
        // Clean up temp config file
        if (tikaConfigPath != null) {
            Files.deleteIfExists(tikaConfigPath);
        }
    }

    /**
     * Creates a temporary tika-config.json file for the forked process.
     * <p>
     * Uses ConfigMerger to:
     * - Add a FileSystemFetcher with UUID-based name for absolute path access
     * - Set PASSBACK_ALL emit strategy (no emitter, return results to client)
     * - Merge with user config if provided
     *
     * @return MergeResult containing the config path and generated fetcher ID
     */
    private ConfigMerger.MergeResult createTikaConfigFile() throws IOException {
        PipesConfig pc = config.getPipesConfig();

        // Build configuration overrides
        ConfigOverrides.Builder builder = ConfigOverrides.builder()
                // Add internal fetcher with UUID-based name to avoid conflicts
                // Use null ID to trigger UUID generation
                .addFetcher(null, "file-system-fetcher",
                        Map.of("allowAbsolutePaths", true))
                // Set pipes configuration
                .setPipesConfig(
                        pc.getNumClients(),
                        pc.getStartupTimeoutMillis(),
                        pc.getMaxFilesProcessedPerProcess(),
                        pc.getForkedJvmArgs())
                // Use PASSBACK_ALL strategy - results returned through socket
                .setEmitStrategy(EmitStrategy.PASSBACK_ALL);

        // Set timeout limits if configured
        if (config.getTimeoutLimits() != null) {
            builder.setTimeoutLimits(config.getTimeoutLimits());
        }

        // Set plugin roots if specified
        if (config.getPluginsDir() != null) {
            builder.setPluginRoots(config.getPluginsDir().toAbsolutePath().toString());
        }

        ConfigOverrides overrides = builder.build();

        // Merge with user config if provided, otherwise create new
        return ConfigMerger.mergeOrCreate(config.getUserConfigPath(), overrides);
    }
}