ReactorInvokerImpl.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.jaxrs.reactor.client;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.StreamSupport;
import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.GenericType;
import jakarta.ws.rs.core.Response;
import org.apache.cxf.jaxrs.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.TRACE;
import static org.apache.cxf.jaxrs.reactor.client.ReactorUtils.toCompletableFuture;
public class ReactorInvokerImpl implements ReactorInvoker {
private final WebClient webClient;
private final ExecutorService executorService;
ReactorInvokerImpl(WebClient webClient, ExecutorService executorService) {
this.webClient = webClient;
this.executorService = executorService;
}
@Override
public Mono<Response> get() {
return method(HttpMethod.GET);
}
@Override
public <R> Mono<R> get(Class<R> responseType) {
return method(HttpMethod.GET, responseType);
}
@Override
public <T> Flux<T> getFlux(Class<T> responseType) {
return flux(HttpMethod.GET, responseType);
}
@Override
public <R> Mono<R> get(GenericType<R> genericType) {
return method(HttpMethod.GET, genericType);
}
@Override
public Mono<Response> put(Entity<?> entity) {
return method(HttpMethod.PUT, entity);
}
@Override
public <R> Mono<R> put(Entity<?> entity, Class<R> responseType) {
return method(HttpMethod.PUT, responseType);
}
@Override
public <T> Flux<T> putFlux(Entity<?> entity, Class<T> responseType) {
return flux(HttpMethod.PUT, entity, responseType);
}
@Override
public <R> Mono<R> put(Entity<?> entity, GenericType<R> genericType) {
return method(HttpMethod.PUT, entity, genericType);
}
@Override
public Mono<Response> post(Entity<?> entity) {
return method(HttpMethod.POST, entity);
}
@Override
public <R> Mono<R> post(Entity<?> entity, Class<R> responseType) {
return method(HttpMethod.POST, entity, responseType);
}
@Override
public <T> Flux<T> postFlux(Entity<?> entity, Class<T> responseType) {
return flux(HttpMethod.POST, entity, responseType);
}
@Override
public <R> Mono<R> post(Entity<?> entity, GenericType<R> genericType) {
return method(HttpMethod.POST, entity, genericType);
}
@Override
public Mono<Response> delete() {
return method(HttpMethod.DELETE);
}
@Override
public <R> Mono<R> delete(Class<R> responseType) {
return method(HttpMethod.DELETE, responseType);
}
@Override
public <T> Flux<T> deleteFlux(Class<T> responseType) {
return flux(HttpMethod.DELETE, responseType);
}
@Override
public <R> Mono<R> delete(GenericType<R> genericType) {
return method(HttpMethod.DELETE, genericType);
}
@Override
public Mono<Response> head() {
return method(HttpMethod.HEAD);
}
@Override
public Mono<Response> options() {
return method(HttpMethod.OPTIONS);
}
@Override
public <R> Mono<R> options(Class<R> responseType) {
return method(HttpMethod.OPTIONS, responseType);
}
@Override
public <T> Flux<T> optionsFlux(Class<T> responseType) {
return flux(HttpMethod.OPTIONS, responseType);
}
@Override
public <R> Mono<R> options(GenericType<R> genericType) {
return method(HttpMethod.OPTIONS, genericType);
}
@Override
public Mono<Response> trace() {
return method(TRACE);
}
@Override
public <R> Mono<R> trace(Class<R> responseType) {
return method(TRACE, responseType);
}
@Override
public <T> Flux<T> traceFlux(Class<T> responseType) {
return flux(TRACE, responseType);
}
@Override
public <R> Mono<R> trace(GenericType<R> genericType) {
return method(TRACE, genericType);
}
@Override
public Mono<Response> method(String name) {
return method(name, Response.class);
}
@Override
public <R> Mono<R> method(String name, Class<R> responseType) {
return mono(webClient.async().method(name, responseType));
}
@Override
public <R> Mono<R> method(String name, GenericType<R> genericType) {
return mono(webClient.async().method(name, genericType));
}
@Override
public Mono<Response> method(String name, Entity<?> entity) {
return method(name, entity, Response.class);
}
@Override
public <R> Mono<R> method(String name, Entity<?> entity, Class<R> responseType) {
return mono(webClient.async().method(name, entity, responseType));
}
@Override
public <T> Flux<T> flux(String name, Entity<?> entity, Class<T> responseType) {
Future<Response> futureResponse = webClient.async().method(name, entity);
return Flux.fromStream(() ->
StreamSupport.stream(toIterable(futureResponse, responseType).spliterator(), false));
}
@Override
public <T> Flux<T> flux(String name, Class<T> responseType) {
Future<Response> futureResponse = webClient.async().method(name);
return Flux.fromStream(() ->
StreamSupport.stream(toIterable(futureResponse, responseType).spliterator(), false));
}
@Override
public <R> Mono<R> method(String name, Entity<?> entity, GenericType<R> genericType) {
return mono(webClient.async().method(name, entity, genericType));
}
private <R> Mono<R> mono(Future<R> future) {
return Mono.fromFuture(toCompletableFuture(future, executorService));
}
private <R> Iterable<R> toIterable(Future<Response> futureResponse, Class<R> type) {
try {
Response response = futureResponse.get();
GenericType<List<R>> rGenericType = new GenericType<>(new WrappedType<R>(type));
return response.readEntity(rGenericType);
} catch (InterruptedException | ExecutionException e) {
throw new CompletionException(e);
}
}
private class WrappedType<R> implements ParameterizedType {
private final Class<R> rClass;
WrappedType(Class<R> rClass) {
this.rClass = rClass;
}
@Override
public Type[] getActualTypeArguments() {
return new Type[]{rClass };
}
@Override
public Type getRawType() {
return List.class;
}
@Override
public Type getOwnerType() {
return List.class;
}
}
}