ClusterAggregateIteratorIT.java

package redis.clients.jedis.modules.search.cluster;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.redis.test.annotations.SinceRedisVersion;
import redis.clients.jedis.EndpointConfig;
import redis.clients.jedis.Endpoints;
import redis.clients.jedis.RedisClusterClient;
import redis.clients.jedis.util.RedisVersionCondition;

import redis.clients.jedis.search.Document;
import redis.clients.jedis.search.IndexOptions;
import redis.clients.jedis.search.Schema;
import redis.clients.jedis.search.aggr.*;

/**
 * This test verifies that ftAggregateIterator works correctly in cluster mode
 */
@SinceRedisVersion(value = "8.0.0", message = "Cluster aggregate iterator tests require Redis OSS 8.0 or higher")
public class ClusterAggregateIteratorIT {

  private static final String index = "cluster_aggiteratorindex";

  protected static EndpointConfig endpoint;

  private RedisClusterClient cluster;

  @RegisterExtension
  public RedisVersionCondition versionCondition = new RedisVersionCondition(
      () -> Endpoints.getRedisEndpoint("cluster-stable"));

  @BeforeAll
  public static void prepareEndpoint() {
    endpoint = Endpoints.getRedisEndpoint("cluster-stable");
  }

  @BeforeEach
  public void setUp() {
    cluster = RedisClusterClient.builder().nodes(new HashSet<>(endpoint.getHostsAndPorts()))
        .clientConfig(endpoint.getClientConfigBuilder().build()).build();

    // Clean up any existing index
    try {
      cluster.ftDropIndex(index);
    } catch (Exception e) {
      // Index might not exist, ignore
    }

    // Flush all data
    cluster.flushAll();
  }

  @AfterEach
  public void tearDown() throws Exception {
    // Clean up index
    try {
      cluster.ftDropIndex(index);
    } catch (Exception e) {
      // Index might not exist, ignore
    }

    if (cluster != null) {
      cluster.close();
    }
  }

  private void addDocument(Document doc) {
    String key = doc.getId();
    Map<String, String> map = new LinkedHashMap<>();
    doc.getProperties().forEach(entry -> map.put(entry.getKey(), String.valueOf(entry.getValue())));
    cluster.hset(key, map);
  }

  @Test
  public void testAggregateIteratorFirstBatchReturnsInitialResults() {
    // Create index and add test data
    Schema sc = new Schema();
    sc.addSortableTextField("name", 1.0);
    sc.addSortableNumericField("count");
    cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);

    addDocument(new Document("data1").set("name", "abc").set("count", 10));
    addDocument(new Document("data2").set("name", "def").set("count", 5));
    addDocument(new Document("data3").set("name", "def").set("count", 25));

    // Create aggregation with cursor that should return 2 results in first batch
    AggregationBuilder aggr = new AggregationBuilder()
        .groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
        .cursor(2, 10000); // 2 results per batch

    // Test that first next() call returns the initial FT.AGGREGATE results
    try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
      assertTrue(iterator.hasNext());

      // First call should return initial results from FT.AGGREGATE
      AggregationResult firstBatch = iterator.next();
      assertNotNull(firstBatch);
      assertNotNull(firstBatch.getRows());
      assertEquals(2, firstBatch.getRows().size()); // Should have 2 groups (abc, def)

      // Verify the results are correct
      List<Row> rows = firstBatch.getRows();
      assertEquals("def", rows.get(0).getString("name"));
      assertEquals(30, rows.get(0).getLong("sum"));
      assertEquals("abc", rows.get(1).getString("name"));
      assertEquals(10, rows.get(1).getLong("sum"));

