BodyDeferringAsyncHandlerTest.java

/*
 * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
 *
 * This program is licensed to you under the Apache License Version 2.0,
 * and you may not use this file except in compliance with the Apache License Version 2.0.
 * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the Apache License Version 2.0 is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
 */
package org.asynchttpclient.handler;

import io.github.artsok.RepeatedIfExceptionsTest;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.asynchttpclient.exception.RemotelyClosedException;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_OCTET_STREAM;
import static org.apache.commons.io.IOUtils.copy;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;
import static org.asynchttpclient.test.TestUtils.findFreePort;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BodyDeferringAsyncHandlerTest extends AbstractBasicTest {

    static final int CONTENT_LENGTH_VALUE = 100000;

    @Override
    public AbstractHandler configureHandler() throws Exception {
        return new SlowAndBigHandler();
    }

    private static AsyncHttpClientConfig getAsyncHttpClientConfig() {
        // for this test brevity's sake, we are limiting to 1 retries
        return config().setMaxRequestRetry(0).setRequestTimeout(Duration.ofSeconds(10)).build();
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void deferredSimple() throws Exception {
        try (AsyncHttpClient client = asyncHttpClient(getAsyncHttpClientConfig())) {
            BoundRequestBuilder r = client.prepareGet(getTargetUrl());

            CountingOutputStream cos = new CountingOutputStream();
            BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(cos);
            Future<Response> f = r.execute(bdah);
            Response resp = bdah.getResponse();
            assertNotNull(resp);
            assertEquals(HttpServletResponse.SC_OK, resp.getStatusCode());
            assertEquals(String.valueOf(CONTENT_LENGTH_VALUE), resp.getHeader(CONTENT_LENGTH));

            // we got headers only, it's probably not all yet here (we have BIG file
            // downloading)
            assertTrue(cos.getByteCount() <= CONTENT_LENGTH_VALUE);

            // now be polite and wait for body arrival too (otherwise we would be
            // dropping the "line" on server)
            assertDoesNotThrow(() -> f.get());

            // it all should be here now
            assertEquals(cos.getByteCount(), CONTENT_LENGTH_VALUE);
        }
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void deferredSimpleWithFailure() throws Throwable {
        try (AsyncHttpClient client = asyncHttpClient(getAsyncHttpClientConfig())) {
            BoundRequestBuilder requestBuilder = client.prepareGet(getTargetUrl()).addHeader("X-FAIL-TRANSFER", Boolean.TRUE.toString());

            CountingOutputStream cos = new CountingOutputStream();
            BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(cos);
            Future<Response> f = requestBuilder.execute(bdah);
            Response resp = bdah.getResponse();
            assertNotNull(resp);
            assertEquals(HttpServletResponse.SC_OK, resp.getStatusCode());
            assertEquals(String.valueOf(CONTENT_LENGTH_VALUE), resp.getHeader(CONTENT_LENGTH));
            // we got headers only, it's probably not all yet here (we have BIG file
            // downloading)
            assertTrue(cos.getByteCount() <= CONTENT_LENGTH_VALUE);

            // now be polite and wait for body arrival too (otherwise we would be
            // dropping the "line" on server)
            try {
                assertThrows(ExecutionException.class, () -> f.get());
            } catch (Exception ex) {
                assertInstanceOf(RemotelyClosedException.class, ex.getCause());
            }
            assertNotEquals(CONTENT_LENGTH_VALUE, cos.getByteCount());
        }
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void deferredInputStreamTrick() throws Exception {
        try (AsyncHttpClient client = asyncHttpClient(getAsyncHttpClientConfig())) {
            BoundRequestBuilder r = client.prepareGet(getTargetUrl());

            PipedOutputStream pos = new PipedOutputStream();
            PipedInputStream pis = new PipedInputStream(pos);
            BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(pos);

            Future<Response> f = r.execute(bdah);

            BodyDeferringInputStream is = new BodyDeferringInputStream(f, bdah, pis);

            Response resp = is.getAsapResponse();
            assertNotNull(resp);
            assertEquals(HttpServletResponse.SC_OK, resp.getStatusCode());
            assertEquals(String.valueOf(CONTENT_LENGTH_VALUE), resp.getHeader(CONTENT_LENGTH));
            // "consume" the body, but our code needs input stream
            CountingOutputStream cos = new CountingOutputStream();
            try {
                copy(is, cos);
            } finally {
                is.close();
                cos.close();
            }

            // now we don't need to be polite, since consuming and closing
            // BodyDeferringInputStream does all.
            // it all should be here now
            assertEquals(CONTENT_LENGTH_VALUE, cos.getByteCount());
        }
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void deferredInputStreamTrickWithFailure() throws Throwable {
        try (AsyncHttpClient client = asyncHttpClient(getAsyncHttpClientConfig())) {
            BoundRequestBuilder r = client.prepareGet(getTargetUrl()).addHeader("X-FAIL-TRANSFER", Boolean.TRUE.toString());
            PipedOutputStream pos = new PipedOutputStream();
            PipedInputStream pis = new PipedInputStream(pos);
            BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(pos);

            Future<Response> f = r.execute(bdah);

            BodyDeferringInputStream is = new BodyDeferringInputStream(f, bdah, pis);

            Response resp = is.getAsapResponse();
            assertNotNull(resp);
            assertEquals(resp.getStatusCode(), HttpServletResponse.SC_OK);
            assertEquals(resp.getHeader(CONTENT_LENGTH), String.valueOf(CONTENT_LENGTH_VALUE));
            // "consume" the body, but our code needs input stream
            CountingOutputStream cos = new CountingOutputStream();

            try (is; cos) {
                copy(is, cos);
            } catch (Exception ex) {
                assertInstanceOf(RemotelyClosedException.class, ex.getCause());
            }
        }
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void deferredInputStreamTrickWithCloseConnectionAndRetry() throws Throwable {
        try (AsyncHttpClient client = asyncHttpClient(config().setMaxRequestRetry(1).setRequestTimeout(Duration.ofSeconds(10)).build())) {
            BoundRequestBuilder r = client.prepareGet(getTargetUrl()).addHeader("X-CLOSE-CONNECTION", Boolean.TRUE.toString());
            PipedOutputStream pos = new PipedOutputStream();
            PipedInputStream pis = new PipedInputStream(pos);
            BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(pos);

            Future<Response> f = r.execute(bdah);

            BodyDeferringInputStream is = new BodyDeferringInputStream(f, bdah, pis);

            Response resp = is.getAsapResponse();
            assertNotNull(resp);
            assertEquals(resp.getStatusCode(), HttpServletResponse.SC_OK);
            assertEquals(resp.getHeader(CONTENT_LENGTH), String.valueOf(CONTENT_LENGTH_VALUE));
            // "consume" the body, but our code needs input stream
            CountingOutputStream cos = new CountingOutputStream();

            try (is; cos) {
                copy(is, cos);
            } catch (Exception ex) {
                assertInstanceOf(UnsupportedOperationException.class, ex.getCause());
            }
        }
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void testConnectionRefused() throws Exception {
        int newPortWithoutAnyoneListening = findFreePort();
        try (AsyncHttpClient client = asyncHttpClient(getAsyncHttpClientConfig())) {
            BoundRequestBuilder r = client.prepareGet("http://localhost:" + newPortWithoutAnyoneListening + "/testConnectionRefused");

            CountingOutputStream cos = new CountingOutputStream();
            BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(cos);
            r.execute(bdah);
            assertThrows(IOException.class, () -> bdah.getResponse());
        }
    }

    @RepeatedIfExceptionsTest(repeats = 5)
    public void testPipedStreams() throws Exception {
        try (AsyncHttpClient client = asyncHttpClient(getAsyncHttpClientConfig())) {
            PipedOutputStream pout = new PipedOutputStream();
            try (PipedInputStream pin = new PipedInputStream(pout)) {
                BodyDeferringAsyncHandler handler = new BodyDeferringAsyncHandler(pout);
                ListenableFuture<Response> respFut = client.prepareGet(getTargetUrl()).execute(handler);

                Response resp = handler.getResponse();
                assertEquals(200, resp.getStatusCode());

                try (BodyDeferringInputStream is = new BodyDeferringInputStream(respFut, handler, pin)) {
                    String body = IOUtils.toString(is, StandardCharsets.UTF_8);
                    System.out.println("Body: " + body);
                    assertTrue(body.contains("ABCDEF"));
                }
            }
        }
    }

    public static class SlowAndBigHandler extends AbstractHandler {

        @Override
        public void handle(String pathInContext, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException {
            httpResponse.setStatus(200);
            httpResponse.setContentLength(CONTENT_LENGTH_VALUE);
            httpResponse.setContentType(APPLICATION_OCTET_STREAM.toString());

            httpResponse.flushBuffer();

            final boolean wantConnectionClose = httpRequest.getHeader("X-CLOSE-CONNECTION") != null;
            final boolean wantFailure = httpRequest.getHeader("X-FAIL-TRANSFER") != null;
            final boolean wantSlow = httpRequest.getHeader("X-SLOW") != null;

            OutputStream os = httpResponse.getOutputStream();
            for (int i = 0; i < CONTENT_LENGTH_VALUE; i++) {
                os.write(i % 255);

                if (wantSlow) {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException ex) {
                        // nuku
                    }
                }

                if (i > CONTENT_LENGTH_VALUE / 2) {
                    if (wantFailure) {
                        // kaboom
                        // yes, response is committed, but Jetty does aborts and
                        // drops connection
                        httpResponse.sendError(500);
                        break;
                    } else if (wantConnectionClose) {
                        // kaboom^2
                        httpResponse.getOutputStream().close();
                    }
                }
            }

            httpResponse.getOutputStream().flush();
            httpResponse.getOutputStream().close();
        }
    }

    // a /dev/null but counting how many bytes it ditched
    public static class CountingOutputStream extends OutputStream {
        private int byteCount;

        @Override
        public void write(int b) {
            // /dev/null
            byteCount++;
        }

        int getByteCount() {
            return byteCount;
        }
    }
}