ByteSseEntityConsumer.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityConsumer;
/**
* Low-allocation binary consumer for Server-Sent Events (SSE).
*
* <p>This consumer parses {@code text/event-stream} responses directly from a
* {@link ByteBuffer} without intermediate {@code char[]} conversion. It performs
* ASCII field matching in-place, accumulates lines until a blank line is reached,
* then emits one logical SSE event via the supplied {@link SseCallbacks}.</p>
*
* <h3>Behavior</h3>
* <ul>
* <li>Validates {@code Content-Type} equals {@code text/event-stream}
* in {@link #streamStart(ContentType)}; otherwise throws {@link HttpException}.</li>
* <li>Strips a UTF-8 BOM if present in the first chunk.</li>
* <li>Accepts LF and CRLF line endings; tolerates CRLF split across buffers.</li>
* <li>Implements WHATWG SSE fields: {@code data}, {@code id}, {@code event}, {@code retry}.
* Unknown fields and malformed {@code retry} values are ignored.</li>
* <li>At end of stream, flushes any partially accumulated line and forces a final
* dispatch of the current event if it has data.</li>
* </ul>
*
* <h3>Thread-safety</h3>
* <p>Instances are not thread-safe and are intended to be used by a single I/O thread
* per HTTP message, as per {@link AbstractBinAsyncEntityConsumer} contract.</p>
*
* <p><strong>Internal:</strong> this type is not part of the public API and may change
* without notice.</p>
*
*/
@Internal
public final class ByteSseEntityConsumer extends AbstractBinAsyncEntityConsumer<Void> {
private static final byte LF = (byte) '\n';
private static final byte CR = (byte) '\r';
private static final byte COLON = (byte) ':';
private static final byte SPACE = (byte) ' ';
private final SseCallbacks cb;
// line accumulator
private byte[] lineBuf = new byte[256];
private int lineLen = 0;
// event accumulator
private final StringBuilder data = new StringBuilder(256);
private String id;
private String type; // defaults to "message"
// Robust BOM skipper (works across multiple chunks)
// Matches 0xEF 0xBB 0xBF at the very beginning of the stream
private int bomMatched = 0; // 0..3 bytes matched so far
private boolean bomDone = false; // once true, no further BOM detection
public ByteSseEntityConsumer(final SseCallbacks callbacks) {
this.cb = callbacks;
}
@Override
public void streamStart(final ContentType contentType) throws HttpException, IOException {
final String mt = contentType != null ? contentType.getMimeType() : null;
if (!"text/event-stream".equalsIgnoreCase(mt)) {
throw new HttpException("Unexpected Content-Type: " + mt);
}
cb.onOpen();
}
@Override
protected void data(final ByteBuffer src, final boolean endOfStream) {
if (!bomDone) {
while (src.hasRemaining() && bomMatched < 3) {
final int expected = (bomMatched == 0) ? 0xEF : (bomMatched == 1 ? 0xBB : 0xBF);
final int b = src.get() & 0xFF;
if (b == expected) {
bomMatched++;
if (bomMatched == 3) {
// Full BOM consumed, mark as done and proceed
bomDone = true;
}
continue;
}
if (bomMatched > 0) {
appendByte((byte) 0xEF);
if (bomMatched >= 2) {
appendByte((byte) 0xBB);
}
}
appendByte((byte) b);
bomMatched = 0;
bomDone = true;
break; // drop into normal loop below for the rest of 'src'
}
if (!bomDone && !src.hasRemaining()) {
if (endOfStream) {
flushEndOfStream();
}
return;
}
}
while (src.hasRemaining()) {
final byte b = src.get();
if (b == LF) {
int len = lineLen;
if (len > 0 && lineBuf[len - 1] == CR) {
len--;
}
handleLine(lineBuf, len);
lineLen = 0;
} else {
appendByte(b);
}
}
if (endOfStream) {
flushEndOfStream();
}
}
private void flushEndOfStream() {
if (lineLen > 0) {
int len = lineLen;
if (lineBuf[len - 1] == CR) {
len--;
}
handleLine(lineBuf, len);
lineLen = 0;
}
handleLine(lineBuf, 0);
}
private void appendByte(final byte b) {
ensureCapacity(lineLen + 1);
lineBuf[lineLen++] = b;
}
@Override
protected int capacityIncrement() {
return 8192;
}
@Override
protected Void generateContent() {
return null;
}
@Override
public void releaseResources() {
lineBuf = new byte[0];
data.setLength(0);
id = null;
type = null;
bomMatched = 0;
bomDone = false;
}
private void handleLine(final byte[] buf, final int len) {
if (len == 0) {
dispatch();
return;
}
if (buf[0] == (byte) ':') {
// comment -> ignore
return;
}
int colon = -1;
for (int i = 0; i < len; i++) {
if (buf[i] == COLON) {
colon = i;
break;
}
}
final int fEnd = colon >= 0 ? colon : len;
int vStart = colon >= 0 ? colon + 1 : len;
if (vStart < len && buf[vStart] == SPACE) {
vStart++;
}
final int fLen = fEnd; // since field starts at 0
// Compare ASCII field name without allocations
if (fLen == 4 && buf[0] == 'd' && buf[1] == 'a' && buf[2] == 't' && buf[3] == 'a') {
final String v = new String(buf, vStart, len - vStart, StandardCharsets.UTF_8);
data.append(v).append('\n');
} else if (fLen == 5 && buf[0] == 'e' && buf[1] == 'v' && buf[2] == 'e' && buf[3] == 'n' && buf[4] == 't') {
type = new String(buf, vStart, len - vStart, StandardCharsets.UTF_8);
} else if (fLen == 2 && buf[0] == 'i' && buf[1] == 'd') {
// ignore if contains NUL per spec
boolean hasNul = false;
for (int i = vStart; i < len; i++) {
if (buf[i] == 0) {
hasNul = true;
break;
}
}
if (!hasNul) {
id = new String(buf, vStart, len - vStart, StandardCharsets.UTF_8);
}
} else if (fLen == 5 && buf[0] == 'r' && buf[1] == 'e' && buf[2] == 't' && buf[3] == 'r' && buf[4] == 'y') {
final long retry = parseLongAscii(buf, vStart, len - vStart);
if (retry >= 0) {
cb.onRetry(retry);
}
}
}
private void dispatch() {
if (data.length() == 0) {
type = null;
return;
}
final int n = data.length();
if (n > 0 && data.charAt(n - 1) == '\n') {
data.setLength(n - 1);
}
cb.onEvent(id, type != null ? type : "message", data.toString());
data.setLength(0);
type = null; // id persists
}
private void ensureCapacity(final int cap) {
if (cap <= lineBuf.length) {
return;
}
int n = lineBuf.length << 1;
if (n < cap) {
n = cap;
}
final byte[] nb = new byte[n];
System.arraycopy(lineBuf, 0, nb, 0, lineLen);
lineBuf = nb;
}
private static long parseLongAscii(final byte[] arr, final int off, final int len) {
if (len <= 0) {
return -1L;
}
long v = 0L;
for (int i = 0; i < len; i++) {
final int d = arr[off + i] - '0';
if (d < 0 || d > 9) {
return -1L;
}
v = v * 10L + d;
if (v < 0) {
return -1L;
}
}
return v;
}
}