PersistentUserSessionProvider.java

/*
 * Copyright 2016 Red Hat, Inc. and/or its affiliates
 * and other contributors as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.keycloak.models.sessions.infinispan;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.reactivex.rxjava3.core.Flowable;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.util.ByRef;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.persistence.manager.PersistenceManager;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.Profile;
import org.keycloak.common.Profile.Feature;
import org.keycloak.common.util.MultiSiteUtils;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.InfinispanUtil;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.ModelException;
import org.keycloak.models.OfflineUserSessionModel;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserProvider;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.light.LightweightUserAdapter;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.JpaChangesPerformer;
import org.keycloak.models.sessions.infinispan.changes.MergedUpdate;
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdatesList;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
import org.keycloak.models.sessions.infinispan.changes.UserSessionPersistentChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent;
import org.keycloak.models.sessions.infinispan.events.RemoveUserSessionsEvent;
import org.keycloak.models.sessions.infinispan.events.SessionEventsSenderTransaction;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.sessions.infinispan.util.FuturesHelper;
import org.keycloak.models.sessions.infinispan.util.InfinispanKeyGenerator;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.UserModelDelegate;

import static org.keycloak.models.Constants.SESSION_NOTE_LIGHTWEIGHT_USER;
import static org.keycloak.utils.StreamsUtil.paginatedStream;

/**
 * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
 */
public class PersistentUserSessionProvider implements UserSessionProvider, SessionRefreshStore {

    private static final Logger log = Logger.getLogger(PersistentUserSessionProvider.class);

    protected final KeycloakSession session;

    protected final Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache;
    protected final Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache;
    protected final Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache;
    protected final Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache;

    protected final UserSessionPersistentChangelogBasedTransaction sessionTx;
    protected final ClientSessionPersistentChangelogBasedTransaction clientSessionTx;

    protected final SessionEventsSenderTransaction clusterEventsSenderTx;

    protected final CrossDCLastSessionRefreshStore lastSessionRefreshStore;
    protected final CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;

    protected final InfinispanKeyGenerator keyGenerator;

    public PersistentUserSessionProvider(KeycloakSession session,
                                         RemoteCacheInvoker remoteCacheInvoker,
                                         CrossDCLastSessionRefreshStore lastSessionRefreshStore,
                                         CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore,
                                         InfinispanKeyGenerator keyGenerator,
                                         Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache,
                                         Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
                                         Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
                                         Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
                                         ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate,
                                         SerializeExecutionsByKey<String> serializerSession,
                                         SerializeExecutionsByKey<String> serializerOfflineSession,
                                         SerializeExecutionsByKey<UUID> serializerClientSession,
                                         SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
        if (!MultiSiteUtils.isPersistentSessionsEnabled()) {
            throw new IllegalStateException("Persistent user sessions are not enabled");
        }

        this.session = session;

        this.sessionCache = sessionCache;
        this.clientSessionCache = clientSessionCache;
        this.offlineSessionCache = offlineSessionCache;
        this.offlineClientSessionCache = offlineClientSessionCache;

        this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session,
                sessionCache, offlineSessionCache,
                remoteCacheInvoker,
                SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs,
                SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs,
                asyncQueuePersistentUpdate,
                serializerSession,
                serializerOfflineSession);

