ReactiveMongoQueryExecution.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.repository.query;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;

import org.springframework.core.convert.converter.Converter;
import org.springframework.data.convert.DtoInstantiatingConverter;
import org.springframework.data.core.ReactiveWrappers;
import org.springframework.data.core.TypeInformation;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Range;
import org.springframework.data.domain.SearchResult;
import org.springframework.data.domain.Similarity;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.Point;
import org.springframework.data.mapping.model.EntityInstantiators;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.ReactiveUpdateOperation.ReactiveUpdate;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.repository.query.VectorSearchDelegate.QueryContainer;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.data.util.ReflectionUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.mongodb.client.result.UpdateResult;

/**
 * Set of classes to contain query execution strategies. Depending (mostly) on the return type of a
 * {@link org.springframework.data.repository.query.QueryMethod} a {@link AbstractReactiveMongoQuery} can be executed in
 * various flavors.
 *
 * @author Mark Paluch
 * @author Christoph Strobl
 * @since 2.0
 */
interface ReactiveMongoQueryExecution {

	Publisher<? extends Object> execute(Query query, Class<?> type, String collection);

	/**
	 * {@link MongoQueryExecution} to execute geo-near queries.
	 *
	 * @author Mark Paluch
	 */
	final class GeoNearExecution implements ReactiveMongoQueryExecution {

		private final ReactiveMongoOperations operations;
		private final MongoParameterAccessor accessor;
		private final TypeInformation<?> returnType;

		public GeoNearExecution(ReactiveMongoOperations operations, MongoParameterAccessor accessor,
				TypeInformation<?> returnType) {

			this.operations = operations;
			this.accessor = accessor;
			this.returnType = returnType;
		}

		@Override
		public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {

			Flux<GeoResult<Object>> results = doExecuteQuery(query, type, collection);
			return isStreamOfGeoResult() ? results : results.map(GeoResult::getContent);
		}

		@SuppressWarnings({ "unchecked", "rawtypes" })
		private Flux<GeoResult<Object>> doExecuteQuery(@Nullable Query query, Class<?> type, String collection) {

			Point nearLocation = accessor.getGeoNearLocation();
			Assert.notNull(nearLocation, "[query.location] ist not present");

			NearQuery nearQuery = NearQuery.near(nearLocation);

			if (query != null) {
				nearQuery.query(query);
			}

			Range<Distance> distances = accessor.getDistanceRange();

			Assert.notNull(distances, "[query.range] ist not present");
			distances.getUpperBound().getValue().ifPresent(it -> nearQuery.maxDistance(it).in(it.getMetric()));
			distances.getLowerBound().getValue().ifPresent(it -> nearQuery.minDistance(it).in(it.getMetric()));

			Pageable pageable = accessor.getPageable();
			nearQuery.with(pageable);

			return (Flux) operations.geoNear(nearQuery, type, collection);
		}

		private boolean isStreamOfGeoResult() {

			if (!ReactiveWrappers.supports(returnType.getType())) {
				return false;
			}

			TypeInformation<?> componentType = returnType.getComponentType();
			return (componentType != null) && GeoResult.class.equals(componentType.getType());
		}
	}

	/**
	 * {@link ReactiveMongoQueryExecution} to execute vector search.
	 *
	 * @author Mark Paluch
	 * @since 5.0
	 */
	class VectorSearchExecution implements ReactiveMongoQueryExecution {

		private final ReactiveMongoOperations operations;
		private final QueryContainer queryMetadata;
		private final AggregationPipeline pipeline;
		private final boolean returnSearchResult;

		VectorSearchExecution(ReactiveMongoOperations operations, MongoQueryMethod method, QueryContainer queryMetadata) {

			this.operations = operations;
			this.queryMetadata = queryMetadata;
			this.pipeline = queryMetadata.pipeline();
			this.returnSearchResult = isSearchResult(method.getReturnType());
		}

		@Override
		public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {

			Flux<Document> aggregate = operations.aggregate(
					TypedAggregation.newAggregation(queryMetadata.outputType(), pipeline.getOperations()), collection,
					Document.class);

			return aggregate.map(document -> {

				Object mappedResult = operations.getConverter().read(queryMetadata.outputType(), document);

				return returnSearchResult
						? new SearchResult<>(mappedResult,
								Similarity.raw(document.getDouble(queryMetadata.scoreField()), queryMetadata.scoringFunction()))
						: mappedResult;
			});
		}

