AsyncCachingExec.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.impl.cache;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.client5.http.async.methods.SimpleBody;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.cache.CacheResponseStatus;
import org.apache.hc.client5.http.cache.HttpCacheContext;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.RequestCacheControl;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.cache.ResponseCacheControl;
import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.client5.http.validator.ETag;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.impl.BasicEntityDetails;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.support.BasicRequestBuilder;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.ByteArrayBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Request executor in the request execution chain that is responsible for
 * transparent client-side caching.
 * <p>
 * The current implementation is conditionally
 * compliant with HTTP/1.1 (meaning all the MUST and MUST NOTs are obeyed),
 * although quite a lot, though not all, of the SHOULDs and SHOULD NOTs
 * are obeyed too.
 *
 * @since 5.0
 */
@Contract(threading = ThreadingBehavior.SAFE) // So long as the responseCache implementation is threadsafe
class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler {

    private static final Logger LOG = LoggerFactory.getLogger(AsyncCachingExec.class);
    private final HttpAsyncCache responseCache;
    private final DefaultAsyncCacheRevalidator cacheRevalidator;
    private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;

    AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
        super(config);
        this.responseCache = Args.notNull(cache, "Response cache");
        this.cacheRevalidator = cacheRevalidator;
        this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request ->
                BasicRequestBuilder.copy(request).build());
    }

    AsyncCachingExec(
            final HttpAsyncCache cache,
            final ScheduledExecutorService executorService,
            final SchedulingStrategy schedulingStrategy,
            final CacheConfig config) {
        this(cache,
                executorService != null ? new DefaultAsyncCacheRevalidator(executorService, schedulingStrategy) : null,
                config);
    }

    private void triggerResponse(
            final SimpleHttpResponse cacheResponse,
            final AsyncExecChain.Scope scope,
            final AsyncExecCallback asyncExecCallback) {
        scope.execRuntime.releaseEndpoint();

        final SimpleBody body = cacheResponse.getBody();
        final byte[] content = body != null ? body.getBodyBytes() : null;
        final ContentType contentType = body != null ? body.getContentType() : null;
        try {
            final AsyncDataConsumer dataConsumer = asyncExecCallback.handleResponse(
                    cacheResponse,
                    content != null ? new BasicEntityDetails(content.length, contentType) : null);
            if (dataConsumer != null) {
                if (content != null) {
                    dataConsumer.consume(ByteBuffer.wrap(content));
                }
                dataConsumer.streamEnd(null);
            }
            asyncExecCallback.completed();
        } catch (final HttpException | IOException ex) {
            asyncExecCallback.failed(ex);
        }
    }

    static class AsyncExecCallbackWrapper implements AsyncExecCallback {

        private final Runnable command;
        private final Consumer<Exception> exceptionConsumer;

        AsyncExecCallbackWrapper(final Runnable command, final Consumer<Exception> exceptionConsumer) {
            this.command = command;
            this.exceptionConsumer = exceptionConsumer;
        }

        @Override
        public AsyncDataConsumer handleResponse(
                final HttpResponse response,
                final EntityDetails entityDetails) throws HttpException, IOException {
            return null;
        }

        @Override
        public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
        }

        @Override
        public void completed() {
            command.run();
        }

        @Override
        public void failed(final Exception cause) {
            if (exceptionConsumer != null) {
                exceptionConsumer.accept(cause);
            }
        }

    }

    @Override
    public void execute(
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
        Args.notNull(request, "HTTP request");
        Args.notNull(scope, "Scope");

        final HttpRoute route = scope.route;
        final HttpClientContext context = scope.clientContext;

        final URIAuthority authority = request.getAuthority();
        final String scheme = request.getScheme();
        final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
        doExecute(target,
                request,
                entityProducer,
                scope,
                chain,
                new AsyncExecCallback() {

                    @Override
                    public AsyncDataConsumer handleResponse(
                            final HttpResponse response,
                            final EntityDetails entityDetails) throws HttpException, IOException {
                        context.setRequest(request);
                        context.setResponse(response);
                        return asyncExecCallback.handleResponse(response, entityDetails);
                    }

                    @Override
                    public void handleInformationResponse(
                            final HttpResponse response) throws HttpException, IOException {
                        asyncExecCallback.handleInformationResponse(response);
                    }

                    @Override
                    public void completed() {
                        asyncExecCallback.completed();
                    }

                    @Override
                    public void failed(final Exception cause) {
                        asyncExecCallback.failed(cause);
                    }

                });
    }

    public void doExecute(
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {

        final String exchangeId = scope.exchangeId;
        final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
        final CancellableDependency operation = scope.cancellableDependency;

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} request via cache: {}", exchangeId, new RequestLine(request));
        }

        context.setCacheResponseStatus(CacheResponseStatus.CACHE_MISS);
        context.setCacheEntry(null);

        if (clientRequestsOurOptions(request)) {
            context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
            triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
            return;
        }

        final RequestCacheControl requestCacheControl;
        if (request.containsHeader(HttpHeaders.CACHE_CONTROL)) {
            requestCacheControl = CacheControlHeaderParser.INSTANCE.parse(request);
            context.setRequestCacheControl(requestCacheControl);
        } else {
            requestCacheControl = context.getRequestCacheControlOrDefault();
            CacheControlHeaderGenerator.INSTANCE.generate(requestCacheControl, request);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} request cache control: {}", exchangeId, requestCacheControl);
        }

        if (cacheableRequestPolicy.canBeServedFromCache(requestCacheControl, request)) {
            operation.setDependency(responseCache.match(target, request, new FutureCallback<CacheMatch>() {

                @Override
                public void completed(final CacheMatch result) {
                    final CacheHit hit = result != null ? result.hit : null;
                    final CacheHit root = result != null ? result.root : null;
                    if (hit == null) {
                        handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback);
                    } else {
                        final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl);
                        }
                        context.setResponseCacheControl(responseCacheControl);
                        handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
                    }
                }

                @Override
                public void failed(final Exception cause) {
                    asyncExecCallback.failed(cause);
                }

                @Override
                public void cancelled() {
                    asyncExecCallback.failed(new InterruptedIOException());
                }

            }));

        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} request cannot be served from cache", exchangeId);
            }
            callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
        }
    }

    void chainProceed(
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        try {
            chain.proceed(request, entityProducer, scope, asyncExecCallback);
        } catch (final HttpException | IOException ex) {
            asyncExecCallback.failed(ex);
        }
    }

    void callBackend(
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        final String exchangeId = scope.exchangeId;

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} calling the backend", exchangeId);
        }
        final Instant requestDate = getCurrentDate();
        final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
        chainProceed(request, entityProducer, scope, chain, new AsyncExecCallback() {

            @Override
            public AsyncDataConsumer handleResponse(
                    final HttpResponse backendResponse,
                    final EntityDetails entityDetails) throws HttpException, IOException {
                final Instant responseDate = getCurrentDate();
                final AsyncExecCallback callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
                callbackRef.set(callback);
                return callback.handleResponse(backendResponse, entityDetails);
            }

            @Override
            public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
                final AsyncExecCallback callback = callbackRef.getAndSet(null);
                if (callback != null) {
                    callback.handleInformationResponse(response);
                } else {
                    asyncExecCallback.handleInformationResponse(response);
                }
            }

            @Override
            public void completed() {
                final AsyncExecCallback callback = callbackRef.getAndSet(null);
                if (callback != null) {
                    callback.completed();
                } else {
                    asyncExecCallback.completed();
                }
            }

            @Override
            public void failed(final Exception cause) {
                final AsyncExecCallback callback = callbackRef.getAndSet(null);
                if (callback != null) {
                    callback.failed(cause);
                } else {
                    asyncExecCallback.failed(cause);
                }
            }

        });
    }

    class CachingAsyncDataConsumer implements AsyncDataConsumer {

        private final String exchangeId;
        private final AsyncExecCallback fallback;
        private final HttpResponse backendResponse;
        private final EntityDetails entityDetails;
        private final AtomicBoolean writtenThrough;
        private final AtomicReference<ByteArrayBuffer> bufferRef;
        private final AtomicReference<AsyncDataConsumer> dataConsumerRef;

        CachingAsyncDataConsumer(
                final String exchangeId,
                final AsyncExecCallback fallback,
                final HttpResponse backendResponse,
                final EntityDetails entityDetails) {
            this.exchangeId = exchangeId;
            this.fallback = fallback;
            this.backendResponse = backendResponse;
            this.entityDetails = entityDetails;
            this.writtenThrough = new AtomicBoolean(false);
            this.bufferRef = new AtomicReference<>(entityDetails != null ? new ByteArrayBuffer(1024) : null);
            this.dataConsumerRef = new AtomicReference<>();
        }

        @Override
        public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
            final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
            if (dataConsumer != null) {
                dataConsumer.updateCapacity(capacityChannel);
            } else {
                capacityChannel.update(Integer.MAX_VALUE);
            }
        }

        @Override
        public final void consume(final ByteBuffer src) throws IOException {
            final ByteArrayBuffer buffer = bufferRef.get();
            if (buffer != null) {
                if (src.hasArray()) {
                    buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
                } else {
                    while (src.hasRemaining()) {
                        buffer.append(src.get());
                    }
                }
                if (buffer.length() > cacheConfig.getMaxObjectSize()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} backend response content length exceeds maximum", exchangeId);
                    }
                    // Over the max limit. Stop buffering and forward the response
                    // along with all the data buffered so far to the caller.
                    bufferRef.set(null);
                    try {
                        final AsyncDataConsumer dataConsumer = fallback.handleResponse(backendResponse, entityDetails);
                        if (dataConsumer != null) {
                            dataConsumerRef.set(dataConsumer);
                            writtenThrough.set(true);
                            dataConsumer.consume(ByteBuffer.wrap(buffer.array(), 0, buffer.length()));
                        }
                    } catch (final HttpException ex) {
                        fallback.failed(ex);
                    }
                }
            } else {
                final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
                if (dataConsumer != null) {
                    dataConsumer.consume(src);
                }
            }
        }

        @Override
        public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
            final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
            if (dataConsumer != null) {
                dataConsumer.streamEnd(trailers);
            }
        }

        @Override
        public void releaseResources() {
            final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
            if (dataConsumer != null) {
                dataConsumer.releaseResources();
            }
        }

    }

    class BackendResponseHandler implements AsyncExecCallback {

        private final HttpHost target;
        private final HttpRequest request;
        private final Instant requestDate;
        private final Instant responseDate;
        private final AsyncExecChain.Scope scope;
        private final AsyncExecCallback asyncExecCallback;
        private final AtomicReference<CachingAsyncDataConsumer> cachingConsumerRef;

        BackendResponseHandler(
                final HttpHost target,
                final HttpRequest request,
                final Instant requestDate,
                final Instant responseDate,
                final AsyncExecChain.Scope scope,
                final AsyncExecCallback asyncExecCallback) {
            this.target = target;
            this.request = request;
            this.requestDate = requestDate;
            this.responseDate = responseDate;
            this.scope = scope;
            this.asyncExecCallback = asyncExecCallback;
            this.cachingConsumerRef = new AtomicReference<>();
        }

        @Override
        public AsyncDataConsumer handleResponse(
                final HttpResponse backendResponse,
                final EntityDetails entityDetails) throws HttpException, IOException {
            final String exchangeId = scope.exchangeId;
            responseCache.evictInvalidatedEntries(target, request, backendResponse, new FutureCallback<Boolean>() {

                @Override
                public void completed(final Boolean result) {
                }

                @Override
                public void failed(final Exception ex) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} unable to flush invalidated entries from cache", exchangeId, ex);
                    }
                }

                @Override
                public void cancelled() {
                }

            });
            if (isResponseTooBig(entityDetails)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} backend response is known to be too big", exchangeId);
                }
                return asyncExecCallback.handleResponse(backendResponse, entityDetails);
            }

            final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
            final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(backendResponse);
            context.setResponseCacheControl(responseCacheControl);
            final boolean cacheable = responseCachingPolicy.isResponseCacheable(responseCacheControl, request, backendResponse);
            if (cacheable) {
                storeRequestIfModifiedSinceFor304Response(request, backendResponse);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} caching backend response", exchangeId);
                }
                final CachingAsyncDataConsumer cachingDataConsumer = new CachingAsyncDataConsumer(
                        exchangeId, asyncExecCallback, backendResponse, entityDetails);
                cachingConsumerRef.set(cachingDataConsumer);
                return cachingDataConsumer;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} backend response is not cacheable", exchangeId);
            }
            return asyncExecCallback.handleResponse(backendResponse, entityDetails);
        }

        @Override
        public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
            asyncExecCallback.handleInformationResponse(response);
        }

        void triggerNewCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate, final ByteArrayBuffer buffer) {
            final String exchangeId = scope.exchangeId;
            final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
            final CancellableDependency operation = scope.cancellableDependency;
            operation.setDependency(responseCache.store(
                    target,
                    request,
                    backendResponse,
                    buffer,
                    requestDate,
                    responseDate,
                    new FutureCallback<CacheHit>() {

                        @Override
                        public void completed(final CacheHit hit) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("{} backend response successfully cached", exchangeId);
                            }
                            try {
                                final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry);
                                context.setCacheEntry(hit.entry);
                                triggerResponse(cacheResponse, scope, asyncExecCallback);
                            } catch (final ResourceIOException ex) {
                                asyncExecCallback.failed(ex);
                            }
                        }

                        @Override
                        public void failed(final Exception ex) {
                            asyncExecCallback.failed(ex);
                        }

                        @Override
                        public void cancelled() {
                            asyncExecCallback.failed(new InterruptedIOException());
                        }

                    }));

        }

        void triggerCachedResponse(final HttpCacheEntry entry) {
            final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
            try {
                final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, entry);
                context.setCacheEntry(entry);
                triggerResponse(cacheResponse, scope, asyncExecCallback);
            } catch (final ResourceIOException ex) {
                asyncExecCallback.failed(ex);
            }
        }

        @Override
        public void completed() {
            final String exchangeId = scope.exchangeId;
            final CachingAsyncDataConsumer cachingDataConsumer = cachingConsumerRef.getAndSet(null);
            if (cachingDataConsumer == null || cachingDataConsumer.writtenThrough.get()) {
                asyncExecCallback.completed();
                return;
            }
            final HttpResponse backendResponse = cachingDataConsumer.backendResponse;
            final ByteArrayBuffer buffer = cachingDataConsumer.bufferRef.getAndSet(null);

            // Handle 304 Not Modified responses
            if (backendResponse.getCode() == HttpStatus.SC_NOT_MODIFIED) {
                responseCache.match(target, request, new FutureCallback<CacheMatch>() {

                    @Override
                    public void completed(final CacheMatch result) {
                        final CacheHit hit = result != null ? result.hit : null;
                        if (hit != null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("{} existing cache entry found, updating cache entry", exchangeId);
                            }
                            responseCache.update(
                                    hit,
                                    target,
                                    request,
                                    backendResponse,
                                    requestDate,
                                    responseDate,
                                    new FutureCallback<CacheHit>() {

                                        @Override
                                        public void completed(final CacheHit updated) {
                                            if (LOG.isDebugEnabled()) {
                                                LOG.debug("{} cache entry updated, generating response from updated entry", exchangeId);
                                            }
                                            triggerCachedResponse(updated.entry);
                                        }
                                        @Override
                                        public void failed(final Exception cause) {
                                            if (LOG.isDebugEnabled()) {
                                                LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
                                            }
                                            asyncExecCallback.failed(cause);
                                        }

                                        @Override
                                        public void cancelled() {
                                            if (LOG.isDebugEnabled()) {
                                                LOG.debug("{} cache entry updated aborted", exchangeId);
                                            }
                                            asyncExecCallback.failed(new InterruptedIOException());
                                        }

                                    });
                        } else {
                            triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
                        }
                    }

                    @Override
                    public void failed(final Exception cause) {
                        asyncExecCallback.failed(cause);
                    }

                    @Override
                    public void cancelled() {
                        asyncExecCallback.failed(new InterruptedIOException());
                    }

                });
            } else {
                if (cacheConfig.isFreshnessCheckEnabled()) {
                    final CancellableDependency operation = scope.cancellableDependency;
                    operation.setDependency(responseCache.match(target, request, new FutureCallback<CacheMatch>() {

                        @Override
                        public void completed(final CacheMatch result) {
                            final CacheHit hit = result != null ? result.hit : null;
                            if (HttpCacheEntry.isNewer(hit != null ? hit.entry : null, backendResponse)) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("{} backend already contains fresher cache entry", exchangeId);
                                }
                                triggerCachedResponse(hit.entry);
                            } else {
                                triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
                            }
                        }

                        @Override
                        public void failed(final Exception cause) {
                            asyncExecCallback.failed(cause);
                        }

                        @Override
                        public void cancelled() {
                            asyncExecCallback.failed(new InterruptedIOException());
                        }

                    }));
                } else {
                    triggerNewCacheEntryResponse(backendResponse, responseDate, buffer);
                }
            }
        }

        @Override
        public void failed(final Exception cause) {
            asyncExecCallback.failed(cause);
        }

    }

    private void handleCacheHit(
            final RequestCacheControl requestCacheControl,
            final ResponseCacheControl responseCacheControl,
            final CacheHit hit,
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
        final String exchangeId = scope.exchangeId;

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} cache hit: {}", exchangeId, new RequestLine(request));
        }

        context.setCacheResponseStatus(CacheResponseStatus.CACHE_HIT);
        cacheHits.getAndIncrement();

        final Instant now = getCurrentDate();

        final CacheSuitability cacheSuitability = suitabilityChecker.assessSuitability(requestCacheControl, responseCacheControl, request, hit.entry, now);
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} cache suitability: {}", exchangeId, cacheSuitability);
        }
        if (cacheSuitability == CacheSuitability.FRESH || cacheSuitability == CacheSuitability.FRESH_ENOUGH) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} cache hit is fresh enough", exchangeId);
            }
            try {
                final SimpleHttpResponse cacheResponse = generateCachedResponse(request, hit.entry, now);
                context.setCacheEntry(hit.entry);
                triggerResponse(cacheResponse, scope, asyncExecCallback);
            } catch (final ResourceIOException ex) {
                if (requestCacheControl.isOnlyIfCached()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} request marked only-if-cached", exchangeId);
                    }
                    context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
                    final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
                    triggerResponse(cacheResponse, scope, asyncExecCallback);
                } else {
                    context.setCacheResponseStatus(CacheResponseStatus.FAILURE);
                    try {
                        chain.proceed(request, entityProducer, scope, asyncExecCallback);
                    } catch (final HttpException | IOException ex2) {
                        asyncExecCallback.failed(ex2);
                    }
                }
            }
        } else {
            if (requestCacheControl.isOnlyIfCached()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} cache entry not is not fresh and only-if-cached requested", exchangeId);
                }
                context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
                final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
                triggerResponse(cacheResponse, scope, asyncExecCallback);
            } else if (cacheSuitability == CacheSuitability.MISMATCH) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} cache entry does not match the request; calling backend", exchangeId);
                }
                callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
            } else if (entityProducer != null && !entityProducer.isRepeatable()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} request is not repeatable; calling backend", exchangeId);
                }
                callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
            } else if (hit.entry.getStatus() == HttpStatus.SC_NOT_MODIFIED && !suitabilityChecker.isConditional(request)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} non-modified cache entry does not match the non-conditional request; calling backend", exchangeId);
                }
                callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
            } else if (cacheSuitability == CacheSuitability.REVALIDATION_REQUIRED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} revalidation required; revalidating cache entry", exchangeId);
                }
                revalidateCacheEntryWithoutFallback(responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
            } else if (cacheSuitability == CacheSuitability.STALE_WHILE_REVALIDATED) {
                if (cacheRevalidator != null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} serving stale with asynchronous revalidation", exchangeId);
                    }
                    try {
                        final String revalidationExchangeId = ExecSupport.getNextExchangeId();
                        context.setExchangeId(revalidationExchangeId);
                        final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
                                revalidationExchangeId,
                                scope.route,
                                scope.originalRequest,
                                new ComplexFuture<>(null),
                                HttpCacheContext.create(),
                                scope.execRuntime.fork(),
                                scope.scheduler,
                                scope.execCount);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{} starting asynchronous revalidation exchange {}", exchangeId, revalidationExchangeId);
                        }
                        cacheRevalidator.revalidateCacheEntry(
                                hit.getEntryKey(),
                                asyncExecCallback,
                                c -> revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, fork, chain, c));
                        context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
                        final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry);
                        context.setCacheEntry(hit.entry);
                        triggerResponse(cacheResponse, scope, asyncExecCallback);
                    } catch (final IOException ex) {
                        asyncExecCallback.failed(ex);
                    }
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} revalidating stale cache entry (asynchronous revalidation disabled)", exchangeId);
                    }
                    revalidateCacheEntryWithFallback(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
                }
            } else if (cacheSuitability == CacheSuitability.STALE) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} revalidating stale cache entry", exchangeId);
                }
                revalidateCacheEntryWithFallback(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} cache entry not usable; calling backend", exchangeId);
                }
                callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
            }
        }
    }

    void revalidateCacheEntry(
            final ResponseCacheControl responseCacheControl,
            final CacheHit hit,
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        final Instant requestDate = getCurrentDate();
        final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(
                responseCacheControl,
                BasicRequestBuilder.copy(request).build(),
                hit.entry);
        final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
        chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {

            final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();

            void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate) {
                final CancellableDependency operation = scope.cancellableDependency;
                operation.setDependency(responseCache.update(
                        hit,
                        target,
                        request,
                        backendResponse,
                        requestDate,
                        responseDate,
                        new FutureCallback<CacheHit>() {

                            @Override
                            public void completed(final CacheHit updated) {
                                try {
                                    final SimpleHttpResponse cacheResponse = generateCachedResponse(request, updated.entry, responseDate);
                                    context.setCacheEntry(updated.entry);
                                    triggerResponse(cacheResponse, scope, asyncExecCallback);
                                } catch (final ResourceIOException ex) {
                                    asyncExecCallback.failed(ex);
                                }
                            }

                            @Override
                            public void failed(final Exception ex) {
                                asyncExecCallback.failed(ex);
                            }

                            @Override
                            public void cancelled() {
                                asyncExecCallback.failed(new InterruptedIOException());
                            }

                        }));
            }

            AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Instant responseDate) {
                final int statusCode = backendResponse.getCode();
                if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
                    context.setCacheResponseStatus(CacheResponseStatus.VALIDATED);
                    cacheUpdates.getAndIncrement();
                }
                if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
                    return new AsyncExecCallbackWrapper(() -> triggerUpdatedCacheEntryResponse(backendResponse, responseDate), asyncExecCallback::failed);
                }
                return new BackendResponseHandler(target, conditionalRequest, requestDate, responseDate, scope, asyncExecCallback);
            }

            @Override
            public AsyncDataConsumer handleResponse(
                    final HttpResponse backendResponse1,
                    final EntityDetails entityDetails) throws HttpException, IOException {

                final Instant responseDate = getCurrentDate();

                final AsyncExecCallback callback1;
                if (HttpCacheEntry.isNewer(hit.entry, backendResponse1)) {

                    final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
                            BasicRequestBuilder.copy(scope.originalRequest).build());

                    callback1 = new AsyncExecCallbackWrapper(() -> chainProceed(unconditional, entityProducer, scope, chain, new AsyncExecCallback() {

                        @Override
                        public AsyncDataConsumer handleResponse(
                                final HttpResponse backendResponse2,
                                final EntityDetails entityDetails1) throws HttpException, IOException {
                            final Instant responseDate2 = getCurrentDate();
                            final AsyncExecCallback callback2 = evaluateResponse(backendResponse2, responseDate2);
                            callbackRef.set(callback2);
                            return callback2.handleResponse(backendResponse2, entityDetails1);
                        }

                        @Override
                        public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
                            final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
                            if (callback2 != null) {
                                callback2.handleInformationResponse(response);
                            } else {
                                asyncExecCallback.handleInformationResponse(response);
                            }
                        }

                        @Override
                        public void completed() {
                            final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
                            if (callback2 != null) {
                                callback2.completed();
                            } else {
                                asyncExecCallback.completed();
                            }
                        }

                        @Override
                        public void failed(final Exception cause) {
                            final AsyncExecCallback callback2 = callbackRef.getAndSet(null);
                            if (callback2 != null) {
                                callback2.failed(cause);
                            } else {
                                asyncExecCallback.failed(cause);
                            }
                        }

                    }), asyncExecCallback::failed);
                } else {
                    callback1 = evaluateResponse(backendResponse1, responseDate);
                }
                callbackRef.set(callback1);
                return callback1.handleResponse(backendResponse1, entityDetails);
            }

            @Override
            public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
                final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
                if (callback1 != null) {
                    callback1.handleInformationResponse(response);
                } else {
                    asyncExecCallback.handleInformationResponse(response);
                }
            }

            @Override
            public void completed() {
                final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
                if (callback1 != null) {
                    callback1.completed();
                } else {
                    asyncExecCallback.completed();
                }
            }

            @Override
            public void failed(final Exception cause) {
                final AsyncExecCallback callback1 = callbackRef.getAndSet(null);
                if (callback1 != null) {
                    callback1.failed(cause);
                } else {
                    asyncExecCallback.failed(cause);
                }
            }

        });

    }

    void revalidateCacheEntryWithoutFallback(
            final ResponseCacheControl responseCacheControl,
            final CacheHit hit,
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        final String exchangeId = scope.exchangeId;
        final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
        revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() {

            private final AtomicBoolean committed = new AtomicBoolean();

            @Override
            public AsyncDataConsumer handleResponse(final HttpResponse response,
                                                    final EntityDetails entityDetails) throws HttpException, IOException {
                committed.set(true);
                return asyncExecCallback.handleResponse(response, entityDetails);
            }

            @Override
            public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
                asyncExecCallback.handleInformationResponse(response);
            }

            @Override
            public void completed() {
                asyncExecCallback.completed();
            }

            @Override
            public void failed(final Exception cause) {
                if (!committed.get() && cause instanceof IOException) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} I/O error while revalidating cache entry", exchangeId, cause);
                    }
                    final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
                    context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
                    triggerResponse(cacheResponse, scope, asyncExecCallback);
                } else {
                    asyncExecCallback.failed(cause);
                }
            }

        });
    }

    void revalidateCacheEntryWithFallback(
            final RequestCacheControl requestCacheControl,
            final ResponseCacheControl responseCacheControl,
            final CacheHit hit,
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        final String exchangeId = scope.exchangeId;
        final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
        revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() {

            private final AtomicReference<HttpResponse> committed = new AtomicReference<>();

            @Override
            public AsyncDataConsumer handleResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
                final int status = response.getCode();
                if (staleIfErrorAppliesTo(status) &&
                        suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} serving stale response due to {} status and stale-if-error enabled", exchangeId, status);
                    }
                    return null;
                }
                committed.set(response);
                return asyncExecCallback.handleResponse(response, entityDetails);
            }

            @Override
            public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
                asyncExecCallback.handleInformationResponse(response);
            }

            @Override
            public void completed() {
                final HttpResponse response = committed.get();
                if (response == null) {
                    try {
                        context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
                        final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry);
                        context.setCacheEntry(hit.entry);
                        triggerResponse(cacheResponse, scope, asyncExecCallback);
                    } catch (final IOException ex) {
                        asyncExecCallback.failed(ex);
                    }
                } else {
                    asyncExecCallback.completed();
                }
            }

            @Override
            public void failed(final Exception cause) {
                final HttpResponse response = committed.get();
                if (response == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} I/O error while revalidating cache entry", exchangeId, cause);
                    }
                    context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
                    if (cause instanceof IOException &&
                            suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{} serving stale response due to IOException and stale-if-error enabled", exchangeId);
                        }
                        try {
                            final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry);
                            context.setCacheEntry(hit.entry);
                            triggerResponse(cacheResponse, scope, asyncExecCallback);
                        } catch (final IOException ex) {
                            asyncExecCallback.failed(cause);
                        }
                    } else {
                        final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
                        triggerResponse(cacheResponse, scope, asyncExecCallback);
                    }
                } else {
                    asyncExecCallback.failed(cause);
                }
            }

        });
    }
    private void handleCacheMiss(
            final RequestCacheControl requestCacheControl,
            final CacheHit partialMatch,
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback) {
        final String exchangeId = scope.exchangeId;

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} cache miss: {}", exchangeId, new RequestLine(request));
        }
        cacheMisses.getAndIncrement();

        final CancellableDependency operation = scope.cancellableDependency;
        if (requestCacheControl.isOnlyIfCached()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} request marked only-if-cached", exchangeId);
            }
            final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
            context.setCacheResponseStatus(CacheResponseStatus.CACHE_MODULE_RESPONSE);
            final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
            triggerResponse(cacheResponse, scope, asyncExecCallback);
        }

        if (partialMatch != null && partialMatch.entry.hasVariants() && entityProducer == null) {
            operation.setDependency(responseCache.getVariants(
                    partialMatch,
                    new FutureCallback<Collection<CacheHit>>() {

                        @Override
                        public void completed(final Collection<CacheHit> variants) {
                            if (variants != null && !variants.isEmpty()) {
                                negotiateResponseFromVariants(target, request, entityProducer, scope, chain, asyncExecCallback, variants);
                            } else {
                                callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
                            }
                        }

                        @Override
                        public void failed(final Exception ex) {
                            asyncExecCallback.failed(ex);
                        }

                        @Override
                        public void cancelled() {
                            asyncExecCallback.failed(new InterruptedIOException());
                        }

                    }));
        } else {
            callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
        }
    }

    void negotiateResponseFromVariants(
            final HttpHost target,
            final HttpRequest request,
            final AsyncEntityProducer entityProducer,
            final AsyncExecChain.Scope scope,
            final AsyncExecChain chain,
            final AsyncExecCallback asyncExecCallback,
            final Collection<CacheHit> variants) {
        final String exchangeId = scope.exchangeId;
        final CancellableDependency operation = scope.cancellableDependency;
        final Map<ETag, CacheHit> variantMap = new HashMap<>();
        for (final CacheHit variant : variants) {
            final ETag eTag = variant.entry.getETag();
            if (eTag != null) {
                variantMap.put(eTag, variant);
            }
        }

        final HttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequestFromVariants(
                request,
                variantMap.keySet());

        final Instant requestDate = getCurrentDate();
        chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {

            final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();

            void updateVariantCacheEntry(final HttpResponse backendResponse, final Instant responseDate, final CacheHit match) {
                final HttpCacheContext context = HttpCacheContext.cast(scope.clientContext);
                context.setCacheResponseStatus(CacheResponseStatus.VALIDATED);
                cacheUpdates.getAndIncrement();

                operation.setDependency(responseCache.storeFromNegotiated(
                        match,
                        target,
                        request,
                        backendResponse,
                        requestDate,
                        responseDate,
                        new FutureCallback<CacheHit>() {

                            @Override
                            public void completed(final CacheHit hit) {
                                try {
                                    final SimpleHttpResponse cacheResponse = generateCachedResponse(request, hit.entry, responseDate);
                                    context.setCacheEntry(hit.entry);
                                    triggerResponse(cacheResponse, scope, asyncExecCallback);
                                } catch (final ResourceIOException ex) {
                                    asyncExecCallback.failed(ex);
                                }
                            }

                            @Override
                            public void failed(final Exception ex) {
                                asyncExecCallback.failed(ex);
                            }

                            @Override
                            public void cancelled() {
                                asyncExecCallback.failed(new InterruptedIOException());
                            }

                        }));
            }

            @Override
            public AsyncDataConsumer handleResponse(
                    final HttpResponse backendResponse,
                    final EntityDetails entityDetails) throws HttpException, IOException {
                final Instant responseDate = getCurrentDate();
                final AsyncExecCallback callback;
                if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
                    callback = new BackendResponseHandler(target, request, requestDate, responseDate, scope, asyncExecCallback);
                } else {
                    final ETag resultEtag = ETag.get(backendResponse);
                    if (resultEtag == null) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{} 304 response did not contain ETag", exchangeId);
                        }
                        callback = new AsyncExecCallbackWrapper(() -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed);
                    } else {
                        final CacheHit match = variantMap.get(resultEtag);
                        if (match == null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("{} 304 response did not contain ETag matching one sent in If-None-Match", exchangeId);
                            }
                            callback = new AsyncExecCallbackWrapper(() -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed);
                        } else {
                            if (HttpCacheEntry.isNewer(match.entry, backendResponse)) {
                                final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
                                        BasicRequestBuilder.copy(request).build());
                                callback = new AsyncExecCallbackWrapper(() -> callBackend(target, unconditional, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed);
                            } else {
                                callback = new AsyncExecCallbackWrapper(() -> updateVariantCacheEntry(backendResponse, responseDate, match), asyncExecCallback::failed);
                            }
                        }
                    }
                }
                callbackRef.set(callback);
                return callback.handleResponse(backendResponse, entityDetails);
            }

            @Override
            public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
                final AsyncExecCallback callback = callbackRef.getAndSet(null);
                if (callback != null) {
                    callback.handleInformationResponse(response);
                } else {
                    asyncExecCallback.handleInformationResponse(response);
                }
            }

            @Override
            public void completed() {
                final AsyncExecCallback callback = callbackRef.getAndSet(null);
                if (callback != null) {
                    callback.completed();
                } else {
                    asyncExecCallback.completed();
                }
            }

            @Override
            public void failed(final Exception cause) {
                final AsyncExecCallback callback = callbackRef.getAndSet(null);
                if (callback != null) {
                    callback.failed(cause);
                } else {
                    asyncExecCallback.failed(cause);
                }
            }

        });

    }

}