        this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session,
                clientSessionCache, offlineClientSessionCache,
                remoteCacheInvoker,
                SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs,
                SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs,
                sessionTx,
                asyncQueuePersistentUpdate,
                serializerClientSession,
                serializerOfflineClientSession);

        this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);

        this.lastSessionRefreshStore = lastSessionRefreshStore;
        this.offlineLastSessionRefreshStore = offlineLastSessionRefreshStore;
        this.keyGenerator = keyGenerator;

        session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
        session.getTransactionManager().enlistAfterCompletion(sessionTx);
        session.getTransactionManager().enlistAfterCompletion(clientSessionTx);
    }

    protected Cache<String, SessionEntityWrapper<UserSessionEntity>> getCache(boolean offline) {
        return offline ? offlineSessionCache : sessionCache;
    }

    protected Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> getClientSessionCache(boolean offline) {
        return offline ? offlineClientSessionCache : clientSessionCache;
    }

    @Override
    public CrossDCLastSessionRefreshStore getLastSessionRefreshStore() {
        return lastSessionRefreshStore;
    }

    @Override
    public CrossDCLastSessionRefreshStore getOfflineLastSessionRefreshStore() {
        return offlineLastSessionRefreshStore;
    }

    @Override
    public PersisterLastSessionRefreshStore getPersisterLastSessionRefreshStore() {
        throw new IllegalStateException("PersisterLastSessionRefreshStore is not supported in PersistentUserSessionProvider");
    }

    @Override
    public KeycloakSession getKeycloakSession() {
        return session;
    }

    @Override
    public AuthenticatedClientSessionModel createClientSession(RealmModel realm, ClientModel client, UserSessionModel userSession) {
        final UUID clientSessionId = PersistentUserSessionProvider.createClientSessionUUID(userSession.getId(), client.getId());
        AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
        entity.setRealmId(realm.getId());
        entity.setClientId(client.getId());
        entity.setUserSessionId(userSession.getId());
        entity.setTimestamp(Time.currentTime());
        entity.getNotes().put(AuthenticatedClientSessionModel.STARTED_AT_NOTE, String.valueOf(entity.getTimestamp()));
        entity.getNotes().put(AuthenticatedClientSessionModel.USER_SESSION_STARTED_AT_NOTE, String.valueOf(userSession.getStarted()));
        if (userSession.isRememberMe()) {
            entity.getNotes().put(AuthenticatedClientSessionModel.USER_SESSION_REMEMBER_ME_NOTE, "true");
        }

        AuthenticatedClientSessionAdapter adapter = new AuthenticatedClientSessionAdapter(session, this, entity, client, userSession, clientSessionTx, false);

        if (userSession.isOffline()) {
            // If this is an offline session, and the referred online session doesn't exist anymore, don't register the client session in the transaction.
            // Instead keep it transient and it will be added to the offline session only afterward. This is expected by SessionTimeoutsTest.testOfflineUserClientIdleTimeoutSmallerThanSessionOneRefresh.
            if (sessionTx.get(realm, userSession.getId(), userSession, false) == null) {
                return adapter;
            }
        }

        // For now, the clientSession is considered transient in case that userSession was transient
        UserSessionModel.SessionPersistenceState persistenceState = userSession.getPersistenceState() != null ?
                userSession.getPersistenceState() : UserSessionModel.SessionPersistenceState.PERSISTENT;

        SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
        clientSessionTx.addTask(clientSessionId, createClientSessionTask, entity, persistenceState);

        SessionUpdateTask<UserSessionEntity> registerClientSessionTask = new ClientSessionPersistentChangelogBasedTransaction.RegisterClientSessionTask(client.getId(), clientSessionId, userSession.isOffline());
        sessionTx.addTask(userSession.getId(), registerClientSessionTask);

        return adapter;
    }

    @Override
    public UserSessionModel createUserSession(String id, RealmModel realm, UserModel user, String loginUsername, String ipAddress,
                                              String authMethod, boolean rememberMe, String brokerSessionId, String brokerUserId, UserSessionModel.SessionPersistenceState persistenceState) {
        if (id == null) {
            id = keyGenerator.generateKeyString(session, sessionCache);
        }

        UserSessionEntity entity = new UserSessionEntity(id);
        updateSessionEntity(entity, realm, user, loginUsername, ipAddress, authMethod, rememberMe, brokerSessionId, brokerUserId);

        SessionUpdateTask<UserSessionEntity> createSessionTask = Tasks.addIfAbsentSync();
        sessionTx.addTask(id, createSessionTask, entity, persistenceState);

        UserSessionAdapter adapter = user instanceof LightweightUserAdapter
          ? wrap(realm, entity, false, user)
          : wrap(realm, entity, false);
        adapter.setPersistenceState(persistenceState);
        return adapter;
    }

    void updateSessionEntity(UserSessionEntity entity, RealmModel realm, UserModel user, String loginUsername, String ipAddress, String authMethod, boolean rememberMe, String brokerSessionId, String brokerUserId) {
        entity.setRealmId(realm.getId());
        entity.setUser(user.getId());
        entity.setLoginUsername(loginUsername);
        entity.setIpAddress(ipAddress);
        entity.setAuthMethod(authMethod);
        entity.setRememberMe(rememberMe);
        entity.setBrokerSessionId(brokerSessionId);
        entity.setBrokerUserId(brokerUserId);

        int currentTime = Time.currentTime();

        entity.setStarted(currentTime);
        entity.setLastSessionRefresh(currentTime);
    }

    @Override
    public UserSessionModel getUserSession(RealmModel realm, String id) {
        return getUserSession(realm, id, null, false);
    }

    private UserSessionAdapter getUserSession(RealmModel realm, String id, UserSessionModel userSession, boolean offline) {
        SessionEntityWrapper<UserSessionEntity> entityWrapper = sessionTx.get(realm, id, userSession, offline);
        return entityWrapper != null ? wrap(realm, entityWrapper.getEntity(), offline) : null;
    }

    private UserSessionEntity getUserSessionEntity(RealmModel realm, String id, boolean offline) {
        SessionEntityWrapper<UserSessionEntity> entityWrapper = sessionTx.get(realm, id, null, offline);
        return entityWrapper != null ? entityWrapper.getEntity() : null;
    }

    private Stream<UserSessionModel> getUserSessionsFromPersistenceProviderStream(RealmModel realm, UserModel user) {
        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
        return persister.loadUserSessionsStream(realm, user, true, 0, null)
                .map(persistentUserSession -> (UserSessionModel) getUserSession(realm, persistentUserSession.getId(), persistentUserSession, true))
                .filter(Objects::nonNull);
    }


    protected Stream<UserSessionModel> getUserSessionsStream(RealmModel realm, UserSessionPredicate predicate, boolean offline) {
        // fetch the offline user-sessions from the persistence provider
        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
        if (predicate.getUserId() != null) {
            UserModel user;
            if (LightweightUserAdapter.isLightweightUser(predicate.getUserId())) {
              user = new UserModelDelegate(null) {
                  @Override
                  public String getId() {
                      return predicate.getUserId();
                  }
              };
            } else {
              user = session.users().getUserById(realm, predicate.getUserId());
            }
            if (user != null) {
                return persister.loadUserSessionsStream(realm, user, offline, 0, null)
                        .filter(predicate.toModelPredicate())
                        .map(s -> (UserSessionModel) getUserSession(realm, s.getId(), s, offline))
                        .filter(Objects::nonNull);
            } else {
                return Stream.empty();
            }
        }

        if (predicate.getBrokerUserId() != null) {
            int split = predicate.getBrokerUserId().indexOf('.');

            Map<String, String> attributes = new HashMap<>();
            attributes.put(UserModel.IDP_ALIAS, predicate.getBrokerUserId().substring(0, split));
            attributes.put(UserModel.IDP_USER_ID, predicate.getBrokerUserId().substring(split + 1));

            UserProvider userProvider = session.getProvider(UserProvider.class);
            UserModel userModel = userProvider.searchForUserStream(realm, attributes, 0, null).findFirst().orElse(null);
            return userModel != null ?
                    persister.loadUserSessionsStream(realm, userModel, offline, 0, null)
                            .filter(predicate.toModelPredicate())
                            .map(s -> (UserSessionModel) getUserSession(realm, s.getId(), s, offline))
                            .filter(Objects::nonNull) :
                    Stream.empty();
        }

        if (predicate.getClient() != null) {
            ClientModel client = session.clients().getClientById(realm, predicate.getClient());
            return persister.loadUserSessionsStream(realm, client, offline, 0, null)
                    .filter(predicate.toModelPredicate())
                    .map(s -> (UserSessionModel) getUserSession(realm, s.getId(), s, offline))
                    .filter(Objects::nonNull);
        }

        if (predicate.getBrokerSessionId() != null && !offline) {
            // we haven't yet migrated the old offline entries, so they don't have a brokerSessionId yet
            return Stream.of(persister.loadUserSessionsStreamByBrokerSessionId(realm, predicate.getBrokerSessionId(), false))
                    .filter(predicate.toModelPredicate())
                    .map(s -> (UserSessionModel) getUserSession(realm, s.getId(), s, false))
                    .filter(Objects::nonNull);
        }

        throw new ModelException("For offline sessions, only lookup by userId, brokerUserId and client is supported");
    }

    @Override
    public AuthenticatedClientSessionAdapter getClientSession(UserSessionModel userSession, ClientModel client, String clientSessionId, boolean offline) {
        if (clientSessionId == null) {
            return null;
        }

        UUID clientSessionUUID = UUID.fromString(clientSessionId);

        SessionEntityWrapper<AuthenticatedClientSessionEntity> clientSessionEntity = clientSessionTx.get(client.getRealm(), client, userSession, clientSessionUUID, offline);
        if (clientSessionEntity != null) {
            return new AuthenticatedClientSessionAdapter(session, this, clientSessionEntity.getEntity(), client, userSession, clientSessionTx, offline);
        }

        return null;
    }

    @Override
    public Stream<UserSessionModel> getUserSessionsStream(final RealmModel realm, UserModel user) {
        return getUserSessionsStream(realm, UserSessionPredicate.create(realm.getId()).user(user.getId()), false);
    }

    @Override
    public Stream<UserSessionModel> getUserSessionByBrokerUserIdStream(RealmModel realm, String brokerUserId) {
        return getUserSessionsStream(realm, UserSessionPredicate.create(realm.getId()).brokerUserId(brokerUserId), false);
    }

    @Override
    public UserSessionModel getUserSessionByBrokerSessionId(RealmModel realm, String brokerSessionId) {
        // TODO: consider returning a list as it is not guaranteed to be unique, and might be active for different users
        return this.getUserSessionsStream(realm, UserSessionPredicate.create(realm.getId()).brokerSessionId(brokerSessionId), false)
                .findFirst().orElse(null);
    }

    @Override
    public Stream<UserSessionModel> getUserSessionsStream(RealmModel realm, ClientModel client) {
        return getUserSessionsStream(realm, client, -1, -1);
    }

    @Override
    public Stream<UserSessionModel> getUserSessionsStream(RealmModel realm, ClientModel client, Integer firstResult, Integer maxResults) {
        return getUserSessionsStream(realm, client, firstResult, maxResults, false);
    }

    protected Stream<UserSessionModel> getUserSessionsStream(final RealmModel realm, ClientModel client, Integer firstResult, Integer maxResults, final boolean offline) {
        UserSessionPredicate predicate = UserSessionPredicate.create(realm.getId()).client(client.getId());

        return paginatedStream(getUserSessionsStream(realm, predicate, offline), firstResult, maxResults);
    }

    @Override
    public UserSessionModel getUserSessionWithPredicate(RealmModel realm, String id, boolean offline, Predicate<UserSessionModel> predicate) {
        UserSessionModel userSession = getUserSession(realm, id, null, offline);
        if (userSession == null) {
            return null;
        }

        // We have userSession, which passes predicate. No need for remote lookup.
        if (predicate.test(userSession)) {
            log.debugf("getUserSessionWithPredicate(%s): found in local cache", id);
            return userSession;
        }

        // The logic to remove the local entry if there is no entry in the remote cache that is present in the InfinispanUserSessionProvider is removed here,
        // as with persistent sessions we will have only a subset of all sessions in memory (both locally and in the remote store).
        return null;
    }


    @Override
    public long getActiveUserSessions(RealmModel realm, ClientModel client) {
        return getUserSessionsCount(realm, client, false);
    }

    @Override
    public Map<String, Long> getActiveClientSessionStats(RealmModel realm, boolean offline) {
        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
        return persister.getUserSessionsCountsByClients(realm, offline);
    }

    protected long getUserSessionsCount(RealmModel realm, ClientModel client, boolean offline) {
        // fetch the actual offline user session count from the database
        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
        return persister.getUserSessionsCount(realm, client, offline);
    }

    @Override
    public void removeUserSession(RealmModel realm, UserSessionModel session) {
        UserSessionEntity entity = getUserSessionEntity(realm, session, false);
        if (entity != null) {
            removeUserSession(entity, false);
        }
    }

    @Override
    public void removeUserSessions(RealmModel realm, UserModel user) {
        removeUserSessions(realm, user, false);
    }

    protected void removeUserSessions(RealmModel realm, UserModel user, boolean offline) {
        UserSessionPredicate.create(realm.getId()).user(user.getId());
        getUserSessionsStream(realm, UserSessionPredicate.create(realm.getId()).user(user.getId()), offline)
                .forEach(s -> removeUserSession(realm, s));
    }

    public void removeAllExpired() {
        // Rely on expiration of cache entries provided by infinispan. Just expire entries from persister is needed
        // TODO: Avoid iteration over all realms here (Details in the KEYCLOAK-16802)
        session.realms().getRealmsStream().forEach(this::removeExpired);

    }

    @Override
    public void removeExpired(RealmModel realm) {
        // Rely on expiration of cache entries provided by infinispan. Nothing needed here besides calling persister
        session.getProvider(UserSessionPersisterProvider.class).removeExpired(realm);
    }

    @Override
    public void removeUserSessions(RealmModel realm) {
        // Send message to all DCs as each site might have different entries in the cache
        clusterEventsSenderTx.addEvent(
                RemoveUserSessionsEvent.createEvent(RemoveUserSessionsEvent.class, InfinispanUserSessionProviderFactory.REMOVE_USER_SESSIONS_EVENT, session, realm.getId(), true),
                ClusterProvider.DCNotify.ALL_DCS);

        session.getProvider(UserSessionPersisterProvider.class).removeUserSessions(realm, false);
    }

    protected void onRemoveUserSessionsEvent(String realmId) {
        removeLocalUserSessions(realmId, false);
        removeLocalUserSessions(realmId, true);
    }

    // public for usage in the testsuite
    public void removeLocalUserSessions(String realmId, boolean offline) {
        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
        Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(cache);
        Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache = getClientSessionCache(offline);
        Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> localClientSessionCache = CacheDecorators.localCache(clientSessionCache);

        Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoadersIfRemoteStoreIsEnabled(localCache);

        final AtomicInteger userSessionsSize = new AtomicInteger();

        removeEntriesByRealm(realmId, localCacheStoreIgnore, userSessionsSize, localCache, localClientSessionCache);

        // TODO: This now runs on each node on each site. Ideally it should run only once on each site.
        removeEntriesByRealmRemote(realmId, InfinispanUtil.getRemoteCache(getCache(offline)), userSessionsSize, InfinispanUtil.getRemoteCache(getClientSessionCache(offline)));

        log.debugf("Removed %d sessions in realm %s. Offline: %b", (Object) userSessionsSize.get(), realmId, offline);
    }

    private static void removeEntriesByRealm(String realmId, Cache<String, SessionEntityWrapper<UserSessionEntity>> sessions, AtomicInteger userSessionsSize, Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessions) {
        FuturesHelper futures = new FuturesHelper();

        sessions
                .entrySet()
                .stream()
                .filter(SessionWrapperPredicate.create(realmId))
                .map(Mappers.userSessionEntity())
                .forEach((Consumer<UserSessionEntity>) userSessionEntity -> {
                    userSessionsSize.incrementAndGet();

                    // Remove session from remoteCache too. Use removeAsync for better perf
                    Future<SessionEntityWrapper<UserSessionEntity>> future = localCache.removeAsync(userSessionEntity.getId());
                    futures.addTask(future);
                    userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> {
                        Future<SessionEntityWrapper<AuthenticatedClientSessionEntity>> f = clientSessions.removeAsync(clientSessionId);
                        futures.addTask(f);
                    });
                });

        futures.waitForAllToFinish();
    }

    private static void removeEntriesByRealmRemote(String realmId, RemoteCache<String, SessionEntityWrapper<UserSessionEntity>> sessions, AtomicInteger userSessionsSize, RemoteCache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessions) {
        if (sessions == null) {
            return;
        }

        FuturesHelper futures = new FuturesHelper();

        sessions
                .entrySet()
                .stream()
                .filter(UserSessionPredicate.create(realmId))
                .map(Mappers.userSessionEntity())
                .forEach((Consumer<UserSessionEntity>) userSessionEntity -> {
                    userSessionsSize.incrementAndGet();

                    Future<SessionEntityWrapper<UserSessionEntity>> future = sessions.withFlags(org.infinispan.client.hotrod.Flag.SKIP_LISTENER_NOTIFICATION).removeAsync(userSessionEntity.getId());
                    futures.addTask(future);
                    if (clientSessions != null) {
                        userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> {
                            Future<SessionEntityWrapper<AuthenticatedClientSessionEntity>> f = clientSessions.withFlags(org.infinispan.client.hotrod.Flag.SKIP_LISTENER_NOTIFICATION).removeAsync(clientSessionId);
                            futures.addTask(f);
                        });
                    }
                });

        futures.waitForAllToFinish();
    }

    @Override
    public void onRealmRemoved(RealmModel realm) {
        // Send message to all DCs, as each DC might have different entries in their site cache
        clusterEventsSenderTx.addEvent(
                RealmRemovedSessionEvent.createEvent(RealmRemovedSessionEvent.class, InfinispanUserSessionProviderFactory.REALM_REMOVED_SESSION_EVENT, session, realm.getId(), true),
                ClusterProvider.DCNotify.ALL_DCS);

        UserSessionPersisterProvider sessionsPersister = session.getProvider(UserSessionPersisterProvider.class);
        if (sessionsPersister != null) {
            sessionsPersister.onRealmRemoved(realm);
        }
    }

    protected void onRealmRemovedEvent(String realmId) {
        removeLocalUserSessions(realmId, true);
        removeLocalUserSessions(realmId, false);
    }

    @Override
    public void onClientRemoved(RealmModel realm, ClientModel client) {
//        clusterEventsSenderTx.addEvent(
//                ClientRemovedSessionEvent.createEvent(ClientRemovedSessionEvent.class, InfinispanUserSessionProviderFactory.CLIENT_REMOVED_SESSION_EVENT, session, realm.getId(), true),
//                ClusterProvider.DCNotify.LOCAL_DC_ONLY);
        UserSessionPersisterProvider sessionsPersister = session.getProvider(UserSessionPersisterProvider.class);
        if (sessionsPersister != null) {
            sessionsPersister.onClientRemoved(realm, client);
        }
    }

    protected void onUserRemoved(RealmModel realm, UserModel user) {
        removeUserSessions(realm, user, true);
        removeUserSessions(realm, user, false);

        UserSessionPersisterProvider persisterProvider = session.getProvider(UserSessionPersisterProvider.class);
        if (persisterProvider != null) {
            persisterProvider.onUserRemoved(realm, user);
        }
    }

    @Override
    public void close() {
    }

    @Override
    public int getStartupTime(RealmModel realm) {
        // TODO: take realm.getNotBefore() into account?
        return session.getProvider(ClusterProvider.class).getClusterStartupTime();
    }

    protected void removeUserSession(UserSessionEntity sessionEntity, boolean offline) {
        sessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> clientSessionTx.addTask(clientSessionId, Tasks.removeSync(offline)));
        SessionUpdateTask<UserSessionEntity> removeTask = Tasks.removeSync(offline);
        sessionTx.addTask(sessionEntity.getId(), removeTask);
    }

    UserSessionAdapter wrap(RealmModel realm, UserSessionEntity entity, boolean offline, UserModel user) {
        if (entity == null) {
            return null;
        }

        return new UserSessionAdapter(session, user, this, sessionTx, clientSessionTx, realm, entity, offline);
    }

    UserSessionAdapter wrap(RealmModel realm, UserSessionEntity entity, boolean offline) {
        UserModel user;
        if (Profile.isFeatureEnabled(Feature.TRANSIENT_USERS) && entity.getNotes().containsKey(SESSION_NOTE_LIGHTWEIGHT_USER)) {
            LightweightUserAdapter lua = LightweightUserAdapter.fromString(session, realm, entity.getNotes().get(SESSION_NOTE_LIGHTWEIGHT_USER));
            final UserSessionAdapter us = wrap(realm, entity, offline, lua);
            lua.setUpdateHandler(lua1 -> {
                if (lua == lua1) {  // Ensure there is no conflicting user model, only the latest lightweight user can be used
                    us.setNote(SESSION_NOTE_LIGHTWEIGHT_USER, lua1.serialize());
                }
            });
            return us;
        }

        user = session.users().getUserById(realm, entity.getUser());

        if (user == null) {
            return null;
        }

        return wrap(realm, entity, offline, user);
    }

    UserSessionEntity getUserSessionEntity(RealmModel realm, UserSessionModel userSession, boolean offline) {
        if (userSession instanceof UserSessionAdapter) {
            if (!userSession.getRealm().equals(realm)) {
                return null;
            }
            return ((UserSessionAdapter) userSession).getEntity();
        } else {
            return getUserSessionEntity(realm, userSession.getId(), offline);
        }
    }


    @Override
    public UserSessionModel createOfflineUserSession(UserSessionModel userSession) {
        UserSessionEntity entity = createUserSessionEntityInstance(userSession);
        entity.setOffline(true);

        SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
        sessionTx.addTask(userSession.getId(), importTask, entity, UserSessionModel.SessionPersistenceState.PERSISTENT);

        UserSessionAdapter offlineUserSession = wrap(userSession.getRealm(), entity, true);

        // started and lastSessionRefresh set to current time
        int currentTime = Time.currentTime();
        offlineUserSession.getEntity().setStarted(currentTime);
        offlineUserSession.getEntity().setLastSessionRefresh(currentTime);

        return offlineUserSession;
    }

    @Override
    public UserSessionAdapter getOfflineUserSession(RealmModel realm, String userSessionId) {
        return getUserSession(realm, userSessionId, null, true);
    }

    @Override
    public Stream<UserSessionModel> getOfflineUserSessionByBrokerUserIdStream(RealmModel realm, String brokerUserId) {
        return getUserSessionsStream(realm, UserSessionPredicate.create(realm.getId()).brokerUserId(brokerUserId), true);
    }

    @Override
    public void removeOfflineUserSession(RealmModel realm, UserSessionModel userSession) {
        UserSessionEntity userSessionEntity = getUserSessionEntity(realm, userSession, true);
        if (userSessionEntity != null) {
            removeUserSession(userSessionEntity, true);
        }
    }

    @Override
    public AuthenticatedClientSessionModel createOfflineClientSession(AuthenticatedClientSessionModel clientSession, UserSessionModel offlineUserSession) {
        UserSessionAdapter userSessionAdapter = (offlineUserSession instanceof UserSessionAdapter) ? (UserSessionAdapter) offlineUserSession :
                getOfflineUserSession(offlineUserSession.getRealm(), offlineUserSession.getId());

        AuthenticatedClientSessionAdapter offlineClientSession = importOfflineClientSession(userSessionAdapter, clientSession);

        // update timestamp to current time
        offlineClientSession.setTimestamp(Time.currentTime());
        offlineClientSession.getNotes().put(AuthenticatedClientSessionModel.STARTED_AT_NOTE, String.valueOf(offlineClientSession.getTimestamp()));
        offlineClientSession.getNotes().put(AuthenticatedClientSessionModel.USER_SESSION_STARTED_AT_NOTE, String.valueOf(offlineUserSession.getStarted()));

        return offlineClientSession;
    }

    @Override
    public Stream<UserSessionModel> getOfflineUserSessionsStream(RealmModel realm, UserModel user) {
        return getUserSessionsFromPersistenceProviderStream(realm, user);
    }

    @Override
    public long getOfflineSessionsCount(RealmModel realm, ClientModel client) {
        return getUserSessionsCount(realm, client, true);
    }

    @Override
    public Stream<UserSessionModel> getOfflineUserSessionsStream(RealmModel realm, ClientModel client, Integer first, Integer max) {
        return getUserSessionsStream(realm, client, first, max, true);
    }

    @Override
    public void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline) {
        if (persistentUserSessions == null || persistentUserSessions.isEmpty()) {
            return;
        }
        persistentUserSessions.forEach(userSessionModel -> importUserSession(userSessionModel, offline));
    }

    public SessionEntityWrapper<UserSessionEntity> importUserSession(UserSessionModel persistentUserSession, boolean offline) {
        Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsById = new HashMap<>();

        UserSessionEntity userSessionEntityToImport = createUserSessionEntityInstance(persistentUserSession);

        for (Map.Entry<String, AuthenticatedClientSessionModel> entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) {
            String clientUUID = entry.getKey();
            AuthenticatedClientSessionModel clientSession = entry.getValue();
            AuthenticatedClientSessionEntity clientSessionToImport = createAuthenticatedClientSessionInstance(userSessionEntityToImport.getId(), clientSession,
                    userSessionEntityToImport.getRealmId(), clientUUID, offline);
            clientSessionToImport.setUserSessionId(userSessionEntityToImport.getId());

            // Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
            clientSessionToImport.setTimestamp(userSessionEntityToImport.getLastSessionRefresh());

            clientSessionsById.put(clientSessionToImport.getId(), new SessionEntityWrapper<>(clientSessionToImport));

            // Update userSession entity with the clientSession
            AuthenticatedClientSessionStore clientSessions = userSessionEntityToImport.getAuthenticatedClientSessions();
            clientSessions.put(clientUUID, clientSessionToImport.getId());
        }

        SessionEntityWrapper<UserSessionEntity>  wrappedUserSessionEntity = new SessionEntityWrapper<>(userSessionEntityToImport);

        Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsById =
                    Stream.of(wrappedUserSessionEntity).collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));

        // Directly put all entities to the infinispan cache
        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = CacheDecorators.skipCacheLoadersIfRemoteStoreIsEnabled(getCache(offline));

        sessionsById = importSessionsWithExpiration(sessionsById, cache,
                offline ? SessionTimeouts::getOfflineSessionLifespanMs : SessionTimeouts::getUserSessionLifespanMs,
                offline ? SessionTimeouts::getOfflineSessionMaxIdleMs : SessionTimeouts::getUserSessionMaxIdleMs);

        if (sessionsById.isEmpty()) {
            return null;
        }

        // put all entities to the remoteCache (if exists)
        RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
        if (remoteCache != null) {
            Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsByIdForTransport = Stream.of(wrappedUserSessionEntity)
                    .map(SessionEntityWrapper::forTransport)
                    .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));

            importSessionsWithExpiration(sessionsByIdForTransport, remoteCache,
                    offline ? SessionTimeouts::getOfflineSessionLifespanMs : SessionTimeouts::getUserSessionLifespanMs,
                    offline ? SessionTimeouts::getOfflineSessionMaxIdleMs : SessionTimeouts::getUserSessionMaxIdleMs);
        }

        // Import client sessions
        Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessCache =
                CacheDecorators.skipCacheLoadersIfRemoteStoreIsEnabled(offline ? offlineClientSessionCache : clientSessionCache);

        importSessionsWithExpiration(clientSessionsById, clientSessCache,
                offline ? SessionTimeouts::getOfflineClientSessionLifespanMs : SessionTimeouts::getClientSessionLifespanMs,
                offline ? SessionTimeouts::getOfflineClientSessionMaxIdleMs : SessionTimeouts::getClientSessionMaxIdleMs);

        // put all entities to the remoteCache (if exists)
        RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache);
        if (remoteCacheClientSessions != null) {
            Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> sessionsByIdForTransport = clientSessionsById.values().stream()
                    .map(SessionEntityWrapper::forTransport)
                    .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));

            importSessionsWithExpiration(sessionsByIdForTransport, remoteCacheClientSessions,
                    offline ? SessionTimeouts::getOfflineClientSessionLifespanMs : SessionTimeouts::getClientSessionLifespanMs,
                    offline ? SessionTimeouts::getOfflineClientSessionMaxIdleMs : SessionTimeouts::getClientSessionMaxIdleMs);
        }

        return sessionsById.entrySet().stream().findFirst().map(Map.Entry::getValue).orElse(null);
    }

    private <T extends SessionEntity, K> Map<K, SessionEntityWrapper<T>> importSessionsWithExpiration(Map<K, SessionEntityWrapper<T>> sessionsById,
                                                                              BasicCache<K, SessionEntityWrapper<T>> cache, SessionFunction<T> lifespanMsCalculator,
                                                                              SessionFunction<T> maxIdleTimeMsCalculator) {
        return sessionsById.entrySet().stream().map(entry -> {

            T sessionEntity = entry.getValue().getEntity();
            RealmModel currentRealm = session.realms().getRealm(sessionEntity.getRealmId());
            ClientModel client = entry.getValue().getClientIfNeeded(currentRealm);
            long lifespan = lifespanMsCalculator.apply(currentRealm, client, sessionEntity);
            long maxIdle = maxIdleTimeMsCalculator.apply(currentRealm, client, sessionEntity);

            if (lifespan != SessionTimeouts.ENTRY_EXPIRED_FLAG
                    && maxIdle != SessionTimeouts.ENTRY_EXPIRED_FLAG) {
                if (cache instanceof RemoteCache) {
                    Retry.executeWithBackoff((int iteration) -> {

                        try {
                            cache.putIfAbsent(entry.getKey(), entry.getValue(), lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
                        } catch (HotRodClientException re) {
                            if (log.isDebugEnabled()) {
                                log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
                                        sessionsById.size(), iteration);
                            }

                            // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
                            throw re;
                        }

                    }, 10, 10);
                } else {
                    cache.putIfAbsent(entry.getKey(), entry.getValue(), lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
                }
                return entry;
            } else {
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private UserSessionEntity createUserSessionEntityInstance(UserSessionModel userSession) {
        UserSessionEntity entity = new UserSessionEntity(userSession.getId());
        entity.setRealmId(userSession.getRealm().getId());

        entity.setAuthMethod(userSession.getAuthMethod());
        entity.setBrokerSessionId(userSession.getBrokerSessionId());
        entity.setBrokerUserId(userSession.getBrokerUserId());
        entity.setIpAddress(userSession.getIpAddress());
        entity.setNotes(userSession.getNotes() == null ? new ConcurrentHashMap<>() : userSession.getNotes());
        entity.setAuthenticatedClientSessions(new AuthenticatedClientSessionStore());
        entity.setRememberMe(userSession.isRememberMe());
        entity.setState(userSession.getState());
        if (userSession instanceof OfflineUserSessionModel offlineUserSession) {
            // this is a hack so that UserModel doesn't have to be available when offline token is imported.
            // see related JIRA - KEYCLOAK-5350 and corresponding test
            entity.setUser(offlineUserSession.getUserId());
            // NOTE: Hack
            // We skip calling entity.setLoginUsername(userSession.getLoginUsername())

        } else {
            entity.setLoginUsername(userSession.getLoginUsername());
            entity.setUser(userSession.getUser().getId());
        }

        entity.setStarted(userSession.getStarted());
        entity.setLastSessionRefresh(userSession.getLastSessionRefresh());
        entity.setOffline(userSession.isOffline());

        return entity;
    }


    private AuthenticatedClientSessionAdapter importOfflineClientSession(UserSessionAdapter sessionToImportInto,
                                                                         AuthenticatedClientSessionModel clientSession) {
        AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(sessionToImportInto.getId(), clientSession,
                sessionToImportInto.getRealm().getId(), clientSession.getClient().getId(), true);
        entity.setUserSessionId(sessionToImportInto.getId());

        // Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
        entity.setTimestamp(sessionToImportInto.getLastSessionRefresh());

        final UUID clientSessionId = entity.getId();

        SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
        clientSessionTx.addTask(entity.getId(), createClientSessionTask, entity, UserSessionModel.SessionPersistenceState.PERSISTENT);

        AuthenticatedClientSessionStore clientSessions = sessionToImportInto.getEntity().getAuthenticatedClientSessions();
        clientSessions.put(clientSession.getClient().getId(), clientSessionId);

        SessionUpdateTask<UserSessionEntity> registerClientSessionTask = new ClientSessionPersistentChangelogBasedTransaction.RegisterClientSessionTask(clientSession.getClient().getId(), clientSessionId, true);
        sessionTx.addTask(sessionToImportInto.getId(), registerClientSessionTask);

        return new AuthenticatedClientSessionAdapter(session, this, entity, clientSession.getClient(), sessionToImportInto, clientSessionTx, true);
    }


    private AuthenticatedClientSessionEntity createAuthenticatedClientSessionInstance(String userSessionId, AuthenticatedClientSessionModel clientSession,
                                                                                      String realmId, String clientId, boolean offline) {
        final UUID clientSessionId = PersistentUserSessionProvider.createClientSessionUUID(userSessionId, clientId);
        AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
        entity.setRealmId(realmId);

        entity.setAction(clientSession.getAction());
        entity.setAuthMethod(clientSession.getProtocol());

        entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
        entity.setClientId(clientId);
        entity.setRedirectUri(clientSession.getRedirectUri());
        entity.setTimestamp(clientSession.getTimestamp());
        entity.setOffline(offline);

        return entity;
    }

    public SessionEntityWrapper<UserSessionEntity> wrapPersistentEntity(RealmModel realm, boolean offline, UserSessionModel persistentUserSession) {
        UserSessionEntity userSessionEntity = createUserSessionEntityInstance(persistentUserSession);

        if (isUserSessionExpired(realm, userSessionEntity, offline)) {
            return null;
        }

        sessionTx.addTask(userSessionEntity.getId(), null, userSessionEntity, UserSessionModel.SessionPersistenceState.PERSISTENT);

        for (Map.Entry<String, AuthenticatedClientSessionModel> entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) {
            String clientUUID = entry.getKey();
            AuthenticatedClientSessionEntity clientSession = createAuthenticatedClientSessionInstance(persistentUserSession.getId(), entry.getValue(),
                    userSessionEntity.getRealmId(), clientUUID, offline);
            clientSession.setUserSessionId(userSessionEntity.getId());

            // Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
            // clientSession.setTimestamp(userSessionEntity.getLastSessionRefresh());

            ClientModel client = session.clients().getClientById(realm, clientSession.getClientId());
            if (isClientSessionExpired(realm, client, clientSession, offline)) {
                continue;
            }

            // Update userSession entity with the clientSession
            AuthenticatedClientSessionStore clientSessions = userSessionEntity.getAuthenticatedClientSessions();
            clientSessions.put(clientUUID, clientSession.getId());
            clientSessionTx.addTask(clientSession.getId(), null, clientSession, UserSessionModel.SessionPersistenceState.PERSISTENT);
        }

        return sessionTx.get(userSessionEntity.getId(), offline);

    }

    private boolean isClientSessionExpired(RealmModel realm, ClientModel client, AuthenticatedClientSessionEntity entity, boolean offline) {
        SessionFunction<AuthenticatedClientSessionEntity> idleChecker = offline ? SessionTimeouts::getOfflineClientSessionMaxIdleMs : SessionTimeouts::getClientSessionMaxIdleMs;
        SessionFunction<AuthenticatedClientSessionEntity> lifetimeChecker = offline ? SessionTimeouts::getOfflineClientSessionLifespanMs : SessionTimeouts::getClientSessionLifespanMs;
        return idleChecker.apply(realm, client, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, client, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
    }

    private boolean isUserSessionExpired(RealmModel realm, UserSessionEntity entity, boolean offline) {
        SessionFunction<UserSessionEntity> idleChecker = offline ? SessionTimeouts::getOfflineSessionMaxIdleMs : SessionTimeouts::getUserSessionMaxIdleMs;
        SessionFunction<UserSessionEntity> lifetimeChecker = offline ? SessionTimeouts::getOfflineSessionLifespanMs : SessionTimeouts::getUserSessionLifespanMs;
        return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
    }

    public static UUID createClientSessionUUID(String userSessionId, String clientId) {
        // This allows creating a UUID that is constant even if the entry is reloaded from the database
        return UUID.nameUUIDFromBytes((userSessionId + clientId).getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public void migrate(String modelVersion) {
        // Changed encoding from JBoss Marshalling to ProtoStream.
        // Unable to read the cached data.
        if ("26.0.0".equals(modelVersion)) {
            log.debug("Clear caches to migrate to Infinispan Protostream");
            CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtoStream());
        }
    }

    /**
     * Copy over all sessions in Infinispan to the persistent user sessions in the database.
     * This method is public so people can use it to build their custom migrations or re-import sessions when necessary
     * in a future version of Keycloak.
     */
    public void migrateNonPersistentSessionsToPersistentSessions() {
        JpaChangesPerformer<String, UserSessionEntity> userSessionPerformer = new JpaChangesPerformer<>(sessionCache.getName(), new ArrayBlockingQueue<>(1));
        JpaChangesPerformer<UUID, AuthenticatedClientSessionEntity> clientSessionPerformer = new JpaChangesPerformer<>(clientSessionCache.getName(), new ArrayBlockingQueue<>(1));
        AtomicInteger currentBatch = new AtomicInteger(0);
        var persistence = ComponentRegistry.componentOf(sessionCache, PersistenceManager.class);
        if (persistence != null && !persistence.getStoresAsString().isEmpty()) {
            ByRef<Throwable> ref = ByRef.create(null);
            Flowable.fromPublisher(persistence.<String, SessionEntityWrapper<UserSessionEntity>>publishEntries(true, false))
                    .blockingSubscribe(e -> processEntryFromCache(e.getValue(), userSessionPerformer, clientSessionPerformer, currentBatch), ref::set);
            if (ref.get() != null) {
                throw new RuntimeException("Unable to migrate sessions", ref.get());
            }
        } else {
            // Usually we assume sessions are stored in a persistence. To be extra safe, iterate over local sessions if no persistent is available.
            sessionCache.forEach((key, value) -> processEntryFromCache(value, userSessionPerformer, clientSessionPerformer, currentBatch));
        }
        flush(userSessionPerformer, clientSessionPerformer);
        // Clear existing sessions as the IDs of the client sessions have changed.
        sessionCache.clear();
        clientSessionCache.clear();
        // Even though offline sessions haven't been migrated, they are cleared as the IDs of the client sessions have changed. It is safe to clear them as they are already stored in the database.
        offlineSessionCache.clear();
        offlineClientSessionCache.clear();
        log.infof("Migrated %d user sessions total.", currentBatch.intValue());
    }

    /**
     * When calling this, ensure that the cache doesn't contain entries for user or client sessions that are already contained in the database.
     * Such entries should first be cleared from the cache before this is being called.
     * As this is assumed to run once during the upgrade to Keycloak 25, this should be safe to assume.
     */
    private void processEntryFromCache(SessionEntityWrapper<UserSessionEntity> sessionEntityWrapper, JpaChangesPerformer<String, UserSessionEntity> userSessionPerformer, JpaChangesPerformer<UUID, AuthenticatedClientSessionEntity> clientSessionPerformer, AtomicInteger count) {
        RealmModel realm = session.realms().getRealm(sessionEntityWrapper.getEntity().getRealmId());
        if (realm == null) {
            // ignoring old and unknown realm found in the session
            return;
        }
        sessionEntityWrapper.getEntity().getAuthenticatedClientSessions().forEach((clientId, uuid) -> {
            SessionEntityWrapper<AuthenticatedClientSessionEntity> clientSession = clientSessionCache.get(uuid);
            if (clientSession != null) {
                // This is necessary because client sessions created by a KC version < 22 do not have clientId set within the entity.
                if (clientSession.getEntity().getClientId() == null) {
                    clientSession.getEntity().setClientId(clientId);
                }
                clientSession.getEntity().setUserSessionId(sessionEntityWrapper.getEntity().getId());
                MergedUpdate<AuthenticatedClientSessionEntity> merged = MergedUpdate.computeUpdate(Collections.singletonList(Tasks.addIfAbsentSync()), clientSession, 1, 1);
                clientSessionPerformer.registerChange(Map.entry(uuid, new SessionUpdatesList<>(realm, clientSession)), merged);
            }
        });
        MergedUpdate<UserSessionEntity> merged = MergedUpdate.computeUpdate(Collections.singletonList(Tasks.addIfAbsentSync()), sessionEntityWrapper, 1, 1);
        userSessionPerformer.registerChange(Map.entry(sessionEntityWrapper.getEntity().getId(), new SessionUpdatesList<>(realm, sessionEntityWrapper)), merged);
        if (count.incrementAndGet() % 100 == 0) {
            flush(userSessionPerformer, clientSessionPerformer);
        }
        if (count.intValue() % 1000 == 0) {
            log.infof("Migrated %d user sessions total, continuing...", count.intValue());
        }
    }

    private <E extends SessionEntity, K> void flush(JpaChangesPerformer<K, E> userSessionsPerformer, JpaChangesPerformer<UUID, AuthenticatedClientSessionEntity> clientSessionPerformer) {
        KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
                s -> {
                    userSessionsPerformer.applyChangesSynchronously(s);
                    clientSessionPerformer.applyChangesSynchronously(s);
                });
    }

}