PipesConfig.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;

public class PipesConfig {


    public static final long DEFAULT_STARTUP_TIMEOUT_MILLIS = 240000;

    public static final long DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS = 300000;

    public static final int DEFAULT_NUM_CLIENTS = 4;

    public static final int DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS = 10000;

    public static final long DEFAULT_MAX_WAIT_FOR_CLIENT_MS = 60000;

    public static final long DEFAULT_SOCKET_TIMEOUT_MS = 60000;

    public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;

    public static final boolean DEFAULT_USE_SHARED_SERVER = false;

    /**
     * The emit strategy configuration determines how the forked PipesServer handles emitting data.
     * See {@link EmitStrategyConfig} for details.
     */
    private EmitStrategyConfig emitStrategy = new EmitStrategyConfig(EmitStrategyConfig.DEFAULT_EMIT_STRATEGY);

    /**
     * When true, multiple PipesClients connect to a single shared PipesServer process
     * instead of each client spawning its own server. This reduces memory overhead
     * and startup time at the cost of reduced isolation - one crash affects all in-flight requests.
     */
    private boolean useSharedServer = DEFAULT_USE_SHARED_SERVER;

    private long socketTimeoutMs = DEFAULT_SOCKET_TIMEOUT_MS;
    private long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
    private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
    private long sleepOnStartupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;

    private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
    private int numClients = DEFAULT_NUM_CLIENTS;

    private long maxWaitForClientMillis = DEFAULT_MAX_WAIT_FOR_CLIENT_MS;
    private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;
    public static final int DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS = 600;
    private int staleFetcherTimeoutSeconds = DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS;
    public static final int DEFAULT_STALE_FETCHER_DELAY_SECONDS = 60;
    private int staleFetcherDelaySeconds = DEFAULT_STALE_FETCHER_DELAY_SECONDS;

    // Async-specific fields (used by AsyncProcessor, ignored by PipesServer)
    public static final long DEFAULT_EMIT_WITHIN_MILLIS = 10000;
    public static final long DEFAULT_EMIT_MAX_ESTIMATED_BYTES = 100000;
    public static final int DEFAULT_QUEUE_SIZE = 10000;
    public static final int DEFAULT_NUM_EMITTERS = 1;

    private long emitWithinMillis = DEFAULT_EMIT_WITHIN_MILLIS;
    private long emitMaxEstimatedBytes = DEFAULT_EMIT_MAX_ESTIMATED_BYTES;
    private int queueSize = DEFAULT_QUEUE_SIZE;
    private int numEmitters = DEFAULT_NUM_EMITTERS;
    private boolean emitIntermediateResults = false;
    /**
     * When true, only stop processing on fatal errors (FAILED_TO_INITIALIZE).
     * When false (default), also stop on initialization failures and not-found errors.
     * <p>
     * Use true for server mode (tika-server /pipes, /async) where different requests
     * may use different fetchers/emitters.
     * Use false (default) for CLI batch mode where all tasks typically use the same
     * fetcher/emitter configuration.
     */
    private boolean stopOnlyOnFatal = false;

    /**
     * Default parse mode for how embedded documents are handled.
     * Can be overridden per-file via ParseContext.
     */
    private ParseMode parseMode = ParseMode.RMETA;

    /**
     * Default behavior when a parse exception occurs.
     */
    private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;

    private ArrayList<String> forkedJvmArgs = new ArrayList<>();
    private String javaPath = "java";

    /**
     * Optional directory for temporary files during pipes-based parsing.
     * If not set, the system default temp directory will be used.
     * Consider using a RAM-backed filesystem (e.g., /dev/shm) for better performance.
     */
    private String tempDirectory = null;

    /**
     * Type of ConfigStore to use for distributed state management.
     * Options: "memory" (default), "ignite"
     */
    private String configStoreType = "memory";
    
    /**
     * JSON configuration parameters for the ConfigStore.
     * The structure depends on the configStoreType selected.
     */
    private String configStoreParams = "{}";

    /**
     * Loads PipesConfig from the "pipes" section of the JSON configuration.
     * <p>
     * This configuration is used by both PipesServer (forking process) and
     * AsyncProcessor (async processing). Some fields are specific to each:
     * <ul>
     *   <li>PipesServer uses: numClients, timeoutMillis, directEmitThresholdBytes, etc.</li>
     *   <li>AsyncProcessor uses: emitWithinMillis, queueSize, numEmitters, etc.</li>
     * </ul>
     * Unused fields in each context are simply ignored.
     *
     * @param tikaJsonConfig the JSON configuration to load from
     * @return the loaded PipesConfig, or a new default instance if not found in config
     * @throws IOException if deserialization fails
     * @throws TikaConfigException if configuration is invalid
     */
    public static PipesConfig load(TikaJsonConfig tikaJsonConfig) throws IOException, TikaConfigException {
        PipesConfig config = tikaJsonConfig.deserialize("pipes", PipesConfig.class);
        if (config == null) {
            config = new PipesConfig();
        }
        return config;
    }

