AggregationOptions.java

/*
 * Copyright 2014-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core.aggregation;

import java.time.Duration;
import java.util.Optional;

import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.mongodb.core.ReadConcernAware;
import org.springframework.data.mongodb.core.ReadPreferenceAware;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.DiskUse;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;

import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;

/**
 * Holds a set of configurable aggregation options that can be used within an aggregation pipeline. A list of support
 * aggregation options can be found in the
 * <a href="https://docs.mongodb.org/manual/reference/command/aggregate/#aggregate">MongoDB reference documentation</a>.
 *
 * @author Thomas Darimont
 * @author Oliver Gierke
 * @author Christoph Strobl
 * @author Mark Paluch
 * @author Yadhukrishna S Pai
 * @author Soumya Prakash Behera
 * @see Aggregation#withOptions(AggregationOptions)
 * @see TypedAggregation#withOptions(AggregationOptions)
 * @since 1.6
 */
public class AggregationOptions implements ReadConcernAware, ReadPreferenceAware {

	private static final String BATCH_SIZE = "batchSize";
	private static final String CURSOR = "cursor";
	private static final String EXPLAIN = "explain";
	private static final String ALLOW_DISK_USE = "allowDiskUse";
	private static final String COLLATION = "collation";
	private static final String COMMENT = "comment";
	private static final String MAX_TIME = "maxTimeMS";
	private static final String HINT = "hint";

	private final DiskUse diskUse;
	private final boolean explain;
	private final Optional<Document> cursor;
	private final Optional<Collation> collation;
	private final Optional<String> comment;
	private final Optional<Object> hint;

	private Optional<ReadConcern> readConcern;

	private Optional<ReadPreference> readPreference;
	private Duration maxTime = Duration.ZERO;
	private ResultOptions resultOptions = ResultOptions.READ;
	private DomainTypeMapping domainTypeMapping = DomainTypeMapping.RELAXED;

	/**
	 * Creates a new {@link AggregationOptions}.
	 *
	 * @param allowDiskUse whether to off-load intensive sort-operations to disk.
	 * @param explain whether to get the execution plan for the aggregation instead of the actual results.
	 * @param cursor can be {@literal null}, used to pass additional options to the aggregation.
	 */
	public AggregationOptions(boolean allowDiskUse, boolean explain, @Nullable Document cursor) {
		this(allowDiskUse, explain, cursor, null);
	}

	public AggregationOptions(DiskUse diskUse, boolean explain, @Nullable Document cursor) {
		this(diskUse, explain, cursor, null);
	}

	public AggregationOptions(DiskUse allowDiskUse, boolean explain, @Nullable Document cursor,
		@Nullable Collation collation) {
		this(allowDiskUse, explain, cursor, collation, null);
	}

	/**
	 * Creates a new {@link AggregationOptions}.
	 *
	 * @param allowDiskUse whether to off-load intensive sort-operations to disk.
	 * @param explain whether to get the execution plan for the aggregation instead of the actual results.
	 * @param cursor can be {@literal null}, used to pass additional options (such as {@code batchSize}) to the
	 *          aggregation.
	 * @param collation collation for string comparison. Can be {@literal null}.
	 * @since 2.0
	 */
	public AggregationOptions(boolean allowDiskUse, boolean explain, @Nullable Document cursor,
			@Nullable Collation collation) {
		this(DiskUse.of(allowDiskUse), explain, cursor, collation, null, null);
	}

	/**
	 * Creates a new {@link AggregationOptions}.
	 *
	 * @param allowDiskUse whether to off-load intensive sort-operations to disk.
	 * @param explain whether to get the execution plan for the aggregation instead of the actual results.
	 * @param cursor can be {@literal null}, used to pass additional options (such as {@code batchSize}) to the
	 *          aggregation.
	 * @param collation collation for string comparison. Can be {@literal null}.
	 * @param comment execution comment. Can be {@literal null}.
	 * @since 2.2
	 */
	public AggregationOptions(boolean allowDiskUse, boolean explain, @Nullable Document cursor,
			@Nullable Collation collation, @Nullable String comment) {
		this(allowDiskUse ? DiskUse.ALLOW : DiskUse.DENY, explain, cursor, collation, comment, null);
	}

