ChangeStreamTask.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core.messaging;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;

/**
 * {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB.
 *
 * @author Christoph Strobl
 * @author Mark Paluch
 * @author Myroslav Kosinskyi
 * @author Kyuhong Han
 * @since 2.1
 */
class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {

	private final Set<String> denylist = new HashSet<>(
			Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"));

	private final QueryMapper queryMapper;
	private final MongoConverter mongoConverter;

	@SuppressWarnings({ "unchecked", "rawtypes" })
	ChangeStreamTask(MongoTemplate template, ChangeStreamRequest<?> request, Class<?> targetType,
			ErrorHandler errorHandler) {
		super(template, (ChangeStreamRequest) request, (Class) targetType, errorHandler);

		queryMapper = new QueryMapper(template.getConverter());
		mongoConverter = template.getConverter();
	}

	@Override
	protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate template, RequestOptions options,
			Class<?> targetType) {

		List<Document> filter = Collections.emptyList();
		BsonDocument resumeToken = new BsonDocument();
		Collation collation = null;
		FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
				: FullDocument.UPDATE_LOOKUP;
		FullDocumentBeforeChange fullDocumentBeforeChange = null;
		BsonTimestamp startAt = null;
		boolean resumeAfter = true;
		boolean showExpandedEvents = false;

		if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions requestOptions) {

			ChangeStreamOptions changeStreamOptions = requestOptions.getChangeStreamOptions();
			filter = prepareFilter(template, changeStreamOptions);

			if (changeStreamOptions.getFilter().isPresent()) {

				Object val = changeStreamOptions.getFilter().get();
				if (val instanceof Aggregation aggregation) {
					collation = aggregation.getOptions().getCollation()
							.map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation).orElse(null);
				}
			}

			if (changeStreamOptions.getShowExpandedEvents().isPresent()) {
				showExpandedEvents = changeStreamOptions.getShowExpandedEvents().get();
			}

			if (changeStreamOptions.getResumeToken().isPresent()) {

				resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
				resumeAfter = changeStreamOptions.isResumeAfter();
			}

			fullDocument = changeStreamOptions.getFullDocumentLookup()
					.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
							: FullDocument.UPDATE_LOOKUP);

			fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().orElse(null);

			startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
		}

		MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
				? template.getMongoDatabaseFactory().getMongoDatabase(options.getDatabaseName())
				: template.getDb();

		ChangeStreamIterable<Document> iterable;

		if (StringUtils.hasText(options.getCollectionName())) {
			iterable = filter.isEmpty() ? db.getCollection(options.getCollectionName()).watch(Document.class)
					: db.getCollection(options.getCollectionName()).watch(filter, Document.class);

		} else {
			iterable = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
		}

		if (!options.maxAwaitTime().isZero()) {
			iterable = iterable.maxAwaitTime(options.maxAwaitTime().toMillis(), TimeUnit.MILLISECONDS);
		}

		if (!resumeToken.isEmpty()) {

			if (resumeAfter) {
				iterable = iterable.resumeAfter(resumeToken);
			} else {
				iterable = iterable.startAfter(resumeToken);
			}
		}

		if (startAt != null) {
			iterable = iterable.startAtOperationTime(startAt);
		}

		if (collation != null) {
			iterable = iterable.collation(collation);
		}

		if (showExpandedEvents) {
			iterable = iterable.showExpandedEvents(showExpandedEvents);
		}

		iterable = iterable.fullDocument(fullDocument);
		if(fullDocumentBeforeChange != null) {
			iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
		}

		return iterable.iterator();
	}

	@SuppressWarnings("unchecked")
	List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {

		if (!options.getFilter().isPresent()) {
			return Collections.emptyList();
		}

		Object filter = options.getFilter().orElse(null);

		if (filter instanceof Aggregation aggregation) {
			AggregationOperationContext context = aggregation instanceof TypedAggregation<?> typedAggregation
					? new TypeBasedAggregationOperationContext(typedAggregation.getInputType(),
							template.getConverter().getMappingContext(), queryMapper)
					: Aggregation.DEFAULT_CONTEXT;

			return aggregation.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", denylist));
		}

		if (filter instanceof List) {
			return (List<Document>) filter;
		}

		throw new IllegalArgumentException(
				"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
	}

	@Override
	protected Message<ChangeStreamDocument<Document>, Object> createMessage(ChangeStreamDocument<Document> source,
			Class<Object> targetType, RequestOptions options) {

		MongoNamespace namespace = source.getNamespace() != null ? source.getNamespace()
				: createNamespaceFromOptions(options);

		return new ChangeStreamEventMessage<>(new ChangeStreamEvent<>(source, targetType, mongoConverter), MessageProperties
				.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
	}

	MongoNamespace createNamespaceFromOptions(RequestOptions options) {

		String collectionName = StringUtils.hasText(options.getCollectionName()) ? options.getCollectionName() : "unknown";
		String databaseName = StringUtils.hasText(options.getDatabaseName()) ? options.getDatabaseName() : "unknown";

		return new MongoNamespace(databaseName, collectionName);
	}

	/**
	 * {@link Message} implementation for ChangeStreams
	 *
	 * @since 2.1
	 */
	static class ChangeStreamEventMessage<T> implements Message<ChangeStreamDocument<Document>, T> {

		private final ChangeStreamEvent<T> delegate;
		private final MessageProperties messageProperties;

		ChangeStreamEventMessage(ChangeStreamEvent<T> delegate, MessageProperties messageProperties) {

			this.delegate = delegate;
			this.messageProperties = messageProperties;
		}

		@Override
		public @Nullable ChangeStreamDocument<Document> getRaw() {
			return delegate.getRaw();
		}

		@Override
		public @Nullable T getBody() {
			return delegate.getBody();
		}

		@Override
		public @Nullable T getBodyBeforeChange() {
			return delegate.getBodyBeforeChange();
		}

		@Override
		public MessageProperties getProperties() {
			return this.messageProperties;
		}

		/**
		 * @return the resume token or {@literal null} if not set.
		 * @see ChangeStreamEvent#getResumeToken()
		 */
		@Nullable
		BsonValue getResumeToken() {
			return delegate.getResumeToken();
		}

		/**
		 * @return the cluster time of the event or {@literal null}.
		 * @see ChangeStreamEvent#getTimestamp()
		 */
		@Nullable
		Instant getTimestamp() {
			return delegate.getTimestamp();
		}

		/**
		 * Get the {@link ChangeStreamEvent} from the message.
		 *
		 * @return never {@literal null}.
		 */
		ChangeStreamEvent<T> getChangeStreamEvent() {
			return delegate;
		}
	}
}