ServerProtocolIO.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core.server;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.core.extractor.UnpackConfig;
import org.apache.tika.pipes.core.protocol.PipesMessage;
import org.apache.tika.pipes.core.protocol.PipesMessageType;
import org.apache.tika.pipes.core.protocol.ShutDownReceivedException;
import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;

/**
 * Centralizes protocol I/O operations shared by {@link PipesServer} and
 * {@link ConnectionHandler}.
 * <p>
 * This class handles the pure protocol mechanics ��� serialization, framing,
 * and ACK exchange. It does <b>not</b> make lifecycle decisions (exit vs.
 * return, close connection vs. shut down JVM). Callers are responsible for
 * catching exceptions and responding according to their own lifecycle policy.
 */
public class ServerProtocolIO {

    private final DataInputStream input;
    private final DataOutputStream output;

    public ServerProtocolIO(DataInputStream input, DataOutputStream output) {
        this.input = input;
        this.output = output;
    }

    /**
     * Writes a FINISHED message with the serialized result and waits for ACK.
     *
     * @throws ShutDownReceivedException if SHUT_DOWN is received instead of ACK
     * @throws IOException on serialization or I/O errors
     */
    public void writeFinished(PipesResult pipesResult) throws IOException {
        byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
        PipesMessage.finished(bytes).write(output);
        awaitAck();
    }

    /**
     * Writes an INTERMEDIATE_RESULT message with the serialized metadata and waits for ACK.
     *
     * @throws ShutDownReceivedException if SHUT_DOWN is received instead of ACK
     * @throws IOException on serialization or I/O errors
     */
    public void writeIntermediate(Metadata metadata) throws IOException {
        byte[] bytes = JsonPipesIpc.toBytes(metadata);
        PipesMessage.intermediateResult(bytes).write(output);
        awaitAck();
    }

    /**
     * Writes a crash message (OOM, TIMEOUT, or UNSPECIFIED_CRASH) with the
     * serialized stack trace and waits for ACK.
     *
     * @throws IOException on serialization, I/O, or unexpected ACK response
     */
    public void writeCrash(PipesMessageType crashType, Throwable t) throws IOException {
        String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
        byte[] bytes = JsonPipesIpc.toBytes(msg);
        PipesMessage.crash(crashType, bytes).write(output);
        awaitAck();
    }

    /**
     * Reads a framed message and verifies it is an ACK.
     *
     * @throws ShutDownReceivedException if the message is SHUT_DOWN
     * @throws IOException if the message is any other non-ACK type, or on I/O error
     */
    public void awaitAck() throws IOException {
        PipesMessage msg = PipesMessage.read(input);
        if (msg.type() == PipesMessageType.ACK) {
            return;
        }
        if (msg.type() == PipesMessageType.SHUT_DOWN) {
            throw new ShutDownReceivedException();
        }
        throw new IOException("Expected ACK but got " + msg.type());
    }

    /**
     * Validates that a FetchEmitTuple's configuration is consistent.
     * <p>
     * If the tuple has an UnpackConfig with an emitter but ParseMode is not UNPACK,
     * that's a configuration error.
     */
    public static void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple)
            throws TikaConfigException {
        ParseContext requestContext = fetchEmitTuple.getParseContext();
        if (requestContext == null) {
            return;
        }
        UnpackConfig unpackConfig = requestContext.get(UnpackConfig.class);
        ParseMode parseMode = requestContext.get(ParseMode.class);

        if (unpackConfig != null && !StringUtils.isBlank(unpackConfig.getEmitter())
                && parseMode != ParseMode.UNPACK) {
            throw new TikaConfigException(
                    "FetchEmitTuple has UnpackConfig with emitter '" + unpackConfig.getEmitter() +
                            "' but ParseMode is " + parseMode + ". " +
                            "To extract embedded bytes, set ParseMode.UNPACK in the ParseContext.");
        }
    }
}