RemoteInfinispanClusterProviderFactory.java

/*
 * Copyright 2024 Red Hat, Inc. and/or its affiliates
 * and other contributors as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.keycloak.cluster.infinispan.remote;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.util.ByRef;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.cluster.infinispan.InfinispanClusterProvider;
import org.keycloak.cluster.infinispan.LockEntry;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.EnvironmentDependentProviderFactory;

import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.WORK_CACHE_NAME;

public class RemoteInfinispanClusterProviderFactory implements ClusterProviderFactory, RemoteInfinispanClusterProvider.SharedData, EnvironmentDependentProviderFactory {

    private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());

    private volatile RemoteCache<String, LockEntry> workCache;
    private volatile int clusterStartupTime;
    private volatile RemoteInfinispanNotificationManager notificationManager;
    private volatile Executor executor;

    @Override
    public ClusterProvider create(KeycloakSession session) {
        if (workCache == null) {
            // Keycloak does not ensure postInit() is invoked before create()
            lazyInit(session);
        }
        assert workCache != null;
        assert notificationManager != null;
        assert executor != null;
        return new RemoteInfinispanClusterProvider(this);
    }

    @Override
    public void init(Config.Scope config) {

    }

    @Override
    public void postInit(KeycloakSessionFactory factory) {
        try (var session = factory.create()) {
            lazyInit(session);
        }
    }

    @Override
    public synchronized void close() {
        logger.debug("Closing provider");
        if (notificationManager != null) {
            notificationManager.removeClientListener();
            notificationManager = null;
        }
        // executor is managed by Infinispan, do not shutdown.
        executor = null;
        workCache = null;
    }

    @Override
    public String getId() {
        return InfinispanUtils.REMOTE_PROVIDER_ID;
    }

    @Override
    public boolean isSupported(Config.Scope config) {
        return InfinispanUtils.isRemoteInfinispan();
    }

    private synchronized void lazyInit(KeycloakSession session) {
        if (workCache != null) {
            return;
        }
        var provider = session.getProvider(InfinispanConnectionProvider.class);
        executor = provider.getExecutor("cluster-provider");
        clusterStartupTime = initClusterStartupTime(provider.getRemoteCache(WORK_CACHE_NAME), (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000));
        notificationManager = new RemoteInfinispanNotificationManager(executor, provider.getRemoteCache(WORK_CACHE_NAME), provider.getTopologyInfo());
        notificationManager.addClientListener();
        workCache = provider.getRemoteCache(WORK_CACHE_NAME);

        logger.debugf("Provider initialized. Cluster startup time: %s", Time.toDate(clusterStartupTime));
    }

    private static int initClusterStartupTime(RemoteCache<String, Integer> cache, int serverStartupTime) {
        Integer clusterStartupTime = putIfAbsentWithRetries(cache, InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartupTime, -1);
        return clusterStartupTime == null ? serverStartupTime : clusterStartupTime;
    }

    static <V> V putIfAbsentWithRetries(RemoteCache<String, V> workCache, String key, V value, int taskTimeoutInSeconds) {
        ByRef<V> ref = new ByRef<>(null);

        Retry.executeWithBackoff((int iteration) -> {
            try {
                if (taskTimeoutInSeconds > 0) {
                    ref.set(workCache.putIfAbsent(key, value, taskTimeoutInSeconds, TimeUnit.SECONDS));
                } else {
                    ref.set(workCache.putIfAbsent(key, value));
                }
            } catch (HotRodClientException re) {
                logger.warnf(re, "Failed to write key '%s' and value '%s' in iteration '%d' . Retrying", key, value, iteration);

                // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
                throw re;
            }

        }, 10, 10);

        return ref.get();
    }

    @Override
    public int clusterStartupTime() {
        return clusterStartupTime;
    }

    @Override
    public RemoteCache<String, LockEntry> cache() {
        return workCache;
    }

    @Override
    public RemoteInfinispanNotificationManager notificationManager() {
        return notificationManager;
    }

    @Override
    public Executor executor() {
        return executor;
    }
}