SseResponseConsumer.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.sse.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.function.LongConsumer;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.protocol.HttpContext;

/**
 * Internal response consumer that bridges an HTTP response to an SSE entity consumer.
 *
 * <p>Responsibilities:</p>
 * <ul>
 *   <li>Validate that the status is {@code 200 OK}; otherwise propagate a failure.</li>
 *   <li>Extract and pass a {@code Retry-After} hint (seconds or RFC-1123 date) to the caller
 *       via the provided {@link LongConsumer}.</li>
 *   <li>Treat {@code 204 No Content} as a terminal close (no reconnect), signaled with
 *       {@link StopReconnectException}.</li>
 * </ul>
 *
 * <p>This class is used internally by {@code DefaultEventSource}.</p>
 *
 * @since 5.7
 */
@Internal
public final class SseResponseConsumer implements AsyncResponseConsumer<Void> {

    private final AsyncEntityConsumer<Void> entity;
    private final LongConsumer retryHintSink; // may be null

    /**
     * Signals that the server requested a terminal close (no reconnect).
     */
    static final class StopReconnectException extends HttpException {
        StopReconnectException(final String msg) {
            super(msg);
        }
    }

    public SseResponseConsumer(final AsyncEntityConsumer<Void> entity, final LongConsumer retryHintSink) {
        this.entity = entity;
        this.retryHintSink = retryHintSink;
    }

    @Override
    public void consumeResponse(final HttpResponse rsp, final EntityDetails ed, final HttpContext ctx,
                                final FutureCallback<Void> cb)
            throws HttpException, IOException {
        final int code = rsp.getCode();
        if (code != HttpStatus.SC_OK) {
            final Header h = rsp.getFirstHeader(HttpHeaders.RETRY_AFTER);
            if (h != null && retryHintSink != null) {
                final long ms = parseRetryAfterMillis(h.getValue());
                if (ms >= 0) {
                    retryHintSink.accept(ms);
                }
            }
            if (code == HttpStatus.SC_NO_CONTENT) { // 204 => do not reconnect
                throw new StopReconnectException("Server closed stream (204)");
            }
            throw new HttpException("Unexpected status: " + code);
        }
        entity.streamStart(ed, cb);
    }

    @Override
    public void informationResponse(final HttpResponse response, final HttpContext context) {
        // no-op
    }

    @Override
    public void updateCapacity(final CapacityChannel channel) throws IOException {
        entity.updateCapacity(channel);
    }

    @Override
    public void consume(final ByteBuffer src) throws IOException {
        entity.consume(src);
    }

    @Override
    public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
        entity.streamEnd(trailers);
    }

    @Override
    public void failed(final Exception cause) {
        entity.failed(cause);
    }

    @Override
    public void releaseResources() {
        entity.releaseResources();
    }

    /**
     * Parses an HTTP {@code Retry-After} header value into milliseconds.
     * Accepts either a positive integer (seconds) or an RFC-1123 date.
     *
     * @return milliseconds to wait, or {@code -1} if unparseable.
     */
    private static long parseRetryAfterMillis(final String v) {
        final String s = v != null ? v.trim() : "";
        try {
            final long sec = Long.parseLong(s);
            return sec >= 0 ? sec * 1000L : -1L;
        } catch (final NumberFormatException ignore) {
            try {
                final ZonedDateTime t = ZonedDateTime.parse(s, DateTimeFormatter.RFC_1123_DATE_TIME);
                final long ms = Duration.between(ZonedDateTime.now(ZoneOffset.UTC), t).toMillis();
                return Math.max(0L, ms);
            } catch (final Exception ignore2) {
                return -1L;
            }
        }
    }
}