	public AggregationOptions(DiskUse allowDiskUse, boolean explain, @Nullable Document cursor,
		@Nullable Collation collation, @Nullable String comment) {
		this(allowDiskUse, explain, cursor, collation, comment, null);
	}

	/**
	 * Creates a new {@link AggregationOptions}.
	 *
	 * @param diskUse whether to off-load intensive sort-operations to disk.
	 * @param explain whether to get the execution plan for the aggregation instead of the actual results.
	 * @param cursor can be {@literal null}, used to pass additional options (such as {@code batchSize}) to the
	 *          aggregation.
	 * @param collation collation for string comparison. Can be {@literal null}.
	 * @param comment execution comment. Can be {@literal null}.
	 * @param hint can be {@literal null}, used to provide an index that would be forcibly used by query optimizer.
	 * @since 3.1
	 */
	private AggregationOptions(DiskUse diskUse, boolean explain, @Nullable Document cursor,
			@Nullable Collation collation, @Nullable String comment, @Nullable Object hint) {

		Assert.notNull(diskUse, "DiskUse must not be null");

		this.diskUse = diskUse;
		this.explain = explain;
		this.cursor = Optional.ofNullable(cursor);
		this.collation = Optional.ofNullable(collation);
		this.comment = Optional.ofNullable(comment);
		this.hint = Optional.ofNullable(hint);
		this.readConcern = Optional.empty();
		this.readPreference = Optional.empty();
	}

	/**
	 * Creates a new {@link AggregationOptions}.
	 *
	 * @param allowDiskUse whether to off-load intensive sort-operations to disk.
	 * @param explain whether to get the execution plan for the aggregation instead of the actual results.
	 * @param cursorBatchSize initial cursor batch size.
	 * @since 2.0
	 */
	public AggregationOptions(boolean allowDiskUse, boolean explain, int cursorBatchSize) {
		this(allowDiskUse, explain, createCursor(cursorBatchSize), null);
	}

	/**
	 * Creates new {@link AggregationOptions} given {@link Document} containing aggregation options.
	 *
	 * @param document must not be {@literal null}.
	 * @return the {@link AggregationOptions}.
	 * @since 2.0
	 */
	public static AggregationOptions fromDocument(Document document) {

		Assert.notNull(document, "Document must not be null");

		Boolean allowDiskUse = document.get(ALLOW_DISK_USE, Boolean.class);
		boolean explain = document.getBoolean(EXPLAIN, false);
		Document cursor = document.get(CURSOR, Document.class);
		Collation collation = document.containsKey(COLLATION) ? Collation.from(document.get(COLLATION, Document.class))
				: null;
		String comment = document.getString(COMMENT);
		Document hint = document.get(HINT, Document.class);

		AggregationOptions options = new AggregationOptions(DiskUse.of(allowDiskUse) , explain, cursor, collation, comment, hint);
		if (document.containsKey(MAX_TIME)) {
			options.maxTime = Duration.ofMillis(document.getLong(MAX_TIME));
		}
		return options;
	}

	/**
	 * Obtain a new {@link Builder} for constructing {@link AggregationOptions}.
	 *
	 * @return never {@literal null}.
	 * @since 2.0
	 */
	public static Builder builder() {
		return new Builder();
	}

	/**
	 * Enables writing to temporary files. When set to {@literal true}, aggregation stages can write data to the
	 * {@code _tmp} subdirectory in the {@code dbPath} directory.
	 *
	 * @return {@literal true} if enabled; {@literal false} otherwise (or if not set).
	 */
	public boolean isAllowDiskUse() {
		return diskUse.equals(DiskUse.ALLOW);
	}

	/**
	 * Return whether {@link #isAllowDiskUse} is configured.
	 *
	 * @return {@literal true} if is {@code allowDiskUse} is configured, {@literal false} otherwise.
	 * @since 4.2.5
	 */
	public boolean isAllowDiskUseSet() {
		return !diskUse.equals(DiskUse.DEFAULT);
	}

	/**
	 * Specifies to return the information on the processing of the pipeline.
	 *
	 * @return {@literal true} if enabled.
	 */
	public boolean isExplain() {
		return explain;
	}

