AsyncJsonServerPipeline.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.core5.jackson2.http;

import java.io.IOException;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.ExchangeHandler;
import org.apache.hc.core5.http.HandlerResolver;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.MethodNotAllowedException;
import org.apache.hc.core5.http.Validator;
import org.apache.hc.core5.http.impl.ServerSupport;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseProducer;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.jackson2.JsonConsumer;
import org.apache.hc.core5.jackson2.JsonResultSink;
import org.apache.hc.core5.jackson2.JsonTokenEventHandler;
import org.apache.hc.core5.util.Args;

/**
 * Server side execution pipeline assembler that supplies {@link AsyncServerExchangeHandler} instances
 * with the defined message exchange pipeline optimized for JSON message exchanges.
 * <p>
 * Please note that {@link AsyncServerExchangeHandler} are stateful and may not be used concurrently
 * by multiple message exchanges or re-used for subsequent message exchanges.
 *
 * @since 5.5
 */
public final class AsyncJsonServerPipeline {

    private final ObjectMapper objectMapper;

    private AsyncJsonServerPipeline(final ObjectMapper objectMapper) {
        this.objectMapper = Args.notNull(objectMapper, "Object mapper");
    }

    public static AsyncJsonServerPipeline assemble(final ObjectMapper objectMapper) {
        return new AsyncJsonServerPipeline(objectMapper);
    }

    /**
     * Configures the pipeline to process the incoming request message stream provided the request
     * message passes the validation.
     */
    public RequestContentStage request(final Validator<HttpRequest> requestValidator) {
        return new RequestContentStage(requestValidator);
    }

    /**
     * Configures the pipeline to process the incoming request message stream provided the request
     * method of the incoming message is allowed.
     */
    public RequestContentStage request(final Method... allowedMethods) {
        return new RequestContentStage(request -> {
            final String method = request.getMethod();
            if (!ServerSupport.isMethodAllowed(method, allowedMethods)) {
                throw new MethodNotAllowedException(method + " not allowed");
            }
        });
    }

    /**
     * Configures the pipeline to process the incoming request message stream.
     */
    public RequestContentStage request() {
        return new RequestContentStage(null);
    }

    /**
     * Request content processing stage.
     */
    public class RequestContentStage {

        private final Validator<HttpRequest> requestValidator;

        private RequestContentStage(final Validator<HttpRequest> requestValidator) {
            this.requestValidator = requestValidator;
        }

        /**
         * Resolves {@link AsyncRequestConsumer} to be used by the pipeline to process the incoming
         * request message stream.
         *
         * @param <T> request content representation.
         */
        public <T> ResponseStage<T> consume(
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<T>> requestConsumerResolver) {
            return new ResponseStage<>(requestValidator, requestConsumerResolver);
        }

