ChangeStreamRequest.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core.messaging;

import java.time.Duration;
import java.time.Instant;

import org.bson.BsonValue;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;

/**
 * {@link SubscriptionRequest} implementation to be used for listening to
 * <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via a {@link MessageListenerContainer}
 * using the synchronous MongoDB Java driver. <br />
 * The most trivial use case is subscribing to all events of a specific {@link com.mongodb.client.MongoCollection
 * collection}
 *
 * <pre>
 * <code>
 *     ChangeStreamRequest&lt;Document&gt; request = new ChangeStreamRequest&lt;&gt;(System.out::println, () -> "collection-name");
 * </code>
 * </pre>
 *
 * or {@link com.mongodb.client.MongoDatabase} which receives events from all {@link com.mongodb.client.MongoCollection
 * collections} in that database.
 *
 * <pre>
 * <code>
 *     ChangeStreamRequest&lt;Document&gt; request = new ChangeStreamRequest&lt;&gt;(System.out::println, RequestOptions.justDatabase("test"));
 * </code>
 * </pre>
 *
 * For more advanced scenarios {@link ChangeStreamOptions} offers abstractions for options like filtering, resuming,...
 *
 * <pre>
 * <code>
 *     ChangeStreamOptions options = ChangeStreamOptions.builder()
 *         .filter(newAggregation(match(where("age").is(7))))
 *         .returnFullDocumentOnUpdate()
 *         .build();
 *
 *     ChangeStreamRequest&lt;Document&gt; request = new ChangeStreamRequest&lt;&gt;(System.out::println, new ChangeStreamRequestOptions("collection-name", options));
 * </code>
 * </pre>
 *
 * {@link ChangeStreamRequestBuilder} offers a fluent API for creating {@link ChangeStreamRequest} with
 * {@link ChangeStreamOptions} in one go.
 *
 * <pre>
 * <code>
 *     ChangeStreamRequest&lt;Document&gt; request = ChangeStreamRequest.builder()
 *         .collection("collection-name")
 *         .publishTo(System.out::println)
 *         .filter(newAggregation(match(where("age").is(7))))
 *         .fullDocumentLookup(UPDATE_LOOKUP)
 *         .build();
 * </code>
 * </pre>
 *
 * {@link Message Messges} passed to the {@link MessageListener} contain the {@link ChangeStreamDocument} within their
 * {@link Message#getRaw() raw value} while the {@code fullDocument} is extracted into the {@link Message#getBody()
 * messages body}. Unless otherwise specified (via {@link ChangeStreamOptions#getFullDocumentLookup()} the
 * {@link Message#getBody() message body} for {@code update events} will be empty for a {@link Document} target type.
 * {@link Message#getBody()} Message bodies} that map to a different target type automatically enforce an
 * {@link FullDocument#UPDATE_LOOKUP}.
 *
 * @author Christoph Strobl
 * @author Mark Paluch
 * @author Myroslav Kosinskyi
 * @author Kyuhong Han
 * @since 2.1
 */