      // Should be no more batches since we got all results in first batch
      AggregationResult secondBatch = iterator.next();
      assertTrue(secondBatch.isEmpty());
    }
  }

  @Test
  public void testAggregateIteratorBasicUsage() {
    // Create index and add test data
    Schema sc = new Schema();
    sc.addSortableTextField("name", 1.0);
    sc.addSortableNumericField("count");
    cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);

    addDocument(new Document("data1").set("name", "abc").set("count", 10));
    addDocument(new Document("data2").set("name", "def").set("count", 5));
    addDocument(new Document("data3").set("name", "def").set("count", 25));
    addDocument(new Document("data4").set("name", "ghi").set("count", 15));
    addDocument(new Document("data5").set("name", "jkl").set("count", 20));

    // Create aggregation with cursor to test FT.CURSOR routing in cluster mode
    AggregationBuilder aggr = new AggregationBuilder()
        .groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
        .cursor(2, 10000); // 2 results per batch

    // Test the iterator using the integrated method
    try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
      assertTrue(iterator.hasNext());
      assertNotNull(iterator.getCursorId());

      int totalBatches = 0;
      int totalRows = 0;

      while (iterator.hasNext()) {
        AggregationResult batch = iterator.next();
        assertNotNull(batch);
        assertNotNull(batch.getRows());
        assertTrue(batch.getRows().size() <= 2); // Batch size should not exceed cursor count
        totalBatches++;
        totalRows += batch.getRows().size();
      }

      assertTrue(totalBatches > 0);
      assertEquals(4, totalRows); // Should have 4 groups total (abc, def, ghi, jkl)
      assertFalse(iterator.hasNext());
    }
  }

  @Test
  public void testAggregateIteratorSingleBatch() {
    // Create index and add test data
    Schema sc = new Schema();
    sc.addSortableTextField("name", 1.0);
    sc.addSortableNumericField("count");
    cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);

    addDocument(new Document("data1").set("name", "abc").set("count", 10));
    addDocument(new Document("data2").set("name", "def").set("count", 5));

    // Create aggregation with large cursor count (all results in one batch)
    AggregationBuilder aggr = new AggregationBuilder()
        .groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
        .cursor(100, 10000); // Large batch size

    // Test the iterator using the integrated method
    try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
      assertTrue(iterator.hasNext());

      AggregationResult batch = iterator.next();
      assertNotNull(batch);
      assertNotNull(batch.getRows());
      assertEquals(2, batch.getRows().size()); // Should have 2 groups (abc, def)

      // Should be no more batches
      assertFalse(iterator.hasNext());
    }
  }

  @Test
  public void testAggregateIteratorEmptyResult() {
    // Create index but add no data
    Schema sc = new Schema();
    sc.addSortableTextField("name", 1.0);
    sc.addSortableNumericField("count");
    cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);

    // Create aggregation with cursor
    AggregationBuilder aggr = new AggregationBuilder()
        .groupBy("@name", Reducers.sum("@count").as("sum")).cursor(10, 10000);

    // Test the iterator with empty results using the integrated method
    try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
      // Should have no results
      assertTrue(iterator.next().isEmpty());
    }
  }

  @Test
  public void testAggregateIteratorRemove() {
    // Create index and add test data
    Schema sc = new Schema();
    sc.addSortableTextField("name", 1.0);
    sc.addSortableNumericField("count");
    cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);

    addDocument(new Document("data1").set("name", "abc").set("count", 10));
    addDocument(new Document("data2").set("name", "def").set("count", 5));
    addDocument(new Document("data3").set("name", "def").set("count", 25));
    addDocument(new Document("data4").set("name", "ghi").set("count", 15));
    addDocument(new Document("data5").set("name", "jkl").set("count", 20));

    // Create aggregation with cursor
    AggregationBuilder aggr = new AggregationBuilder()
        .groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
        .cursor(2, 10000); // 2 results per batch

    // Test remove() method
    try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
      assertTrue(iterator.hasNext());
      assertNotNull(iterator.getCursorId());
      assertTrue(iterator.getCursorId() > 0);

      // Get first batch
      AggregationResult firstBatch = iterator.next();
      assertNotNull(firstBatch);
      assertEquals(2, firstBatch.getRows().size());

      // Should still have more results
      assertTrue(iterator.hasNext());

      // Remove the cursor - this should terminate the iteration
      iterator.remove();

      // After remove, should have no more results
      assertFalse(iterator.hasNext());
      assertEquals(Long.valueOf(-1), iterator.getCursorId());

      // Calling next() should throw NoSuchElementException
      assertThrows(NoSuchElementException.class, iterator::next);
    }
  }
}