AsyncServerPipeline.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.core5.http.nio.support;

import java.io.IOException;

import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.ExchangeHandler;
import org.apache.hc.core5.http.HandlerResolver;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.MethodNotAllowedException;
import org.apache.hc.core5.http.UnsupportedMediaTypeException;
import org.apache.hc.core5.http.Validator;
import org.apache.hc.core5.http.impl.ServerSupport;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseProducer;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.http.protocol.HttpContext;

/**
 * Server side execution pipeline assembler that supplies {@link AsyncServerExchangeHandler} instances
 * with the defined message exchange pipeline.
 * <p>
 * Please note that {@link AsyncServerExchangeHandler} are stateful and may not be used concurrently
 * by multiple message exchanges or re-used for subsequent message exchanges.
 *
 * @since 5.5
 */
public final class AsyncServerPipeline {

    public static AsyncServerPipeline assemble() {
        return new AsyncServerPipeline();
    }

    /**
     * Configures the pipeline to process the incoming request message stream provided the request
     * message passes the validation.
     */
    public RequestContentStage request(final Validator<HttpRequest> requestValidator) {
        return new RequestContentStage(requestValidator);
    }

    /**
     * Configures the pipeline to process the incoming request message stream provided the request
     * method of the incoming message is allowed.
     */
    public RequestContentStage request(final Method... allowedMethods) {
        return new RequestContentStage(request -> {
            final String method = request.getMethod();
            if (!ServerSupport.isMethodAllowed(method, allowedMethods)) {
                throw new MethodNotAllowedException(method + " not allowed");
            }
        });
    }

    /**
     * Configures the pipeline to process the incoming request message stream.
     */
    public RequestContentStage request() {
        return new RequestContentStage(null);
    }

    /**
     * Request content processing stage.
     */
    public static class RequestContentStage {

        private final Validator<HttpRequest> requestValidator;

        private RequestContentStage(final Validator<HttpRequest> requestValidator) {
            this.requestValidator = requestValidator;
        }

        /**
         * Resolves {@link AsyncRequestConsumer} to be used by the pipeline to process the incoming
         * request message stream.
         *
         * @param <T> request content representation.
         */
        public <T> ResponseStage<T> consume(
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<T>> requestConsumerResolver) {
            return new ResponseStage<>(requestValidator, requestConsumerResolver);
        }

        /**
         * Resolves {@link AsyncEntityConsumer} to be used by the pipeline to process the incoming
         * request content stream. Resolver may return {@code null} if the media type is not supported.
         *
         * @param <T> request content representation.
         */
        public <T> ResponseStage<Message<HttpRequest, T>> consumeContent(
                final Resolver<ContentType, Supplier<AsyncEntityConsumer<T>>> dataConsumerResolver) {
            return new ResponseStage<>(
                    requestValidator,
                    (request, entityDetails, context) -> {
                        if (entityDetails == null) {
                            return new BasicRequestConsumer<>(DiscardingEntityConsumer::new);
                        }
                        final ContentType contentType = ContentType.parseLenient(entityDetails.getContentType());
                        final Supplier<AsyncEntityConsumer<T>> supplier = dataConsumerResolver.resolve(contentType);
                        if (supplier == null) {
                            throw new UnsupportedMediaTypeException(contentType);
                        }
                        return new BasicRequestConsumer<>(supplier);
                    });
        }

        /**
         * Configures the pipeline to process the incoming request content as a String.
         */
        public ResponseStage<Message<HttpRequest, String>> asString() {
            return consumeContent(contentType -> StringAsyncEntityConsumer::new);
        }

        /**
         * Configures the pipeline to process the incoming request content as a byte array.
         */
        public ResponseStage<Message<HttpRequest, byte[]>> asByteArray() {
            return consumeContent(contentType -> BasicAsyncEntityConsumer::new);
        }

        /**
         * Configures the pipeline to ignore and discard the incoming request content.
         */
        public ResponseStage<Message<HttpRequest, Void>> ignoreContent() {
            return consumeContent(contentType -> DiscardingEntityConsumer::new);
        }

    }

    /**
     * Response message stage.
     */
    public static class ResponseStage<I> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;

