PipesServer.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.core.server;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.time.Duration;
import java.time.Instant;
import java.util.HexFormat;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import org.apache.tika.config.TikaProgressTracker;
import org.apache.tika.config.TimeoutLimits;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.config.loader.TikaLoader;
import org.apache.tika.detect.Detector;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.extractor.EmbeddedDocumentExtractorFactory;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.filter.MetadataFilter;
import org.apache.tika.metadata.writefilter.MetadataWriteLimiterFactory;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.core.EmitStrategy;
import org.apache.tika.pipes.core.EmitStrategyConfig;
import org.apache.tika.pipes.core.PipesClient;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.config.ConfigStore;
import org.apache.tika.pipes.core.config.ConfigStoreFactory;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.pipes.core.extractor.UnpackExtractorFactory;
import org.apache.tika.pipes.core.fetcher.FetcherManager;
import org.apache.tika.pipes.core.protocol.PipesMessage;
import org.apache.tika.pipes.core.protocol.PipesMessageType;
import org.apache.tika.pipes.core.protocol.ShutDownReceivedException;
import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.plugins.TikaPluginManager;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.serialization.ParseContextUtils;
import org.apache.tika.utils.ExceptionUtils;
/**
* This server is forked from the PipesClient. This class isolates
* parsing from the client to protect the primary JVM.
* <p>
* When configuring logging for this class, make absolutely certain
* not to write to STDOUT. This class uses STDOUT to communicate with
* the PipesClient.
*/
public class PipesServer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PipesServer.class);
public static final int AUTH_TOKEN_LENGTH_BYTES = 32;
private final long heartbeatIntervalMs;
private final String pipesClientId;
private Detector detector;
private final DataInputStream input;
private final DataOutputStream output;
private final TikaLoader tikaLoader;
private final PipesConfig pipesConfig;
private final Socket socket;
private final MetadataFilter defaultMetadataFilter;
private final ContentHandlerFactory defaultContentHandlerFactory;
private final MetadataWriteLimiterFactory defaultMetadataWriteLimiterFactory;
private AutoDetectParser autoDetectParser;
private RecursiveParserWrapper rMetaParser;
private FetcherManager fetcherManager;
private EmitterManager emitterManager;
private ConfigStore configStore;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ExecutorCompletionService<PipesResult> executorCompletionService = new ExecutorCompletionService<>(executorService);
private final EmitStrategy emitStrategy;
private final ServerProtocolIO protocolIO;
public static PipesServer load(int port, Path tikaConfigPath) throws Exception {
String pipesClientId = System.getProperty("pipesClientId", "unknown");
LOG.debug("pipesClientId={}: connecting to client on port={}", pipesClientId, port);
Socket socket = new Socket();
socket.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), port), PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
socket.setTcpNoDelay(true); // Disable Nagle's algorithm to avoid ~40ms delays on small writes
DataInputStream dis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
try {
TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
// Set socket timeout from config after loading PipesConfig
socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
socket.setTcpNoDelay(true);
MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
ContentHandlerFactory contentHandlerFactory = tikaLoader.loadContentHandlerFactory();
MetadataWriteLimiterFactory metadataWriteLimiterFactory = tikaLoader.loadParseContext().get(MetadataWriteLimiterFactory.class);
PipesServer pipesServer = new PipesServer(pipesClientId, tikaLoader, pipesConfig, socket, dis, dos, metadataFilter, contentHandlerFactory, metadataWriteLimiterFactory);
pipesServer.initializeResources();
LOG.debug("pipesClientId={}: PipesServer loaded and ready", pipesClientId);
return pipesServer;
} catch (Exception e) {
LOG.error("Failed to start up", e);
try {
String msg = ExceptionUtils.getStackTrace(e);
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
PipesMessage.startupFailed(bytes).write(dos);
PipesMessage ackMsg = PipesMessage.read(dis);
if (ackMsg.type() != PipesMessageType.ACK) {
LOG.warn("Expected ACK but got: {}", ackMsg.type());
}
} catch (IOException ioException) {
LOG.error("Failed to send startup failure message to client", ioException);
}
throw e;
}
}
public PipesServer(String pipesClientId, TikaLoader tikaLoader, PipesConfig pipesConfig, Socket socket, DataInputStream in,
DataOutputStream out, MetadataFilter metadataFilter, ContentHandlerFactory contentHandlerFactory,
MetadataWriteLimiterFactory metadataWriteLimiterFactory) throws TikaConfigException,
IOException {
this.pipesClientId = pipesClientId;
this.tikaLoader = tikaLoader;
this.pipesConfig = pipesConfig;
this.socket = socket;
this.defaultMetadataFilter = metadataFilter;
this.defaultContentHandlerFactory = contentHandlerFactory;
this.defaultMetadataWriteLimiterFactory = metadataWriteLimiterFactory;
this.input = new DataInputStream(in);
this.output = new DataOutputStream(out);
this.heartbeatIntervalMs = pipesConfig.getHeartbeatIntervalMs();
// Validate heartbeat interval is less than socket timeout
if (heartbeatIntervalMs >= pipesConfig.getSocketTimeoutMs()) {
String msg = String.format(Locale.ROOT, "Heartbeat interval (%dms) must be less than socket timeout (%dms). " +
"This configuration will cause socket timeouts during normal processing.",
heartbeatIntervalMs, pipesConfig.getSocketTimeoutMs());
// Allow override for testing only
if (!"true".equals(System.getProperty("tika.pipes.allowInvalidHeartbeat"))) {
throw new TikaConfigException(msg);
}
LOG.error(msg + " Proceeding because tika.pipes.allowInvalidHeartbeat=true");
}
emitStrategy = pipesConfig.getEmitStrategy().getType();
this.protocolIO = new ServerProtocolIO(input, output);
}
public static void main(String[] args) throws Exception {
// Check for shared mode: --shared <numConnections> <tikaConfigPath>
if (args.length > 0 && "--shared".equals(args[0])) {
String portEnv = System.getenv("TIKA_PIPES_PORT");
if (portEnv == null || portEnv.isEmpty()) {
throw new IllegalStateException("TIKA_PIPES_PORT environment variable is not set");
}
int port = Integer.parseInt(portEnv);
String tokenHex = System.getenv("TIKA_PIPES_AUTH_TOKEN");
if (tokenHex == null || tokenHex.isEmpty()) {
throw new IllegalStateException("TIKA_PIPES_AUTH_TOKEN environment variable is not set");
}
byte[] expectedToken = HexFormat.of().parseHex(tokenHex);
int numConnections = Integer.parseInt(args[1]);
Path tikaConfig = Paths.get(args[2]);
LOG.info("Starting shared PipesServer with {} connections", numConnections);
runSharedMode(port, numConnections, tikaConfig, expectedToken);
} else {
// Per-client mode: <port> <tikaConfigPath>
int port = Integer.parseInt(args[0]);
Path tikaConfig = Paths.get(args[1]);
String pipesClientId = System.getProperty("pipesClientId", "unknown");
LOG.debug("pipesClientId={}: starting pipes server on port={}", pipesClientId, port);
try (PipesServer server = PipesServer.load(port, tikaConfig)) {
server.mainLoop();
} catch (Throwable t) {
LOG.error("pipesClientId={}: crashed", pipesClientId, t);
throw t;
} finally {
LOG.debug("pipesClientId={}: server shutting down", pipesClientId);
}
}
}
/**
* Runs the server in shared mode, accepting multiple client connections.
* <p>
* Each incoming connection must present a valid auth token (32 bytes) before
* being accepted. This prevents unauthorized local processes from connecting.
* Note: if a malicious actor has access to your localhost and can read
* /proc/<pid>/environ, that is beyond Tika's security model. This auth
* token exists to prevent CVE-style abuse from untrusted local processes that
* cannot read the server process's environment.
*/
private static void runSharedMode(int port, int numConnections, Path tikaConfigPath,
byte[] expectedToken) throws Exception {
TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
PipesConfig pipesConfig = PipesConfig.load(tikaLoader.getConfig());
// Load shared resources
SharedServerResources resources = SharedServerResources.load(tikaLoader, pipesConfig);
// Create thread pool for connection handlers
ExecutorService connectionPool = Executors.newFixedThreadPool(numConnections);
// Create server socket and accept connections
try (java.net.ServerSocket serverSocket = new java.net.ServerSocket()) {
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), port), numConnections);
// Signal readiness to the parent process via stdout
System.out.println("READY:" + port);
System.out.flush();
LOG.info("Shared server ready, accepting connections");
// Accept connections until shutdown
while (!Thread.currentThread().isInterrupted()) {
try {
java.net.Socket clientSocket = serverSocket.accept();
clientSocket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
clientSocket.setTcpNoDelay(true);
// Validate auth token before creating handler
byte[] clientToken = new byte[AUTH_TOKEN_LENGTH_BYTES];
int bytesRead = 0;
while (bytesRead < AUTH_TOKEN_LENGTH_BYTES) {
int r = clientSocket.getInputStream().read(
clientToken, bytesRead, AUTH_TOKEN_LENGTH_BYTES - bytesRead);
if (r == -1) {
break;
}
bytesRead += r;
}
if (bytesRead < AUTH_TOKEN_LENGTH_BYTES ||
!MessageDigest.isEqual(expectedToken, clientToken)) {
LOG.warn("Rejected connection with invalid auth token");
try {
clientSocket.close();
} catch (IOException ignored) {
}
continue;
}
LOG.debug("Accepted authenticated connection from client");
ConnectionHandler handler = new ConnectionHandler(clientSocket, resources, pipesConfig);
connectionPool.submit(handler);
} catch (java.net.SocketException e) {
// Server socket closed, shutdown
LOG.debug("Server socket closed, shutting down");
break;
}
}
} finally {
connectionPool.shutdownNow();
connectionPool.awaitTermination(10, TimeUnit.SECONDS);
LOG.debug("Shared server shutdown complete");
}
}
public void mainLoop() {
try {
PipesMessage.ready().write(output);
} catch (IOException e) {
LOG.error("pipesClientId={}: failed to send READY", pipesClientId, e);
exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
return;
}
LOG.debug("pipesClientId={}: sent READY, entering main loop", pipesClientId);
ArrayBlockingQueue<Metadata> intermediateResult = new ArrayBlockingQueue<>(1);
//main loop
try {
while (true) {
PipesMessage msg = PipesMessage.read(input);
LOG.trace("pipesClientId={}: received message type={}", pipesClientId, msg.type());
switch (msg.type()) {
case PING:
PipesMessage.ping().write(output);
break;
case NEW_REQUEST:
intermediateResult.clear();
CountDownLatch countDownLatch = new CountDownLatch(1);
FetchEmitTuple fetchEmitTuple;
try {
fetchEmitTuple = JsonPipesIpc.fromBytes(msg.payload(), FetchEmitTuple.class);
} catch (IOException e) {
LOG.error("problem deserializing FetchEmitTuple", e);
handleCrash(PipesMessageType.UNSPECIFIED_CRASH, "unknown", e);
break; // unreachable after handleCrash/exit, but needed for compilation
}
// Validate before merging with global config
ServerProtocolIO.validateFetchEmitTuple(fetchEmitTuple);
// Create merged ParseContext: defaults from tika-config + request overrides
ParseContext mergedContext = createMergedParseContext(fetchEmitTuple.getParseContext());
// Resolve friendly-named configs in ParseContext to actual objects
ParseContextUtils.resolveAll(mergedContext, getClass().getClassLoader());
TikaProgressTracker tracker = new TikaProgressTracker();
mergedContext.set(TikaProgressTracker.class, tracker);
PipesWorker pipesWorker = getPipesWorker(intermediateResult, fetchEmitTuple, mergedContext, countDownLatch);
executorCompletionService.submit(pipesWorker);
try {
loopUntilDone(fetchEmitTuple, mergedContext, executorCompletionService, intermediateResult, countDownLatch, tracker);
} catch (Throwable t) {
LOG.error("Serious problem processing request", t);
}
break;
case SHUT_DOWN:
LOG.debug("shutting down");
try {
close();
} catch (Exception e) {
//swallow
}
System.exit(0);
break;
default:
String errorMsg = String.format(Locale.ROOT,
"pipesClientId=%s: Unexpected message type %s in command position",
pipesClientId, msg.type());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
} catch (Throwable t) {
LOG.error("main loop error (did the forking process shut down?)", t);
exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
}
}
private PipesWorker getPipesWorker(ArrayBlockingQueue<Metadata> intermediateResult, FetchEmitTuple fetchEmitTuple,
ParseContext mergedContext, CountDownLatch countDownLatch) {
FetchHandler fetchHandler = new FetchHandler(fetcherManager);
ParseHandler parseHandler = new ParseHandler(detector, intermediateResult, countDownLatch, autoDetectParser,
rMetaParser, defaultContentHandlerFactory, pipesConfig.getParseMode());
Long thresholdBytes = pipesConfig.getEmitStrategy().getThresholdBytes();
long threshold = (thresholdBytes != null) ? thresholdBytes : EmitStrategyConfig.DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES;
EmitHandler emitHandler = new EmitHandler(defaultMetadataFilter, emitStrategy, emitterManager, threshold);
return new PipesWorker(fetchEmitTuple, mergedContext, autoDetectParser, emitterManager,
fetchHandler, parseHandler, emitHandler, defaultMetadataWriteLimiterFactory);
}
private void loopUntilDone(FetchEmitTuple fetchEmitTuple, ParseContext mergedContext,
ExecutorCompletionService<PipesResult> executorCompletionService,
ArrayBlockingQueue<Metadata> intermediateResult, CountDownLatch countDownLatch,
TikaProgressTracker tracker) throws InterruptedException, IOException {
Instant start = Instant.now();
TimeoutLimits limits = TimeoutLimits.get(mergedContext);
long progressTimeoutMillis = limits.getProgressTimeoutMillis();
long totalTaskTimeoutMillis = limits.getTotalTaskTimeoutMillis();
long heartbeatCounter = 1;
boolean wroteIntermediateResult = false;
while (true) {
// Check for intermediate result (pre-parse metadata)
if (!wroteIntermediateResult) {
Metadata intermediate = intermediateResult.poll(100, TimeUnit.MILLISECONDS);
if (intermediate != null) {
writeIntermediate(intermediate);
countDownLatch.countDown();
wroteIntermediateResult = true;
}
}
// Check for task completion (can happen even without intermediate result if crash occurs early)
Future<PipesResult> future = executorCompletionService.poll(100, TimeUnit.MILLISECONDS);
if (future != null) {
PipesResult pipesResult = null;
try {
pipesResult = future.get();
} catch (OutOfMemoryError e) {
handleCrash(PipesMessageType.OOM, fetchEmitTuple.getId(), e);
return; // handleCrash calls exit(), but guard against unexpected return
} catch (ExecutionException e) {
Throwable t = e.getCause();
LOG.error("crash: {}", fetchEmitTuple.getId(), t);
if (t instanceof OutOfMemoryError) {
handleCrash(PipesMessageType.OOM, fetchEmitTuple.getId(), t);
return;
}
handleCrash(PipesMessageType.UNSPECIFIED_CRASH, fetchEmitTuple.getId(), t);
return;
}
LOG.debug("executor completionService finished task: id={} status={}", fetchEmitTuple.getId(), pipesResult.status());
writeFinished(pipesResult);
return;
}
// Send fire-and-forget heartbeat if we've waited long enough
long elapsed = System.currentTimeMillis() - start.toEpochMilli();
if (elapsed > heartbeatCounter * heartbeatIntervalMs) {
PipesMessage.working(tracker.getLastProgressMillis()).write(output);
heartbeatCounter++;
}
if (checkTotalTimeout(start, totalTaskTimeoutMillis, fetchEmitTuple.getId())) {
return; // handleCrash calls exit(), but guard against unexpected return
}
if (checkProgressTimeout(tracker, progressTimeoutMillis, fetchEmitTuple.getId())) {
return;
}
}
}
private boolean checkTotalTimeout(Instant start, long totalTaskTimeoutMillis, String id) {
long elapsed = Duration.between(start, Instant.now()).toMillis();
if (elapsed > totalTaskTimeoutMillis) {
handleCrash(PipesMessageType.TIMEOUT, id,
new RuntimeException("Server-side total task timeout after " + elapsed + "ms (limit: " + totalTaskTimeoutMillis + "ms)"));
return true;
}
return false;
}
private boolean checkProgressTimeout(TikaProgressTracker tracker, long progressTimeoutMillis, String id) {
long timeSinceProgress = System.currentTimeMillis() - tracker.getLastProgressMillis();
if (timeSinceProgress > progressTimeoutMillis) {
handleCrash(PipesMessageType.TIMEOUT, id,
new RuntimeException("Server-side progress timeout: no progress for " + timeSinceProgress + "ms (limit: " + progressTimeoutMillis + "ms)"));
return true;
}
return false;
}
private void handleCrash(PipesMessageType crashType, String id, Throwable t) {
LOG.error("{}: {}", crashType, id, t);
try {
protocolIO.writeCrash(crashType, t);
} catch (IOException e) {
LOG.warn("problem writing crash info to client", e);
}
exit(crashType.getExitCode().orElse(19));
}
@Override
public void close() throws Exception {
executorService.shutdownNow();
socket.close();
defaultMetadataFilter.close();
}
private void exit(int exitCode) {
if (exitCode != 0) {
LOG.error("exiting: {}", exitCode);
} else {
LOG.debug("exiting: {}", exitCode);
}
System.exit(exitCode);
}
protected void initializeResources() throws TikaException, IOException, SAXException {
TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig);
// Create ConfigStore if specified in pipesConfig
this.configStore = createConfigStore(pipesConfig, tikaPluginManager);
// Load managers with ConfigStore to enable runtime modifications
this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig, true, configStore);
this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig, true, configStore);
this.autoDetectParser = (AutoDetectParser) tikaLoader.loadAutoDetectParser();
this.detector = this.autoDetectParser.getDetector();
this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
}
/**
* Creates a merged ParseContext with defaults from tika-config overlaid with request values.
* Request values take precedence over defaults.
* <p>
* Creates a fresh context each time to avoid shared state between requests.
*
* @param requestContext the ParseContext from FetchEmitTuple
* @return a new ParseContext with defaults + request overrides
*/
private ParseContext createMergedParseContext(ParseContext requestContext) throws TikaConfigException {
// Create fresh context with defaults from tika-config (e.g., DigesterFactory)
ParseContext mergedContext = tikaLoader.loadParseContext();
// If no embedded document extractor factory is configured, use UnpackExtractorFactory
// as the default for pipes scenarios (supports embedded byte extraction)
if (mergedContext.get(EmbeddedDocumentExtractorFactory.class) == null) {
mergedContext.set(EmbeddedDocumentExtractorFactory.class, new UnpackExtractorFactory());
}
// Overlay request's values (request takes precedence)
mergedContext.copyFrom(requestContext);
return mergedContext;
}
private ConfigStore createConfigStore(PipesConfig pipesConfig, TikaPluginManager tikaPluginManager) throws TikaException {
String configStoreType = pipesConfig.getConfigStoreType();
String configStoreParams = pipesConfig.getConfigStoreParams();
if (configStoreType == null || "memory".equals(configStoreType)) {
// Use default in-memory store (no persistence)
return null;
}
ExtensionConfig storeConfig = new ExtensionConfig(
configStoreType, configStoreType, configStoreParams);
return ConfigStoreFactory.createConfigStore(
tikaPluginManager,
configStoreType,
storeConfig);
}
private void writeFinished(PipesResult pipesResult) {
try {
protocolIO.writeFinished(pipesResult);
} catch (ShutDownReceivedException e) {
handleShutDown();
} catch (IOException e) {
LOG.error("problem writing emit data (forking process shutdown?)", e);
exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
}
}
private void writeIntermediate(Metadata metadata) {
try {
protocolIO.writeIntermediate(metadata);
} catch (ShutDownReceivedException e) {
handleShutDown();
} catch (IOException e) {
LOG.error("problem writing intermediate data (forking process shutdown?)", e);
exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
}
}
private void handleShutDown() {
LOG.info("pipesClientId={}: received SHUT_DOWN, shutting down gracefully", pipesClientId);
try {
close();
} catch (Exception e) {
//swallow
}
exit(0);
}
}