SetWindowFieldsOperation.java
/*
* Copyright 2021-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.domain.Sort;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;
/**
* Encapsulates the {@code setWindowFields}-operation.
*
* @author Christoph Strobl
* @since 3.3
* @see <a href=
* "https://docs.mongodb.com/manual/reference/operator/aggregation/setWindowFields/">https://docs.mongodb.com/manual/reference/operator/aggregation/setWindowFields/</a>
*/
public class SetWindowFieldsOperation
implements AggregationOperation, FieldsExposingAggregationOperation.InheritsFieldsAggregationOperation {
private static final String CURRENT = "current";
private static final String UNBOUNDED = "unbounded";
private final @Nullable Object partitionBy;
private final @Nullable AggregationOperation sortBy;
private final WindowOutput output;
/**
* Create a new {@link SetWindowFieldsOperation} with given args.
*
* @param partitionBy The field or {@link AggregationExpression} to group by.
* @param sortBy the {@link SortOperation operation} to sort the documents by in the partition.
* @param output the {@link WindowOutput} containing the fields to add and the rules to calculate their respective
* values.
*/
protected SetWindowFieldsOperation(@Nullable Object partitionBy, @Nullable AggregationOperation sortBy,
WindowOutput output) {
this.partitionBy = partitionBy;
this.sortBy = sortBy;
this.output = output;
}
/**
* Obtain a {@link SetWindowFieldsOperationBuilder builder} to create a {@link SetWindowFieldsOperation}.
*
* @return new instance of {@link SetWindowFieldsOperationBuilder}.
*/
public static SetWindowFieldsOperationBuilder builder() {
return new SetWindowFieldsOperationBuilder();
}
@Override
public ExposedFields getFields() {
return ExposedFields.synthetic(Fields.from(output.fields.toArray(new Field[0])));
}
@Override
public Document toDocument(AggregationOperationContext context) {
Document $setWindowFields = new Document();
if (partitionBy != null) {
if (partitionBy instanceof AggregationExpression aggregationExpression) {
$setWindowFields.append("partitionBy", aggregationExpression.toDocument(context));
} else if (partitionBy instanceof Field field) {
$setWindowFields.append("partitionBy", context.getReference(field).toString());
} else {
$setWindowFields.append("partitionBy", partitionBy);
}
}
if (sortBy != null) {
$setWindowFields.append("sortBy", sortBy.toDocument(context).get(sortBy.getOperator()));
}
Document output = new Document();
for (ComputedField field : this.output.fields) {
Document fieldOperation = field.getWindowOperator().toDocument(context);
if (field.window != null) {
fieldOperation.put("window", field.window.toDocument(context));
}
output.append(field.getName(), fieldOperation);
}
$setWindowFields.append("output", output);
return new Document(getOperator(), $setWindowFields);
}
@Override
public String getOperator() {
return "$setWindowFields";
}
/**
* {@link WindowOutput} defines output of {@literal $setWindowFields} stage by defining the {@link ComputedField
* field(s)} to append to the documents in the output.
*/
public static class WindowOutput {
private final List<ComputedField> fields;
/**
* Create a new output containing the single given {@link ComputedField field}.
*
* @param outputField must not be {@literal null}.
*/
public WindowOutput(ComputedField outputField) {
Assert.notNull(outputField, "OutputField must not be null");
this.fields = new ArrayList<>();
this.fields.add(outputField);
}
/**
* Append the given {@link ComputedField field} to the outptut.
*
* @param field must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public WindowOutput append(ComputedField field) {
Assert.notNull(field, "Field must not be null");
fields.add(field);
return this;
}
/**
* Append the given {@link AggregationExpression} as a {@link ComputedField field} in a fluent way.
*
* @param expression must not be {@literal null}.
* @return new instance of {@link ComputedFieldAppender}.
* @see #append(ComputedField)
*/
@Contract("_ -> new")
public ComputedFieldAppender append(AggregationExpression expression) {
return new ComputedFieldAppender() {
@Nullable private Window window;
@Override
public WindowOutput as(String fieldname) {
return WindowOutput.this.append(new ComputedField(fieldname, expression, window));
}
@Override
public ComputedFieldAppender within(Window window) {
this.window = window;
return this;
}
};
}
/**
* Tiny little helper to allow fluent API usage for {@link #append(ComputedField)}.
*/
public interface ComputedFieldAppender {
/**
* Specify the target field name.
*
* @param fieldname the name of field to add to the target document.
* @return the {@link WindowOutput} that started the append operation.
*/
WindowOutput as(String fieldname);
/**
* Specify the window boundaries.
*
* @param window must not be {@literal null}.
* @return this.
*/
ComputedFieldAppender within(Window window);
}
}
/**
* A {@link Field} that the result of a computation done via an {@link AggregationExpression}.
*
* @author Christoph Strobl
*/
public static class ComputedField implements Field {
private final String name;
private final AggregationExpression windowOperator;
private final @Nullable Window window;
/**
* Create a new {@link ComputedField}.
*
* @param name the target field name.
* @param windowOperator the expression to calculate the field value.
*/
public ComputedField(String name, AggregationExpression windowOperator) {
this(name, windowOperator, null);
}
/**
* Create a new {@link ComputedField}.
*
* @param name the target field name.
* @param windowOperator the expression to calculate the field value.
* @param window the boundaries to operate within. Can be {@literal null}.
*/
public ComputedField(String name, AggregationExpression windowOperator, @Nullable Window window) {
this.name = name;
this.windowOperator = windowOperator;
this.window = window;
}
@Override
public String getName() {
return name;
}
@Override
public String getTarget() {
return getName();
}
@Override
public boolean isAliased() {
return false;
}
public AggregationExpression getWindowOperator() {
return windowOperator;
}
public @Nullable Window getWindow() {
return window;
}
}
/**
* Quick access to {@link DocumentWindow documents} and {@literal RangeWindow range} {@link Window windows}.
*
* @author Christoph Strobl
*/
public interface Windows {
/**
* Create a document window relative to the position of the current document.
*
* @param lower an integer for a position relative to the current document, {@literal current} or
* {@literal unbounded}.
* @param upper an integer for a position relative to the current document, {@literal current} or
* {@literal unbounded}.
* @return new instance of {@link DocumentWindow}.
*/
static DocumentWindow documents(Object lower, Object upper) {
return new DocumentWindow(lower, upper);
}
/**
* Create a range window defined based on sort expression.
*
* @param lower a numeric value to add the sort by field value of the current document, {@literal current} or
* {@literal unbounded}.
* @param upper a numeric value to add the sort by field value of the current document, {@literal current} or
* {@literal unbounded}.
* @return new instance of {@link RangeWindow}.
*/
static RangeWindow range(Object lower, Object upper, @Nullable WindowUnit unit) {
return new RangeWindow(lower, upper, unit == null ? WindowUnits.DEFAULT : unit);
}
/**
* Create a range window based on the {@link Sort sort value} of the current document via a fluent API.
*
* @return new instance of {@link RangeWindowBuilder}.
*/
static RangeWindowBuilder range() {
return new RangeWindowBuilder();
}
/**
* Create a document window relative to the position of the current document via a fluent API.
*
* @return new instance of {@link DocumentWindowBuilder}.
*/
static DocumentWindowBuilder documents() {
return new DocumentWindowBuilder();
}
}
/**
* A {@link Window} to be used for {@link ComputedField#getWindow() ComputedField}.
*/
public interface Window {
/**
* The lower (inclusive) boundary.
*
* @return
*/
Object getLower();
/**
* The upper (inclusive) boundary.
*
* @return
*/
Object getUpper();
/**
* Obtain the document representation of the window in a default {@link AggregationOperationContext context}.
*
* @return never {@literal null}.
*/
default Document toDocument() {
return toDocument(Aggregation.DEFAULT_CONTEXT);
}
/**
* Obtain the document representation of the window in the given {@link AggregationOperationContext context}.
*
* @return never {@literal null}.
*/
Document toDocument(AggregationOperationContext ctx);
}
/**
* Builder API for a {@link RangeWindow}.
*
* @author Christoph Strobl
*/
public static class RangeWindowBuilder {
private @Nullable Object lower;
private @Nullable Object upper;
private @Nullable WindowUnit unit;
/**
* The lower (inclusive) range limit based on the sortBy field.
*
* @param lower eg. {@literal current} or {@literal unbounded}.
* @return this.
*/
@Contract("_ -> this")
public RangeWindowBuilder from(String lower) {
this.lower = lower;
return this;
}
/**
* The upper (inclusive) range limit based on the sortBy field.
*
* @param upper eg. {@literal current} or {@literal unbounded}.
* @return this.
*/
@Contract("_ -> this")
public RangeWindowBuilder to(String upper) {
this.upper = upper;
return this;
}
/**
* The lower (inclusive) range limit value to add to the value based on the sortBy field. Use a negative integer for
* a position before the current document. Use a positive integer for a position after the current document.
* {@code 0} is the current document position.
*
* @param lower
* @return this.
*/
@Contract("_ -> this")
public RangeWindowBuilder from(Number lower) {
this.lower = lower;
return this;
}
/**
* The upper (inclusive) range limit value to add to the value based on the sortBy field. Use a negative integer for
* a position before the current document. Use a positive integer for a position after the current document.
* {@code 0} is the current document position.
*
* @param upper
* @return this.
*/
@Contract("_ -> this")
public RangeWindowBuilder to(Number upper) {
this.upper = upper;
return this;
}
/**
* Use {@literal current} as {@link #from(String) lower} limit.
*
* @return this.
*/
@Contract("-> this")
public RangeWindowBuilder fromCurrent() {
return from(CURRENT);
}
/**
* Use {@literal unbounded} as {@link #from(String) lower} limit.
*
* @return this.
*/
@Contract("-> this")
public RangeWindowBuilder fromUnbounded() {
return from(UNBOUNDED);
}
/**
* Use {@literal current} as {@link #to(String) upper} limit.
*
* @return this.
*/
@Contract("-> this")
public RangeWindowBuilder toCurrent() {
return to(CURRENT);
}
/**
* Use {@literal unbounded} as {@link #to(String) upper} limit.
*
* @return this.
*/
@Contract("-> this")
public RangeWindowBuilder toUnbounded() {
return to(UNBOUNDED);
}
/**
* Set the {@link WindowUnit unit} or measure for the given {@link Window}.
*
* @param windowUnit must not be {@literal null}. Can be on of {@link Windows}.
* @return this.
*/
@Contract("_ -> this")
public RangeWindowBuilder unit(WindowUnit windowUnit) {
Assert.notNull(windowUnit, "WindowUnit must not be null");
this.unit = windowUnit;
return this;
}
/**
* Build the {@link RangeWindow}.
*
* @return new instance of {@link RangeWindow}.
*/
@Contract("-> new")
public RangeWindow build() {
Assert.notNull(lower, "Lower bound must not be null");
Assert.notNull(upper, "Upper bound must not be null");
Assert.notNull(unit, "WindowUnit bound must not be null");
return new RangeWindow(lower, upper, unit);
}
}
/**
* Builder API for a {@link RangeWindow}.
*
* @author Christoph Strobl
*/
public static class DocumentWindowBuilder {
private @Nullable Object lower;
private @Nullable Object upper;
/**
* The lower (inclusive) range limit based on current document. Use a negative integer for a position before the
* current document. Use a positive integer for a position after the current document. {@code 0} is the current
* document position.
*
* @param lower
* @return this.
*/
@Contract("_ -> this")
public DocumentWindowBuilder from(Number lower) {
this.lower = lower;
return this;
}
@Contract("-> this")
public DocumentWindowBuilder fromCurrent() {
return from(CURRENT);
}
@Contract("-> this")
public DocumentWindowBuilder fromUnbounded() {
return from(UNBOUNDED);
}
@Contract("_ -> this")
public DocumentWindowBuilder to(String upper) {
this.upper = upper;
return this;
}
/**
* The lower (inclusive) range limit based on current document.
*
* @param lower eg. {@literal current} or {@literal unbounded}.
* @return this.
*/
@Contract("_ -> this")
public DocumentWindowBuilder from(String lower) {
this.lower = lower;
return this;
}
/**
* The upper (inclusive) range limit based on current document. Use a negative integer for a position before the
* current document. Use a positive integer for a position after the current document. {@code 0} is the current
* document position.
*
* @param upper
* @return this.
*/
@Contract("_ -> this")
public DocumentWindowBuilder to(Number upper) {
this.upper = upper;
return this;
}
@Contract("-> this")
public DocumentWindowBuilder toCurrent() {
return to(CURRENT);
}
@Contract("-> this")
public DocumentWindowBuilder toUnbounded() {
return to(UNBOUNDED);
}
@Contract("-> new")
public DocumentWindow build() {
Assert.notNull(lower, "Lower bound must not be null");
Assert.notNull(upper, "Upper bound must not be null");
return new DocumentWindow(lower, upper);
}
}
/**
* Common base class for {@link Window} implementation.
*
* @author Christoph Strobl
*/
static abstract class WindowImpl implements Window {
private final Object lower;
private final Object upper;
protected WindowImpl(Object lower, Object upper) {
this.lower = lower;
this.upper = upper;
}
@Override
public Object getLower() {
return lower;
}
@Override
public Object getUpper() {
return upper;
}
}
/**
* {@link Window} implementation based on the current document.
*
* @author Christoph Strobl
*/
public static class DocumentWindow extends WindowImpl {
DocumentWindow(Object lower, Object upper) {
super(lower, upper);
}
@Override
public Document toDocument(AggregationOperationContext ctx) {
return new Document("documents", Arrays.asList(getLower(), getUpper()));
}
}
/**
* {@link Window} implementation based on the sort fields.
*
* @author Christoph Strobl
*/
public static class RangeWindow extends WindowImpl {
private final WindowUnit unit;
protected RangeWindow(Object lower, Object upper, WindowUnit unit) {
super(lower, upper);
this.unit = unit;
}
@Override
public Document toDocument(AggregationOperationContext ctx) {
Document range = new Document("range", new Object[] { getLower(), getUpper() });
if (unit != null && !WindowUnits.DEFAULT.equals(unit)) {
range.append("unit", unit.name().toLowerCase());
}
return range;
}
}
/**
* The actual time unit to apply to a {@link Window}.
*/
public interface WindowUnit {
String name();
/**
* Converts the given time unit into a {@link WindowUnit}. Supported units are: days, hours, minutes, seconds, and
* milliseconds.
*
* @param timeUnit the time unit to convert, must not be {@literal null}.
* @return
* @throws IllegalArgumentException if the {@link TimeUnit} is {@literal null} or not supported for conversion.
*/
static WindowUnit from(TimeUnit timeUnit) {
Assert.notNull(timeUnit, "TimeUnit must not be null");
return switch (timeUnit) {
case DAYS -> WindowUnits.DAY;
case HOURS -> WindowUnits.HOUR;
case MINUTES -> WindowUnits.MINUTE;
case SECONDS -> WindowUnits.SECOND;
case MILLISECONDS -> WindowUnits.MILLISECOND;
default -> throw new IllegalArgumentException(String.format("Cannot create WindowUnit from %s", timeUnit));
};
}
/**
* Converts the given chrono unit into a {@link WindowUnit}. Supported units are: years, weeks, months, days, hours,
* minutes, seconds, and millis.
*
* @param chronoUnit the chrono unit to convert, must not be {@literal null}.
* @return
* @throws IllegalArgumentException if the {@link TimeUnit} is {@literal null} or not supported for conversion.
*/
static WindowUnit from(ChronoUnit chronoUnit) {
return switch (chronoUnit) {
case YEARS -> WindowUnits.YEAR;
case WEEKS -> WindowUnits.WEEK;
case MONTHS -> WindowUnits.MONTH;
case DAYS -> WindowUnits.DAY;
case HOURS -> WindowUnits.HOUR;
case MINUTES -> WindowUnits.MINUTE;
case SECONDS -> WindowUnits.SECOND;
case MILLIS -> WindowUnits.MILLISECOND;
default -> throw new IllegalArgumentException(String.format("Cannot create WindowUnit from %s", chronoUnit));
};
}
}
/**
* Quick access to available {@link WindowUnit units}.
*/
public enum WindowUnits implements WindowUnit {
DEFAULT, YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND, MILLISECOND
}
/**
* A fluent builder to create a {@link SetWindowFieldsOperation}.
*
* @author Christoph Strobl
*/
public static class SetWindowFieldsOperationBuilder {
private @Nullable Object partitionBy;
private @Nullable SortOperation sortOperation;
private @Nullable WindowOutput output;
/**
* Specify the field to group by.
*
* @param fieldName must not be {@literal null} or null.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder partitionByField(String fieldName) {
Assert.hasText(fieldName, "Field name must not be empty or null");
return partitionBy(Fields.field("$" + fieldName, fieldName));
}
/**
* Specify the {@link AggregationExpression expression} to group by.
*
* @param expression must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder partitionByExpression(AggregationExpression expression) {
return partitionBy(expression);
}
/**
* Sort {@link Sort.Direction#ASC ascending} by the given fields.
*
* @param fields must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder sortBy(String... fields) {
return sortBy(Sort.by(fields));
}
/**
* Set the sort order.
*
* @param sort must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder sortBy(Sort sort) {
return sortBy(new SortOperation(sort));
}
/**
* Set the {@link SortOperation} to use.
*
* @param sort must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder sortBy(SortOperation sort) {
Assert.notNull(sort, "SortOperation must not be null");
this.sortOperation = sort;
return this;
}
/**
* Define the actual output computation.
*
* @param output must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder output(WindowOutput output) {
Assert.notNull(output, "WindowOutput must not be null");
this.output = output;
return this;
}
/**
* Add a field capturing the result of the given {@link AggregationExpression expression} to the output.
*
* @param expression must not be {@literal null}.
* @return new instance of {@link WindowChoice}.
*/
@Contract("_ -> new")
public WindowChoice output(AggregationExpression expression) {
return new WindowChoice() {
@Nullable private Window window;
@Override
public As within(Window window) {
Assert.notNull(window, "Window must not be null");
this.window = window;
return this;
}
@Override
public SetWindowFieldsOperationBuilder as(String targetFieldName) {
Assert.hasText(targetFieldName, "Target field name must not be empty or null");
ComputedField computedField = new ComputedField(targetFieldName, expression, window);
if (SetWindowFieldsOperationBuilder.this.output == null) {
SetWindowFieldsOperationBuilder.this.output = new WindowOutput(computedField);
} else {
SetWindowFieldsOperationBuilder.this.output.append(computedField);
}
return SetWindowFieldsOperationBuilder.this;
}
};
}
/**
* Interface to capture field name used to capture the computation result.
*/
public interface As {
/**
* Define the target name field name to hold the computation result.
*
* @param targetFieldName must not be {@literal null} or empty.
* @return the starting point {@link SetWindowFieldsOperationBuilder builder} instance.
*/
SetWindowFieldsOperationBuilder as(String targetFieldName);
}
/**
* Interface to capture an optional {@link Window} applicable to the field computation.
*/
public interface WindowChoice extends As {
/**
* Specify calculation boundaries.
*
* @param window must not be {@literal null}.
* @return never {@literal null}.
*/
As within(Window window);
}
/**
* Partition by a value that translates to a valid mongodb expression.
*
* @param value must not be {@literal null}.
* @return this.
*/
@Contract("_ -> this")
public SetWindowFieldsOperationBuilder partitionBy(Object value) {
Assert.notNull(value, "Partition By must not be null");
partitionBy = value;
return this;
}
/**
* Obtain a new instance of {@link SetWindowFieldsOperation} with previously set arguments.
*
* @return new instance of {@link SetWindowFieldsOperation}.
*/
@Contract("-> new")
public SetWindowFieldsOperation build() {
Assert.notNull(output, "Output must be set first");
return new SetWindowFieldsOperation(partitionBy, sortOperation, output);
}
}
}