PipesMessage.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.core.protocol;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Locale;
/**
* Uniform framed message for the PipesClient/PipesServer IPC protocol.
* <p>
* Wire format: {@code [MAGIC 0x54 0x4B][TYPE 1B][LEN 4B][PAYLOAD]}
* <ul>
* <li>MAGIC ��� two bytes {@code 0x54 0x4B} ("TK") for desync detection</li>
* <li>TYPE ��� one byte identifying the {@link PipesMessageType}</li>
* <li>LEN ��� four-byte big-endian payload length (0 for empty payloads)</li>
* <li>PAYLOAD ��� {@code LEN} bytes of payload data</li>
* </ul>
*/
public record PipesMessage(PipesMessageType type, byte[] payload) {
static final byte MAGIC_0 = 0x54; // 'T'
static final byte MAGIC_1 = 0x4B; // 'K'
/** Maximum payload size: 100 MB (same as old MAX_FETCH_EMIT_TUPLE_BYTES). */
public static final int MAX_PAYLOAD_BYTES = 100 * 1024 * 1024;
private static final byte[] EMPTY = new byte[0];
/**
* Reads one framed message from the stream.
*
* @throws ProtocolDesyncException if magic bytes don't match
* @throws EOFException if the stream ends before a complete message
* @throws IOException on payload size violations or I/O errors
*/
public static PipesMessage read(DataInputStream in) throws IOException {
int m0 = in.read();
if (m0 == -1) {
throw new EOFException("Stream closed before magic byte");
}
int m1 = in.read();
if (m1 == -1) {
throw new EOFException("Stream closed after first magic byte");
}
if ((byte) m0 != MAGIC_0 || (byte) m1 != MAGIC_1) {
throw new ProtocolDesyncException(
String.format(Locale.ROOT, "Expected magic 0x%02x%02x but got 0x%02x%02x",
MAGIC_0 & 0xFF, MAGIC_1 & 0xFF, m0 & 0xFF, m1 & 0xFF));
}
int typeByte = in.read();
if (typeByte == -1) {
throw new EOFException("Stream closed before type byte");
}
PipesMessageType type = PipesMessageType.lookup(typeByte);
int len = in.readInt();
if (len < 0) {
throw new IOException("Negative payload length: " + len);
}
if (len > MAX_PAYLOAD_BYTES) {
throw new IOException("Payload length " + len +
" exceeds maximum of " + MAX_PAYLOAD_BYTES + " bytes");
}
byte[] payload;
if (len == 0) {
payload = EMPTY;
} else {
payload = new byte[len];
in.readFully(payload);
}
return new PipesMessage(type, payload);
}
/**
* Writes this message to the stream and flushes.
*/
public void write(DataOutputStream out) throws IOException {
out.write(MAGIC_0);
out.write(MAGIC_1);
out.write(type.getByte());
out.writeInt(payload.length);
if (payload.length > 0) {
out.write(payload);
}
out.flush();
}
// ---- convenience factories ----
public static PipesMessage ping() {
return new PipesMessage(PipesMessageType.PING, EMPTY);
}
public static PipesMessage ack() {
return new PipesMessage(PipesMessageType.ACK, EMPTY);
}
public static PipesMessage ready() {
return new PipesMessage(PipesMessageType.READY, EMPTY);
}
public static PipesMessage shutDown() {
return new PipesMessage(PipesMessageType.SHUT_DOWN, EMPTY);
}
/**
* Creates a WORKING heartbeat with the last-progress timestamp in the payload.
*
* @param lastProgressMillis epoch millis of the last progress update
*/
public static PipesMessage working(long lastProgressMillis) {
byte[] payload = ByteBuffer.allocate(Long.BYTES)
.order(ByteOrder.BIG_ENDIAN)
.putLong(lastProgressMillis)
.array();
return new PipesMessage(PipesMessageType.WORKING, payload);
}
public static PipesMessage newRequest(byte[] payload) {
return new PipesMessage(PipesMessageType.NEW_REQUEST, payload);
}
public static PipesMessage finished(byte[] payload) {
return new PipesMessage(PipesMessageType.FINISHED, payload);
}
public static PipesMessage intermediateResult(byte[] payload) {
return new PipesMessage(PipesMessageType.INTERMEDIATE_RESULT, payload);
}
public static PipesMessage startupFailed(byte[] payload) {
return new PipesMessage(PipesMessageType.STARTUP_FAILED, payload);
}
public static PipesMessage crash(PipesMessageType crashType, byte[] payload) {
return new PipesMessage(crashType, payload);
}
/**
* Extracts the last-progress timestamp from a WORKING message payload.
*
* @return epoch millis of the last progress update reported by the server
*/
public long lastProgressMillis() {
if (type != PipesMessageType.WORKING) {
throw new IllegalStateException("lastProgressMillis() only valid for WORKING messages");
}
return ByteBuffer.wrap(payload)
.order(ByteOrder.BIG_ENDIAN)
.getLong();
}
}