QueryHelper.java

/*
 * Copyright 2024 Red Hat, Inc. and/or its affiliates
 * and other contributors as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.keycloak.models.sessions.infinispan.query;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.infinispan.client.hotrod.impl.query.RemoteQuery;
import org.infinispan.commons.api.query.Query;
import org.infinispan.query.dsl.QueryResult;

public final class QueryHelper {

    /**
     * Converts a single projection results into a long value.
     */
    public static final Function<Object[], Long> SINGLE_PROJECTION_TO_LONG = projection -> {
        assert projection.length == 1;
        return (long) projection[0];
    };

    /**
     * Converts a single projection value into a {@link String}.
     */
    public static final Function<Object[], String> SINGLE_PROJECTION_TO_STRING = projection -> {
        assert projection.length == 1;
        return String.valueOf(projection[0]);
    };

    /**
     * Converts a projection with two values into a {@link Map.Entry} of {@link String} and {@link Long}, where the key
     * is the first projection, and the second is the second project.
     */
    public static final Function<Object[], Map.Entry<String, Long>> PROJECTION_TO_STRING_LONG_ENTRY = projection -> {
        assert projection.length == 2;
        return Map.entry((String) projection[0], (long) projection[1]);
    };

    private QueryHelper() {
    }

    /**
     * Fetches a single value from the query.
     * <p>
     * This method changes the {@link Query} state to return just a single value.
     *
     * @param query   The {@link Query} instance.
     * @param mapping The {@link Function} that maps the query results (projection) into the result.
     * @param <T>     The {@link Query} response type.
     * @param <R>     The {@link Optional} type.
     * @return An {@link Optional} with the {@link Query} results mapped.
     */
    public static <T, R> Optional<R> fetchSingle(Query<T> query, Function<T, R> mapping) {
        query.hitCountAccuracy(1).maxResults(1);
        try (var iterator = query.iterator()) {
            return iterator.hasNext() ? Optional.ofNullable(mapping.apply(iterator.next())) : Optional.empty();
        }
    }

    /**
     * Streams using batching over all results from the {@link Query}.
     * <p>
     * If a large result set is expected, this method is recommended to avoid loading downloading a lot of data in a
     * single request.
     * <p>
     * The results are fetched on demand.
     * <p>
     * Warning: This method changes ignores the start offset and the max results. It will return everything.
     *
     * @param query     The {@link Query} instance.
     * @param batchSize The number of results to fetch for each remote request.
     * @param mapping   The {@link Function} that maps the query results (projection) into the result.
     * @param <T>       The {@link Query} response type.
     * @param <R>       The {@link Stream} type.
     * @return A {@link Stream} with the results.
     */
    public static <T, R> Stream<R> streamAll(Query<T> query, int batchSize, Function<T, R> mapping) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new BatchingIterator<>(query, batchSize, mapping), 0), false);
    }

    /**
     * Performs the {@link Query} and returns the results.
     * <p>
     * This method is preferred to {@link Query#list()} since it does not have to compute an accurate hit count (affects
     * Indexed query performance).
     * <p>
     * If a large dataset is expected, use {@link #streamAll(Query, int, Function)}.
     *
     * @param query   The {@link Query} instance.
     * @param mapping The {@link Function} that maps the query results (projection) into the result.
     * @param <T>     The {@link Query} response type.
     * @param <R>     The {@link Collection} type.
     * @return A {@link Collection} with the results.
     */
    public static <T, R> Collection<R> toCollection(Query<T> query, Function<T, R> mapping) {
        try (var iterator = query.iterator()) {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
                    .map(mapping)
                    .collect(Collectors.toList());
        }
    }

    // TODO to be removed. A publisher was added to the Infinispan API since version 15.1.
    private static class BatchingIterator<T, R> implements Iterator<R> {

        private final RemoteQuery<T> query;
        private final int batchSize;
        private final Function<T, R> mapping;
        private int currentOffset;
        private Iterator<T> currentResults;
        private CompletableFuture<QueryResult<T>> nextResults;
        private R next;
        private boolean completed;

        private BatchingIterator(Query<T> query, int batchSize, Function<T, R> mapping) {
            assert query instanceof RemoteQuery<T>;
            this.query = (RemoteQuery<T>) query.startOffset(0).hitCountAccuracy(batchSize).maxResults(batchSize);
            this.batchSize = batchSize;
            this.mapping = mapping;
            currentResults = Collections.emptyIterator();
            executeQueryAsync();
            fetchNext();
        }

        @Override
        public boolean hasNext() {
            return next != null;
        }

        @Override
        public R next() {
            if (next == null) {
                throw new NoSuchElementException();
            }
            var result = next;
            fetchNext();
            return result;
        }

        private void executeQueryAsync() {
            nextResults = query.executeAsync().toCompletableFuture();
        }

        private void fetchNext() {
            while (true) {
                while (currentResults.hasNext()) {
                    next = mapping.apply(currentResults.next());
                    if (next != null) {
                        return;
                    }
                }
                if (completed) {
                    next = null;
                    return;
                }
                useNextResultsAndRequestMore();
            }
        }

        private void useNextResultsAndRequestMore() {
            var rsp = nextResults.join();
            var resultList = rsp.list();
            if (resultList.isEmpty()) {
                completed = true;
                return;
            }
            currentResults = resultList.iterator();
            if (resultList.size() < batchSize) {
                completed = true;
                return;
            }
            currentOffset += resultList.size();
            if (rsp.count().isExact() && currentOffset >= rsp.count().value()) {
                completed = true;
                return;
            }
            query.startOffset(currentOffset);
            executeQueryAsync();
        }
    }

}