PipesParser.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.plugins.TikaPluginManager;

public class PipesParser implements Closeable {

    private static final Logger LOG = LoggerFactory.getLogger(PipesParser.class);

    /**
     * Loads a PipesParser from a configuration file path.
     * <p>
     * This method:
     * <ol>
     *   <li>Loads the JSON configuration</li>
     *   <li>Pre-extracts plugins before spawning child processes</li>
     *   <li>Creates the PipesParser with the loaded configuration</li>
     * </ol>
     *
     * @param tikaConfigPath path to the tika-config.json file
     * @return a new PipesParser instance
     * @throws IOException if reading config or extraction fails
     * @throws TikaConfigException if configuration is invalid
     */
    public static PipesParser load(Path tikaConfigPath) throws IOException, TikaConfigException {
        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
        PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
        return load(tikaJsonConfig, pipesConfig, tikaConfigPath);
    }

    /**
     * Loads a PipesParser from pre-loaded configuration objects.
     * <p>
     * Use this method when you need to modify the PipesConfig before creating
     * the parser (e.g., to override emit strategy).
     *
     * @param tikaJsonConfig the pre-loaded JSON configuration
     * @param pipesConfig the pipes configuration (may be modified by caller)
     * @param tikaConfigPath path to the config file (passed to child processes)
     * @return a new PipesParser instance
     * @throws IOException if plugin extraction fails
     */
    public static PipesParser load(TikaJsonConfig tikaJsonConfig, PipesConfig pipesConfig,
            Path tikaConfigPath) throws IOException {
        TikaPluginManager.preExtractPlugins(tikaJsonConfig);
        return new PipesParser(pipesConfig, tikaConfigPath);
    }

    private final PipesConfig pipesConfig;
    private final Path tikaConfigPath;
    private final List<PipesClient> clients = new ArrayList<>();
    private final List<ServerManager> serverManagers = new ArrayList<>();
    private final ArrayBlockingQueue<PipesClient> clientQueue;
    private final boolean isSharedMode;

    private PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
        this.pipesConfig = pipesConfig;
        this.tikaConfigPath = tikaConfigPath;
        this.isSharedMode = pipesConfig.isUseSharedServer();
        this.clientQueue = new ArrayBlockingQueue<>(pipesConfig.getNumClients());

        if (isSharedMode) {
            // Shared mode: one ServerManager for all clients
            LOG.info("Using shared server mode with {} clients", pipesConfig.getNumClients());
            SharedServerManager sharedManager = new SharedServerManager(
                    pipesConfig, tikaConfigPath, pipesConfig.getNumClients());
            serverManagers.add(sharedManager);

            for (int i = 0; i < pipesConfig.getNumClients(); i++) {
                PipesClient client = new PipesClient(pipesConfig, sharedManager);
                clientQueue.offer(client);
                clients.add(client);
            }
        } else {
            // Per-client mode: each client has its own ServerManager
            LOG.info("Using per-client server mode with {} clients", pipesConfig.getNumClients());
            for (int i = 0; i < pipesConfig.getNumClients(); i++) {
                PerClientServerManager serverManager = new PerClientServerManager(
                        pipesConfig, tikaConfigPath, i);
                serverManagers.add(serverManager);

                PipesClient client = new PipesClient(pipesConfig, serverManager);
                clientQueue.offer(client);
                clients.add(client);
            }
        }
    }

    public PipesResult parse(FetchEmitTuple t) throws InterruptedException,
            PipesException, IOException {
        PipesClient client = null;
        try {
            client = clientQueue.poll(pipesConfig.getMaxWaitForClientMillis(),
                    TimeUnit.MILLISECONDS);
            if (client == null) {
                return PipesResults.CLIENT_UNAVAILABLE_WITHIN_MS;
            }
            return client.process(t);
        } finally {
            if (client != null) {
                clientQueue.offer(client);
            }
        }
    }

    @Override
    public void close() throws IOException {
        List<IOException> exceptions = new ArrayList<>();

        // First close all clients (closes their connections)
        for (PipesClient pipesClient : clients) {
            try {
                pipesClient.close();
            } catch (IOException e) {
                exceptions.add(e);
            }
        }

        // Then close all server managers (shuts down server processes)
        for (ServerManager serverManager : serverManagers) {
            try {
                serverManager.close();
            } catch (IOException e) {
                exceptions.add(e);
            }
        }

        if (!exceptions.isEmpty()) {
            throw exceptions.get(0);
        }
    }

    /**
     * Returns whether this parser is using shared server mode.
     *
     * @return true if using shared server mode
     */
    public boolean isSharedMode() {
        return isSharedMode;
    }

    /**
     * Returns the current server port. For testing purposes only.
     * In shared mode, returns the port of the shared server.
     * In per-client mode, returns the port of the first client's server.
     *
     * @return the current server port, or -1 if no server is running
     */
    public int getCurrentServerPort() {
        if (serverManagers.isEmpty()) {
            return -1;
        }
        return serverManagers.get(0).getPort();
    }
}