JettyConnector.java
/*
* Copyright (c) 2013, 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.jetty.connector;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.CookieStore;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.MultivaluedMap;
import javax.net.ssl.SSLContext;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.BasicAuthentication;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
import org.glassfish.jersey.client.innate.ClientProxy;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.internal.util.collection.ByteBufferInputStream;
import org.glassfish.jersey.internal.util.collection.NonBlockingInputStream;
import org.glassfish.jersey.message.internal.HeaderUtils;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.message.internal.Statuses;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.ProxyConfiguration;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
/**
* A {@link Connector} that utilizes the Jetty HTTP Client to send and receive
* HTTP request and responses.
* <p/>
* The following properties are only supported at construction of this class:
* <ul>
* <li>{@link ClientProperties#ASYNC_THREADPOOL_SIZE}</li>
* <li>{@link ClientProperties#CONNECT_TIMEOUT}</li>
* <li>{@link ClientProperties#FOLLOW_REDIRECTS}</li>
* <li>{@link ClientProperties#PROXY_URI}</li>
* <li>{@link ClientProperties#PROXY_USERNAME}</li>
* <li>{@link ClientProperties#PROXY_PASSWORD}</li>
* <li>{@link ClientProperties#PROXY_PASSWORD}</li>
* <li>{@link JettyClientProperties#DISABLE_COOKIES}</li>*
* <li>{@link JettyClientProperties#ENABLE_SSL_HOSTNAME_VERIFICATION}</li>
* <li>{@link JettyClientProperties#PREEMPTIVE_BASIC_AUTHENTICATION}</li>
* <li>{@link JettyClientProperties#SYNC_LISTENER_RESPONSE_MAX_SIZE}</li>
* </ul>
* <p/>
* This transport supports both synchronous and asynchronous processing of client requests.
* The following methods are supported: GET, POST, PUT, DELETE, HEAD, OPTIONS, TRACE, CONNECT and MOVE.
* <p/>
* Typical usage:
* <p/>
* <pre>
* {@code
* ClientConfig config = new ClientConfig();
* Connector connector = new JettyConnector(config);
* config.connector(connector);
* Client client = ClientBuilder.newClient(config);
*
* // async request
* WebTarget target = client.target("http://localhost:8080");
* Future<Response> future = target.path("resource").request().async().get();
*
* // wait for 3 seconds
* Response response = future.get(3, TimeUnit.SECONDS);
* String entity = response.readEntity(String.class);
* client.close();
* }
* </pre>
* <p>
* This connector supports only {@link org.glassfish.jersey.client.RequestEntityProcessing#BUFFERED entity buffering}.
* Defining the property {@link ClientProperties#REQUEST_ENTITY_PROCESSING} has no effect on this connector.
* </p>
*
* @author Arul Dhesiaseelan (aruld at acm.org)
* @author Marek Potociar
*/
public class JettyConnector implements Connector {
private static final Logger LOGGER = Logger.getLogger(JettyConnector.class.getName());
private final HttpClient client;
private final CookieStore cookieStore;
private final Configuration configuration;
private final Optional<Integer> syncListenerResponseMaxSize;
/**
* Create the new Jetty client connector.
*
* @param jaxrsClient JAX-RS client instance, for which the connector is created.
* @param config client configuration.
*/
protected JettyConnector(final Client jaxrsClient, final Configuration config) {
this.configuration = config;
HttpClient httpClient = getRegisteredHttpClient(config);
if (httpClient == null) {
final SSLContext sslContext = jaxrsClient.getSslContext();
final SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(false);
sslContextFactory.setSslContext(sslContext);
httpClient = new HttpClient(initClientTransport(), sslContextFactory);
}
this.client = httpClient;
Boolean enableHostnameVerification = (Boolean) config.getProperties()
.get(JettyClientProperties.ENABLE_SSL_HOSTNAME_VERIFICATION);
if (enableHostnameVerification != null) {
final String verificationAlgorithm = enableHostnameVerification ? "HTTPS" : null;
client.getSslContextFactory().setEndpointIdentificationAlgorithm(verificationAlgorithm);
}
if (jaxrsClient.getHostnameVerifier() != null) {
client.getSslContextFactory().setHostnameVerifier(jaxrsClient.getHostnameVerifier());
}
final Object connectTimeout = config.getProperties().get(ClientProperties.CONNECT_TIMEOUT);
if (connectTimeout != null && connectTimeout instanceof Integer && (Integer) connectTimeout > 0) {
client.setConnectTimeout((Integer) connectTimeout);
}
final Object threadPoolSize = config.getProperties().get(ClientProperties.ASYNC_THREADPOOL_SIZE);
if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
final String name = HttpClient.class.getSimpleName() + "@" + hashCode();
final QueuedThreadPool threadPool = new QueuedThreadPool((Integer) threadPoolSize);
threadPool.setName(name);
client.setExecutor(threadPool);
}
Boolean disableCookies = (Boolean) config.getProperties().get(JettyClientProperties.DISABLE_COOKIES);
disableCookies = (disableCookies != null) ? disableCookies : false;
final AuthenticationStore auth = client.getAuthenticationStore();
final Object basicAuthProvider = config.getProperty(JettyClientProperties.PREEMPTIVE_BASIC_AUTHENTICATION);
if (basicAuthProvider != null && (basicAuthProvider instanceof BasicAuthentication)) {
auth.addAuthentication((BasicAuthentication) basicAuthProvider);
}
final Optional<ClientProxy> proxy = ClientProxy.proxyFromConfiguration(config);
proxy.ifPresent(clientProxy -> {
final ProxyConfiguration proxyConfig = client.getProxyConfiguration();
final URI u = clientProxy.uri();
proxyConfig.getProxies().add(new HttpProxy(u.getHost(), u.getPort()));
if (clientProxy.userName() != null) {
auth.addAuthentication(new BasicAuthentication(u, "<<ANY_REALM>>",
clientProxy.userName(), clientProxy.password()));
}
});
if (disableCookies) {
client.setCookieStore(new HttpCookieStore.Empty());
}
final Object slResponseMaxSize = configuration.getProperties()
.get(JettyClientProperties.SYNC_LISTENER_RESPONSE_MAX_SIZE);
if (slResponseMaxSize != null && slResponseMaxSize instanceof Integer
&& (Integer) slResponseMaxSize > 0) {
this.syncListenerResponseMaxSize = Optional.of((Integer) slResponseMaxSize);
}
else {
this.syncListenerResponseMaxSize = Optional.empty();
}
try {
client.start();
} catch (final Exception e) {
throw new ProcessingException("Failed to start the client.", e);
}
this.cookieStore = client.getCookieStore();
}
/**
* provides required HTTP client transport for client
*
* the default transport is {@link HttpClientTransportOverHTTP}
*
* @return instance of {@link HttpClientTransport}
* @since 2.41
*/
protected HttpClientTransport initClientTransport() {
return new HttpClientTransportOverHTTP();
}
/**
* provides custom registered {@link HttpClient} if any (or NULL)
*
* @param config configuration where {@link HttpClient} could be registered
* @return {@link HttpClient} instance if any was previously registered or NULL
*
* @since 2.41
*/
protected HttpClient getRegisteredHttpClient(Configuration config) {
if (config.isRegistered(JettyHttpClientSupplier.class)) {
Optional<Object> contract = config.getInstances().stream()
.filter(a-> JettyHttpClientSupplier.class.isInstance(a)).findFirst();
if (contract.isPresent()) {
return ((JettyHttpClientSupplier) contract.get()).getHttpClient();
}
}
return null;
}
/**
* Get the {@link HttpClient}.
*
* @return the {@link HttpClient}.
*/
@SuppressWarnings("UnusedDeclaration")
public HttpClient getHttpClient() {
return client;
}
/**
* Get the {@link CookieStore}.
*
* @return the {@link CookieStore} instance or null when
* JettyClientProperties.DISABLE_COOKIES set to true.
*/
public CookieStore getCookieStore() {
return cookieStore;
}
@Override
public ClientResponse apply(final ClientRequest jerseyRequest) throws ProcessingException {
final Request jettyRequest = translateRequest(jerseyRequest);
final Map<String, String> clientHeadersSnapshot = new HashMap<>();
final ContentProvider entity =
getBytesProvider(jerseyRequest, jerseyRequest.getHeaders(), clientHeadersSnapshot, jettyRequest);
if (entity != null) {
jettyRequest.content(entity);
} else {
clientHeadersSnapshot.putAll(writeOutBoundHeaders(jerseyRequest.getHeaders(), jettyRequest));
}
try {
final ContentResponse jettyResponse;
if (!syncListenerResponseMaxSize.isPresent()) {
jettyResponse = jettyRequest.send();
}
else {
final FutureResponseListener listener
= new FutureResponseListener(jettyRequest, syncListenerResponseMaxSize.get());
jettyRequest.send(listener);
jettyResponse = listener.get();
}
HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, jerseyRequest.getHeaders(),
JettyConnector.this.getClass().getName(), jerseyRequest.getConfiguration());
final javax.ws.rs.core.Response.StatusType status = jettyResponse.getReason() == null
? Statuses.from(jettyResponse.getStatus())
: Statuses.from(jettyResponse.getStatus(), jettyResponse.getReason());
final ClientResponse jerseyResponse = new ClientResponse(status, jerseyRequest);
processResponseHeaders(jettyResponse.getHeaders(), jerseyResponse);
try {
jerseyResponse.setEntityStream(new HttpClientResponseInputStream(jettyResponse));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
return jerseyResponse;
} catch (final Exception e) {
throw new ProcessingException(e);
}
}
private static void processResponseHeaders(final HttpFields respHeaders, final ClientResponse jerseyResponse) {
for (final HttpField header : respHeaders) {
final String headerName = header.getName();
final MultivaluedMap<String, String> headers = jerseyResponse.getHeaders();
List<String> list = headers.get(headerName);
if (list == null) {
list = new ArrayList<>();
}
list.add(header.getValue());
headers.put(headerName, list);
}
}
private static final class HttpClientResponseInputStream extends FilterInputStream {
HttpClientResponseInputStream(final ContentResponse jettyResponse) throws IOException {
super(getInputStream(jettyResponse));
}
private static InputStream getInputStream(final ContentResponse response) {
return new ByteArrayInputStream(response.getContent());
}
}
private Request translateRequest(final ClientRequest clientRequest) {
final URI uri = clientRequest.getUri();
final Request request = client.newRequest(uri);
request.method(clientRequest.getMethod());
request.followRedirects(clientRequest.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));
final Object readTimeout = clientRequest.resolveProperty(ClientProperties.READ_TIMEOUT, -1);
if (readTimeout != null && readTimeout instanceof Integer && (Integer) readTimeout > 0) {
request.idleTimeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
}
final Object totalTimeout = clientRequest.resolveProperty(JettyClientProperties.TOTAL_TIMEOUT, -1);
if (totalTimeout != null && totalTimeout instanceof Integer && (Integer) totalTimeout > 0) {
request.timeout((Integer) totalTimeout, TimeUnit.MILLISECONDS);
}
return request;
}
private Map<String, String> writeOutBoundHeaders(final MultivaluedMap<String, Object> headers, final Request request) {
final Map<String, String> stringHeaders = HeaderUtils.asStringHeadersSingleValue(headers, configuration);
// remove User-agent header set by Jetty; Jersey already sets this in its request (incl. Jetty version)
request.getHeaders().remove(HttpHeader.USER_AGENT);
for (final Map.Entry<String, String> e : stringHeaders.entrySet()) {
request.getHeaders().put(e.getKey(), e.getValue());
}
return stringHeaders;
}
private ContentProvider getBytesProvider(final ClientRequest clientRequest,
final MultivaluedMap<String, Object> headers,
final Map<String, String> snapshot,
final Request request) {
final Object entity = clientRequest.getEntity();
if (entity == null) {
return null;
}
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
clientRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(final int contentLength) throws IOException {
snapshot.putAll(writeOutBoundHeaders(headers, request));
return outputStream;
}
});
try {
clientRequest.writeEntity();
} catch (final IOException e) {
throw new ProcessingException("Failed to write request entity.", e);
}
return new BytesContentProvider(outputStream.toByteArray());
}
private ContentProvider getStreamProvider(final ClientRequest clientRequest) {
final Object entity = clientRequest.getEntity();
if (entity == null) {
return null;
}
final OutputStreamContentProvider streamContentProvider = new OutputStreamContentProvider();
clientRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(final int contentLength) throws IOException {
return streamContentProvider.getOutputStream();
}
});
return streamContentProvider;
}
private void processContent(final ClientRequest clientRequest, final ContentProvider entity) throws IOException {
if (entity == null) {
return;
}
final OutputStreamContentProvider streamContentProvider = (OutputStreamContentProvider) entity;
try (final OutputStream output = streamContentProvider.getOutputStream()) {
clientRequest.writeEntity();
}
}
@Override
public Future<?> apply(final ClientRequest jerseyRequest, final AsyncConnectorCallback callback) {
final Request jettyRequest = translateRequest(jerseyRequest);
final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(jerseyRequest.getHeaders(), jettyRequest);
final ContentProvider entity = getStreamProvider(jerseyRequest);
if (entity != null) {
jettyRequest.content(entity);
}
final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
final Throwable failure;
try {
final CompletableFuture<ClientResponse> responseFuture = new CompletableFuture<ClientResponse>();
responseFuture.whenComplete(
(clientResponse, throwable) -> {
if (throwable != null && throwable instanceof CancellationException) {
// take care of future cancellation
jettyRequest.abort(throwable);
}
});
final AtomicReference<ClientResponse> jerseyResponse = new AtomicReference<>();
final ByteBufferInputStream entityStream = new ByteBufferInputStream();
jettyRequest.send(new Response.Listener.Adapter() {
@Override
public void onHeaders(final Response jettyResponse) {
HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, jerseyRequest.getHeaders(),
JettyConnector.this.getClass().getName(), jerseyRequest.getConfiguration());
if (responseFuture.isDone()) {
if (!callbackInvoked.compareAndSet(false, true)) {
return;
}
}
final ClientResponse response = translateResponse(jerseyRequest, jettyResponse, entityStream);
jerseyResponse.set(response);
}
@Override
public void onContent(final Response jettyResponse, final ByteBuffer content) {
try {
// content must be consumed before returning from this method.
if (content.hasArray()) {
byte[] array = content.array();
byte[] buff = new byte[content.remaining()];
System.arraycopy(array, content.arrayOffset(), buff, 0, content.remaining());
entityStream.put(ByteBuffer.wrap(buff));
} else {
byte[] buff = new byte[content.remaining()];
content.get(buff);
entityStream.put(ByteBuffer.wrap(buff));
}
} catch (final InterruptedException ex) {
final ProcessingException pe = new ProcessingException(ex);
entityStream.closeQueue(pe);
// try to complete the future with an exception
responseFuture.completeExceptionally(pe);
Thread.currentThread().interrupt();
}
}
@Override
public void onComplete(final Result result) {
entityStream.closeQueue();
if (!callbackInvoked.get()) {
callback.response(jerseyResponse.get());
}
responseFuture.complete(jerseyResponse.get());
}
@Override
public void onFailure(final Response response, final Throwable t) {
entityStream.closeQueue(t);
// try to complete the future with an exception
responseFuture.completeExceptionally(t);
if (callbackInvoked.compareAndSet(false, true)) {
callback.failure(t);
}
}
});
processContent(jerseyRequest, entity);
return responseFuture;
} catch (final Throwable t) {
failure = t;
}
if (callbackInvoked.compareAndSet(false, true)) {
callback.failure(failure);
}
CompletableFuture<Object> future = new CompletableFuture<>();
future.completeExceptionally(failure);
return future;
}
private static ClientResponse translateResponse(final ClientRequest jerseyRequest,
final org.eclipse.jetty.client.api.Response jettyResponse,
final NonBlockingInputStream entityStream) {
final ClientResponse jerseyResponse = new ClientResponse(Statuses.from(jettyResponse.getStatus()), jerseyRequest);
processResponseHeaders(jettyResponse.getHeaders(), jerseyResponse);
jerseyResponse.setEntityStream(entityStream);
return jerseyResponse;
}
@Override
public String getName() {
return "Jetty HttpClient " + Jetty.VERSION;
}
@Override
public void close() {
try {
client.stop();
} catch (final Exception e) {
throw new ProcessingException("Failed to stop the client.", e);
}
}
}