	/**
	 * The initial cursor batch size, if available, otherwise {@literal null}.
	 *
	 * @return the batch size or {@literal null}.
	 * @since 2.0
	 */
	@Nullable
	public Integer getCursorBatchSize() {

		if (cursor.filter(val -> val.containsKey(BATCH_SIZE)).isPresent()) {
			return cursor.get().get(BATCH_SIZE, Integer.class);
		}

		return null;
	}

	/**
	 * Specify a document that contains options that control the creation of the cursor object.
	 *
	 * @return never {@literal null}.
	 */
	public Optional<Document> getCursor() {
		return cursor;
	}

	/**
	 * Get collation settings for string comparison.
	 *
	 * @return never {@literal null}.
	 * @since 2.0
	 */
	public Optional<Collation> getCollation() {
		return collation;
	}

	/**
	 * Get the comment for the aggregation.
	 *
	 * @return never {@literal null}.
	 * @since 2.2
	 */
	public Optional<String> getComment() {
		return comment;
	}

	/**
	 * Get the hint used to fulfill the aggregation.
	 *
	 * @return never {@literal null}.
	 * @since 3.1
	 * @deprecated since 4.1, use {@link #getHintObject()} instead.
	 */
	public Optional<Document> getHint() {
		return hint.map(it -> {
			if (it instanceof Document doc) {
				return doc;
			}
			if (it instanceof String hintString) {
				if (BsonUtils.isJsonDocument(hintString)) {
					return BsonUtils.parse(hintString, null);
				}
			}
			throw new IllegalStateException("Unable to read hint of type %s".formatted(it.getClass()));
		});
	}

	/**
	 * Get the hint used to fulfill the aggregation.
	 *
	 * @return never {@literal null}.
	 * @since 4.1
	 */
	public Optional<Object> getHintObject() {
		return hint;
	}

	@Override
	public boolean hasReadConcern() {
		return readConcern.isPresent();
	}

	@Override
	public @Nullable ReadConcern getReadConcern() {
		return readConcern.orElse(null);
	}

	@Override
	public boolean hasReadPreference() {
		return readPreference.isPresent();
	}

	@Override
	public @Nullable ReadPreference getReadPreference() {
		return readPreference.orElse(null);
	}

	/**
	 * @return the time limit for processing. {@link Duration#ZERO} is used for the default unbounded behavior.
	 * @since 3.0
	 */
	public Duration getMaxTime() {
		return maxTime;
	}

	/**
	 * @return {@literal true} to skip results when running an aggregation. Useful in combination with {@code $merge} or
	 *         {@code $out}.
	 * @since 3.0.2
	 */
	public boolean isSkipResults() {
		return ResultOptions.SKIP.equals(resultOptions);
	}

	/**
	 * @return the domain type mapping strategy do apply. Never {@literal null}.
	 * @since 3.2
	 */
	public DomainTypeMapping getDomainTypeMapping() {
		return domainTypeMapping;
	}

	/**
	 * Returns a new potentially adjusted copy for the given {@code aggregationCommandObject} with the configuration
	 * applied.
	 *
	 * @param command the aggregation command.
	 * @return
	 */
	Document applyAndReturnPotentiallyChangedCommand(Document command) {

		Document result = new Document(command);

		if (isAllowDiskUseSet() && !result.containsKey(ALLOW_DISK_USE)) {
			result.put(ALLOW_DISK_USE, isAllowDiskUse());
		}

		if (explain && !result.containsKey(EXPLAIN)) {
			result.put(EXPLAIN, explain);
		}

		if (result.containsKey(HINT)) {
			hint.ifPresent(val -> result.append(HINT, val));
		}

		if (!result.containsKey(CURSOR)) {
			cursor.ifPresent(val -> result.put(CURSOR, val));
		}

		if (!result.containsKey(COLLATION)) {
			collation.map(Collation::toDocument).ifPresent(val -> result.append(COLLATION, val));
		}

		if (hasExecutionTimeLimit() && !result.containsKey(MAX_TIME)) {
			result.append(MAX_TIME, maxTime.toMillis());
		}

		return result;
	}