		private static boolean isSearchResult(TypeInformation<?> returnType) {

			if (!Publisher.class.isAssignableFrom(returnType.getType())) {
				return false;
			}

			TypeInformation<?> componentType = returnType.getComponentType();
			return componentType != null && SearchResult.class.equals(componentType.getType());
		}

	}

	/**
	 * {@link ReactiveMongoQueryExecution} removing documents matching the query.
	 *
	 * @author Mark Paluch
	 * @author Artyom Gabeev
	 */
	final class DeleteExecution implements ReactiveMongoQueryExecution {

		private final ReactiveMongoOperations operations;
		private final MongoQueryMethod method;

		public DeleteExecution(ReactiveMongoOperations operations, MongoQueryMethod method) {
			this.operations = operations;
			this.method = method;
		}

		@Override
		public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {

			if (method.isCollectionQuery()) {
				return operations.findAllAndRemove(query, type, collection);
			}

			if (method.isQueryForEntity() && !ClassUtils.isPrimitiveOrWrapper(method.getReturnedObjectType())) {
				return operations.findAndRemove(query, type, collection);
			}

			return operations.remove(query, type, collection)
					.map(deleteResult -> deleteResult.wasAcknowledged() ? deleteResult.getDeletedCount() : 0L);
		}
	}

	/**
	 * {@link MongoQueryExecution} updating documents matching the query.
	 *
	 * @author Christph Strobl
	 * @since 3.4
	 */
	final class UpdateExecution implements ReactiveMongoQueryExecution {

		private final ReactiveUpdate<?> updateOps;
		private final MongoParameterAccessor accessor;
		private Mono<UpdateDefinition> update;

		UpdateExecution(ReactiveUpdate<?> updateOps, ReactiveMongoQueryMethod method, MongoParameterAccessor accessor,
				Mono<UpdateDefinition> update) {

			this.updateOps = updateOps;
			this.accessor = accessor;
			this.update = update;
		}

		@Override
		public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {

			return update.flatMap(it -> updateOps.inCollection(collection) //
					.matching(query.with(accessor.getSort())) // actually we could do it unsorted
					.apply(it) //
					.all() //
					.map(UpdateResult::getModifiedCount));
		}
	}

	/**
	 * An {@link ReactiveMongoQueryExecution} that wraps the results of the given delegate with the given result
	 * processing.
	 */
	final class ResultProcessingExecution implements ReactiveMongoQueryExecution {

		private final ReactiveMongoQueryExecution delegate;
		private final Converter<Object, Object> converter;

		public ResultProcessingExecution(ReactiveMongoQueryExecution delegate, Converter<Object, Object> converter) {

			Assert.notNull(delegate, "Delegate must not be null");
			Assert.notNull(converter, "Converter must not be null");

			this.delegate = delegate;
			this.converter = converter;
		}

		@Override
		@SuppressWarnings("NullAway")
		public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {
			return (Publisher) converter.convert(delegate.execute(query, type, collection));
		}
	}

	/**
	 * A {@link Converter} to post-process all source objects using the given {@link ResultProcessor}.
	 *
	 * @author Mark Paluch
	 */
	final class ResultProcessingConverter implements Converter<Object, Object> {

		private final ResultProcessor processor;
		private final ReactiveMongoOperations operations;
		private final EntityInstantiators instantiators;

		public ResultProcessingConverter(ResultProcessor processor, ReactiveMongoOperations operations,
				EntityInstantiators instantiators) {

			Assert.notNull(processor, "Processor must not be null");
			Assert.notNull(operations, "Operations must not be null");
			Assert.notNull(instantiators, "Instantiators must not be null");

			this.processor = processor;
			this.operations = operations;
			this.instantiators = instantiators;
		}

		@Override
		public Object convert(Object source) {

			ReturnedType returnedType = processor.getReturnedType();

			if (ReflectionUtils.isVoid(returnedType.getReturnedType())) {

				if (source instanceof Mono<?> mono) {
					return mono.then();
				}

				if (source instanceof Publisher<?> publisher) {
					return Flux.from(publisher).then();
				}
			}

			if (ClassUtils.isPrimitiveOrWrapper(returnedType.getReturnedType())) {
				return source;
			}

			if (!operations.getConverter().getMappingContext().hasPersistentEntityFor(returnedType.getReturnedType())) {
				return source;
			}

			Converter<Object, Object> converter = new DtoInstantiatingConverter(returnedType.getReturnedType(),
					operations.getConverter().getMappingContext(), instantiators);

			return processor.processResult(source, converter);
		}
	}
}