ChangeStreamOptions.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;

import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;

/**
 * Options applicable to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a>. Intended
 * to be used along with {@link org.springframework.data.mongodb.core.messaging.ChangeStreamRequest} in a sync world as
 * well {@link ReactiveMongoOperations} if you prefer it that way.
 *
 * @author Christoph Strobl
 * @author Mark Paluch
 * @author Myroslav Kosinskyi
 * @since 2.1
 */
public class ChangeStreamOptions {

	private @Nullable Object filter;
	private @Nullable BsonValue resumeToken;
	private @Nullable FullDocument fullDocumentLookup;
	private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
	private @Nullable Collation collation;
	private @Nullable Object resumeTimestamp;
	private @Nullable Boolean showExpandedEvents;
	private Resume resume = Resume.UNDEFINED;

	protected ChangeStreamOptions() {}

	/**
	 * @return {@link Optional#empty()} if not set.
	 */
	public Optional<Object> getFilter() {
		return Optional.ofNullable(filter);
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 */
	public Optional<BsonValue> getResumeToken() {
		return Optional.ofNullable(resumeToken);
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 */
	public Optional<FullDocument> getFullDocumentLookup() {
		return Optional.ofNullable(fullDocumentLookup);
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 * @since 4.0
	 */
	public Optional<FullDocumentBeforeChange> getFullDocumentBeforeChangeLookup() {
		return Optional.ofNullable(fullDocumentBeforeChangeLookup);
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 */
	public Optional<Collation> getCollation() {
		return Optional.ofNullable(collation);
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 */
	public Optional<Instant> getResumeTimestamp() {
		return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, Instant.class));
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 * @since 2.2
	 */
	public Optional<BsonTimestamp> getResumeBsonTimestamp() {
		return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
	}

	/**
	 * @return {@link Optional#empty()} if not set.
	 * @since 5.1
	 */
	public Optional<Boolean> getShowExpandedEvents() {
		return Optional.ofNullable(showExpandedEvents);
	}

	/**
	 * @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
	 * @since 2.2
	 */
	public boolean isStartAfter() {
		return Resume.START_AFTER.equals(resume);
	}

	/**
	 * @return {@literal true} if the change stream should be resumed after the {@link #getResumeToken() token}.
	 * @since 2.2
	 */
	public boolean isResumeAfter() {
		return Resume.RESUME_AFTER.equals(resume);
	}

	/**
	 * @return empty {@link ChangeStreamOptions}.
	 */
	public static ChangeStreamOptions empty() {
		return ChangeStreamOptions.builder().build();
	}

	/**
	 * Obtain a shiny new {@link ChangeStreamOptionsBuilder} and start defining options in this fancy fluent way. Just
	 * don't forget to call {@link ChangeStreamOptionsBuilder#build() build()} when done.
	 *
	 * @return new instance of {@link ChangeStreamOptionsBuilder}.
	 */
	public static ChangeStreamOptionsBuilder builder() {
		return new ChangeStreamOptionsBuilder();
	}

	private static <T> T asTimestampOfType(Object timestamp, Class<T> targetType) {
		return targetType.cast(doGetTimestamp(timestamp, targetType));
	}

	private static <T> Object doGetTimestamp(Object timestamp, Class<T> targetType) {

		if (ClassUtils.isAssignableValue(targetType, timestamp)) {
			return timestamp;
		}

		if (timestamp instanceof Instant instant) {
			return new BsonTimestamp((int) instant.getEpochSecond(), 0);
		}

		if (timestamp instanceof BsonTimestamp bsonTimestamp) {
			return Instant.ofEpochSecond(bsonTimestamp.getTime());
		}

		throw new IllegalArgumentException(
				"o_O that should actually not happen; The timestamp should be an Instant or a BsonTimestamp but was "
						+ ObjectUtils.nullSafeClassName(timestamp));
	}

	@Override
	public boolean equals(@Nullable Object o) {
		if (this == o)
			return true;
		if (o == null || getClass() != o.getClass())
			return false;

		ChangeStreamOptions that = (ChangeStreamOptions) o;

		if (!ObjectUtils.nullSafeEquals(this.filter, that.filter)) {
			return false;
		}
		if (!ObjectUtils.nullSafeEquals(this.resumeToken, that.resumeToken)) {
			return false;
		}
		if (!ObjectUtils.nullSafeEquals(this.fullDocumentLookup, that.fullDocumentLookup)) {
			return false;
		}
		if (!ObjectUtils.nullSafeEquals(this.fullDocumentBeforeChangeLookup, that.fullDocumentBeforeChangeLookup)) {
			return false;
		}
		if (!ObjectUtils.nullSafeEquals(this.collation, that.collation)) {
			return false;
		}
		if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) {
			return false;
		}
		if (!ObjectUtils.nullSafeEquals(this.showExpandedEvents, that.showExpandedEvents)) {
			return false;
		}
		return resume == that.resume;
	}

	@Override
	public int hashCode() {
		int result = ObjectUtils.nullSafeHashCode(filter);
		result = 31 * result + ObjectUtils.nullSafeHashCode(resumeToken);
		result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup);
		result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup);
		result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
		result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
		result = 31 * result + ObjectUtils.nullSafeHashCode(showExpandedEvents);
		result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
		return result;
	}

	/**
	 * @author Christoph Strobl
	 * @since 2.2
	 */
	enum Resume {

		UNDEFINED,

		/**
		 * @see com.mongodb.client.ChangeStreamIterable#startAfter(BsonDocument)
		 */
		START_AFTER,

		/**
		 * @see com.mongodb.client.ChangeStreamIterable#resumeAfter(BsonDocument)
		 */
		RESUME_AFTER
	}

	/**
	 * Builder for creating {@link ChangeStreamOptions}.
	 *
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	public static class ChangeStreamOptionsBuilder {

		private @Nullable Object filter;
		private @Nullable BsonValue resumeToken;
		private @Nullable FullDocument fullDocumentLookup;
		private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
		private @Nullable Collation collation;
		private @Nullable Object resumeTimestamp;
		private @Nullable Boolean showExpandedEvents;
		private Resume resume = Resume.UNDEFINED;

		private ChangeStreamOptionsBuilder() {}

		/**
		 * Set the collation to use.
		 *
		 * @param collation must not be {@literal null} nor {@literal empty}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder collation(Collation collation) {

			Assert.notNull(collation, "Collation must not be null nor empty");

			this.collation = collation;
			return this;
		}

		/**
		 * Set the filter to apply. <br />
		 * Fields on aggregation expression root level are prefixed to map to fields contained in
		 * {@link ChangeStreamDocument#getFullDocument() fullDocument}. However {@literal operationType}, {@literal ns},
		 * {@literal documentKey} and {@literal fullDocument} are reserved words that will be omitted, and therefore taken
		 * as given, during the mapping procedure. You may want to have a look at the
		 * <a href="https://docs.mongodb.com/manual/reference/change-events/">structure of Change Events</a>. <br />
		 * Use {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} to ensure filter expressions are
		 * mapped to domain type fields.
		 *
		 * @param filter the {@link Aggregation Aggregation pipeline} to apply for filtering events. Must not be
		 *          {@literal null}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder filter(Aggregation filter) {

			Assert.notNull(filter, "Filter must not be null");

			this.filter = filter;
			return this;
		}

		/**
		 * Set the plain filter chain to apply.
		 *
		 * @param filter must not be {@literal null} nor contain {@literal null} values.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder filter(Document... filter) {

			Assert.noNullElements(filter, "Filter must not contain null values");

			this.filter = Arrays.asList(filter);
			return this;
		}

		/**
		 * Set the resume token (typically a {@link org.bson.BsonDocument} containing a {@link org.bson.BsonBinary binary
		 * token}) after which to start with listening.
		 *
		 * @param resumeToken must not be {@literal null}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder resumeToken(BsonValue resumeToken) {

			Assert.notNull(resumeToken, "ResumeToken must not be null");

			this.resumeToken = resumeToken;

			if (this.resume == Resume.UNDEFINED) {
				this.resume = Resume.RESUME_AFTER;
			}

			return this;
		}

		/**
		 * Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
		 *
		 * @return this.
		 * @see #fullDocumentLookup(FullDocument)
		 */
		public ChangeStreamOptionsBuilder returnFullDocumentOnUpdate() {
			return fullDocumentLookup(FullDocument.UPDATE_LOOKUP);
		}

		/**
		 * Set the {@link FullDocument} lookup to use.
		 *
		 * @param lookup must not be {@literal null}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder fullDocumentLookup(FullDocument lookup) {

			Assert.notNull(lookup, "Lookup must not be null");

			this.fullDocumentLookup = lookup;
			return this;
		}

		/**
		 * Set the {@link FullDocumentBeforeChange} lookup to use.
		 *
		 * @param lookup must not be {@literal null}.
		 * @return this.
		 * @since 4.0
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) {

			Assert.notNull(lookup, "Lookup must not be null");

			this.fullDocumentBeforeChangeLookup = lookup;
			return this;
		}

		/**
		 * Return the full document before being changed if it is available.
		 *
		 * @return this.
		 * @since 4.0
		 * @see #fullDocumentBeforeChangeLookup(FullDocumentBeforeChange)
		 */
		public ChangeStreamOptionsBuilder returnFullDocumentBeforeChange() {
			return fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.WHEN_AVAILABLE);
		}