public class ChangeStreamRequest<T>
		implements SubscriptionRequest<ChangeStreamDocument<Document>, T, ChangeStreamRequestOptions> {

	private final MessageListener<ChangeStreamDocument<Document>, ? super T> messageListener;
	private final ChangeStreamRequestOptions options;

	/**
	 * Create a new {@link ChangeStreamRequest} with options, passing {@link Message messages} to the given
	 * {@link MessageListener}.
	 *
	 * @param messageListener must not be {@literal null}.
	 * @param options must not be {@literal null}.
	 */
	public ChangeStreamRequest(MessageListener<ChangeStreamDocument<Document>, ? super T> messageListener,
			RequestOptions options) {

		Assert.notNull(messageListener, "MessageListener must not be null");
		Assert.notNull(options, "Options must not be null");

		this.options = options instanceof ChangeStreamRequestOptions changeStreamRequestOptions ?
				changeStreamRequestOptions : ChangeStreamRequestOptions.of(options);

		this.messageListener = messageListener;
	}

	@Override
	public MessageListener<ChangeStreamDocument<Document>, ? super T> getMessageListener() {
		return messageListener;
	}

	@Override
	public ChangeStreamRequestOptions getRequestOptions() {
		return options;
	}

	/**
	 * Obtain a shiny new {@link ChangeStreamRequestBuilder} and start defining your {@link ChangeStreamRequest} in this
	 * fancy fluent way. Just don't forget to call {@link ChangeStreamRequestBuilder#build() build()} when done.
	 *
	 * @return new instance of {@link ChangeStreamRequest}.
	 */
	public static ChangeStreamRequestBuilder builder() {
		return new ChangeStreamRequestBuilder();
	}

	/**
	 * Obtain a shiny new {@link ChangeStreamRequestBuilder} and start defining your {@link ChangeStreamRequest} in this
	 * fancy fluent way. Just don't forget to call {@link ChangeStreamRequestBuilder#build() build()} when done.
	 *
	 * @return new instance of {@link ChangeStreamRequest}.
	 */
	public static <T> ChangeStreamRequestBuilder<T> builder(
			MessageListener<ChangeStreamDocument<Document>, ? super T> listener) {

		ChangeStreamRequestBuilder<T> builder = new ChangeStreamRequestBuilder<>();
		return builder.publishTo(listener);
	}

	/**
	 * {@link SubscriptionRequest.RequestOptions} implementation specific to a {@link ChangeStreamRequest}.
	 *
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	public static class ChangeStreamRequestOptions implements SubscriptionRequest.RequestOptions {

		private final @Nullable String databaseName;
		private final @Nullable String collectionName;
		private final @Nullable Duration maxAwaitTime;
		private final ChangeStreamOptions options;

		/**
		 * Create new {@link ChangeStreamRequestOptions}.
		 *
		 * @param databaseName can be {@literal null}.
		 * @param collectionName can be {@literal null}.
		 * @param options must not be {@literal null}.
		 */
		public ChangeStreamRequestOptions(@Nullable String databaseName, @Nullable String collectionName,
				ChangeStreamOptions options) {
			this(databaseName, collectionName, null, options);
		}

		/**
		 * Create new {@link ChangeStreamRequestOptions}.
		 *
		 * @param databaseName can be {@literal null}.
		 * @param collectionName can be {@literal null}.
		 * @param maxAwaitTime can be {@literal null}.
		 * @param options must not be {@literal null}.
		 * @since 3.0
		 */
		public ChangeStreamRequestOptions(@Nullable String databaseName, @Nullable String collectionName,
				@Nullable Duration maxAwaitTime, ChangeStreamOptions options) {

			Assert.notNull(options, "Options must not be null");

			this.collectionName = collectionName;
			this.databaseName = databaseName;
			this.maxAwaitTime = maxAwaitTime;
			this.options = options;
		}

		public static ChangeStreamRequestOptions of(RequestOptions options) {

			Assert.notNull(options, "Options must not be null");

			return new ChangeStreamRequestOptions(options.getDatabaseName(), options.getCollectionName(),
					ChangeStreamOptions.builder().build());
		}

		/**
		 * Get the {@link ChangeStreamOptions} defined.
		 *
		 * @return never {@literal null}.
		 */
		public ChangeStreamOptions getChangeStreamOptions() {
			return options;
		}

		@Override
		public @Nullable String getCollectionName() {
			return collectionName;
		}

		@Override
		public @Nullable String getDatabaseName() {
			return databaseName;
		}

		@Override
		public Duration maxAwaitTime() {
			return maxAwaitTime != null ? maxAwaitTime : RequestOptions.super.maxAwaitTime();
		}
	}

	/**
	 * Builder for creating {@link ChangeStreamRequest}.
	 *
	 * @author Christoph Strobl
	 * @since 2.1
	 * @see ChangeStreamOptions
	 */
	public static class ChangeStreamRequestBuilder<T> {

		private @Nullable String databaseName;
		private @Nullable String collectionName;
		private @Nullable Duration maxAwaitTime;
		private @Nullable MessageListener<ChangeStreamDocument<Document>, ? super T> listener;
		private final ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();

		private ChangeStreamRequestBuilder() {}

		/**
		 * Set the name of the {@link com.mongodb.client.MongoDatabase} to listen to.
		 *
		 * @param databaseName must not be {@literal null} nor empty.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> database(String databaseName) {

			Assert.hasText(databaseName, "DatabaseName must not be null");

			this.databaseName = databaseName;
			return this;
		}

		/**
		 * Set the name of the {@link com.mongodb.client.MongoCollection} to listen to.
		 *
		 * @param collectionName must not be {@literal null} nor empty.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> collection(String collectionName) {

			Assert.hasText(collectionName, "CollectionName must not be null");

			this.collectionName = collectionName;
			return this;
		}

		/**
		 * Set the {@link MessageListener} event {@link Message messages} will be published to.
		 *
		 * @param messageListener must not be {@literal null}.
		 * @return this.
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> publishTo(
				MessageListener<ChangeStreamDocument<Document>, ? super T> messageListener) {

			Assert.notNull(messageListener, "MessageListener must not be null");

			this.listener = messageListener;
			return this;
		}

		/**
		 * Set the filter to apply.
		 * <br />
		 * Fields on aggregation expression root level are prefixed to map to fields contained in
		 * {@link ChangeStreamDocument#getFullDocument() fullDocument}. However {@literal operationType}, {@literal ns},
		 * {@literal documentKey} and {@literal fullDocument} are reserved words that will be omitted, and therefore taken
		 * as given, during the mapping procedure. You may want to have a look at the
		 * <a href="https://docs.mongodb.com/manual/reference/change-events/">structure of Change Events</a>.
		 * <br />
		 * Use {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} to ensure filter expressions are
		 * mapped to domain type fields.
		 *
		 * @param aggregation the {@link Aggregation Aggregation pipeline} to apply for filtering events. Must not be
		 *          {@literal null}.
		 * @return this.
		 * @see ChangeStreamOptions#getFilter()
		 * @see ChangeStreamOptionsBuilder#filter(Aggregation)
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> filter(Aggregation aggregation) {

			Assert.notNull(aggregation, "Aggregation must not be null");

			this.delegate.filter(aggregation);
			return this;
		}

		/**
		 * Set the plain filter chain to apply.
		 *
		 * @param pipeline must not be {@literal null} nor contain {@literal null} values.
		 * @return this.
		 * @see ChangeStreamOptions#getFilter()
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> filter(Document... pipeline) {

			Assert.notNull(pipeline, "Aggregation pipeline must not be null");
			Assert.noNullElements(pipeline, "Aggregation pipeline must not contain null elements");

			this.delegate.filter(pipeline);
			return this;
		}

		/**
		 * Set the collation to use.
		 *
		 * @param collation must not be {@literal null} nor {@literal empty}.
		 * @return this.
		 * @see ChangeStreamOptions#getCollation()
		 * @see ChangeStreamOptionsBuilder#collation(Collation)
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> collation(Collation collation) {

			Assert.notNull(collation, "Collation must not be null");

			this.delegate.collation(collation);
			return this;
		}

		/**
		 * Set the resume token (typically a {@link org.bson.BsonDocument} containing a {@link org.bson.BsonBinary binary
		 * token}) after which to start with listening.
		 *
		 * @param resumeToken must not be {@literal null}.
		 * @return this.
		 * @see ChangeStreamOptions#getResumeToken()
		 * @see ChangeStreamOptionsBuilder#resumeToken(org.bson.BsonValue)
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> resumeToken(BsonValue resumeToken) {

			Assert.notNull(resumeToken, "Resume token not be null");

			this.delegate.resumeToken(resumeToken);
			return this;
		}

		/**
		 * Set the cluster time at which to resume listening.
		 *
		 * @param clusterTime must not be {@literal null}.
		 * @return this.
		 * @see ChangeStreamOptions#getResumeTimestamp()
		 * @see ChangeStreamOptionsBuilder#resumeAt(java.time.Instant)
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> resumeAt(Instant clusterTime) {

			Assert.notNull(clusterTime, "ClusterTime must not be null");

			this.delegate.resumeAt(clusterTime);
			return this;
		}

		/**
		 * Set the resume token after which to continue emitting notifications.
		 *
		 * @param resumeToken must not be {@literal null}.
		 * @return this.
		 * @since 2.2
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> resumeAfter(BsonValue resumeToken) {

			Assert.notNull(resumeToken, "ResumeToken must not be null");
			this.delegate.resumeAfter(resumeToken);

			return this;
		}

		/**
		 * Set the resume token after which to start emitting notifications.
		 *
		 * @param resumeToken must not be {@literal null}.
		 * @return this.
		 * @since 2.2
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> startAfter(BsonValue resumeToken) {

			Assert.notNull(resumeToken, "ResumeToken must not be null");
			this.delegate.startAfter(resumeToken);

			return this;
		}

		/**
		 * Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
		 *
		 * @return this.
		 * @see ChangeStreamOptions#getFullDocumentLookup()
		 * @see ChangeStreamOptionsBuilder#fullDocumentLookup(FullDocument)
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> fullDocumentLookup(FullDocument lookup) {

			Assert.notNull(lookup, "FullDocument not be null");

			this.delegate.fullDocumentLookup(lookup);
			return this;
		}

		/**
		 * Set the {@link FullDocumentBeforeChange} lookup to the given value.
		 *
		 * @return this.
		 * @since 4.0
		 * @see ChangeStreamOptions#getFullDocumentBeforeChangeLookup()
		 * @see ChangeStreamOptionsBuilder#fullDocumentBeforeChangeLookup(FullDocumentBeforeChange)
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) {

			Assert.notNull(lookup, "FullDocumentBeforeChange not be null");

			this.delegate.fullDocumentBeforeChangeLookup(lookup);
			return this;
		}

		/**
		 * Set the cursors maximum wait time on the server (for a new Document to be emitted).
		 *
		 * @param timeout must not be {@literal null}.
		 * @since 3.0
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> maxAwaitTime(Duration timeout) {

			Assert.notNull(timeout, "timeout not be null");

			this.maxAwaitTime = timeout;
			return this;
		}

		/**
		 * Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
		 *
		 * @param showExpandedEvents {@code true} to include expanded events.
		 * @return this.
		 * @since 5.1
		 */
		@Contract("_ -> this")
		public ChangeStreamRequestBuilder<T> showExpandedEvents(boolean showExpandedEvents) {

			this.delegate.showExpandedEvents(showExpandedEvents);
			return this;
		}

		/**
		 * @return the build {@link ChangeStreamRequest}.
		 */
		@Contract("-> new")
		public ChangeStreamRequest<T> build() {

			Assert.notNull(listener, "MessageListener must not be null");

			return new ChangeStreamRequest<>(listener,
					new ChangeStreamRequestOptions(databaseName, collectionName, maxAwaitTime, delegate.build()));
		}
	}
}