	/**
	 * Returns a {@link Document} representation of this {@link AggregationOptions}.
	 *
	 * @return never {@literal null}.
	 */
	public Document toDocument() {

		Document document = new Document();
		if (isAllowDiskUseSet()) {
			document.put(ALLOW_DISK_USE, isAllowDiskUse());
		}
		document.put(EXPLAIN, explain);

		cursor.ifPresent(val -> document.put(CURSOR, val));
		collation.ifPresent(val -> document.append(COLLATION, val.toDocument()));
		comment.ifPresent(val -> document.append(COMMENT, val));
		hint.ifPresent(val -> document.append(HINT, val));

		if (hasExecutionTimeLimit()) {
			document.append(MAX_TIME, maxTime.toMillis());
		}

		return document;
	}

	/**
	 * @return {@literal true} if {@link #maxTime} is set to a positive value.
	 * @since 3.0
	 */
	public boolean hasExecutionTimeLimit() {
		return !maxTime.isZero() && !maxTime.isNegative();
	}

	@Override
	public String toString() {
		return toDocument().toJson();
	}

	static Document createCursor(int cursorBatchSize) {
		return new Document("batchSize", cursorBatchSize);
	}

	/**
	 * A Builder for {@link AggregationOptions}.
	 *
	 * @author Thomas Darimont
	 * @author Mark Paluch
	 */
	public static class Builder {

		private DiskUse diskUse = DiskUse.DEFAULT;
		private boolean explain;
		private @Nullable Document cursor;
		private @Nullable Collation collation;
		private @Nullable String comment;
		private @Nullable Object hint;
		private @Nullable ReadConcern readConcern;
		private @Nullable ReadPreference readPreference;
		private @Nullable Duration maxTime;
		private @Nullable ResultOptions resultOptions;
		private @Nullable DomainTypeMapping domainTypeMapping;

		/**
		 * Defines whether to off-load intensive sort-operations to disk.
		 *
		 * @param allowDiskUse use {@literal true} to allow disk use during the aggregation.
		 * @return this.
		 */
		@Contract("_ -> this")
		public Builder allowDiskUse(boolean allowDiskUse) {
			return diskUse(DiskUse.of(allowDiskUse));
		}

		/**
		 * Defines whether to off-load intensive sort-operations to disk.
		 *
		 * @param diskUse use {@literal true} to allow disk use during the aggregation.
		 * @return this.
		 * @since 5.0
		 */
		@Contract("_ -> this")
		public Builder diskUse(DiskUse diskUse) {

			Assert.notNull(diskUse, "DiskUse must not be null");

			this.diskUse = diskUse;
			return this;
		}

		/**
		 * Defines whether to get the execution plan for the aggregation instead of the actual results.
		 *
		 * @param explain use {@literal true} to enable explain feature.
		 * @return this.
		 */
		@Contract("_ -> this")
		public Builder explain(boolean explain) {

			this.explain = explain;
			return this;
		}

		/**
		 * Additional options to the aggregation.
		 *
		 * @param cursor must not be {@literal null}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public Builder cursor(Document cursor) {

			this.cursor = cursor;
			return this;
		}

		/**
		 * Define the initial cursor batch size.
		 *
		 * @param batchSize use a positive int.
		 * @return this.
		 * @since 2.0
		 */
		@Contract("_ -> this")
		public Builder cursorBatchSize(int batchSize) {

			this.cursor = createCursor(batchSize);
			return this;
		}

		/**
		 * Define collation settings for string comparison.
		 *
		 * @param collation can be {@literal null}.
		 * @return this.
		 * @since 2.0
		 */
		@Contract("_ -> this")
		public Builder collation(@Nullable Collation collation) {

			this.collation = collation;
			return this;
		}

		/**
		 * Define a comment to describe the execution.
		 *
		 * @param comment can be {@literal null}.
		 * @return this.
		 * @since 2.2
		 */
		@Contract("_ -> this")
		public Builder comment(@Nullable String comment) {

			this.comment = comment;
			return this;
		}

		/**
		 * Define a hint that is used by query optimizer to to fulfill the aggregation.
		 *
		 * @param hint can be {@literal null}.
		 * @return this.
		 * @since 3.1
		 */
		@Contract("_ -> this")
		public Builder hint(@Nullable Document hint) {

			this.hint = hint;
			return this;
		}

		/**
		 * Define a hint that is used by query optimizer to to fulfill the aggregation.
		 *
		 * @param indexName can be {@literal null}.
		 * @return this.
		 * @since 4.1
		 */
		@Contract("_ -> this")
		public Builder hint(@Nullable String indexName) {

			this.hint = indexName;
			return this;
		}

