AsyncProcessor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core.async;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.config.loader.TikaLoader;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.filter.MetadataFilter;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
import org.apache.tika.pipes.api.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.api.pipesiterator.TotalCounter;
import org.apache.tika.pipes.api.reporter.PipesReporter;
import org.apache.tika.pipes.core.PerClientServerManager;
import org.apache.tika.pipes.core.PipesClient;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.PipesException;
import org.apache.tika.pipes.core.PipesResults;
import org.apache.tika.pipes.core.ServerManager;
import org.apache.tika.pipes.core.SharedServerManager;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.pipes.core.reporter.ReporterManager;
import org.apache.tika.plugins.TikaPluginManager;

/**
 * This is the main class for handling async requests. This manages
 * AsyncClients and AsyncEmitters.
 *
 */
public class AsyncProcessor implements Closeable {

    static final int PARSER_FUTURE_CODE = 1;
    static final int WATCHER_FUTURE_CODE = 3;

    private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);

    private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
    private final ArrayBlockingQueue<EmitDataPair> emitDatumTuples;
    private final ExecutorCompletionService<Integer> executorCompletionService;
    private final ExecutorService executorService;
    private final PipesConfig asyncConfig;
    private final Path tikaConfigPath;
    private final PipesReporter pipesReporter;
    private final List<ServerManager> serverManagers = new ArrayList<>();
    private final AtomicLong totalProcessed = new AtomicLong(0);
    private final AtomicBoolean applicationErrorOccurred = new AtomicBoolean(false);
    private static long MAX_OFFER_WAIT_MS = 120000;
    private volatile int numParserThreadsFinished = 0;
    private volatile int numEmitterThreadsFinished = 0;
    private boolean addedEmitterSemaphores = false;
    boolean isShuttingDown = false;

    /**
     * Loads an AsyncProcessor from a configuration file path.
     * <p>
     * This method pre-extracts plugins before loading, ensuring child processes
     * don't race to extract the same plugins.
     *
     * @param tikaConfigPath path to the tika-config.json file
     * @return a new AsyncProcessor instance
     * @throws IOException if reading config or plugin extraction fails
     * @throws TikaException if configuration is invalid
     */
    public static AsyncProcessor load(Path tikaConfigPath) throws TikaException, IOException {
        return load(tikaConfigPath, null);
    }

    /**
     * Loads an AsyncProcessor from a configuration file path with a custom PipesIterator.
     * <p>
     * This method pre-extracts plugins before loading, ensuring child processes
     * don't race to extract the same plugins.
     *
     * @param tikaConfigPath path to the tika-config.json file
     * @param pipesIterator optional custom pipes iterator (may be null)
     * @return a new AsyncProcessor instance
     * @throws IOException if reading config or plugin extraction fails
     * @throws TikaException if configuration is invalid
     */
    public static AsyncProcessor load(Path tikaConfigPath, PipesIterator pipesIterator)
            throws TikaException, IOException {
        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
        TikaPluginManager.preExtractPlugins(tikaJsonConfig);
        return new AsyncProcessor(tikaConfigPath, pipesIterator, tikaJsonConfig);
    }

    private AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator,
            TikaJsonConfig tikaJsonConfig) throws TikaException, IOException {
        TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig);
        MetadataFilter metadataFilter = TikaLoader.load(tikaConfigPath).loadMetadataFilters();
        this.asyncConfig = PipesConfig.load(tikaJsonConfig);
        this.tikaConfigPath = tikaConfigPath;
        this.pipesReporter = ReporterManager.load(tikaPluginManager, tikaJsonConfig);
        LOG.debug("loaded reporter {}", pipesReporter.getClass());
        this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
        this.emitDatumTuples = new ArrayBlockingQueue<>(100);
        //+1 is the watcher thread
        this.executorService = Executors.newFixedThreadPool(
                asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1);
        this.executorCompletionService =
                new ExecutorCompletionService<>(executorService);
        try {
            this.executorCompletionService.submit(() -> {
                while (true) {
                    try {
                        Thread.sleep(500);
                        checkActive();
                    } catch (InterruptedException e) {
                        return WATCHER_FUTURE_CODE;
                    }
                }
            });
            //this is run in a daemon thread
            if (pipesIterator != null && (pipesIterator instanceof TotalCounter)) {
                LOG.debug("going to total counts");
                startCounter((TotalCounter) pipesIterator);
            }

            // Create ServerManagers based on shared mode config
            boolean isSharedMode = asyncConfig.isUseSharedServer();
            if (isSharedMode) {
                LOG.info("Using shared server mode with {} workers", asyncConfig.getNumClients());
                SharedServerManager sharedManager = new SharedServerManager(
                        asyncConfig, tikaConfigPath, asyncConfig.getNumClients());
                serverManagers.add(sharedManager);

                for (int i = 0; i < asyncConfig.getNumClients(); i++) {
                    executorCompletionService.submit(
                            new FetchEmitWorker(asyncConfig, sharedManager,
                                    fetchEmitTuples, emitDatumTuples, applicationErrorOccurred));
                }
            } else {
                LOG.info("Using per-client server mode with {} workers", asyncConfig.getNumClients());
                for (int i = 0; i < asyncConfig.getNumClients(); i++) {
                    PerClientServerManager serverManager = new PerClientServerManager(
                            asyncConfig, tikaConfigPath, i);
                    serverManagers.add(serverManager);

                    executorCompletionService.submit(
                            new FetchEmitWorker(asyncConfig, serverManager,
                                    fetchEmitTuples, emitDatumTuples, applicationErrorOccurred));
                }
            }

            EmitterManager emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig);
            for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
                executorCompletionService.submit(
                        new AsyncEmitter(asyncConfig, emitDatumTuples, emitterManager));
            }
        } catch (Exception e) {
            LOG.error("problem initializing AsyncProcessor", e);
            executorService.shutdownNow();
            closeServerManagers();
            this.pipesReporter.error(e);
            throw e;
        }
    }

    private void startCounter(TotalCounter totalCounter) {
        Thread counterThread = new Thread(() -> {
            totalCounter.startTotalCount();
            TotalCountResult.STATUS status = totalCounter.getTotalCount().getStatus();
            while (status == TotalCountResult.STATUS.NOT_COMPLETED) {
                try {
                    Thread.sleep(500);
                    TotalCountResult result = totalCounter.getTotalCount();
                    LOG.trace("counter total  {} {} ", result.getStatus(), result.getTotalCount());
                    pipesReporter.report(result);
                    status = result.getStatus();
                } catch (InterruptedException e) {
                    return;
                }
            }

        });
        counterThread.setDaemon(true);
        counterThread.start();
    }

    public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, long offerMs)
            throws PipesException, InterruptedException {
        if (isShuttingDown) {
            throw new IllegalStateException(
                    "Can't call offer after calling close() or " + "shutdownNow()");
        }
        if (applicationErrorOccurred.get()) {
            throw new PipesException("Can't call offer after an application error occurred");
        }
        if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
            throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
                    asyncConfig.getQueueSize());
        }
        long start = System.currentTimeMillis();
        long elapsed = System.currentTimeMillis() - start;
        while (elapsed < offerMs) {
            if (fetchEmitTuples.remainingCapacity() > newFetchEmitTuples.size()) {
                try {
                    fetchEmitTuples.addAll(newFetchEmitTuples);
                    return true;
                } catch (IllegalStateException e) {
                    //this means that the add all failed because the queue couldn't
                    //take the full list
                    LOG.debug("couldn't add full list", e);
                }
            }
            Thread.sleep(100);
            elapsed = System.currentTimeMillis() - start;
        }
        return false;
    }

    public int getCapacity() {
        return fetchEmitTuples.remainingCapacity();
    }

    public boolean offer(FetchEmitTuple t, long offerMs)
            throws PipesException, InterruptedException {
        if (fetchEmitTuples == null) {
            throw new IllegalStateException("queue hasn't been initialized yet.");
        }
        long deadline = System.currentTimeMillis() + offerMs;
        while (System.currentTimeMillis() < deadline) {
            synchronized (this) {
                if (isShuttingDown) {
                    throw new IllegalStateException(
                            "Can't call offer after calling close() or shutdownNow()");
                }
                if (applicationErrorOccurred.get()) {
                    throw new PipesException(
                            "Can't call offer after an application error occurred");
                }
                checkActive();
            }
            // Try a short offer outside the synchronized block so checkActive()
            // can still be called by other threads (e.g. the watcher).
            long remaining = deadline - System.currentTimeMillis();
            long pollMs = Math.min(remaining, 1000);
            if (pollMs <= 0) {
                return false;
            }
            if (fetchEmitTuples.offer(t, pollMs, TimeUnit.MILLISECONDS)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Returns true if an application error has occurred during processing.
     * When this returns true, all workers have stopped or are stopping,
     * and no new tuples can be offered.
     *
     * @return true if an application error occurred
     */
    public boolean hasApplicationError() {
        return applicationErrorOccurred.get();
    }

    public void finished() throws InterruptedException {
        for (int i = 0; i < asyncConfig.getNumClients(); i++) {
            boolean offered = fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE,
                    MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS);
            if (! offered) {
                throw new RuntimeException("Couldn't offer completed semaphore within " +
                        MAX_OFFER_WAIT_MS + " ms");
            }
        }
    }

    public synchronized boolean checkActive() throws InterruptedException {

        Future<Integer> future = executorCompletionService.poll();
        if (future != null) {
            try {
                Integer i = future.get();
                switch (i) {
                    case PARSER_FUTURE_CODE :
                        numParserThreadsFinished++;
                        LOG.debug("fetchEmitWorker finished, total {}", numParserThreadsFinished);
                        break;
                    case AsyncEmitter.EMITTER_FUTURE_CODE :
                        numEmitterThreadsFinished++;
                        LOG.debug("emitter thread finished, total {}", numEmitterThreadsFinished);
                        break;
                    case WATCHER_FUTURE_CODE :
                        LOG.debug("watcher thread finished");
                        break;
                    default :
                        throw new IllegalArgumentException("Don't recognize this future code: " + i);
                }
            } catch (ExecutionException e) {
                LOG.error("execution exception", e);
                this.pipesReporter.error(e);
                throw new RuntimeException(e);
            }
        }
        if (numParserThreadsFinished == asyncConfig.getNumClients() && ! addedEmitterSemaphores) {
            for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
                try {
                    boolean offered = emitDatumTuples.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE,
                            MAX_OFFER_WAIT_MS,
                            TimeUnit.MILLISECONDS);
                    if (! offered) {
                        throw new RuntimeException("Couldn't offer emit data stop semaphore " +
                                "within " + MAX_OFFER_WAIT_MS + " ms");
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            addedEmitterSemaphores = true;
        }
        return !(numParserThreadsFinished == asyncConfig.getNumClients() &&
                numEmitterThreadsFinished == asyncConfig.getNumEmitters());
    }

    @Override
    public void close() throws IOException {
        executorService.shutdownNow();
        closeServerManagers();
        this.pipesReporter.close();
    }

    private void closeServerManagers() {
        for (ServerManager manager : serverManagers) {
            try {
                manager.close();
            } catch (IOException e) {
                LOG.warn("Error closing server manager", e);
            }
        }
    }

    public long getTotalProcessed() {
        return totalProcessed.get();
    }

    private class FetchEmitWorker implements Callable<Integer> {

        private final PipesConfig asyncConfig;
        private final ServerManager serverManager;
        private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
        private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue;
        private final AtomicBoolean applicationErrorOccurred;

        private FetchEmitWorker(PipesConfig asyncConfig,
                                ServerManager serverManager,
                                ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples,
                                ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue,
                                AtomicBoolean applicationErrorOccurred) {
            this.asyncConfig = asyncConfig;
            this.serverManager = serverManager;
            this.fetchEmitTuples = fetchEmitTuples;
            this.emitDataTupleQueue = emitDataTupleQueue;
            this.applicationErrorOccurred = applicationErrorOccurred;
        }

        @Override
        public Integer call() throws Exception {

            try (PipesClient pipesClient = new PipesClient(asyncConfig, serverManager)) {
                while (true) {
                    // Check if another worker encountered an application error
                    if (applicationErrorOccurred.get()) {
                        LOG.info("pipesClientId={}: stopping due to application error in another worker",
                                pipesClient.getPipesClientId());
                        return PARSER_FUTURE_CODE;
                    }
                    FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
                    if (t == null) {
                        //skip
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("null fetch emit tuple");
                        }
                    } else if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("hit completed semaphore");
                        }
                        return PARSER_FUTURE_CODE;
                    } else {
                        PipesResult result = null;
                        long start = System.currentTimeMillis();
                        try {
                            result = pipesClient.process(t);
                            //TODO -- drop this back to debug or even trace once we have stability in ci
                            LOG.info("pipesClientId={}, status={}", pipesClient.getPipesClientId(), result.status());
                        } catch (IOException e) {
                            LOG.warn("pipesClientId={} crash", pipesClient.getPipesClientId(), e);
                            result = PipesResults.UNSPECIFIED_CRASH;
                        }
                        // Check if we should stop processing based on the result
                        if (shouldStopProcessing(result)) {
                            LOG.error("pipesClientId={}: {} ({}), stopping all processing",
                                    pipesClient.getPipesClientId(),
                                    describeStopReason(result),
                                    result.status());
                            applicationErrorOccurred.set(true);
                            pipesReporter.report(t, result, System.currentTimeMillis() - start);
                            throw new PipesException(describeStopReason(result) + ": " +
                                    result.status() +
                                    (result.message() != null ? " - " + result.message() : ""));
                        }
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("timer -- pipes client process: {} ms",
                                    System.currentTimeMillis() - start);
                        }
                        long offerStart = System.currentTimeMillis();

                        if (shouldEmit(result)) {
                            LOG.trace("adding result to emitter queue: " + result.emitData());
                            boolean offered = emitDataTupleQueue.offer(
                                    new EmitDataPair(t.getEmitKey().getEmitterId(), result.emitData()), MAX_OFFER_WAIT_MS,
                                    TimeUnit.MILLISECONDS);
                            if (! offered) {
                                throw new RuntimeException("Couldn't offer emit data to queue " +
                                        "within " + MAX_OFFER_WAIT_MS + " ms");
                            }
                        }
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("timer -- offered: {} ms",
                                    System.currentTimeMillis() - offerStart);
                        }
                        long elapsed = System.currentTimeMillis() - start;
                        pipesReporter.report(t, result, elapsed);
                        totalProcessed.incrementAndGet();
                    }
                }
            }
        }

        private boolean shouldEmit(PipesResult result) {

            if (result.status() == PipesResult.RESULT_STATUS.PARSE_SUCCESS ||
                    result.status() == PipesResult.RESULT_STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
                return true;
            }
            // Emit intermediate results on any non-success if configured
            return asyncConfig.isEmitIntermediateResults() && !result.isSuccess();
        }

        /**
         * Determines if processing should stop based on the result and configuration.
         * <p>
         * When stopOnlyOnFatal is true (server mode): only stop on fatal errors.
         * When stopOnlyOnFatal is false (CLI mode, default): also stop on initialization
         * failures and fetcher/emitter not found errors.
         */
        private boolean shouldStopProcessing(PipesResult result) {
            // Always stop on fatal errors
            if (result.isFatal()) {
                return true;
            }

            // In server mode, only fatal errors stop processing
            if (asyncConfig.isStopOnlyOnFatal()) {
                return false;
            }

            // In CLI mode, also stop on initialization failures and not-found errors
            if (result.isInitializationFailure()) {
                return true;
            }

            // Stop on fetcher/emitter not found in CLI mode
            PipesResult.RESULT_STATUS status = result.status();
            return status == PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND ||
                   status == PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND;
        }

        private String describeStopReason(PipesResult result) {
            if (result.isFatal()) {
                return "Fatal error";
            } else if (result.isInitializationFailure()) {
                return "Initialization failure";
            } else {
                return "Configuration error";
            }
        }
    }
}