AsyncJsonClientPipeline.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.jackson2.http;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HandlerResolver;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.support.AbstractClientExchangeHandler;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.support.BasicRequestBuilder;
import org.apache.hc.core5.jackson2.JsonConsumer;
import org.apache.hc.core5.jackson2.JsonResultSink;
import org.apache.hc.core5.jackson2.JsonTokenEventHandler;
import org.apache.hc.core5.util.Args;
/**
* Client side execution pipeline assembler that creates {@link AsyncClientExchangeHandler} instances
* with the defined message exchange pipeline optimized for JSON message exchanges that triggers
* the given {@link FutureCallback} or {@link CompletableFuture} upon completion.
* <p>
* Please note that {@link AsyncClientExchangeHandler} are stateful and may not be used concurrently
* by multiple message exchanges or re-used for subsequent message exchanges.
*
* @since 5.5
*/
public final class AsyncJsonClientPipeline {
private static final String JSON_CONTENT_TYPE_TEXT = ContentType.APPLICATION_JSON.toString();
private final ObjectMapper objectMapper;
private AsyncJsonClientPipeline(final ObjectMapper objectMapper) {
this.objectMapper = Args.notNull(objectMapper, "Object mapper");
}
public static AsyncJsonClientPipeline assemble(final ObjectMapper objectMapper) {
return new AsyncJsonClientPipeline(objectMapper);
}
public RequestStage request() {
return new RequestStage();
}
/**
* Configures the pipeline to produce an outgoing message stream with the given
* request head.
*/
public RequestContentStage request(final HttpRequest request) {
return new RequestContentStage(request);
}
/**
* Request message stage.
*/
public class RequestStage {
private RequestStage() {
}
/**
* Configures {@link AsyncRequestProducer} to be used by the pipeline to generate the outgoing
* request message stream.
*/
public ResponseStage produce(final AsyncRequestProducer requestProducer) {
return new ResponseStage(requestProducer);
}
/**
* Configures the pipeline to produce an outgoing GET message stream.
*/
public ResponseStage get(final URI requestUri) {
return new ResponseStage(AsyncRequestBuilder.get(requestUri)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing GET message stream.
*/
public ResponseStage get(final HttpHost target, final String path) {
return new ResponseStage(AsyncRequestBuilder.get()
.setHttpHost(target)
.setPath(path)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing POST message stream.
*/
public RequestContentStage post(final URI requestUri) {
return new RequestContentStage(BasicRequestBuilder.post(requestUri)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing POST message stream.
*/
public RequestContentStage post(final HttpHost target, final String path) {
return new RequestContentStage(BasicRequestBuilder.post()
.setHttpHost(target)
.setPath(path)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing PUT message stream.
*/
public RequestContentStage put(final URI requestUri) {
return new RequestContentStage(BasicRequestBuilder.put(requestUri)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing PUT message stream.
*/
public RequestContentStage put(final HttpHost target, final String path) {
return new RequestContentStage(BasicRequestBuilder.put()
.setHttpHost(target)
.setPath(path)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing PATCH message stream.
*/
public RequestContentStage patch(final URI requestUri) {
return new RequestContentStage(BasicRequestBuilder.patch(requestUri)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
/**
* Configures the pipeline to produce an outgoing PATCH message stream.
*/
public RequestContentStage patch(final HttpHost target, final String path) {
return new RequestContentStage(BasicRequestBuilder.patch()
.setHttpHost(target)
.setPath(path)
.addHeader(HttpHeaders.ACCEPT, JSON_CONTENT_TYPE_TEXT)
.build());
}
}
/**
* Request content stage.
*/
public class RequestContentStage {
private final HttpRequest request;
private RequestContentStage(final HttpRequest request) {
this.request = request;
}
/**
* Configures {@link AsyncEntityProducer} to be used by the pipeline to generate the outgoing
* request content stream.
*/
public ResponseStage produceContent(final AsyncEntityProducer dataProducer) {
return new ResponseStage(new BasicRequestProducer(request, dataProducer));
}
/**
* Configures the pipeline to represent the outgoing message content as a byte array.
*/
public <T> ResponseStage asObject(final T content) {
return new ResponseStage(JsonRequestProducers.create(request, content, objectMapper));
}
/**
* Configures the pipeline to represent the outgoing message content as a byte array.
*/
public ResponseStage asJsonNode(final JsonNode content) {
return new ResponseStage(JsonRequestProducers.create(request, content, objectMapper));
}
/**
* Configures the pipeline to represent the outgoing message content as a sequence
* of objects.
*/
public <T> ResponseStage asSequence(final ObjectProducer<T> objectProducer) {
return new ResponseStage(JsonRequestProducers.create(request, objectMapper, objectProducer));
}
/**
* Configures the pipeline to represent the outgoing message without a content body.
*/
public ResponseStage noContent() {
return produceContent(null);
}
}
/**
* Response message stage.
*/
public class ResponseStage {
private final AsyncRequestProducer requestProducer;
private ResponseStage(final AsyncRequestProducer requestProducer) {
this.requestProducer = requestProducer;
}
/**
* Configures the pipeline to processes the incoming response message stream.
*/
public ResponseContentStage response() {
return new ResponseContentStage(requestProducer);
}
}
/**
* Response content generation stage.
*/
public class ResponseContentStage {
private final AsyncRequestProducer requestProducer;
private ResponseContentStage(final AsyncRequestProducer requestProducer) {
this.requestProducer = requestProducer;
}
/**
* Configures {@link AsyncResponseConsumer} to be used by the pipeline to process
* the incoming response message stream.
*
* @param <T> response content representation.
*/
public <T> ResultStage<T> consume(final HandlerResolver<HttpResponse, AsyncResponseConsumer<T>> responseConsumerResolver) {
return new ResultStage<>(requestProducer, responseConsumerResolver);
}
/**
* Configures the pipeline to process the incoming response content as an object with
* the given {@link JavaType}.
*/
public <T> ResultStage<Message<HttpResponse, T>> asObject(final JavaType javaType) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper, javaType));
}
/**
* Configures the pipeline to process the incoming response content as an object with
* the given {@link Class}.
*/
public <T> ResultStage<Message<HttpResponse, T>> asObject(final Class<T> clazz) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper, clazz));
}
/**
* Configures the pipeline to process the incoming response content as an object with
* the given {@link TypeReference}.
*/
public <T> ResultStage<Message<HttpResponse, T>> asObject(final TypeReference<T> typeReference) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper, typeReference));
}
/**
* Configures the pipeline to process the incoming response content as a {@link JsonNode} instance.
*/
public ResultStage<Message<HttpResponse, JsonNode>> asJsonNode() {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper.getFactory()));
}
/**
* Configures the pipeline to process the incoming response content as a sequence of objects
* with the given {@link JavaType}.
*/
public <T> ResultStage<Long> asSequence(
final JavaType javaType,
final JsonConsumer<HttpResponse> responseValidator,
final Callback<String> errorCallback,
final JsonResultSink<T> resultSink) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper, javaType, responseValidator, errorCallback, resultSink));
}
/**
* Configures the pipeline to process the incoming response content as a sequence of objects
* with the given {@link Class}.
*/
public <T> ResultStage<Long> asSequence(
final Class<T> clazz,
final JsonConsumer<HttpResponse> responseValidator,
final Callback<String> errorCallback,
final JsonResultSink<T> resultSink) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper, clazz, responseValidator, errorCallback, resultSink));
}
/**
* Configures the pipeline to process the incoming response content as a sequence of objects
* with the given {@link TypeReference}.
*/
public <T> ResultStage<Long> asSequence(
final TypeReference<T> typeReference,
final JsonConsumer<HttpResponse> responseValidator,
final Callback<String> errorCallback,
final JsonResultSink<T> resultSink) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper, typeReference, responseValidator, errorCallback, resultSink));
}
/**
* Configures the pipeline to process the incoming response content as a sequence of events.
*/
public ResultStage<Void> asEvents(
final JsonConsumer<HttpResponse> responseValidator,
final Callback<String> errorCallback,
final JsonTokenEventHandler eventHandler) {
return consume((response, entityDetails, context) ->
JsonResponseConsumers.create(objectMapper.getFactory(), responseValidator, errorCallback, eventHandler));
}
}
/**
* Exchange result signal stage.
*/
public static class ResultStage<T> {
private final AsyncRequestProducer requestProducer;
private final HandlerResolver<HttpResponse, AsyncResponseConsumer<T>> responseConsumerResolver;
private ResultStage(
final AsyncRequestProducer requestProducer,
final HandlerResolver<HttpResponse, AsyncResponseConsumer<T>> responseConsumerResolver) {
this.requestProducer = requestProducer;
this.responseConsumerResolver = responseConsumerResolver;
}
/**
* Configures the pipeline to signal completion of the message exchange by calling the given
* {@link FutureCallback} upon completion.
*/
public CompletionStage<T> result(final FutureCallback<T> resultCallback) {
return new CompletionStage<>(requestProducer, responseConsumerResolver, resultCallback);
}
/**
* Configures the pipeline to signal completion of the message exchange by triggering the given
* {@link CompletableFuture} upon completion.
*/
public CompletionStage<T> result(final CompletableFuture<T> future) {
Args.notNull(future, "Future");
return result(new FutureCallback<T>() {
@Override
public void completed(final T result) {
future.complete(result);
}
@Override
public void failed(final Exception ex) {
future.completeExceptionally(ex);
}
@Override
public void cancelled() {
future.cancel(true);
}
});
}
}
/**
* Exchange completion stage.
*/
public static class CompletionStage<T> {
private final AsyncRequestProducer requestProducer;
private final HandlerResolver<HttpResponse, AsyncResponseConsumer<T>> responseConsumerResolver;
private final FutureCallback<T> resultCallback;
private CompletionStage(
final AsyncRequestProducer requestProducer,
final HandlerResolver<HttpResponse, AsyncResponseConsumer<T>> responseConsumerResolver,
final FutureCallback<T> resultCallback) {
this.requestProducer = requestProducer;
this.responseConsumerResolver = responseConsumerResolver;
this.resultCallback = resultCallback;
}
/**
* Creates {@link AsyncClientExchangeHandler} implementing the defined message exchange pipeline.
*/
public AsyncClientExchangeHandler create() {
return new AbstractClientExchangeHandler<T>(requestProducer, resultCallback) {
@Override
protected AsyncResponseConsumer<T> supplyConsumer(
final HttpResponse response,
final EntityDetails entityDetails,
final HttpContext context) throws HttpException {
final AsyncResponseConsumer<T> requestConsumer = responseConsumerResolver.resolve(response, entityDetails, context);
if (requestConsumer == null) {
throw new HttpException("Unable to process response");
}
return requestConsumer;
}
};
}
}
}