InboundSseEventProcessor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cxf.jaxrs.sse.client;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.sse.InboundSseEvent;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.jaxrs.client.ClientProviderFactory;
import org.apache.cxf.jaxrs.impl.ResponseImpl;
import org.apache.cxf.jaxrs.sse.client.InboundSseEventImpl.Builder;
import org.apache.cxf.message.Message;

public class InboundSseEventProcessor {
    public static final String SERVER_SENT_EVENTS = "text/event-stream";
    public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);

    private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class);
    private static final String COMMENT = ":";
    private static final String EVENT = "event:";
    private static final String ID = "id:";
    private static final String RETRY = "retry:";
    private static final String DATA = "data:";

    private final Endpoint endpoint;
    private final InboundSseEventListener listener;
    private final ExecutorService executor;
    private final boolean discardIncomplete;
    
    private volatile boolean closed;
    
    protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener) {
        this(endpoint, listener, true);
    }
    
    protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener, boolean discardIncomplete) {
        this.endpoint = endpoint;
        this.listener = listener;
        this.discardIncomplete = discardIncomplete;
        this.executor = Executors.newSingleThreadScheduledExecutor();
    }
    
    void run(final Response response) {
        if (closed) {
            throw new IllegalStateException("The SSE Event Processor is already closed");
        }
        
        final InputStream is = response.readEntity(InputStream.class);
        final ClientProviderFactory factory = ClientProviderFactory.getInstance(endpoint);
        
        Message message = null;
        if (response instanceof ResponseImpl) {
            message = ((ResponseImpl)response).getOutMessage();
        }
        
        executor.submit(process(response, is, factory, message));
    }
    
    private Callable<?> process(Response response, InputStream is, ClientProviderFactory factory, Message message) {
        return () -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
                String line = reader.readLine();
                InboundSseEventImpl.Builder builder = null;

                while (line != null && !Thread.interrupted() && !closed) {
                    if (StringUtils.isEmpty(line) && builder != null) { /* empty new line */
                        final InboundSseEvent event = builder.build(factory, message);
                        builder = null; /* reset the builder for next event */
                        listener.onNext(event);
                    } else {
                        // Parsing and interpreting event stream: 
                        // https://www.w3.org/TR/eventsource/#parsing-an-event-stream
                        if (line.startsWith(EVENT)) {
                            int beginIndex = findFirstNonSpacePosition(line, EVENT);
                            builder = getOrCreate(builder).name(line.substring(beginIndex));
                        } else if (line.startsWith(ID)) {
                            int beginIndex = findFirstNonSpacePosition(line, ID);
                            builder = getOrCreate(builder).id(line.substring(beginIndex));
                        } else if (line.startsWith(COMMENT)) {
                            int beginIndex = findFirstNonSpacePosition(line, COMMENT);
                            builder = getOrCreate(builder).comment(line.substring(beginIndex));
                        } else if (line.startsWith(RETRY)) {
                            int beginIndex = findFirstNonSpacePosition(line, RETRY);
                            builder = getOrCreate(builder).reconnectDelay(line.substring(beginIndex));
                        } else if (line.startsWith(DATA)) {
                            int beginIndex = findFirstNonSpacePosition(line, DATA);
                            builder = getOrCreate(builder).appendData(line.substring(beginIndex));
                        }
                    }
                    line = reader.readLine();
                }
                
                if (builder != null) {
                    // As per https://www.w3.org/TR/2021/SPSD-eventsource-20210128/#event-stream-interpretation:
                    //
                    //   ... Once the end of the file is reached, any pending data must be discarded. 
                    //   (If the file ends in the middle of an event, before the final empty line, 
                    //   the incomplete event is not dispatched.) ...
                    //
                    if (discardIncomplete /* default */) {
                        LOG.fine("Discarding incomplete SSE event");
                    } else {
                        listener.onNext(builder.build(factory, message));
                    }
                }

                // complete the stream
                listener.onComplete();
            } catch (final Exception ex) {
                listener.onError(ex);
            }

            if (response != null) {
                LOG.fine("Closing the response");
                response.close();
            }

            return null;
        };
    }
    
    boolean isClosed() {
        return closed;
    }
    
    boolean close(long timeout, TimeUnit unit) {
        try {
            closed = true;
            
            if (executor.isShutdown()) {
                return true;
            }
            
            AccessController.doPrivileged((PrivilegedAction<Void>)
                () -> { 
                    executor.shutdown();
                    return null;
                });
            return executor.awaitTermination(timeout, unit);
        } catch (final InterruptedException ex) {
            return false;
        }
    }
    
    /**
     * Create builder on-demand, without explicit event demarcation
     */
    private static Builder getOrCreate(final Builder builder) {
        return (builder == null) ? new InboundSseEventImpl.Builder() : builder;
    }
    
    /**
     * Remove only leading spaces from the line as per specification, space after 
     * the colon is optional.
     * 
     * The following stream fires two identical events:
     * 
     *   data:test
     *   data: test
     *   
     *   This is because the space after the colon is ignored if present.
     */
    private static int findFirstNonSpacePosition(final String str, final String prefix) {
        int beginIndex = prefix.length();
        
        for (; beginIndex < str.length(); ++beginIndex) {
            if (str.charAt(beginIndex) != ' ') {
                break;
            }
        }
        
        return beginIndex;
    }
}