DistributedProxySelector.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.client5.http.impl.routing;

import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A DistributedProxySelector is a custom {@link ProxySelector} implementation that
 * delegates proxy selection to a list of underlying ProxySelectors in a
 * distributed manner. It ensures that proxy selection is load-balanced
 * across the available ProxySelectors, and provides thread safety by
 * maintaining separate states for each thread.
 *
 * <p>The DistributedProxySelector class maintains a list of ProxySelectors,
 * a {@link ThreadLocal} variable for the current {@link ProxySelector}, and an {@link AtomicInteger}
 * to keep track of the shared index across all threads. When the select()
 * method is called, it delegates the proxy selection to the current
 * ProxySelector or the next available one in the list if the current one
 * returns an empty proxy list. Any exceptions that occur during proxy
 * selection are caught and ignored, and the next ProxySelector is tried.
 *
 * <p>The connectFailed() method notifies the active {@link ProxySelector} of a
 * connection failure, allowing the underlying ProxySelector to handle
 * connection failures according to its own logic.
 *
 * @since 5.3
 */
@Contract(threading = ThreadingBehavior.SAFE)
public class DistributedProxySelector extends ProxySelector {

    private static final Logger LOG = LoggerFactory.getLogger(DistributedProxySelector.class);

    /**
     * A list of {@link ProxySelector} instances to be used by the DistributedProxySelector
     * for selecting proxies.
     */
    private final List<ProxySelector> selectors;

    /**
     * A {@link ThreadLocal} variable holding the current {@link ProxySelector} for each thread,
     * ensuring thread safety when accessing the current {@link ProxySelector}.
     */
    private final ThreadLocal<ProxySelector> currentSelector;

    /**
     * An {@link AtomicInteger} representing the shared index across all threads for
     * maintaining the current position in the list of ProxySelectors, ensuring
     * proper distribution of {@link ProxySelector} usage.
     */
    private final AtomicInteger sharedIndex;


    /**
     * Constructs a DistributedProxySelector with the given list of {@link ProxySelector}.
     * The constructor initializes the currentSelector as a {@link ThreadLocal}, and
     * the sharedIndex as an {@link AtomicInteger}.
     *
     * @param selectors the list of ProxySelectors to use.
     * @throws IllegalArgumentException if the list is null or empty.
     */
    public DistributedProxySelector(final List<ProxySelector> selectors) {
        if (selectors == null || selectors.isEmpty()) {
            throw new IllegalArgumentException("At least one ProxySelector is required");
        }
        this.selectors = new ArrayList<>(selectors);
        this.currentSelector = new ThreadLocal<>();
        this.sharedIndex = new AtomicInteger();
    }

    /**
     * Selects a list of proxies for the given {@link URI} by delegating to the current
     * {@link ProxySelector} or the next available {@link ProxySelector} in the list if the current
     * one returns an empty proxy list. If an {@link Exception} occurs, it will be caught
     * and ignored, and the next {@link ProxySelector} will be tried.
     *
     * @param uri the {@link URI} to select a proxy for.
     * @return a list of proxies for the given {@link URI}.
     */
    @Override
    public List<Proxy> select(final URI uri) {
        List<Proxy> result = Collections.emptyList();
        ProxySelector selector;

        for (int i = 0; i < selectors.size(); i++) {
            selector = nextSelector();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Selecting next proxy selector for URI {}: {}", uri, selector);
            }

            try {
                currentSelector.set(selector);
                result = currentSelector.get().select(uri);
                if (!result.isEmpty()) {
                    break;
                }
            } catch (final Exception e) {
                // ignore and try the next selector
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Exception caught while selecting proxy for URI {}: {}", uri, e.getMessage());
                }
            } finally {
                currentSelector.remove();
            }
        }
        return result;
    }

    /**
     * Notifies the active {@link ProxySelector} of a connection failure. This method
     * retrieves the current {@link ProxySelector} from the {@link ThreadLocal} variable and
     * delegates the handling of the connection failure to the underlying
     * ProxySelector's connectFailed() method. After handling the connection
     * failure, the current ProxySelector is removed from the {@link ThreadLocal} variable.
     *
     * @param uri the {@link URI} that failed to connect.
     * @param sa  the {@link SocketAddress} of the proxy that failed to connect.
     * @param ioe the {@link IOException} that resulted from the failed connection.
     */
    @Override
    public void connectFailed(final URI uri, final SocketAddress sa, final IOException ioe) {
        final ProxySelector selector = currentSelector.get();
        if (selector != null) {
            selector.connectFailed(uri, sa, ioe);
            currentSelector.remove();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Removed the current ProxySelector for URI {}: {}", uri, selector);
            }
        }
    }

    /**
     * Retrieves the next available {@link ProxySelector} in the list of selectors,
     * incrementing the shared index atomically to ensure proper distribution
     * across different threads.
     *
     * @return the next {@link ProxySelector} in the list.
     */
    private ProxySelector nextSelector() {
        final int nextIndex = sharedIndex.getAndUpdate(i -> (i + 1) % selectors.size());
        return selectors.get(nextIndex);
    }
}