PipesClient.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core;


import static org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.TIMEOUT;
import static org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.config.TimeoutLimits;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.core.emitter.EmitDataImpl;
import org.apache.tika.pipes.core.protocol.PipesMessage;
import org.apache.tika.pipes.core.protocol.PipesMessageType;
import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.pipes.core.server.IntermediateResult;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;

/**
 * The PipesClient is designed to be single-threaded. It only allots
 * a single thread for {@link #process(FetchEmitTuple)} processing.
 * See {@link org.apache.tika.pipes.core.async.AsyncProcessor} for handling
 * multiple PipesClients.
 * <p>
 * PipesClient delegates server lifecycle management to a {@link ServerManager}.
 * In per-client mode, each client has its own {@link PerClientServerManager}.
 * In shared mode, all clients share a single {@link SharedServerManager}.
 */
public class PipesClient implements Closeable {

    private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);
    private static final AtomicInteger CLIENT_COUNTER = new AtomicInteger(0);
    public static final int SOCKET_CONNECT_TIMEOUT_MS = 60000;
    public static final int SOCKET_TIMEOUT_MS = 60000;

    private final PipesConfig pipesConfig;
    private final ServerManager serverManager;
    private final boolean ownsServerManager;
    private final int pipesClientId;

    private ConnectionTuple connectionTuple;
    private int filesProcessed = 0;

    /**
     * Creates a PipesClient with the given server manager.
     * <p>
     * The caller retains ownership of the server manager and is responsible
     * for closing it. This is used in shared mode where multiple clients
     * share a single server manager.
     *
     * @param pipesConfig the pipes configuration
     * @param serverManager the server manager (per-client or shared)
     */
    public PipesClient(PipesConfig pipesConfig, ServerManager serverManager) {
        this.pipesConfig = pipesConfig;
        this.serverManager = serverManager;
        this.ownsServerManager = false;
        this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
    }

    /**
     * Creates a PipesClient with its own dedicated server process.
     * <p>
     * This is a convenience constructor for per-client mode that creates
     * a {@link PerClientServerManager} internally. The server will be started
     * lazily on first use and shut down when this client is closed.
     *
     * @param pipesConfig the pipes configuration
     * @param tikaConfigPath path to the tika config file
     */
    public PipesClient(PipesConfig pipesConfig, java.nio.file.Path tikaConfigPath) {
        this.pipesConfig = pipesConfig;
        this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
        this.serverManager = new PerClientServerManager(pipesConfig, tikaConfigPath, pipesClientId);
        this.ownsServerManager = true;
    }

    public int getFilesProcessed() {
        return filesProcessed;
    }

    private boolean ping() {
        if (connectionTuple == null) {
            return false;
        }
        // Check if server process is still running
        if (!serverManager.isRunning()) {
            return false;
        }
        try {
            PipesMessage.ping().write(connectionTuple.output);
            PipesMessage response = PipesMessage.read(connectionTuple.input);
            if (response.type() == PipesMessageType.PING) {
                return true;
            }
        } catch (IOException e) {
            return false;
        }
        return false;
    }

    @Override
    public void close() throws IOException {
        try {
            closeConnection();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (ownsServerManager) {
            serverManager.close();
        }
    }

    public int getPipesClientId() {
        return pipesClientId;
    }

    /**
     * Closes just this client's connection, not the server.
     * Server lifecycle is managed by PipesParser.
     */
    private void closeConnection() throws InterruptedException {
        if (connectionTuple == null) {
            return;
        }
        LOG.debug("pipesClientId={}: closing connection", pipesClientId);
        try {
            PipesMessage.shutDown().write(connectionTuple.output);
        } catch (IOException e) {
            // swallow
        }
        List<IOException> exceptions = new ArrayList<>();
        tryToClose(connectionTuple.input, exceptions);
        tryToClose(connectionTuple.output, exceptions);
        tryToClose(connectionTuple.socket, exceptions);
        connectionTuple = null;
    }

    private void tryToClose(Closeable closeable, List<IOException> exceptions) {
        if (closeable == null) {
            return;
        }
        try {
            closeable.close();
        } catch (IOException e) {
            exceptions.add(e);
        }
    }

    public PipesResult process(FetchEmitTuple t) throws IOException, InterruptedException {
        // Container object to hold latest intermediate result if the parser is doing that
        IntermediateResult intermediateResult = new IntermediateResult();
        PipesResult result = null;
        try {
            maybeInit();
        } catch (ServerInitializationException e) {
            LOG.error("server initialization failed: {} ", t.getId(), e);
            closeConnection();
            return buildFatalResult(t.getId(), t.getEmitKey(), PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE,
                    intermediateResult.get(), e.getMessage());
        } catch (SecurityException e) {
            LOG.error("security exception during initialization: {} ", t.getId());
            closeConnection();
            return buildFatalResult(t.getId(), t.getEmitKey(), PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE,
                    intermediateResult.get());
        }

        try {
            writeTask(t);
            result = waitForServer(t, intermediateResult);
            filesProcessed++;
            // Update server manager's file counter for maxFilesProcessedPerProcess tracking
            serverManager.incrementFilesProcessed(pipesConfig.getMaxFilesProcessedPerProcess());
        } catch (InterruptedException | SecurityException e) {
            throw e;
        } catch (Exception e) {
            LOG.error("exception waiting for server to complete task: {} ", t.getId(), e);
            closeConnection();
            return buildFatalResult(t.getId(), t.getEmitKey(), UNSPECIFIED_CRASH, intermediateResult.get());
        }
        return result;
    }

    private void maybeInit() throws InterruptedException, ServerInitializationException {
        boolean reconnect = false;

        // Check if server needs restart (marked for restart after crash or reaching file limit)
        if (serverManager.needsRestart()) {
            LOG.debug("pipesClientId={}: server marked for restart - reconnecting", pipesClientId);
            closeConnection();
            reconnect = true;
        }

        // Check if connection is alive
        if (!reconnect && !ping()) {
            reconnect = true;
        }

        if (reconnect) {
            int maxRestartAttempts = 5;
            int restartAttempts = 0;
            long baseDelayMs = 100;

            while (true) {
                try {
                    reconnect();
                    filesProcessed = 0;
                    return;
                } catch (ServerInitializationException e) {
                    // Server initialization failed - don't retry, rethrow immediately
                    throw e;
                } catch (TimeoutException e) {
                    LOG.warn("pipesClientId={}: couldn't reconnect within timeout (attempt {}/{})",
                            pipesClientId, restartAttempts + 1, maxRestartAttempts);
                    if (++restartAttempts >= maxRestartAttempts) {
                        throw new ServerInitializationException("couldn't connect to server after " +
                                restartAttempts + " attempts", e);
                    }
                    // Wait with exponential backoff before retry
                    long delay = baseDelayMs * (1L << Math.min(restartAttempts, 6));
                    Thread.sleep(delay);
                } catch (IOException e) {
                    LOG.warn("pipesClientId={}: couldn't reconnect (attempt {}/{})",
                            pipesClientId, restartAttempts + 1, maxRestartAttempts, e);
                    if (++restartAttempts >= maxRestartAttempts) {
                        throw new ServerInitializationException("couldn't connect to server after " +
                                restartAttempts + " attempts", e);
                    }
                    // Wait with exponential backoff before retry
                    long delay = baseDelayMs * (1L << Math.min(restartAttempts, 6));
                    Thread.sleep(delay);
                }
            }
        }
    }

    /**
     * Establishes or re-establishes connection to the server.
     */
    private void reconnect() throws InterruptedException, IOException, TimeoutException, ServerInitializationException {
        // Close existing connection if any
        if (connectionTuple != null) {
            LOG.debug("pipesClientId={}: closing existing connection before reconnect", pipesClientId);
            closeConnection();
        }

        // Ensure server is running (blocks if restart in progress)
        serverManager.ensureRunning();

        // Get port after ensureRunning - this is the port we'll connect to
        int port = serverManager.getPort();
        LOG.debug("pipesClientId={}: connecting to server", pipesClientId);

        // Connect to server
        Socket socket = serverManager.connect((int) pipesConfig.getSocketTimeoutMs());

        connectionTuple = new ConnectionTuple(socket,
                new DataInputStream(new BufferedInputStream(socket.getInputStream())),
                new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())));

        waitForStartup();
    }

    private void writeTask(FetchEmitTuple t) throws IOException {
        LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", pipesClientId, t.getId());
        byte[] bytes = JsonPipesIpc.toBytes(t);
        PipesMessage.newRequest(bytes).write(connectionTuple.output);
    }

    private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult intermediateResult) throws InterruptedException {
        TimeoutLimits limits = TimeoutLimits.get(t.getParseContext());
        long progressTimeoutMillis = limits.getProgressTimeoutMillis();
        long totalTaskTimeoutMillis = limits.getTotalTaskTimeoutMillis();
        Instant start = Instant.now();
        Instant lastUpdate = start;

        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("thread interrupt");
            }
            Instant now = Instant.now();
            long totalElapsed = Duration.between(start, now).toMillis();
            if (totalElapsed > totalTaskTimeoutMillis) {
                LOG.warn("clientId={}: total task timeout: id={} elapsed={}ms limit={}ms",
                        pipesClientId, t.getId(), totalElapsed, totalTaskTimeoutMillis);
                // Mark for restart - server is stuck on current request and needs to be restarted
                serverManager.markServerForRestart();
                closeConnection();
                return buildFatalResult(t.getId(), t.getEmitKey(), PipesResult.RESULT_STATUS.TIMEOUT,
                        intermediateResult.get());
            }
            long timeSinceUpdate = Duration.between(lastUpdate, now).toMillis();
            if (timeSinceUpdate > progressTimeoutMillis) {
                LOG.warn("clientId={}: progress timeout: id={} timeSinceUpdate={}ms limit={}ms",
                        pipesClientId, t.getId(), timeSinceUpdate, progressTimeoutMillis);
                serverManager.markServerForRestart();
                closeConnection();
                return buildFatalResult(t.getId(), t.getEmitKey(), PipesResult.RESULT_STATUS.TIMEOUT,
                        intermediateResult.get());
            }
            try {
                PipesMessage msg = PipesMessage.read(connectionTuple.input);
                LOG.trace("clientId={}: received message type={} id={}", pipesClientId, msg.type(), t.getId());

                // Send ACK only for messages that require it
                if (msg.type().requiresAck()) {
                    PipesMessage.ack().write(connectionTuple.output);
                }

                switch (msg.type()) {
                    case OOM:
                        String oomMsg = JsonPipesIpc.fromBytes(msg.payload(), String.class);
                        serverManager.markServerForRestart();
                        closeConnection();
                        return buildFatalResult(t.getId(), t.getEmitKey(), PipesResult.RESULT_STATUS.OOM,
                                intermediateResult.get(), oomMsg);
                    case TIMEOUT:
                        String timeoutMsg = JsonPipesIpc.fromBytes(msg.payload(), String.class);
                        serverManager.markServerForRestart();
                        closeConnection();
                        return buildFatalResult(t.getId(), t.getEmitKey(), TIMEOUT,
                                intermediateResult.get(), timeoutMsg);
                    case UNSPECIFIED_CRASH:
                        String crashMsg = JsonPipesIpc.fromBytes(msg.payload(), String.class);
                        serverManager.markServerForRestart();
                        closeConnection();
                        return buildFatalResult(t.getId(), t.getEmitKey(), UNSPECIFIED_CRASH,
                                intermediateResult.get(), crashMsg);
                    case INTERMEDIATE_RESULT:
                        intermediateResult.set(JsonPipesIpc.fromBytes(msg.payload(), Metadata.class));
                        lastUpdate = Instant.now();
                        break;
                    case WORKING:
                        lastUpdate = Instant.ofEpochMilli(msg.lastProgressMillis());
                        break;
                    case FINISHED:
                        PipesResult result = JsonPipesIpc.fromBytes(msg.payload(), PipesResult.class);
                        // Restore ParseContext from original FetchEmitTuple (not serialized back from server)
                        if (result.emitData() instanceof EmitDataImpl emitDataImpl) {
                            emitDataImpl.setParseContext(t.getParseContext());
                        }
                        return result;
                    default:
                        throw new IOException("Unexpected message type from server: " + msg.type());
                }
            } catch (SocketTimeoutException e) {
                LOG.warn("clientId={}: Socket timeout exception while waiting for server", pipesClientId, e);
                // Mark for restart - server is stuck on current request and needs to be restarted
                serverManager.markServerForRestart();
                closeConnection();
                return buildFatalResult(t.getId(), t.getEmitKey(), TIMEOUT, intermediateResult.get(),
                        ExceptionUtils.getStackTrace(e));
            } catch (SecurityException e) {
                throw e;
            } catch (Exception e) {
                LOG.warn("clientId={} - crash while waiting for server", pipesClientId, e);
                // Handle crash and determine status based on exit code
                int exitCode = serverManager.handleCrashAndGetExitCode();
                PipesResult.RESULT_STATUS status = UNSPECIFIED_CRASH;
                if (exitCode == PipesMessageType.OOM.getExitCode().orElse(-1)) {
                    status = PipesResult.RESULT_STATUS.OOM;
                } else if (exitCode == PipesMessageType.TIMEOUT.getExitCode().orElse(-1)) {
                    status = PipesResult.RESULT_STATUS.TIMEOUT;
                }
                closeConnection();
                return buildFatalResult(t.getId(), t.getEmitKey(), status, intermediateResult.get(),
                        ExceptionUtils.getStackTrace(e));
            }
        }
    }

    private PipesResult buildFatalResult(String id, EmitKey emitKey, PipesResult.RESULT_STATUS status,
                                         Optional<Metadata> intermediateResultOpt) {
        return buildFatalResult(id, emitKey, status, intermediateResultOpt, null);
    }

    private PipesResult buildFatalResult(String id, EmitKey emitKey, PipesResult.RESULT_STATUS status,
                                         Optional<Metadata> intermediateResultOpt, String msg) {
        LOG.warn("clientId={}: crash id={} status={}", pipesClientId, id, status);
        Metadata intermediateResult = intermediateResultOpt.orElse(new Metadata());

        if (LOG.isTraceEnabled()) {
            LOG.trace("clientId={}: intermediate result: id={}", pipesClientId, intermediateResult);
        }
        intermediateResult.set(TikaCoreProperties.PIPES_RESULT, status.toString());
        if (StringUtils.isBlank(msg)) {
            return new PipesResult(status, new EmitDataImpl(emitKey.getEmitKey(), List.of(intermediateResult)));
        } else {
            return new PipesResult(status, new EmitDataImpl(emitKey.getEmitKey(), List.of(intermediateResult)), msg);
        }
    }

    private void waitForStartup() throws IOException {
        PipesMessage msg = PipesMessage.read(connectionTuple.input);
        if (msg.type() == PipesMessageType.READY) {
            LOG.debug("clientId={}: server ready", pipesClientId);
        } else if (msg.type() == PipesMessageType.STARTUP_FAILED) {
            // Send ACK for startup failure
            PipesMessage.ack().write(connectionTuple.output);
            String errorMsg = new String(msg.payload(), StandardCharsets.UTF_8);
            LOG.error("clientId={}: Server failed to start: {}", pipesClientId, errorMsg);
            throw new ServerInitializationException(errorMsg);
        } else {
            LOG.error("clientId={}: Unexpected first message type: {}", pipesClientId, msg.type());
            throw new IOException("Unexpected first message type from server: " + msg.type());
        }
    }

    /**
     * Connection state: socket and streams for communicating with the server.
     * Unlike the old ServerTuple, this doesn't include the process or server socket
     * since those are now managed by ServerManager.
     */
    private record ConnectionTuple(Socket socket, DataInputStream input, DataOutputStream output) {
    }

}