        private ResponseStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
        }

        /**
         * Configures the pipeline to produces an outgoing response message stream based on
         * the incoming request message and content.
         */
        public ResponseContentStage<I> response() {
            return new ResponseContentStage<>(requestValidator, requestConsumerResolver);
        }

    }

    /**
     * Response content generation stage.
     */
    public static class ResponseContentStage<I> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;

        private ResponseContentStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
        }

        /**
         * Resolves {@link AsyncResponseProducer} to be used by the pipeline to generate the outgoing response
         * message stream based on the given response message object.
         *
         * @param <O> response content representation.
         */
        public <O> RequestHandlingStage<I, O> produce(
                final Resolver<O, AsyncResponseProducer> responseProducerResolver) {
            return new RequestHandlingStage<>(requestValidator, requestConsumerResolver, responseProducerResolver);
        }

        /**
         * Resolves {@link AsyncEntityProducer} to be used by the pipeline to generate the outgoing response
         * content stream based on the given response content object.
         *
         * @param <O> response content representation.
         */
        public <O> RequestHandlingStage<I, Message<HttpResponse, O>> produceContent(
                final Resolver<O, AsyncEntityProducer> dataProducerResolver) {
            return new RequestHandlingStage<>(
                    requestValidator,
                    requestConsumerResolver,
                    m ->
                            new BasicResponseProducer(m.head(), dataProducerResolver.resolve(m.body())));
        }

        /**
         * Configures the pipeline to represent the response message content as a String.
         */
        public RequestHandlingStage<I, Message<HttpResponse, String>> asString(final ContentType contentType) {
            return produceContent(s -> new StringAsyncEntityProducer(s, contentType));
        }

        /**
         * Configures the pipeline to represent the response message content as byre array.
         */
        public RequestHandlingStage<I, Message<HttpResponse, byte[]>> asByteArray(final ContentType contentType) {
            return produceContent(bytes -> new BasicAsyncEntityProducer(bytes, contentType));
        }

    }

    /**
     * Request handling stage.
     */
    public static class RequestHandlingStage<I, O> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;
        private final Resolver<O, AsyncResponseProducer> responseProducerResolver;

        private RequestHandlingStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver,
                final Resolver<O, AsyncResponseProducer> responseProducerResolver) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
            this.responseProducerResolver = responseProducerResolver;
        }

        /**
         * Configures the pipeline to resolve the exception to a response message stream.
         */
        public ExceptionStage<I, O> exception(
                final Resolver<Exception, AsyncResponseProducer> exceptionMapper) {
            return new ExceptionStage<>(requestValidator, requestConsumerResolver, responseProducerResolver, exceptionMapper);
        }

        /**
         * Configures the pipeline to resolve the exception to a response error message. The response status
         * code will be determined by default based on the exception type.
         */
        public ExceptionStage<I, O> errorMessage(
                final Resolver<Exception, String> messageMapper) {
            return exception(ex ->
                    new BasicResponseProducer(
                            new BasicHttpResponse(ServerSupport.toStatusCode(ex)),
                            messageMapper.resolve(ex),
                            ContentType.TEXT_PLAIN));
        }

        /**
         * Configures the pipeline to handle the message exchange by generating a response object
         * based on the properties of the request object.
         */
        public CompletionStage<I, O> handle(final ExchangeHandler<I, O> exchangeHandler) {
            return errorMessage(ServerSupport::toErrorMessage).handle(exchangeHandler);
        }

    }

    /**
     * Exception handling stage.
     */
    public static class ExceptionStage<I, O> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;
        private final Resolver<O, AsyncResponseProducer> responseProducerResolver;
        private final Resolver<Exception, AsyncResponseProducer> exceptionMapper;

        private ExceptionStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver,
                final Resolver<O, AsyncResponseProducer> responseProducerResolver,
                final Resolver<Exception, AsyncResponseProducer> exceptionMapper) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
            this.responseProducerResolver = responseProducerResolver;
            this.exceptionMapper = exceptionMapper;
        }

        /**
         * Configures the pipeline to handle the message exchange by generating a response object
         * based on the properties of the request object.
         */
        public CompletionStage<I, O> handle(final ExchangeHandler<I, O> exchangeHandler) {
            return new CompletionStage<>(requestValidator, requestConsumerResolver, responseProducerResolver, exchangeHandler, exceptionMapper);
        }

    }

    /**
     * Exchange completion stage.
     */
    public static class CompletionStage<I, O> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;
        private final Resolver<O, AsyncResponseProducer> responseProducerResolver;
        private final ExchangeHandler<I, O> exchangeHandler;
        private final Resolver<Exception, AsyncResponseProducer> exceptionMapper;

        private CompletionStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver,
                final Resolver<O, AsyncResponseProducer> responseProducerResolver,
                final ExchangeHandler<I, O> exchangeHandler,
                final Resolver<Exception, AsyncResponseProducer> exceptionMapper) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
            this.responseProducerResolver = responseProducerResolver;
            this.exchangeHandler = exchangeHandler;
            this.exceptionMapper = exceptionMapper;
        }

        /**
         * Creates {@link Supplier} of {@link AsyncServerExchangeHandler} implementing the defined message
         * exchange pipeline.
         */
        public Supplier<AsyncServerExchangeHandler> supplier() {
            return () -> new AbstractServerExchangeHandler<I>() {

                @Override
                protected AsyncRequestConsumer<I> supplyConsumer(
                        final HttpRequest request,
                        final EntityDetails entityDetails,
                        final HttpContext context) throws HttpException {
                    if (requestValidator != null) {
                        requestValidator.validate(request);
                    }
                    final AsyncRequestConsumer<I> requestConsumer = requestConsumerResolver.resolve(request, entityDetails, context);
                    if (requestConsumer == null) {
                        throw new HttpException("Unable to process request");
                    }
                    return requestConsumer;
                }

                @Override
                protected void handle(
                        final I requestMessage,
                        final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
                        final HttpContext context) throws HttpException, IOException {
                    final O responseMessage = exchangeHandler.handle(requestMessage, context);
                    if (responseMessage == null) {
                        throw new HttpException("Unable to handle request");
                    }
                    final AsyncResponseProducer responseProducer = responseProducerResolver.resolve(responseMessage);
                    if (responseProducer == null) {
                        throw new HttpException("Unable to produce response");
                    }
                    responseTrigger.submitResponse(responseProducer, context);
                }

                @Override
                protected AsyncResponseProducer handleError(final Exception ex) {
                    final AsyncResponseProducer responseProducer = exceptionMapper != null ? exceptionMapper.resolve(ex) : null;
                    return responseProducer != null ? responseProducer : super.handleError(ex);
                }

            };
        }

    }

}