StreamingTest.java

/*
 * Copyright (c) 2022 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.apache5.connector;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.inject.Singleton;

import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.server.ChunkedOutput;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
 * @author Petr Janouch
 */
public class StreamingTest extends JerseyTest {
    private PoolingHttpClientConnectionManager connectionManager;

    /**
     * Test that a data stream can be terminated from the client side.
     */
    @Test
    public void clientCloseNoTimeoutTest() throws IOException {
        clientCloseTest(-1);
    }

    @Test
    public void clientCloseWithTimeOutTest() throws IOException {
        clientCloseTest(1_000);
    }

    /**
     * Tests that closing a response after completely reading the entity reuses the connection
     */
    @Test
    public void reuseConnectionTest() throws IOException {
        Response response = target().path("/streamingEndpoint/get").request().get();
        InputStream is = response.readEntity(InputStream.class);
        byte[] buf = new byte[8192];
        is.read(buf);
        is.close();
        response.close();

        assertEquals(1, connectionManager.getTotalStats().getAvailable());
        assertEquals(0, connectionManager.getTotalStats().getLeased());
    }

    /**
     * Tests that closing a request without reading the entity does not throw an exception.
     */
    @Test
    public void clientCloseThrowsNoExceptionTest() throws IOException {
        Response response = target().path("/streamingEndpoint/get").request().get();
        response.close();
    }

    @Override
    protected void configureClient(ClientConfig config) {
        connectionManager = new PoolingHttpClientConnectionManager();
        config.property(Apache5ClientProperties.CONNECTION_MANAGER, connectionManager);
        config.connectorProvider(new Apache5ConnectorProvider());
    }

    @Override
    protected Application configure() {
        return new ResourceConfig(StreamingEndpoint.class);
    }

    /**
     * Test that a data stream can be terminated from the client side.
     */
    private void clientCloseTest(int readTimeout) throws IOException {
        // start streaming
        AtomicInteger counter = new AtomicInteger(0);
        Invocation.Builder builder = target().path("/streamingEndpoint").request();
        if (readTimeout > -1) {
            counter.set(1);
            builder.property(ClientProperties.READ_TIMEOUT, readTimeout);
            builder.property(Apache5ClientProperties.CONNECTION_CLOSING_STRATEGY,
                    (Apache5ConnectionClosingStrategy) (config, request, response, stream) -> {
                try {
                    stream.close();
                } catch (Exception e) {
                    // timeout, no chunk ending
                } finally {
                    counter.set(0);
                    response.close();
                }
            });
        }
        InputStream inputStream = builder.get(InputStream.class);

        WebTarget sendTarget = target().path("/streamingEndpoint/send");
        // trigger sending 'A' to the stream; OK is sent if everything on the server was OK
        assertEquals("OK", sendTarget.request().get().readEntity(String.class));
        // check 'A' has been sent
        assertEquals('A', inputStream.read());
        // closing the stream should tear down the connection
        inputStream.close();
        // trigger sending another 'A' to the stream; it should fail
        // (indicating that the streaming has been terminated on the server)
        assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
        assertEquals(0, counter.get());
    }

    @Singleton
    @Path("streamingEndpoint")
    public static class StreamingEndpoint {

        private final ChunkedOutput<String> output = new ChunkedOutput<>(String.class);

        @GET
        @Path("send")
        public String sendEvent() {
            try {
                output.write("A");
            } catch (IOException e) {
                return "NOK";
            }

            return "OK";
        }

        @GET
        @Produces(MediaType.TEXT_PLAIN)
        public ChunkedOutput<String> get() {
            return output;
        }

        @GET
        @Path("get")
        @Produces(MediaType.TEXT_PLAIN)
        public String getString() {
            return "OK";
        }
    }
}