        /**
         * Configures the pipeline to process the incoming request content as an object
         * with the given {@link JavaType}.
         */
        public <T> ResponseStage<Message<HttpRequest, T>> asObject(final JavaType javaType) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper, javaType));
        }

        /**
         * Configures the pipeline to process the incoming request content as an object
         * with the given {@link Class}.
         */
        public <T> ResponseStage<Message<HttpRequest, T>> asObject(final Class<T> clazz) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper, clazz));
        }

        /**
         * Configures the pipeline to process the incoming request content as an object
         * with the given {@link TypeReference}.
         */
        public <T> ResponseStage<Message<HttpRequest, T>> asObject(final TypeReference<T> typeReference) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper, typeReference));
        }

        /**
         * Configures the pipeline to process the incoming request content as a {@link JsonNode} instance.
         */
        public ResponseStage<Message<HttpRequest, JsonNode>> asJsonNode() {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper.getFactory()));
        }

        /**
         * Configures the pipeline to process the incoming request content as a sequence of objects
         * with the given {@link JavaType}.
         */
        public <T> ResponseStage<Long> asSequence(
                final JavaType javaType,
                final JsonConsumer<HttpRequest> validator,
                final JsonResultSink<T> resultSink) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper, javaType, validator, resultSink));
        }

        /**
         * Configures the pipeline to process the incoming request content as a sequence of objects
         * with the given {@link Class}.
         */
        public <T> ResponseStage<Long> asSequence(
                final Class<T> clazz,
                final JsonConsumer<HttpRequest> validator,
                final JsonResultSink<T> resultSink) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper, clazz, validator, resultSink));
        }

        /**
         * Configures the pipeline to process the incoming request content as a sequence of objects
         * with the given {@link TypeReference}.
         */
        public <T> ResponseStage<Long> asSequence(
                final TypeReference<T> typeReference,
                final JsonConsumer<HttpRequest> validator,
                final JsonResultSink<T> resultSink) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper, typeReference, validator, resultSink));
        }

        /**
         * Configures the pipeline to process the incoming request content as a sequence of objects
         * with the given {@link TypeReference}.
         */
        public ResponseStage<Void> asEvents(
                final JsonConsumer<HttpRequest> validator,
                final JsonTokenEventHandler eventHandler) {
            return consume((request, entityDetails, context) ->
                    JsonRequestConsumers.create(objectMapper.getFactory(), validator, eventHandler));
        }

        /**
         * Configures the pipeline to ignore and discard the incoming request content.
         */
        public ResponseStage<Message<HttpRequest, Void>> ignoreContent() {
            return consume((request, entityDetails, context) ->
                    new BasicRequestConsumer<>(DiscardingEntityConsumer::new));
        }

    }

    /**
     * Response message stage.
     */
    public class ResponseStage<I> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;

        private ResponseStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
        }

        /**
         * Configures the pipeline to produces an outgoing response message stream based on
         * the incoming request message and content.
         */
        public ResponseContentStage<I> response() {
            return new ResponseContentStage<>(requestValidator, requestConsumerResolver);
        }

    }

    /**
     * Response content generation stage.
     */
    public class ResponseContentStage<I> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;

        private ResponseContentStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
        }

        /**
         * Resolves {@link AsyncResponseProducer} to be used by the pipeline to generate the outgoing response
         * message stream based on the given response message object.
         *
         * @param <O> response content representation.
         */
        public <O> RequestHandlingStage<I, O> produce(
                final Resolver<O, AsyncResponseProducer> responseProducerResolver) {
            return new RequestHandlingStage<>(requestValidator, requestConsumerResolver, responseProducerResolver);
        }

        /**
         * Resolves {@link AsyncEntityProducer} to be used by the pipeline to generate the outgoing response
         * content stream based on the given response content object.
         *
         * @param <O> response content representation.
         */
        public <O> RequestHandlingStage<I, Message<HttpResponse, O>> produceContent(
                final Resolver<O, AsyncEntityProducer> dataProducerResolver) {
            return new RequestHandlingStage<>(
                    requestValidator,
                    requestConsumerResolver,
                    m ->
                            new BasicResponseProducer(m.head(), dataProducerResolver.resolve(m.body())));
        }

        /**
         * Configures the pipeline to represent the response message content as an object
         * with the given {@link Class}.
         */
        public <O> RequestHandlingStage<I, Message<HttpResponse, O>> asObject(final Class<O> clazz) {
            return produce(m -> JsonResponseProducers.create(m.head(), m.body(), objectMapper));
        }

        /**
         * Configures the pipeline to represent the response message content as an object.
         */
        public RequestHandlingStage<I, Message<HttpResponse, JsonNode>> asJsonNode() {
            return produce(m -> JsonResponseProducers.create(m.head(), m.body(), objectMapper));
        }

        /**
         * Configures the pipeline to represent the response message content as a sequence
         * of objects.
         */
        public <O> RequestHandlingStage<I, HttpResponse> asSequence(final ObjectProducer<O> objectProducer) {
            return produce(r -> JsonResponseProducers.create(r, objectMapper, objectProducer));
        }

    }

    /**
     * Request handling stage.
     */
    public class RequestHandlingStage<I, O> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;
        private final Resolver<O, AsyncResponseProducer> responseProducerResolver;

        private RequestHandlingStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver,
                final Resolver<O, AsyncResponseProducer> responseProducerResolver) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
            this.responseProducerResolver = responseProducerResolver;
        }

        /**
         * Configures the pipeline to resolve the exception to a response message stream.
         */
        public ExceptionStage<I, O> exception(
                final Resolver<Exception, AsyncResponseProducer> exceptionMapper) {
            return new ExceptionStage<>(requestValidator, requestConsumerResolver, responseProducerResolver, exceptionMapper);
        }

        /**
         * Configures the pipeline to resolve the exception to a response error message. The response status
         * code will be determined by default based on the exception type.
         */
        public ExceptionStage<I, O> errorMessage(
                final Resolver<Exception, String> messageMapper) {
            return exception(ex ->
                    new BasicResponseProducer(
                            new BasicHttpResponse(ServerSupport.toStatusCode(ex)),
                            messageMapper.resolve(ex),
                            ContentType.TEXT_PLAIN));
        }

        /**
         * Configures the pipeline to handle the message exchange by generating a response object
         * based on the properties of the request object.
         */
        public CompletionStage<I, O> handle(final ExchangeHandler<I, O> exchangeHandler) {
            return errorMessage(ServerSupport::toErrorMessage).handle(exchangeHandler);
        }

    }

    /**
     * Exception handling stage.
     */
    public class ExceptionStage<I, O> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;
        private final Resolver<O, AsyncResponseProducer> responseProducerResolver;
        private final Resolver<Exception, AsyncResponseProducer> exceptionMapper;

        private ExceptionStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver,
                final Resolver<O, AsyncResponseProducer> responseProducerResolver,
                final Resolver<Exception, AsyncResponseProducer> exceptionMapper) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
            this.responseProducerResolver = responseProducerResolver;
            this.exceptionMapper = exceptionMapper;
        }

        /**
         * Configures the pipeline to handle the message exchange by generating a response object
         * based on the properties of the request object.
         */
        public CompletionStage<I, O> handle(final ExchangeHandler<I, O> exchangeHandler) {
            return new CompletionStage<>(requestValidator, requestConsumerResolver, responseProducerResolver, exchangeHandler, exceptionMapper);
        }

    }

    /**
     * Exchange completion stage.
     */
    public class CompletionStage<I, O> {

        private final Validator<HttpRequest> requestValidator;
        private final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver;
        private final Resolver<O, AsyncResponseProducer> responseProducerResolver;
        private final ExchangeHandler<I, O> exchangeHandler;
        private final Resolver<Exception, AsyncResponseProducer> exceptionMapper;

        private CompletionStage(
                final Validator<HttpRequest> requestValidator,
                final HandlerResolver<HttpRequest, AsyncRequestConsumer<I>> requestConsumerResolver,
                final Resolver<O, AsyncResponseProducer> responseProducerResolver,
                final ExchangeHandler<I, O> exchangeHandler,
                final Resolver<Exception, AsyncResponseProducer> exceptionMapper) {
            this.requestValidator = requestValidator;
            this.requestConsumerResolver = requestConsumerResolver;
            this.responseProducerResolver = responseProducerResolver;
            this.exchangeHandler = exchangeHandler;
            this.exceptionMapper = exceptionMapper;
        }

        /**
         * Creates {@link Supplier} of {@link AsyncServerExchangeHandler} implementing the defined message
         * exchange pipeline.
         */
        public Supplier<AsyncServerExchangeHandler> supplier() {
            return () -> new AbstractServerExchangeHandler<I>() {

                @Override
                protected AsyncRequestConsumer<I> supplyConsumer(
                        final HttpRequest request,
                        final EntityDetails entityDetails,
                        final HttpContext context) throws HttpException {
                    if (requestValidator != null) {
                        requestValidator.validate(request);
                    }
                    final AsyncRequestConsumer<I> requestConsumer = requestConsumerResolver.resolve(request, entityDetails, context);
                    if (requestConsumer == null) {
                        throw new HttpException("Unable to process request");
                    }
                    return requestConsumer;
                }

                @Override
                protected void handle(
                        final I requestMessage,
                        final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
                        final HttpContext context) throws HttpException, IOException {
                    final O responseMessage = exchangeHandler.handle(requestMessage, context);
                    if (responseMessage == null) {
                        throw new HttpException("Unable to handle request");
                    }
                    final AsyncResponseProducer responseProducer = responseProducerResolver.resolve(responseMessage);
                    if (responseProducer == null) {
                        throw new HttpException("Unable to produce response");
                    }
                    responseTrigger.submitResponse(responseProducer, context);
                }

                @Override
                protected AsyncResponseProducer handleError(final Exception ex) {
                    final AsyncResponseProducer responseProducer = exceptionMapper != null ? exceptionMapper.resolve(ex) : null;
                    return responseProducer != null ? responseProducer : super.handleError(ex);
                }

            };
        }

    }

}