TestFutureRequestExecutionService.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.testing.sync;

import static org.hamcrest.MatcherAssert.assertThat;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.FutureRequestExecutionService;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

@SuppressWarnings("boxing") // test code
class TestFutureRequestExecutionService {

    private HttpServer localServer;
    private String uri;
    private FutureRequestExecutionService httpAsyncClientWithFuture;

    private final AtomicBoolean blocked = new AtomicBoolean(false);

    @BeforeEach
    void before() throws Exception {
        this.localServer = ServerBootstrap.bootstrap()
                .setCanonicalHostName("localhost")
                .register("/wait", (request, response, context) -> {
                    try {
                        while (blocked.get()) {
                            Thread.sleep(10);
                        }
                    } catch (final InterruptedException e) {
                        throw new IllegalStateException(e);
                    }
                    response.setCode(200);
                }).create();

        this.localServer.start();
        uri = "http://localhost:" + this.localServer.getLocalPort() + "/wait";
        final HttpClientConnectionManager cm = PoolingHttpClientConnectionManagerBuilder.create()
                .setMaxConnPerRoute(5)
                .build();
        final CloseableHttpClient httpClient = HttpClientBuilder.create()
                .setConnectionManager(cm)
                .build();
        final ExecutorService executorService = Executors.newFixedThreadPool(5);
        httpAsyncClientWithFuture = new FutureRequestExecutionService(httpClient, executorService);
    }

    @AfterEach
    void after() throws Exception {
        blocked.set(false); // any remaining requests should unblock
        this.localServer.stop();
        httpAsyncClientWithFuture.close();
    }

    @Test
    void shouldExecuteSingleCall() throws InterruptedException, ExecutionException {
        final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
            new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
        Assertions.assertTrue(task.get(), "request should have returned OK");
    }

    @Test @Disabled("Fails intermittently with GitHub Actions. Needs to be revised")
    void shouldCancel() {
        final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
            new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
        task.cancel(true);
        final Exception exception = Assertions.assertThrows(Exception.class, task::get);
        assertThat(exception, CoreMatchers.anyOf(
                CoreMatchers.instanceOf(CancellationException.class),
                CoreMatchers.instanceOf(ExecutionException.class)));
    }

    @Test
    void shouldTimeout() {
        blocked.set(true);
        final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
            new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
        Assertions.assertThrows(TimeoutException.class, () ->
                task.get(10, TimeUnit.MILLISECONDS));
    }

    @Test
    void shouldExecuteMultipleCalls() throws Exception {
        final int reqNo = 100;
        final Queue<Future<Boolean>> tasks = new LinkedList<>();
        for (int i = 0; i < reqNo; i++) {
            final Future<Boolean> task = httpAsyncClientWithFuture.execute(
                    new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
            tasks.add(task);
        }
        for (final Future<Boolean> task : tasks) {
            final Boolean b = task.get();
            Assertions.assertNotNull(b);
            Assertions.assertTrue(b, "request should have returned OK");
        }
    }

    @Test
    void shouldExecuteMultipleCallsAndCallback() throws Exception {
        final int reqNo = 100;
        final Queue<Future<Boolean>> tasks = new LinkedList<>();
        final CountDownLatch latch = new CountDownLatch(reqNo);
        for (int i = 0; i < reqNo; i++) {
            final Future<Boolean> task = httpAsyncClientWithFuture.execute(
                    new HttpGet(uri), HttpClientContext.create(),
                    new OkidokiHandler(), new CountingCallback(latch));
            tasks.add(task);
        }
        Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
        for (final Future<Boolean> task : tasks) {
            final Boolean b = task.get();
            Assertions.assertNotNull(b);
            Assertions.assertTrue(b, "request should have returned OK");
        }
    }

    private static final class CountingCallback implements FutureCallback<Boolean> {

        private final CountDownLatch latch;

        CountingCallback(final CountDownLatch latch) {
            super();
            this.latch = latch;
        }

        @Override
        public void failed(final Exception ex) {
            latch.countDown();
        }

        @Override
        public void completed(final Boolean result) {
            latch.countDown();
        }

        @Override
        public void cancelled() {
            latch.countDown();
        }
    }


    private static final class OkidokiHandler implements HttpClientResponseHandler<Boolean> {
        @Override
        public Boolean handleResponse(
                final ClassicHttpResponse response) {
            return response.getCode() == 200;
        }
    }

}