		/**
		 * Set the cluster time to resume from.
		 *
		 * @param resumeTimestamp must not be {@literal null}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) {

			Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null");

			this.resumeTimestamp = resumeTimestamp;
			return this;
		}

		/**
		 * Set the cluster time to resume from.
		 *
		 * @param resumeTimestamp must not be {@literal null}.
		 * @return this.
		 * @since 2.2
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {

			Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null");

			this.resumeTimestamp = resumeTimestamp;
			return this;
		}

		/**
		 * Set the resume token after which to continue emitting notifications.
		 *
		 * @param resumeToken must not be {@literal null}.
		 * @return this.
		 * @since 2.2
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder resumeAfter(BsonValue resumeToken) {

			resumeToken(resumeToken);
			this.resume = Resume.RESUME_AFTER;

			return this;
		}

		/**
		 * Set the resume token after which to start emitting notifications.
		 *
		 * @param resumeToken must not be {@literal null}.
		 * @return this.
		 * @since 2.2
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder startAfter(BsonValue resumeToken) {

			resumeToken(resumeToken);
			this.resume = Resume.START_AFTER;

			return this;
		}

		/**
		 * Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
		 *
		 * @param showExpandedEvents {@code true} to include expanded events.
		 * @return this.
		 * @since 5.1
		 */
		@Contract("_ -> this")
		public ChangeStreamOptionsBuilder showExpandedEvents(boolean showExpandedEvents) {

			this.showExpandedEvents = showExpandedEvents;
			return this;
		}

		/**
		 * @return the built {@link ChangeStreamOptions}
		 */
		@Contract("-> new")
		public ChangeStreamOptions build() {

			ChangeStreamOptions options = new ChangeStreamOptions();

			options.filter = this.filter;
			options.resumeToken = this.resumeToken;
			options.fullDocumentLookup = this.fullDocumentLookup;
			options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup;
			options.collation = this.collation;
			options.resumeTimestamp = this.resumeTimestamp;
			options.showExpandedEvents = this.showExpandedEvents;
			options.resume = this.resume;

			return options;
		}
	}
}