AsyncProcessor.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.core.async;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.config.loader.TikaLoader;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.filter.MetadataFilter;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
import org.apache.tika.pipes.api.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.api.pipesiterator.TotalCounter;
import org.apache.tika.pipes.api.reporter.PipesReporter;
import org.apache.tika.pipes.core.PerClientServerManager;
import org.apache.tika.pipes.core.PipesClient;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.PipesException;
import org.apache.tika.pipes.core.PipesResults;
import org.apache.tika.pipes.core.ServerManager;
import org.apache.tika.pipes.core.SharedServerManager;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.pipes.core.reporter.ReporterManager;
import org.apache.tika.plugins.TikaPluginManager;
/**
* This is the main class for handling async requests. This manages
* AsyncClients and AsyncEmitters.
*
*/
public class AsyncProcessor implements Closeable {
static final int PARSER_FUTURE_CODE = 1;
static final int WATCHER_FUTURE_CODE = 3;
private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitDataPair> emitDatumTuples;
private final ExecutorCompletionService<Integer> executorCompletionService;
private final ExecutorService executorService;
private final PipesConfig asyncConfig;
private final Path tikaConfigPath;
private final PipesReporter pipesReporter;
private final List<ServerManager> serverManagers = new ArrayList<>();
private final AtomicLong totalProcessed = new AtomicLong(0);
private final AtomicBoolean applicationErrorOccurred = new AtomicBoolean(false);
private static long MAX_OFFER_WAIT_MS = 120000;
private volatile int numParserThreadsFinished = 0;
private volatile int numEmitterThreadsFinished = 0;
private boolean addedEmitterSemaphores = false;
boolean isShuttingDown = false;
/**
* Loads an AsyncProcessor from a configuration file path.
* <p>
* This method pre-extracts plugins before loading, ensuring child processes
* don't race to extract the same plugins.
*
* @param tikaConfigPath path to the tika-config.json file
* @return a new AsyncProcessor instance
* @throws IOException if reading config or plugin extraction fails
* @throws TikaException if configuration is invalid
*/
public static AsyncProcessor load(Path tikaConfigPath) throws TikaException, IOException {
return load(tikaConfigPath, null);
}
/**
* Loads an AsyncProcessor from a configuration file path with a custom PipesIterator.
* <p>
* This method pre-extracts plugins before loading, ensuring child processes
* don't race to extract the same plugins.
*
* @param tikaConfigPath path to the tika-config.json file
* @param pipesIterator optional custom pipes iterator (may be null)
* @return a new AsyncProcessor instance
* @throws IOException if reading config or plugin extraction fails
* @throws TikaException if configuration is invalid
*/
public static AsyncProcessor load(Path tikaConfigPath, PipesIterator pipesIterator)
throws TikaException, IOException {
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
TikaPluginManager.preExtractPlugins(tikaJsonConfig);
return new AsyncProcessor(tikaConfigPath, pipesIterator, tikaJsonConfig);
}
private AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator,
TikaJsonConfig tikaJsonConfig) throws TikaException, IOException {
TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig);
MetadataFilter metadataFilter = TikaLoader.load(tikaConfigPath).loadMetadataFilters();
this.asyncConfig = PipesConfig.load(tikaJsonConfig);
this.tikaConfigPath = tikaConfigPath;
this.pipesReporter = ReporterManager.load(tikaPluginManager, tikaJsonConfig);
LOG.debug("loaded reporter {}", pipesReporter.getClass());
this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
this.emitDatumTuples = new ArrayBlockingQueue<>(100);
//+1 is the watcher thread
this.executorService = Executors.newFixedThreadPool(
asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1);
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);
try {
this.executorCompletionService.submit(() -> {
while (true) {
try {
Thread.sleep(500);
checkActive();
} catch (InterruptedException e) {
return WATCHER_FUTURE_CODE;
}
}
});
//this is run in a daemon thread
if (pipesIterator != null && (pipesIterator instanceof TotalCounter)) {
LOG.debug("going to total counts");
startCounter((TotalCounter) pipesIterator);
}
// Create ServerManagers based on shared mode config
boolean isSharedMode = asyncConfig.isUseSharedServer();
if (isSharedMode) {
LOG.info("Using shared server mode with {} workers", asyncConfig.getNumClients());
SharedServerManager sharedManager = new SharedServerManager(
asyncConfig, tikaConfigPath, asyncConfig.getNumClients());
serverManagers.add(sharedManager);
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
executorCompletionService.submit(
new FetchEmitWorker(asyncConfig, sharedManager,
fetchEmitTuples, emitDatumTuples, applicationErrorOccurred));
}
} else {
LOG.info("Using per-client server mode with {} workers", asyncConfig.getNumClients());
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
PerClientServerManager serverManager = new PerClientServerManager(
asyncConfig, tikaConfigPath, i);
serverManagers.add(serverManager);
executorCompletionService.submit(
new FetchEmitWorker(asyncConfig, serverManager,
fetchEmitTuples, emitDatumTuples, applicationErrorOccurred));
}
}
EmitterManager emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig);
for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
executorCompletionService.submit(
new AsyncEmitter(asyncConfig, emitDatumTuples, emitterManager));
}
} catch (Exception e) {
LOG.error("problem initializing AsyncProcessor", e);
executorService.shutdownNow();
closeServerManagers();
this.pipesReporter.error(e);
throw e;
}
}
private void startCounter(TotalCounter totalCounter) {
Thread counterThread = new Thread(() -> {
totalCounter.startTotalCount();
TotalCountResult.STATUS status = totalCounter.getTotalCount().getStatus();
while (status == TotalCountResult.STATUS.NOT_COMPLETED) {
try {
Thread.sleep(500);
TotalCountResult result = totalCounter.getTotalCount();
LOG.trace("counter total {} {} ", result.getStatus(), result.getTotalCount());
pipesReporter.report(result);
status = result.getStatus();
} catch (InterruptedException e) {
return;
}
}
});
counterThread.setDaemon(true);
counterThread.start();
}
public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, long offerMs)
throws PipesException, InterruptedException {
if (isShuttingDown) {
throw new IllegalStateException(
"Can't call offer after calling close() or " + "shutdownNow()");
}
if (applicationErrorOccurred.get()) {
throw new PipesException("Can't call offer after an application error occurred");
}
if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
asyncConfig.getQueueSize());
}
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
while (elapsed < offerMs) {
if (fetchEmitTuples.remainingCapacity() > newFetchEmitTuples.size()) {
try {
fetchEmitTuples.addAll(newFetchEmitTuples);
return true;
} catch (IllegalStateException e) {
//this means that the add all failed because the queue couldn't
//take the full list
LOG.debug("couldn't add full list", e);
}
}
Thread.sleep(100);
elapsed = System.currentTimeMillis() - start;
}
return false;
}
public int getCapacity() {
return fetchEmitTuples.remainingCapacity();
}
public boolean offer(FetchEmitTuple t, long offerMs)
throws PipesException, InterruptedException {
if (fetchEmitTuples == null) {
throw new IllegalStateException("queue hasn't been initialized yet.");
}
long deadline = System.currentTimeMillis() + offerMs;
while (System.currentTimeMillis() < deadline) {
synchronized (this) {
if (isShuttingDown) {
throw new IllegalStateException(
"Can't call offer after calling close() or shutdownNow()");
}
if (applicationErrorOccurred.get()) {
throw new PipesException(
"Can't call offer after an application error occurred");
}
checkActive();
}
// Try a short offer outside the synchronized block so checkActive()
// can still be called by other threads (e.g. the watcher).
long remaining = deadline - System.currentTimeMillis();
long pollMs = Math.min(remaining, 1000);
if (pollMs <= 0) {
return false;
}
if (fetchEmitTuples.offer(t, pollMs, TimeUnit.MILLISECONDS)) {
return true;
}
}
return false;
}
/**
* Returns true if an application error has occurred during processing.
* When this returns true, all workers have stopped or are stopping,
* and no new tuples can be offered.
*
* @return true if an application error occurred
*/
public boolean hasApplicationError() {
return applicationErrorOccurred.get();
}
public void finished() throws InterruptedException {
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
boolean offered = fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE,
MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS);
if (! offered) {
throw new RuntimeException("Couldn't offer completed semaphore within " +
MAX_OFFER_WAIT_MS + " ms");
}
}
}
public synchronized boolean checkActive() throws InterruptedException {
Future<Integer> future = executorCompletionService.poll();
if (future != null) {
try {
Integer i = future.get();
switch (i) {
case PARSER_FUTURE_CODE :
numParserThreadsFinished++;
LOG.debug("fetchEmitWorker finished, total {}", numParserThreadsFinished);
break;
case AsyncEmitter.EMITTER_FUTURE_CODE :
numEmitterThreadsFinished++;
LOG.debug("emitter thread finished, total {}", numEmitterThreadsFinished);
break;
case WATCHER_FUTURE_CODE :
LOG.debug("watcher thread finished");
break;
default :
throw new IllegalArgumentException("Don't recognize this future code: " + i);
}
} catch (ExecutionException e) {
LOG.error("execution exception", e);
this.pipesReporter.error(e);
throw new RuntimeException(e);
}
}
if (numParserThreadsFinished == asyncConfig.getNumClients() && ! addedEmitterSemaphores) {
for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
try {
boolean offered = emitDatumTuples.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE,
MAX_OFFER_WAIT_MS,
TimeUnit.MILLISECONDS);
if (! offered) {
throw new RuntimeException("Couldn't offer emit data stop semaphore " +
"within " + MAX_OFFER_WAIT_MS + " ms");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
addedEmitterSemaphores = true;
}
return !(numParserThreadsFinished == asyncConfig.getNumClients() &&
numEmitterThreadsFinished == asyncConfig.getNumEmitters());
}
@Override
public void close() throws IOException {
executorService.shutdownNow();
closeServerManagers();
this.pipesReporter.close();
}
private void closeServerManagers() {
for (ServerManager manager : serverManagers) {
try {
manager.close();
} catch (IOException e) {
LOG.warn("Error closing server manager", e);
}
}
}
public long getTotalProcessed() {
return totalProcessed.get();
}
private class FetchEmitWorker implements Callable<Integer> {
private final PipesConfig asyncConfig;
private final ServerManager serverManager;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue;
private final AtomicBoolean applicationErrorOccurred;
private FetchEmitWorker(PipesConfig asyncConfig,
ServerManager serverManager,
ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples,
ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue,
AtomicBoolean applicationErrorOccurred) {
this.asyncConfig = asyncConfig;
this.serverManager = serverManager;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataTupleQueue = emitDataTupleQueue;
this.applicationErrorOccurred = applicationErrorOccurred;
}
@Override
public Integer call() throws Exception {
try (PipesClient pipesClient = new PipesClient(asyncConfig, serverManager)) {
while (true) {
// Check if another worker encountered an application error
if (applicationErrorOccurred.get()) {
LOG.info("pipesClientId={}: stopping due to application error in another worker",
pipesClient.getPipesClientId());
return PARSER_FUTURE_CODE;
}
FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
if (t == null) {
//skip
if (LOG.isTraceEnabled()) {
LOG.trace("null fetch emit tuple");
}
} else if (t == PipesIterator.COMPLETED_SEMAPHORE) {
if (LOG.isTraceEnabled()) {
LOG.trace("hit completed semaphore");
}
return PARSER_FUTURE_CODE;
} else {
PipesResult result = null;
long start = System.currentTimeMillis();
try {
result = pipesClient.process(t);
//TODO -- drop this back to debug or even trace once we have stability in ci
LOG.info("pipesClientId={}, status={}", pipesClient.getPipesClientId(), result.status());
} catch (IOException e) {
LOG.warn("pipesClientId={} crash", pipesClient.getPipesClientId(), e);
result = PipesResults.UNSPECIFIED_CRASH;
}
// Check if we should stop processing based on the result
if (shouldStopProcessing(result)) {
LOG.error("pipesClientId={}: {} ({}), stopping all processing",
pipesClient.getPipesClientId(),
describeStopReason(result),
result.status());
applicationErrorOccurred.set(true);
pipesReporter.report(t, result, System.currentTimeMillis() - start);
throw new PipesException(describeStopReason(result) + ": " +
result.status() +
(result.message() != null ? " - " + result.message() : ""));
}
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- pipes client process: {} ms",
System.currentTimeMillis() - start);
}
long offerStart = System.currentTimeMillis();
if (shouldEmit(result)) {
LOG.trace("adding result to emitter queue: " + result.emitData());
boolean offered = emitDataTupleQueue.offer(
new EmitDataPair(t.getEmitKey().getEmitterId(), result.emitData()), MAX_OFFER_WAIT_MS,
TimeUnit.MILLISECONDS);
if (! offered) {
throw new RuntimeException("Couldn't offer emit data to queue " +
"within " + MAX_OFFER_WAIT_MS + " ms");
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- offered: {} ms",
System.currentTimeMillis() - offerStart);
}
long elapsed = System.currentTimeMillis() - start;
pipesReporter.report(t, result, elapsed);
totalProcessed.incrementAndGet();
}
}
}
}
private boolean shouldEmit(PipesResult result) {
if (result.status() == PipesResult.RESULT_STATUS.PARSE_SUCCESS ||
result.status() == PipesResult.RESULT_STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
return true;
}
// Emit intermediate results on any non-success if configured
return asyncConfig.isEmitIntermediateResults() && !result.isSuccess();
}
/**
* Determines if processing should stop based on the result and configuration.
* <p>
* When stopOnlyOnFatal is true (server mode): only stop on fatal errors.
* When stopOnlyOnFatal is false (CLI mode, default): also stop on initialization
* failures and fetcher/emitter not found errors.
*/
private boolean shouldStopProcessing(PipesResult result) {
// Always stop on fatal errors
if (result.isFatal()) {
return true;
}
// In server mode, only fatal errors stop processing
if (asyncConfig.isStopOnlyOnFatal()) {
return false;
}
// In CLI mode, also stop on initialization failures and not-found errors
if (result.isInitializationFailure()) {
return true;
}
// Stop on fetcher/emitter not found in CLI mode
PipesResult.RESULT_STATUS status = result.status();
return status == PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND ||
status == PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND;
}
private String describeStopReason(PipesResult result) {
if (result.isFatal()) {
return "Fatal error";
} else if (result.isInitializationFailure()) {
return "Initialization failure";
} else {
return "Configuration error";
}
}
}
}