AggregateIteratorIT.java
package redis.clients.jedis.modules.search;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.modules.RedisModuleCommandsTestBase;
import redis.clients.jedis.search.Document;
import redis.clients.jedis.search.IndexOptions;
import redis.clients.jedis.search.Schema;
import redis.clients.jedis.search.aggr.*;
@ParameterizedClass
@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions")
public class AggregateIteratorIT extends RedisModuleCommandsTestBase {
private static final String index = "aggiteratorindex";
@BeforeAll
public static void prepare() {
RedisModuleCommandsTestBase.prepare();
}
public AggregateIteratorIT(RedisProtocol redisProtocol) {
super(redisProtocol);
}
private void addDocument(Document doc) {
String key = doc.getId();
Map<String, String> map = new LinkedHashMap<>();
doc.getProperties().forEach(entry -> map.put(entry.getKey(), String.valueOf(entry.getValue())));
client.hset(key, map);
}
@Test
public void testAggregateIteratorBasicUsage() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
addDocument(new Document("data2").set("name", "def").set("count", 5));
addDocument(new Document("data3").set("name", "def").set("count", 25));
addDocument(new Document("data4").set("name", "ghi").set("count", 15));
addDocument(new Document("data5").set("name", "jkl").set("count", 20));
// Create aggregation with cursor
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
.cursor(2, 10000); // 2 results per batch
// Test the iterator using the integrated method
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
assertTrue(iterator.hasNext());
assertNotNull(iterator.getCursorId());
int totalBatches = 0;
int totalRows = 0;
while (iterator.hasNext()) {
AggregationResult batch = iterator.next();
assertNotNull(batch);
assertNotNull(batch.getRows());
assertTrue(batch.getRows().size() <= 2); // Batch size should not exceed cursor count
totalBatches++;
totalRows += batch.getRows().size();
}
assertTrue(totalBatches > 0);
assertEquals(4, totalRows); // Should have 4 groups total (abc, def, ghi, jkl)
assertFalse(iterator.hasNext());
}
}
@Test
public void testAggregateIteratorWithoutCursor() {
// Create aggregation without cursor - should throw exception
AggregationBuilder aggr = new AggregationBuilder().groupBy("@name",
Reducers.sum("@count").as("sum"));
assertThrows(IllegalArgumentException.class, () -> client.ftAggregateIterator(index, aggr));
}
@Test
public void testAggregateIteratorSingleBatch() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
addDocument(new Document("data2").set("name", "def").set("count", 5));
// Create aggregation with large cursor count (all results in one batch)
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
.cursor(100, 10000); // Large batch size
// Test the iterator using the integrated method
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
assertTrue(iterator.hasNext());
AggregationResult batch = iterator.next();
assertNotNull(batch);
assertNotNull(batch.getRows());
assertEquals(2, batch.getRows().size()); // Should have 2 groups (abc, def)
// Should be no more batches
assertFalse(iterator.hasNext());
}
}
@Test
public void testAggregateIteratorFirstBatchReturnsInitialResults() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
addDocument(new Document("data2").set("name", "def").set("count", 5));
addDocument(new Document("data3").set("name", "def").set("count", 25));
// Create aggregation with cursor that should return 2 results in first batch
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
.cursor(2, 10000); // 2 results per batch
// Test that first next() call returns the initial FT.AGGREGATE results
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
assertTrue(iterator.hasNext());
// First call should return initial results from FT.AGGREGATE
AggregationResult firstBatch = iterator.next();
assertNotNull(firstBatch);
assertNotNull(firstBatch.getRows());
assertEquals(2, firstBatch.getRows().size()); // Should have 2 groups (abc, def)
// Verify the results are correct
List<Row> rows = firstBatch.getRows();
assertEquals("def", rows.get(0).getString("name"));
assertEquals(30, rows.get(0).getLong("sum"));
assertEquals("abc", rows.get(1).getString("name"));
assertEquals(10, rows.get(1).getLong("sum"));
// Should be no more batches since we got all results in first batch
AggregationResult secondBatch = iterator.next();
assertEquals(0, secondBatch.getRows().size());
}
}
@Test
public void testAggregateIteratorEmptyResult() {
// Create index but add no data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
// Create aggregation with cursor
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).cursor(10, 10000);
// Test the iterator with empty results using the integrated method
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
// Should have no results
assertTrue(iterator.next().isEmpty());
}
}
@Test
public void testAggregateIteratorRemove() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
addDocument(new Document("data2").set("name", "def").set("count", 5));
addDocument(new Document("data3").set("name", "def").set("count", 25));
addDocument(new Document("data4").set("name", "ghi").set("count", 15));
addDocument(new Document("data5").set("name", "jkl").set("count", 20));
// Create aggregation with cursor
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).sortBy(10, SortedField.desc("@sum"))
.cursor(2, 10000); // 2 results per batch
// Test remove() method
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
assertTrue(iterator.hasNext());
assertNotNull(iterator.getCursorId());
assertTrue(iterator.getCursorId() > 0);
// Get first batch
AggregationResult firstBatch = iterator.next();
assertNotNull(firstBatch);
assertEquals(2, firstBatch.getRows().size());
// Should still have more results
assertTrue(iterator.hasNext());
// Remove the cursor - this should terminate the iteration
iterator.remove();
// After remove, should have no more results
assertFalse(iterator.hasNext());
assertEquals(Long.valueOf(-1), iterator.getCursorId());
// Calling next() should throw NoSuchElementException
assertThrows(NoSuchElementException.class, iterator::next);
}
}
@Test
public void testAggregateIteratorRemoveBeforeNext() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
addDocument(new Document("data2").set("name", "cde").set("count", 8));
// Create aggregation with cursor
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).cursor(1, 10000);
// Test calling remove() before next() - should work since cursor is initialized
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
assertTrue(iterator.hasNext());
assertTrue(iterator.getCursorId() > 0);
// Remove without calling next() first
iterator.remove();
// After remove, should have no more results
assertFalse(iterator.hasNext());
assertEquals(Long.valueOf(-1), iterator.getCursorId());
}
}
@Test
public void testAggregateIteratorRemoveAfterClose() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
// Create aggregation with cursor
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).cursor(10, 10000);
AggregateIterator iterator = client.ftAggregateIterator(index, aggr);
assertTrue(iterator.hasNext());
// Close the iterator
iterator.close();
// Calling remove() after close should not throw exception, just return silently
iterator.remove(); // Should not throw
assertEquals(Long.valueOf(-1), iterator.getCursorId());
}
@Test
public void testAggregateIteratorRemoveMultipleTimes() {
// Create index and add test data
Schema sc = new Schema();
sc.addSortableTextField("name", 1.0);
sc.addSortableNumericField("count");
client.ftCreate(index, IndexOptions.defaultOptions(), sc);
addDocument(new Document("data1").set("name", "abc").set("count", 10));
addDocument(new Document("data2").set("name", "cde").set("count", 3));
// Create aggregation with cursor
AggregationBuilder aggr = new AggregationBuilder()
.groupBy("@name", Reducers.sum("@count").as("sum")).cursor(1, 10000);
// Test calling remove() multiple times
try (AggregateIterator iterator = client.ftAggregateIterator(index, aggr)) {
assertTrue(iterator.hasNext());
// First remove should work
iterator.remove();
assertFalse(iterator.hasNext());
assertEquals(-1L, iterator.getCursorId());
// Second remove should not throw exception, just return silently
iterator.remove(); // Should not throw
assertEquals(-1L, iterator.getCursorId());
}
}
}