ReactiveMongoQueryExecution.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.repository.query;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.convert.DtoInstantiatingConverter;
import org.springframework.data.core.ReactiveWrappers;
import org.springframework.data.core.TypeInformation;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Range;
import org.springframework.data.domain.SearchResult;
import org.springframework.data.domain.Similarity;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.Point;
import org.springframework.data.mapping.model.EntityInstantiators;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.ReactiveUpdateOperation.ReactiveUpdate;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.repository.query.VectorSearchDelegate.QueryContainer;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.data.util.ReflectionUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import com.mongodb.client.result.UpdateResult;
/**
* Set of classes to contain query execution strategies. Depending (mostly) on the return type of a
* {@link org.springframework.data.repository.query.QueryMethod} a {@link AbstractReactiveMongoQuery} can be executed in
* various flavors.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.0
*/
interface ReactiveMongoQueryExecution {
Publisher<? extends Object> execute(Query query, Class<?> type, String collection);
/**
* {@link MongoQueryExecution} to execute geo-near queries.
*
* @author Mark Paluch
*/
final class GeoNearExecution implements ReactiveMongoQueryExecution {
private final ReactiveMongoOperations operations;
private final MongoParameterAccessor accessor;
private final TypeInformation<?> returnType;
public GeoNearExecution(ReactiveMongoOperations operations, MongoParameterAccessor accessor,
TypeInformation<?> returnType) {
this.operations = operations;
this.accessor = accessor;
this.returnType = returnType;
}
@Override
public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {
Flux<GeoResult<Object>> results = doExecuteQuery(query, type, collection);
return isStreamOfGeoResult() ? results : results.map(GeoResult::getContent);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Flux<GeoResult<Object>> doExecuteQuery(@Nullable Query query, Class<?> type, String collection) {
Point nearLocation = accessor.getGeoNearLocation();
Assert.notNull(nearLocation, "[query.location] ist not present");
NearQuery nearQuery = NearQuery.near(nearLocation);
if (query != null) {
nearQuery.query(query);
}
Range<Distance> distances = accessor.getDistanceRange();
Assert.notNull(distances, "[query.range] ist not present");
distances.getUpperBound().getValue().ifPresent(it -> nearQuery.maxDistance(it).in(it.getMetric()));
distances.getLowerBound().getValue().ifPresent(it -> nearQuery.minDistance(it).in(it.getMetric()));
Pageable pageable = accessor.getPageable();
nearQuery.with(pageable);
return (Flux) operations.geoNear(nearQuery, type, collection);
}
private boolean isStreamOfGeoResult() {
if (!ReactiveWrappers.supports(returnType.getType())) {
return false;
}
TypeInformation<?> componentType = returnType.getComponentType();
return (componentType != null) && GeoResult.class.equals(componentType.getType());
}
}
/**
* {@link ReactiveMongoQueryExecution} to execute vector search.
*
* @author Mark Paluch
* @since 5.0
*/
class VectorSearchExecution implements ReactiveMongoQueryExecution {
private final ReactiveMongoOperations operations;
private final QueryContainer queryMetadata;
private final AggregationPipeline pipeline;
private final boolean returnSearchResult;
VectorSearchExecution(ReactiveMongoOperations operations, MongoQueryMethod method, QueryContainer queryMetadata) {
this.operations = operations;
this.queryMetadata = queryMetadata;
this.pipeline = queryMetadata.pipeline();
this.returnSearchResult = isSearchResult(method.getReturnType());
}
@Override
public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {
Flux<Document> aggregate = operations.aggregate(
TypedAggregation.newAggregation(queryMetadata.outputType(), pipeline.getOperations()), collection,
Document.class);
return aggregate.map(document -> {
Object mappedResult = operations.getConverter().read(queryMetadata.outputType(), document);
return returnSearchResult
? new SearchResult<>(mappedResult,
Similarity.raw(document.getDouble(queryMetadata.scoreField()), queryMetadata.scoringFunction()))
: mappedResult;
});
}
private static boolean isSearchResult(TypeInformation<?> returnType) {
if (!Publisher.class.isAssignableFrom(returnType.getType())) {
return false;
}
TypeInformation<?> componentType = returnType.getComponentType();
return componentType != null && SearchResult.class.equals(componentType.getType());
}
}
/**
* {@link ReactiveMongoQueryExecution} removing documents matching the query.
*
* @author Mark Paluch
* @author Artyom Gabeev
*/
final class DeleteExecution implements ReactiveMongoQueryExecution {
private final ReactiveMongoOperations operations;
private final MongoQueryMethod method;
public DeleteExecution(ReactiveMongoOperations operations, MongoQueryMethod method) {
this.operations = operations;
this.method = method;
}
@Override
public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {
if (method.isCollectionQuery()) {
return operations.findAllAndRemove(query, type, collection);
}
if (method.isQueryForEntity() && !ClassUtils.isPrimitiveOrWrapper(method.getReturnedObjectType())) {
return operations.findAndRemove(query, type, collection);
}
return operations.remove(query, type, collection)
.map(deleteResult -> deleteResult.wasAcknowledged() ? deleteResult.getDeletedCount() : 0L);
}
}
/**
* {@link MongoQueryExecution} updating documents matching the query.
*
* @author Christph Strobl
* @since 3.4
*/
final class UpdateExecution implements ReactiveMongoQueryExecution {
private final ReactiveUpdate<?> updateOps;
private final MongoParameterAccessor accessor;
private Mono<UpdateDefinition> update;
UpdateExecution(ReactiveUpdate<?> updateOps, ReactiveMongoQueryMethod method, MongoParameterAccessor accessor,
Mono<UpdateDefinition> update) {
this.updateOps = updateOps;
this.accessor = accessor;
this.update = update;
}
@Override
public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {
return update.flatMap(it -> updateOps.inCollection(collection) //
.matching(query.with(accessor.getSort())) // actually we could do it unsorted
.apply(it) //
.all() //
.map(UpdateResult::getModifiedCount));
}
}
/**
* An {@link ReactiveMongoQueryExecution} that wraps the results of the given delegate with the given result
* processing.
*/
final class ResultProcessingExecution implements ReactiveMongoQueryExecution {
private final ReactiveMongoQueryExecution delegate;
private final Converter<Object, Object> converter;
public ResultProcessingExecution(ReactiveMongoQueryExecution delegate, Converter<Object, Object> converter) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(converter, "Converter must not be null");
this.delegate = delegate;
this.converter = converter;
}
@Override
@SuppressWarnings("NullAway")
public Publisher<? extends Object> execute(Query query, Class<?> type, String collection) {
return (Publisher) converter.convert(delegate.execute(query, type, collection));
}
}
/**
* A {@link Converter} to post-process all source objects using the given {@link ResultProcessor}.
*
* @author Mark Paluch
*/
final class ResultProcessingConverter implements Converter<Object, Object> {
private final ResultProcessor processor;
private final ReactiveMongoOperations operations;
private final EntityInstantiators instantiators;
public ResultProcessingConverter(ResultProcessor processor, ReactiveMongoOperations operations,
EntityInstantiators instantiators) {
Assert.notNull(processor, "Processor must not be null");
Assert.notNull(operations, "Operations must not be null");
Assert.notNull(instantiators, "Instantiators must not be null");
this.processor = processor;
this.operations = operations;
this.instantiators = instantiators;
}
@Override
public Object convert(Object source) {
ReturnedType returnedType = processor.getReturnedType();
if (ReflectionUtils.isVoid(returnedType.getReturnedType())) {
if (source instanceof Mono<?> mono) {
return mono.then();
}
if (source instanceof Publisher<?> publisher) {
return Flux.from(publisher).then();
}
}
if (ClassUtils.isPrimitiveOrWrapper(returnedType.getReturnedType())) {
return source;
}
if (!operations.getConverter().getMappingContext().hasPersistentEntityFor(returnedType.getReturnedType())) {
return source;
}
Converter<Object, Object> converter = new DtoInstantiatingConverter(returnedType.getReturnedType(),
operations.getConverter().getMappingContext(), instantiators);
return processor.processResult(source, converter);
}
}
}