		/**
		 * Define a {@link ReadConcern} to apply to the aggregation.
		 *
		 * @param readConcern can be {@literal null}.
		 * @return this.
		 * @since 4.1
		 */
		@Contract("_ -> this")
		public Builder readConcern(@Nullable ReadConcern readConcern) {

			this.readConcern = readConcern;
			return this;
		}

		/**
		 * Define a {@link ReadPreference} to apply to the aggregation.
		 *
		 * @param readPreference can be {@literal null}.
		 * @return this.
		 * @since 4.1
		 */
		@Contract("_ -> this")
		public Builder readPreference(@Nullable ReadPreference readPreference) {

			this.readPreference = readPreference;
			return this;
		}

		/**
		 * Set the time limit for processing.
		 *
		 * @param maxTime {@link Duration#ZERO} is used for the default unbounded behavior. {@link Duration#isNegative()
		 *          Negative} values will be ignored.
		 * @return this.
		 * @since 3.0
		 */
		@Contract("_ -> this")
		public Builder maxTime(@Nullable Duration maxTime) {

			this.maxTime = maxTime;
			return this;
		}

		/**
		 * Run the aggregation, but do NOT read the aggregation result from the store. <br />
		 * If the expected result of the aggregation is rather large, eg. when using an {@literal $out} operation, this
		 * option allows to execute the aggregation without having the cursor return the operation result.
		 *
		 * @return this.
		 * @since 3.0.2
		 */
		@Contract("-> this")
		public Builder skipOutput() {

			this.resultOptions = ResultOptions.SKIP;
			return this;
		}

		/**
		 * Apply a strict domain type mapping considering {@link org.springframework.data.mongodb.core.mapping.Field}
		 * annotations throwing errors for non-existent, but referenced fields.
		 *
		 * @return this.
		 * @since 3.2
		 */
		@Contract("-> this")
		public Builder strictMapping() {

			this.domainTypeMapping = DomainTypeMapping.STRICT;
			return this;
		}

		/**
		 * Apply a relaxed domain type mapping considering {@link org.springframework.data.mongodb.core.mapping.Field}
		 * annotations using the user provided name if a referenced field does not exist.
		 *
		 * @return this.
		 * @since 3.2
		 */
		@Contract("-> this")
		public Builder relaxedMapping() {

			this.domainTypeMapping = DomainTypeMapping.RELAXED;
			return this;
		}

		/**
		 * Apply no domain type mapping at all taking the pipeline as-is.
		 *
		 * @return this.
		 * @since 3.2
		 */
		@Contract("-> this")
		public Builder noMapping() {

			this.domainTypeMapping = DomainTypeMapping.NONE;
			return this;
		}

		/**
		 * Returns a new {@link AggregationOptions} instance with the given configuration.
		 *
		 * @return new instance of {@link AggregationOptions}.
		 */
		@Contract("-> new")
		public AggregationOptions build() {

			AggregationOptions options = new AggregationOptions(diskUse, explain, cursor, collation, comment, hint);
			if (maxTime != null) {
				options.maxTime = maxTime;
			}
			if (resultOptions != null) {
				options.resultOptions = resultOptions;
			}
			if (domainTypeMapping != null) {
				options.domainTypeMapping = domainTypeMapping;
			}
			if (readConcern != null) {
				options.readConcern = Optional.of(readConcern);
			}
			if (readPreference != null) {
				options.readPreference = Optional.of(readPreference);
			}

			return options;
		}
	}

	/**
	 * @since 3.0
	 */
	private enum ResultOptions {

		/**
		 * Just do it!, and do not read the operation result.
		 */
		SKIP,
		/**
		 * Read the aggregation result from the cursor.
		 */
		READ
	}

	/**
	 * Aggregation pipeline Domain type mappings supported by the mapping layer.
	 *
	 * @since 3.2
	 */
	public enum DomainTypeMapping {

		/**
		 * Mapping throws errors for non-existent, but referenced fields.
		 */
		STRICT,

		/**
		 * Fields that do not exist in the model are treated as-is.
		 */
		RELAXED,

		/**
		 * Do not attempt to map fields against the model and treat the entire pipeline as-is.
		 */
		NONE
	}
}