    public long getSocketTimeoutMs() {
        return socketTimeoutMs;
    }

    /**
     * Socket timeout in milliseconds for reading from the forked process.
     * If no data is received within this time, the connection is considered timed out.
     * This is different from timeoutMillis which is the parse/processing timeout.
     * @param socketTimeoutMs
     */
    public void setSocketTimeoutMs(long socketTimeoutMs) {
        this.socketTimeoutMs = socketTimeoutMs;
    }

    public long getHeartbeatIntervalMs() {
        return heartbeatIntervalMs;
    }

    /**
     * Interval in milliseconds between heartbeat messages sent from server to client.
     * Should be significantly less than socketTimeoutMs to ensure the client doesn't timeout.
     * WARNING: Setting this >= socketTimeoutMs will cause socket timeouts during normal processing.
     * This only exists for testing. We encourage you never to use it.
     * @param heartbeatIntervalMs
     */
    public void setHeartbeatIntervalMs(long heartbeatIntervalMs) {
        this.heartbeatIntervalMs = heartbeatIntervalMs;
    }

    public long getShutdownClientAfterMillis() {
        return shutdownClientAfterMillis;
    }

    /**
     * If the client has been inactive after this many milliseconds,
     * shut it down.
     *
     * @param shutdownClientAfterMillis
     */
    public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
        this.shutdownClientAfterMillis = shutdownClientAfterMillis;
    }

    public int getNumClients() {
        return numClients;
    }

    public void setNumClients(int numClients) {
        this.numClients = numClients;
    }

    public void setForkedJvmArgs(ArrayList<String> jvmArgs) {
        this.forkedJvmArgs = jvmArgs;
    }
    //ArrayList to make jackson happy
    public ArrayList<String> getForkedJvmArgs() {
        return forkedJvmArgs;
    }

    public void setStartupTimeoutMillis(long startupTimeoutMillis) {
        this.startupTimeoutMillis = startupTimeoutMillis;
    }


    /**
     * Restart the forked PipesServer after it has processed this many files to avoid
     * slow-building memory leaks.
     * @return
     */
    public int getMaxFilesProcessedPerProcess() {
        return maxFilesProcessedPerProcess;
    }

    public void setMaxFilesProcessedPerProcess(int maxFilesProcessedPerProcess) {
        this.maxFilesProcessedPerProcess = maxFilesProcessedPerProcess;
    }

    public String getJavaPath() {
        return javaPath;
    }

    public void setJavaPath(String javaPath) {
        this.javaPath = javaPath;
    }

    public long getStartupTimeoutMillis() {
        return startupTimeoutMillis;
    }

    /**
     * Get the emit strategy configuration.
     *
     * @return the emit strategy configuration
     */
    public EmitStrategyConfig getEmitStrategy() {
        return emitStrategy;
    }

    /**
     * Set the emit strategy configuration.
     *
     * @param emitStrategy the emit strategy configuration
     */
    public void setEmitStrategy(EmitStrategyConfig emitStrategy) {
        this.emitStrategy = emitStrategy;
    }

    public long getSleepOnStartupTimeoutMillis() {
        return sleepOnStartupTimeoutMillis;
    }

    public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
        this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
    }

    public int getStaleFetcherTimeoutSeconds() {
        return staleFetcherTimeoutSeconds;
    }

    public void setStaleFetcherTimeoutSeconds(int staleFetcherTimeoutSeconds) {
        this.staleFetcherTimeoutSeconds = staleFetcherTimeoutSeconds;
    }

    public int getStaleFetcherDelaySeconds() {
        return staleFetcherDelaySeconds;
    }

    public void setStaleFetcherDelaySeconds(int staleFetcherDelaySeconds) {
        this.staleFetcherDelaySeconds = staleFetcherDelaySeconds;
    }

    public long getMaxWaitForClientMillis() {
        return maxWaitForClientMillis;
    }

    public void setMaxWaitForClientMillis(long maxWaitForClientMillis) {
        this.maxWaitForClientMillis = maxWaitForClientMillis;
    }

    // Async-specific getters/setters (used by AsyncProcessor, ignored by PipesServer)

    public long getEmitWithinMillis() {
        return emitWithinMillis;
    }

    /**
     * If nothing has been emitted in this amount of time
     * and the {@link #getEmitMaxEstimatedBytes()} has not been reached yet,
     * emit what's in the emit queue.
     *
     * @param emitWithinMillis time in milliseconds
     */
    public void setEmitWithinMillis(long emitWithinMillis) {
        this.emitWithinMillis = emitWithinMillis;
    }

    /**
     * When the emit queue hits this estimated size (sum of
     * estimated extract sizes), emit the batch.
     *
     * @return the maximum estimated bytes before emitting
     */
    public long getEmitMaxEstimatedBytes() {
        return emitMaxEstimatedBytes;
    }

    public void setEmitMaxEstimatedBytes(long emitMaxEstimatedBytes) {
        this.emitMaxEstimatedBytes = emitMaxEstimatedBytes;
    }

    /**
     * FetchEmitTuple queue size
     *
     * @return the queue size
     */
    public int getQueueSize() {
        return queueSize;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    /**
     * Number of emitters
     *
     * @return the number of emitters
     */
    public int getNumEmitters() {
        return numEmitters;
    }

    public void setNumEmitters(int numEmitters) {
        this.numEmitters = numEmitters;
    }

    public boolean isEmitIntermediateResults() {
        return emitIntermediateResults;
    }

    public void setEmitIntermediateResults(boolean emitIntermediateResults) {
        this.emitIntermediateResults = emitIntermediateResults;
    }

    /**
     * When true, only stop processing on fatal errors (FAILED_TO_INITIALIZE).
     * When false (default), also stop on initialization failures (FETCHER_INITIALIZATION_EXCEPTION,
     * EMITTER_INITIALIZATION_EXCEPTION, CLIENT_UNAVAILABLE_WITHIN_MS) and not-found errors
     * (FETCHER_NOT_FOUND, EMITTER_NOT_FOUND).
     * <p>
     * Use true for server mode (tika-server /pipes, /async) where different requests
     * may use different fetchers/emitters - a bad request shouldn't kill the server.
     * Use false (default) for CLI batch mode where all tasks typically use the same
     * fetcher/emitter configuration - no point continuing if configuration is wrong.
     *
     * @return true if only fatal errors should stop processing
     */
    public boolean isStopOnlyOnFatal() {
        return stopOnlyOnFatal;
    }

    public void setStopOnlyOnFatal(boolean stopOnlyOnFatal) {
        this.stopOnlyOnFatal = stopOnlyOnFatal;
    }

    /**
     * Gets the default parse mode for how embedded documents are handled.
     *
     * @return the default parse mode
     */
    public ParseMode getParseMode() {
        return parseMode;
    }

    /**
     * Sets the default parse mode for how embedded documents are handled.
     * This can be overridden per-file via ParseContext.
     *
     * @param parseMode the parse mode (RMETA or CONCATENATE)
     */
    public void setParseMode(ParseMode parseMode) {
        this.parseMode = parseMode;
    }

    /**
     * Sets the default parse mode from a string.
     *
     * @param parseMode the parse mode name (rmeta or concatenate)
     */
    public void setParseMode(String parseMode) {
        this.parseMode = ParseMode.parse(parseMode);
    }

    /**
     * Gets the default behavior when a parse exception occurs.
     *
     * @return the parse exception behavior
     */
    public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
        return onParseException;
    }

    /**
     * Sets the default behavior when a parse exception occurs.
     *
     * @param onParseException the parse exception behavior
     */
    public void setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) {
        this.onParseException = onParseException;
    }

    public String getConfigStoreType() {
        return configStoreType;
    }

    public void setConfigStoreType(String configStoreType) {
        this.configStoreType = configStoreType;
    }

    public String getConfigStoreParams() {
        return configStoreParams;
    }

    public void setConfigStoreParams(String configStoreParams) {
        this.configStoreParams = configStoreParams;
    }

    /**
     * Gets the directory for temporary files during pipes-based parsing.
     *
     * @return the temp directory path, or null to use system default
     */
    public String getTempDirectory() {
        return tempDirectory;
    }

    /**
     * Sets the directory for temporary files during pipes-based parsing.
     * If not set, the system default temp directory will be used.
     * Consider using a RAM-backed filesystem (e.g., /dev/shm or /tmpfs) for better performance.
     *
     * @param tempDirectory the temp directory path, or null to use system default
     */
    public void setTempDirectory(String tempDirectory) {
        this.tempDirectory = tempDirectory;
    }

    /**
     * Returns whether shared server mode is enabled.
     *
     * @return true if shared server mode is enabled
     * @see #setUseSharedServer(boolean)
     */
    public boolean isUseSharedServer() {
        return useSharedServer;
    }

    /**
     * Sets whether to use shared server mode.
     * <p>
     * When {@code true}, multiple PipesClients connect to a single shared PipesServer
     * process instead of each client having its own dedicated server. This reduces
     * memory overhead but sacrifices isolation: one crash affects all in-flight requests.
     * <p>
     * <b>Not recommended for production.</b> See the Tika Pipes documentation for
     * limitations and guidance.
     *
     * @param useSharedServer true to enable shared server mode, false for per-client mode (default)
     */
    public void setUseSharedServer(boolean useSharedServer) {
        this.useSharedServer = useSharedServer;
    }
}