ReactiveChangeStreamOperationSupport.java

/*
 * Copyright 2019-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core;

import reactor.core.publisher.Flux;

import java.time.Instant;
import java.util.List;
import java.util.function.Consumer;

import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.util.Assert;

/**
 * @author Christoph Strobl
 * @since 2.2
 */
class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperation {

	private final ReactiveMongoTemplate template;

	/**
	 * @param template must not be {@literal null}.
	 */
	ReactiveChangeStreamOperationSupport(ReactiveMongoTemplate template) {
		this.template = template;
	}

	@Override
	public <T> ReactiveChangeStream<T> changeStream(Class<T> domainType) {

		Assert.notNull(domainType, "DomainType must not be null");
		return new ReactiveChangeStreamSupport<>(template, domainType, domainType, null, null);
	}

	static class ReactiveChangeStreamSupport<T>
			implements ReactiveChangeStream<T>, ChangeStreamWithFilterAndProjection<T> {

		private final ReactiveMongoTemplate template;
		private final Class<?> domainType;
		private final Class<T> returnType;
		private final @Nullable String collection;
		private final @Nullable ChangeStreamOptions options;

		private ReactiveChangeStreamSupport(ReactiveMongoTemplate template, Class<?> domainType, Class<T> returnType,
				@Nullable String collection, @Nullable ChangeStreamOptions options) {

			this.template = template;
			this.domainType = domainType;
			this.returnType = returnType;
			this.collection = collection;
			this.options = options;
		}

		@Override
		public ChangeStreamWithFilterAndProjection<T> watchCollection(String collection) {

			Assert.hasText(collection, "Collection name must not be null nor empty");

			return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options);
		}

		@Override
		public ChangeStreamWithFilterAndProjection<T> watchCollection(Class<?> entityClass) {

			Assert.notNull(entityClass, "Collection type not be null");

			return watchCollection(template.getCollectionName(entityClass));
		}

		@Override
		public TerminatingChangeStream<T> resumeAt(Object token) {

			return withOptions(builder -> {

				if (token instanceof Instant instant) {
					builder.resumeAt(instant);
				} else if (token instanceof BsonTimestamp bsonTimestamp) {
					builder.resumeAt(bsonTimestamp);
				}
			});
		}

		@Override
		public TerminatingChangeStream<T> resumeAfter(Object token) {

			Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");

			return withOptions(builder -> builder.resumeAfter((BsonValue) token));
		}

		@Override
		public TerminatingChangeStream<T> startAfter(Object token) {

			Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue");

			return withOptions(builder -> builder.startAfter((BsonValue) token));
		}

		@Override
		public ReactiveChangeStreamSupport<T> withOptions(Consumer<ChangeStreamOptionsBuilder> optionsConsumer) {

			ChangeStreamOptionsBuilder builder = initOptionsBuilder();
			optionsConsumer.accept(builder);

			return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, builder.build());
		}

		@Override
		public <R> ChangeStreamWithFilterAndProjection<R> as(Class<R> resultType) {

			Assert.notNull(resultType, "ResultType must not be null");

			return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options);
		}

		@Override
		public ChangeStreamWithFilterAndProjection<T> filter(Aggregation filter) {
			return withOptions(builder -> builder.filter(filter));
		}

		@Override
		public ChangeStreamWithFilterAndProjection<T> filter(CriteriaDefinition by) {

			MatchOperation $match = Aggregation.match(by);
			Aggregation aggregation = !Document.class.equals(domainType) ? Aggregation.newAggregation(domainType, $match)
					: Aggregation.newAggregation($match);
			return filter(aggregation);
		}

		@Override
		public Flux<ChangeStreamEvent<T>> listen() {
			return template.changeStream(collection, options != null ? options : ChangeStreamOptions.empty(), returnType);
		}

		private ChangeStreamOptionsBuilder initOptionsBuilder() {

			ChangeStreamOptionsBuilder builder = ChangeStreamOptions.builder();
			if (options == null) {
				return builder;
			}

			options.getFilter().ifPresent(it -> {
				if (it instanceof Aggregation aggregation) {
					builder.filter(aggregation);
				} else {
					builder.filter(((List<Document>) it).toArray(new Document[0]));
				}
			});
			options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup);
			options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup);
			options.getCollation().ifPresent(builder::collation);

			if (options.isResumeAfter()) {
				options.getResumeToken().ifPresent(builder::resumeAfter);
				options.getResumeBsonTimestamp().ifPresent(builder::resumeAfter);
			} else if (options.isStartAfter()) {
				options.getResumeToken().ifPresent(builder::startAfter);
			} else {
				options.getResumeTimestamp().ifPresent(builder::resumeAt);
				options.getResumeBsonTimestamp().ifPresent(builder::resumeAt);
			}

			return builder;
		}
	}
}