ReactiveMongoTemplate.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import static org.springframework.data.mongodb.core.query.SerializationUtils.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.domain.OffsetScrollPosition;
import org.springframework.data.domain.Window;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.Metric;
import org.springframework.data.mapping.MappingException;
import org.springframework.data.mapping.PersistentEntity;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mapping.context.MappingContextEvent;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoClusterCapable;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.mongodb.core.EntityOperations.Entity;
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
import org.springframework.data.mongodb.core.QueryOperations.CountContext;
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
import org.springframework.data.mongodb.core.QueryOperations.DistinctQueryContext;
import org.springframework.data.mongodb.core.QueryOperations.QueryContext;
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.ScrollUtils.KeysetScrollQuery;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.FieldLookupPolicy;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.bulk.Bulk;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.bulk.BulkWriteResult;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
import org.springframework.data.mongodb.core.convert.MongoWriter;
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.index.MongoMappingEventPublisher;
import org.springframework.data.mongodb.core.index.ReactiveIndexOperations;
import org.springframework.data.mongodb.core.index.ReactiveMongoPersistentEntityIndexCreator;
import org.springframework.data.mongodb.core.mapping.FieldName;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.event.*;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.data.projection.EntityProjection;
import org.springframework.data.util.Optionals;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.NumberUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;
import com.mongodb.ClientSessionOptions;
import com.mongodb.CursorType;
import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.CreateViewOptions;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.EstimatedDocumentCountOptions;
import com.mongodb.client.model.FindOneAndDeleteOptions;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.AggregatePublisher;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.DistinctPublisher;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MapReducePublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCluster;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
/**
* Primary implementation of {@link ReactiveMongoOperations}. It simplifies the use of Reactive MongoDB usage and helps
* to avoid common errors. It executes core MongoDB workflow, leaving application code to provide {@link Document} and
* extract results. This class executes BSON queries or updates, initiating iteration over {@link FindPublisher} and
* catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
* org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
* {@link ReactiveMongoDatabaseFactory} reference, or get prepared in an application context and given to services as
* bean reference.
* <p>
* Note: The {@link ReactiveMongoDatabaseFactory} should always be configured as a bean in the application context, in
* the first case given to the service directly, in the second case to the prepared template.
* <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
* <p>
* {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
* {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
* <p>
* You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
* generally apply a {@link ReadPreference}.
* <p>
* When using transactions make sure to create this template with the same {@link ReactiveMongoDatabaseFactory} that is
* also used for {@code ReactiveMongoTransactionManager} creation.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Roman Puchkovskiy
* @author Mathieu Ouellet
* @author Yadhukrishna S Pai
* @author Florian L��diger
* @author Kyuhong Han
* @since 2.0
*/
public class ReactiveMongoTemplate implements ReactiveMongoOperations, ApplicationContextAware {
public static final DbRefResolver NO_OP_REF_RESOLVER = NoOpDbRefResolver.INSTANCE;
private static final Log LOGGER = LogFactory.getLog(ReactiveMongoTemplate.class);
private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;
private final MongoConverter mongoConverter;
private final MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext;
private final ReactiveMongoDatabaseFactory mongoDatabaseFactory;
private final PersistenceExceptionTranslator exceptionTranslator;
private final QueryMapper queryMapper;
private final UpdateMapper updateMapper;
private final ApplicationListener<MappingContextEvent<?, ?>> indexCreatorListener;
private final EntityOperations operations;
private final PropertyOperations propertyOperations;
private final QueryOperations queryOperations;
private final EntityLifecycleEventDelegate eventDelegate;
private @Nullable WriteConcern writeConcern;
private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE;
private WriteResultChecking writeResultChecking = WriteResultChecking.NONE;
private @Nullable ReadPreference readPreference;
private @Nullable ApplicationEventPublisher eventPublisher;
private @Nullable ReactiveEntityCallbacks entityCallbacks;
private @Nullable ReactiveMongoPersistentEntityIndexCreator indexCreator;
private SessionSynchronization sessionSynchronization = SessionSynchronization.ON_ACTUAL_TRANSACTION;
private CountExecution countExecution = this::doExactCount;
/**
* Constructor used for a basic template configuration.
* <p>
* If you intend to use transactions, make sure to use {@link #ReactiveMongoTemplate(ReactiveMongoDatabaseFactory)} or
* {@link #ReactiveMongoTemplate(ReactiveMongoDatabaseFactory, MongoConverter)} constructors, otherwise, this template
* will not participate in transactions using the default {@code SessionSynchronization.ON_ACTUAL_TRANSACTION} setting
* as {@code ReactiveMongoTransactionManager} uses strictly its configured {@link ReactiveMongoDatabaseFactory} for
* transaction participation.
*
* @param mongoClient must not be {@literal null}.
* @param databaseName must not be {@literal null} or empty.
*/
public ReactiveMongoTemplate(MongoClient mongoClient, String databaseName) {
this(new SimpleReactiveMongoDatabaseFactory(mongoClient, databaseName), (MongoConverter) null);
}
/**
* Constructor used for a basic template configuration.
*
* @param mongoDatabaseFactory must not be {@literal null}.
*/
public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory) {
this(mongoDatabaseFactory, (MongoConverter) null);
}
/**
* Constructor used for a basic template configuration.
*
* @param mongoDatabaseFactory must not be {@literal null}.
* @param mongoConverter can be {@literal null}.
*/
public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory,
@Nullable MongoConverter mongoConverter) {
this(mongoDatabaseFactory, mongoConverter, ReactiveMongoTemplate::handleSubscriptionException);
}
/**
* Constructor used for a basic template configuration.
*
* @param mongoDatabaseFactory must not be {@literal null}.
* @param mongoConverter can be {@literal null}.
* @param subscriptionExceptionHandler exception handler called by {@link Flux#doOnError(Consumer)} on reactive type
* materialization via {@link Publisher#subscribe(Subscriber)}. This callback is used during non-blocking
* subscription of e.g. index creation {@link Publisher}s. Must not be {@literal null}.
* @since 2.1
*/
public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory,
@Nullable MongoConverter mongoConverter, Consumer<Throwable> subscriptionExceptionHandler) {
Assert.notNull(mongoDatabaseFactory, "ReactiveMongoDatabaseFactory must not be null");
this.mongoDatabaseFactory = mongoDatabaseFactory;
this.exceptionTranslator = mongoDatabaseFactory.getExceptionTranslator();
this.mongoConverter = mongoConverter == null ? getDefaultMongoConverter() : mongoConverter;
this.queryMapper = new QueryMapper(this.mongoConverter);
this.updateMapper = new UpdateMapper(this.mongoConverter);
this.indexCreatorListener = new IndexCreatorEventListener(subscriptionExceptionHandler);
// We always have a mapping context in the converter, whether it's a simple one or not
this.mappingContext = this.mongoConverter.getMappingContext();
this.operations = new EntityOperations(this.mongoConverter, this.queryMapper);
this.propertyOperations = new PropertyOperations(this.mongoConverter.getMappingContext());
this.queryOperations = new QueryOperations(queryMapper, updateMapper, operations, propertyOperations,
mongoDatabaseFactory);
this.eventDelegate = new EntityLifecycleEventDelegate();
// We create indexes based on mapping events
if (this.mappingContext instanceof MongoMappingContext mongoMappingContext) {
if (mongoMappingContext.isAutoIndexCreation()) {
this.indexCreator = new ReactiveMongoPersistentEntityIndexCreator(mongoMappingContext, this::indexOps);
this.eventPublisher = new MongoMappingEventPublisher(this.indexCreatorListener);
mongoMappingContext.setApplicationEventPublisher(this.eventPublisher);
this.mappingContext.getPersistentEntities()
.forEach(entity -> onCheckForIndexes(entity, subscriptionExceptionHandler));
}
}
}
private ReactiveMongoTemplate(ReactiveMongoDatabaseFactory dbFactory, ReactiveMongoTemplate that) {
this.mongoDatabaseFactory = dbFactory;
this.exceptionTranslator = that.exceptionTranslator;
this.mongoConverter = that.mongoConverter;
this.queryMapper = that.queryMapper;
this.updateMapper = that.updateMapper;
this.indexCreator = that.indexCreator;
this.indexCreatorListener = that.indexCreatorListener;
this.mappingContext = that.mappingContext;
this.operations = that.operations;
this.propertyOperations = that.propertyOperations;
this.sessionSynchronization = that.sessionSynchronization;
this.queryOperations = that.queryOperations;
this.eventDelegate = that.eventDelegate;
}
private void onCheckForIndexes(MongoPersistentEntity<?> entity, Consumer<Throwable> subscriptionExceptionHandler) {
if (indexCreator != null) {
indexCreator.checkForIndexes(entity).subscribe(v -> {}, subscriptionExceptionHandler);
}
}
private static void handleSubscriptionException(Throwable t) {
LOGGER.error("Unexpected exception during asynchronous execution", t);
}
/**
* Configures the {@link WriteResultChecking} to be used with the template. Setting {@literal null} will reset the
* default of {@link ReactiveMongoTemplate#DEFAULT_WRITE_RESULT_CHECKING}.
*
* @param resultChecking
*/
public void setWriteResultChecking(@Nullable WriteResultChecking resultChecking) {
this.writeResultChecking = resultChecking == null ? DEFAULT_WRITE_RESULT_CHECKING : resultChecking;
}
/**
* Configures the {@link WriteConcern} to be used with the template. If none is configured the {@link WriteConcern}
* configured on the {@link MongoDatabaseFactory} will apply.
*
* @param writeConcern can be {@literal null}.
*/
public void setWriteConcern(@Nullable WriteConcern writeConcern) {
this.writeConcern = writeConcern;
}
/**
* Configures the {@link WriteConcernResolver} to be used with the template.
*
* @param writeConcernResolver can be {@literal null}.
*/
public void setWriteConcernResolver(@Nullable WriteConcernResolver writeConcernResolver) {
this.writeConcernResolver = writeConcernResolver != null ? writeConcernResolver
: DefaultWriteConcernResolver.INSTANCE;
}
/**
* Used by {@link #prepareCollection(MongoCollection)} to set the {@link ReadPreference} before any operations
* are performed.
*
* @param readPreference
*/
public void setReadPreference(ReadPreference readPreference) {
this.readPreference = readPreference;
}
/**
* Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
* published or whether emission should be suppressed. Enabled by default.
*
* @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events.
* @since 4.0
* @see MongoMappingEvent
*/
public void setEntityLifecycleEventsEnabled(boolean enabled) {
this.eventDelegate.setEventsEnabled(enabled);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
prepareIndexCreator(applicationContext);
setApplicationEventPublisher(applicationContext);
if (entityCallbacks == null) {
setEntityCallbacks(ReactiveEntityCallbacks.create(applicationContext));
}
if (eventPublisher != null && mappingContext instanceof ApplicationEventPublisherAware applicationEventPublisherAware) {
applicationEventPublisherAware.setApplicationEventPublisher(eventPublisher);
}
}
void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.eventPublisher = applicationEventPublisher;
eventDelegate.setPublisher(this.eventPublisher);
}
/**
* Set the {@link ReactiveEntityCallbacks} instance to use when invoking
* {@link org.springframework.data.mapping.callback.EntityCallback callbacks} like the
* {@link ReactiveBeforeSaveCallback}. <br />
* Overrides potentially existing {@link ReactiveEntityCallbacks}.
*
* @param entityCallbacks must not be {@literal null}.
* @throws IllegalArgumentException if the given instance is {@literal null}.
* @since 2.2
*/
public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) {
Assert.notNull(entityCallbacks, "EntityCallbacks must not be null");
this.entityCallbacks = entityCallbacks;
}
/**
* Configure whether to use estimated count. Defaults to exact counting.
*
* @param enabled use {@link com.mongodb.client.MongoCollection#estimatedDocumentCount()} for unpaged and empty
* {@link Query queries} if {@code true}.
* @since 3.4
*/
public void useEstimatedCount(boolean enabled) {
useEstimatedCount(enabled, this::countCanBeEstimated);
}
/**
* Configure whether to use estimated count based on the given {@link BiPredicate estimationFilter}.
*
* @param enabled use {@link com.mongodb.client.MongoCollection#estimatedDocumentCount()} for unpaged and empty
* {@link Query queries} if {@code true}.
* @param estimationFilter the {@link BiPredicate filter}.
* @since 3.4
*/
private void useEstimatedCount(boolean enabled, BiFunction<Document, CountOptions, Mono<Boolean>> estimationFilter) {
if (enabled) {
this.countExecution = (collectionName, filter, options) -> {
return estimationFilter.apply(filter, options).flatMap(canEstimate -> {
if (!canEstimate) {
return doExactCount(collectionName, filter, options);
}
EstimatedDocumentCountOptions estimatedDocumentCountOptions = new EstimatedDocumentCountOptions();
if (options.getMaxTime(TimeUnit.MILLISECONDS) > 0) {
estimatedDocumentCountOptions.maxTime(options.getMaxTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
return doEstimatedCount(collectionName, estimatedDocumentCountOptions);
});
};
} else {
this.countExecution = this::doExactCount;
}
}
/**
* Inspects the given {@link ApplicationContext} for {@link ReactiveMongoPersistentEntityIndexCreator} and those in
* turn if they were registered for the current {@link MappingContext}. If no creator for the current
* {@link MappingContext} can be found we manually add the internally created one as {@link ApplicationListener} to
* make sure indexes get created appropriately for entity types persisted through this {@link ReactiveMongoTemplate}
* instance.
*
* @param context must not be {@literal null}.
*/
private void prepareIndexCreator(ApplicationContext context) {
String[] indexCreators = context.getBeanNamesForType(ReactiveMongoPersistentEntityIndexCreator.class);
for (String creator : indexCreators) {
ReactiveMongoPersistentEntityIndexCreator creatorBean = context.getBean(creator,
ReactiveMongoPersistentEntityIndexCreator.class);
if (creatorBean.isIndexCreatorFor(mappingContext)) {
return;
}
}
if (context instanceof ConfigurableApplicationContext configurableApplicationContext) {
configurableApplicationContext.addApplicationListener(indexCreatorListener);
}
}
/**
* Returns the default {@link MongoConverter}.
*
* @return
*/
@Override
public MongoConverter getConverter() {
return this.mongoConverter;
}
EntityOperations getEntityOperations() {
return operations;
}
QueryOperations getQueryOperations() {
return queryOperations;
}
@Override
public ReactiveIndexOperations indexOps(String collectionName) {
return new DefaultReactiveIndexOperations(this, collectionName, this.queryMapper);
}
@Override
public ReactiveIndexOperations indexOps(Class<?> entityClass) {
return new DefaultReactiveIndexOperations(this, getCollectionName(entityClass), this.queryMapper, entityClass);
}
@Override
public String getCollectionName(Class<?> entityClass) {
return operations.determineCollectionName(entityClass);
}
@Override
public Mono<Document> executeCommand(String jsonCommand) {
Assert.notNull(jsonCommand, "Command must not be empty");
return executeCommand(Document.parse(jsonCommand));
}
@Override
public Mono<Document> executeCommand(Document command) {
return executeCommand(command, null);
}
@Override
public Mono<Document> executeCommand(Document command, @Nullable ReadPreference readPreference) {
Assert.notNull(command, "Command must not be null");
return createFlux(db -> readPreference != null ? db.runCommand(command, readPreference, Document.class)
: db.runCommand(command, Document.class)).next();
}
@Override
public <T> Flux<T> execute(Class<?> entityClass, ReactiveCollectionCallback<T> action) {
return createFlux(getCollectionName(entityClass), action);
}
@Override
public <T> Flux<T> execute(ReactiveDatabaseCallback<T> action) {
return createFlux(action);
}
@Override
public <T> Flux<T> execute(String collectionName, ReactiveCollectionCallback<T> callback) {
Assert.notNull(callback, "ReactiveCollectionCallback must not be null");
return createFlux(collectionName, callback);
}
<T> Mono<T> doWithCluster(Function<MongoCluster, Publisher<T>> callback) {
if (!(mongoDatabaseFactory instanceof ReactiveMongoClusterCapable clusterCapable)) {
return Mono.error(new IllegalStateException(
"Unable to obtain MongoCluster. Does your database factory implement ReactiveMongoClusterCapable?"));
}
return Mono.from(callback.apply(clusterCapable.getMongoCluster())).onErrorMap(translateException());
}
@Override
public ReactiveSessionScoped withSession(Publisher<ClientSession> sessionProvider) {
Mono<ClientSession> cachedSession = Mono.from(sessionProvider).cache();
return new ReactiveSessionScoped() {
@Override
public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {
return cachedSession.flatMapMany(session -> {
return ReactiveMongoTemplate.this.withSession(action, session) //
.doFinally(signalType -> {
doFinally.accept(session);
});
});
}
};
}
/**
* Define if {@link ReactiveMongoTemplate} should participate in transactions. Default is set to
* {@link SessionSynchronization#ON_ACTUAL_TRANSACTION}.
* <p>
* <strong>NOTE:</strong> MongoDB transactions require at least MongoDB 4.0.
*
* @since 2.2
*/
public void setSessionSynchronization(SessionSynchronization sessionSynchronization) {
this.sessionSynchronization = sessionSynchronization;
}
private <T> Flux<T> withSession(ReactiveSessionCallback<T> action, ClientSession session) {
ReactiveSessionBoundMongoTemplate operations = new ReactiveSessionBoundMongoTemplate(session,
ReactiveMongoTemplate.this);
return Flux.from(action.doInSession(operations)) //
.contextWrite(ctx -> ReactiveMongoContext.setSession(ctx, Mono.just(session)));
}
@Override
public ReactiveMongoOperations withSession(ClientSession session) {
return new ReactiveSessionBoundMongoTemplate(session, ReactiveMongoTemplate.this);
}
@Override
public ReactiveSessionScoped withSession(ClientSessionOptions sessionOptions) {
return withSession(mongoDatabaseFactory.getSession(sessionOptions));
}
/**
* Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
*/
public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null");
return Mono.defer(this::doGetDatabase).flatMapMany(database -> callback.doInDB(prepareDatabase(database)))
.onErrorMap(translateException());
}
/**
* Create a reusable Mono for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Mono} wrapping the {@link ReactiveDatabaseCallback}.
*/
public <T> Mono<T> createMono(ReactiveDatabaseCallback<T> callback) {
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null");
return Mono.defer(this::doGetDatabase).flatMap(database -> Mono.from(callback.doInDB(prepareDatabase(database))))
.onErrorMap(translateException());
}
/**
* Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
*
* @param collectionName must not be empty or {@literal null}.
* @param callback must not be {@literal null}.
* @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
*/
public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
Assert.hasText(collectionName, "Collection name must not be null or empty");
Assert.notNull(callback, "ReactiveDatabaseCallback must not be null");
Mono<MongoCollection<Document>> collectionPublisher = doGetDatabase()
.map(database -> getAndPrepareCollection(database, collectionName));
return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
}
/**
* Create a reusable {@link Mono} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
*
* @param collectionName must not be empty or {@literal null}.
* @param callback must not be {@literal null}.
* @param <T>
* @return a reusable {@link Mono} wrapping the {@link ReactiveCollectionCallback}.
*/
public <T> Mono<T> createMono(String collectionName, ReactiveCollectionCallback<T> callback) {
Assert.hasText(collectionName, "Collection name must not be null or empty");
Assert.notNull(callback, "ReactiveCollectionCallback must not be null");
Mono<MongoCollection<Document>> collectionPublisher = doGetDatabase()
.map(database -> getAndPrepareCollection(database, collectionName));
return collectionPublisher.flatMap(collection -> Mono.from(callback.doInCollection(collection)))
.onErrorMap(translateException());
}
@Override
public Mono<BulkWriteResult> bulkWrite(Bulk bulk, BulkWriteOptions options) {
return doGetDatabase().flatMap(db -> new ReactiveBulkWriter(this).write(db.getName(), bulk, options));
}
@Override
public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass) {
return createCollection(entityClass, Function.identity());
}
@Override
public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass,
Function<? super CollectionOptions, ? extends CollectionOptions> collectionOptionsCustomizer) {
Assert.notNull(collectionOptionsCustomizer, "CollectionOptions customizer function must not be null");
return createCollection(entityClass,
collectionOptionsCustomizer.apply(operations.forType(entityClass).getCollectionOptions()));
}
@Override
public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass,
@Nullable CollectionOptions collectionOptions) {
Assert.notNull(entityClass, "EntityClass must not be null");
CollectionOptions options = collectionOptions != null ? collectionOptions : CollectionOptions.empty();
options = Optionals
.firstNonEmpty(() -> Optional.ofNullable(collectionOptions).flatMap(CollectionOptions::getCollation),
() -> operations.forType(entityClass).getCollation()) //
.map(options::collation).orElse(options);
return doCreateCollection(getCollectionName(entityClass), convertToCreateCollectionOptions(options, entityClass));
}
@Override
public Mono<MongoCollection<Document>> createCollection(String collectionName) {
return doCreateCollection(collectionName, new CreateCollectionOptions());
}
@Override
public Mono<MongoCollection<Document>> createCollection(String collectionName,
@Nullable CollectionOptions collectionOptions) {
return doCreateCollection(collectionName, convertToCreateCollectionOptions(collectionOptions));
}
@Override
public Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationPipeline pipeline,
@Nullable ViewOptions options) {
return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
options);
}
@Override
public Mono<MongoCollection<Document>> createView(String name, String source, AggregationPipeline pipeline,
@Nullable ViewOptions options) {
return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
options);
}
private Mono<MongoCollection<Document>> createView(String name, String source, AggregationDefinition aggregation,
@Nullable ViewOptions options) {
return doCreateView(name, source, aggregation.getAggregationPipeline(), options);
}
protected Mono<MongoCollection<Document>> doCreateView(String name, String source, List<Document> pipeline,
@Nullable ViewOptions options) {
CreateViewOptions viewOptions = new CreateViewOptions();
if (options != null) {
options.getCollation().map(Collation::toMongoCollation).ifPresent(viewOptions::collation);
}
return execute(db -> {
return Flux.from(db.createView(name, source, pipeline, viewOptions))
.then(Mono.fromSupplier(() -> db.getCollection(name)));
}).next();
}
@Override
public Mono<MongoCollection<Document>> getCollection(String collectionName) {
Assert.notNull(collectionName, "Collection name must not be null");
return createMono(db -> Mono.just(db.getCollection(collectionName)));
}
@Override
public <T> Mono<Boolean> collectionExists(Class<T> entityClass) {
return collectionExists(getCollectionName(entityClass));
}
@Override
public Mono<Boolean> collectionExists(String collectionName) {
return createMono(db -> Flux.from(db.listCollectionNames()) //
.filter(s -> s.equals(collectionName)) //
.map(s -> true) //
.single(false));
}
@Override
public <T> Mono<Void> dropCollection(Class<T> entityClass) {
return dropCollection(getCollectionName(entityClass));
}
@Override
public Mono<Void> dropCollection(String collectionName) {
return createMono(collectionName, MongoCollection::drop).doOnSuccess(success -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped collection [" + collectionName + "]");
}
}).then();
}
@Override
public ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName) {
return bulkOps(mode, null, collectionName);
}
@Override
public ReactiveBulkOperations bulkOps(BulkMode mode, Class<?> entityClass) {
return bulkOps(mode, entityClass, getCollectionName(entityClass));
}
@Override
public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName) {
Assert.notNull(mode, "BulkMode must not be null");
Assert.hasText(collectionName, "Collection name must not be null or empty");
DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName,
new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper,
updateMapper, eventPublisher, entityCallbacks));
operations.setDefaultWriteConcern(writeConcern);
return operations;
}
@Override
public Flux<String> getCollectionNames() {
return createFlux(db -> db.listCollectionNames());
}
public Mono<MongoDatabase> getMongoDatabase() {
return mongoDatabaseFactory.getMongoDatabase();
}
protected Mono<MongoDatabase> doGetDatabase() {
return ReactiveMongoDatabaseUtils.getDatabase(mongoDatabaseFactory, sessionSynchronization);
}
<T> Mono<SourceAwareDocument<T>> prepareObjectForSaveReactive(String collectionName, T objectToSave) {
return prepareObjectForSaveReactive(collectionName, objectToSave, mongoConverter);
}
<T> Mono<SourceAwareDocument<T>> prepareObjectForSaveReactive(String collectionName, T objectToSave,
MongoWriter<? super Object> writer) {
T toConvert = maybeEmitEvent(new BeforeConvertEvent<>(objectToSave, collectionName)).getSource();
return maybeCallBeforeConvert(toConvert, collectionName).map(initialized -> {
AdaptibleEntity<T> entity = operations.forEntityUpsert(initialized, mongoConverter.getConversionService());
T withVersion = entity.initializeVersionProperty();
Document dbDoc = entity.toMappedDocument(writer).getDocument();
maybeEmitEvent(new BeforeSaveEvent<>(withVersion, dbDoc, collectionName));
return new Object[] { withVersion, dbDoc };
}).flatMap(pair -> maybeCallBeforeSave((T) pair[0], (Document) pair[1], collectionName)
.map(saved -> new SourceAwareDocument<>(saved, (Document) pair[1], collectionName)));
}
@Override
public <T> Mono<T> findOne(Query query, Class<T> entityClass) {
return findOne(query, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Mono<T> findOne(Query query, Class<T> entityClass, String collectionName) {
if (ObjectUtils.isEmpty(query.getSortObject())) {
return doFindOne(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
}
query.limit(1);
return find(query, entityClass, collectionName).next();
}
@Override
public Mono<Boolean> exists(Query query, Class<?> entityClass) {
return exists(query, entityClass, getCollectionName(entityClass));
}
@Override
public Mono<Boolean> exists(Query query, String collectionName) {
return exists(query, null, collectionName);
}
@Override
public Mono<Boolean> exists(Query query, @Nullable Class<?> entityClass, String collectionName) {
if (query == null) {
throw new InvalidDataAccessApiUsageException("Query passed in to exist can't be null");
}
return createFlux(collectionName, collection -> {
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
QueryContext queryContext = queryOperations.createQueryContext(query);
Document filter = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);
FindPublisher<Document> findPublisher = collectionPreparer.prepare(collection).find(filter, Document.class)
.projection(new Document(FieldName.ID.name(), 1));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("exists: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
}
queryContext.applyCollation(entityClass, findPublisher::collation);
return findPublisher.limit(1);
}).hasElements();
}
@Override
public <T> Flux<T> find(Query query, Class<T> entityClass) {
return find(query, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Flux<T> find(@Nullable Query query, Class<T> entityClass, String collectionName) {
if (query == null) {
return findAll(entityClass, collectionName);
}
return doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
}
@Override
public <T> Mono<Window<T>> scroll(Query query, Class<T> entityType) {
Assert.notNull(entityType, "Entity type must not be null");
return scroll(query, entityType, getCollectionName(entityType));
}
@Override
public <T> Mono<Window<T>> scroll(Query query, Class<T> entityType, String collectionName) {
return doScroll(query, entityType, entityType, QueryResultConverter.entity(), collectionName);
}
<T, R> Mono<Window<R>> doScroll(Query query, Class<?> sourceClass, Class<T> targetClass,
QueryResultConverter<? super T, ? extends R> resultConverter, String collectionName) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(collectionName, "CollectionName must not be null");
Assert.notNull(sourceClass, "Entity type must not be null");
Assert.notNull(targetClass, "Target type must not be null");
EntityProjection<T, ?> projection = operations.introspectProjection(targetClass, sourceClass);
DocumentCallback<R> callback = getResultReader(projection, collectionName, resultConverter);
int limit = query.isLimited() ? query.getLimit() + 1 : Integer.MAX_VALUE;
if (query.hasKeyset()) {
KeysetScrollQuery keysetPaginationQuery = ScrollUtils.createKeysetPaginationQuery(query,
operations.getIdPropertyName(sourceClass));
Mono<List<R>> result = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query),
keysetPaginationQuery.query(), keysetPaginationQuery.fields(), sourceClass,
new QueryFindPublisherPreparer(query, keysetPaginationQuery.sort(), limit, 0, sourceClass), callback)
.collectList();
return result.map(it -> ScrollUtils.createWindow(query, it, sourceClass, operations));
}
Mono<List<R>> result = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), sourceClass,
new QueryFindPublisherPreparer(query, query.getSortObject(), limit, query.getSkip(), sourceClass), callback)
.collectList();
return result.map(
it -> ScrollUtils.createWindow(it, query.getLimit(), OffsetScrollPosition.positionFunction(query.getSkip())));
}
@Override
public <T> Mono<T> findById(Object id, Class<T> entityClass) {
return findById(id, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Mono<T> findById(Object id, Class<T> entityClass, String collectionName) {
String idKey = operations.getIdPropertyName(entityClass);
return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), null, entityClass,
(Collation) null);
}
@Override
public <T> Flux<T> findDistinct(Query query, String field, Class<?> entityClass, Class<T> resultClass) {
return findDistinct(query, field, getCollectionName(entityClass), entityClass, resultClass);
}
@Override
@SuppressWarnings("unchecked")
public <T> Flux<T> findDistinct(Query query, String field, String collectionName, Class<?> entityClass,
Class<T> resultClass) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(field, "Field must not be null");
Assert.notNull(collectionName, "CollectionName must not be null");
Assert.notNull(entityClass, "EntityClass must not be null");
Assert.notNull(resultClass, "ResultClass must not be null");
MongoPersistentEntity<?> entity = getPersistentEntity(entityClass);
DistinctQueryContext distinctQueryContext = queryOperations.distinctQueryContext(query, field);
Document mappedQuery = distinctQueryContext.getMappedQuery(entity);
String mappedFieldName = distinctQueryContext.getMappedFieldName(entity);
Class<T> mongoDriverCompatibleType = distinctQueryContext.getDriverCompatibleClass(resultClass);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
Flux<?> result = execute(collectionName, collection -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Executing findDistinct using query %s for field: %s in collection: %s",
serializeToJsonSafely(mappedQuery), field, collectionName));
}
FindPublisherPreparer preparer = new QueryFindPublisherPreparer(query, entityClass);
DistinctPublisher<T> publisher = collectionPreparer.prepare(collection).distinct(mappedFieldName, mappedQuery,
mongoDriverCompatibleType);
distinctQueryContext.applyCollation(entityClass, publisher::collation);
return publisher;
});
if (resultClass == Object.class || mongoDriverCompatibleType != resultClass) {
Class<?> targetType = distinctQueryContext.getMostSpecificConversionTargetType(resultClass, entityClass);
MongoConverter converter = getConverter();
result = result.map(it -> converter.mapValueToTargetType(it, targetType, NO_OP_REF_RESOLVER));
}
return (Flux<T>) result;
}
@Override
public <O> Flux<O> aggregate(TypedAggregation<?> aggregation, String inputCollectionName, Class<O> outputType) {
Assert.notNull(aggregation, "Aggregation pipeline must not be null");
return doAggregate(aggregation, inputCollectionName, aggregation.getInputType(), outputType);
}
@Override
public <O> Flux<O> aggregate(TypedAggregation<?> aggregation, Class<O> outputType) {
Assert.notNull(aggregation, "Aggregation pipeline must not be null");
return aggregate(aggregation, getCollectionName(aggregation.getInputType()), outputType);
}
@Override
public <O> Flux<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
return doAggregate(aggregation, getCollectionName(inputType), inputType, outputType);
}
@Override
public <O> Flux<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType) {
return doAggregate(aggregation, collectionName, null, outputType);
}
protected <O> Flux<O> doAggregate(Aggregation aggregation, String collectionName, @Nullable Class<?> inputType,
Class<O> outputType) {
AggregationDefinition context = queryOperations.createAggregation(aggregation, inputType);
return doAggregate(aggregation, collectionName, outputType, QueryResultConverter.entity(), context);
}
<T, O> Flux<O> doAggregate(Aggregation aggregation, String collectionName, @Nullable Class<?> inputType,
Class<T> outputType, QueryResultConverter<? super T, ? extends O> resultConverter) {
AggregationDefinition context = queryOperations.createAggregation(aggregation, inputType);
return doAggregate(aggregation, collectionName, outputType, resultConverter, context);
}
<T, O> Flux<O> doAggregate(Aggregation aggregation, String collectionName, Class<T> outputType,
QueryResultConverter<? super T, ? extends O> resultConverter, AggregationDefinition definition) {
Assert.notNull(aggregation, "Aggregation pipeline must not be null");
Assert.hasText(collectionName, "Collection name must not be null or empty");
Assert.notNull(outputType, "Output type must not be null");
AggregationOptions options = aggregation.getOptions();
Assert.isTrue(!options.isExplain(), "Cannot use explain option with streaming");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Streaming aggregation: %s in collection %s",
serializeToJsonSafely(definition.getAggregationPipeline()), collectionName));
}
DocumentCallback<O> readCallback = new QueryResultConverterCallback<>(resultConverter,
new ReadDocumentCallback<>(mongoConverter, outputType, collectionName));
return execute(collectionName, collection -> aggregateAndMap(collection, definition.getAggregationPipeline(),
definition.isOutOrMerge(), options, readCallback, definition.getInputType()));
}
private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<Document> pipeline,
boolean isOutOrMerge, AggregationOptions options, DocumentCallback<O> readCallback,
@Nullable Class<?> inputType) {
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(options);
AggregatePublisher<Document> cursor = collectionPreparer.prepare(collection).aggregate(pipeline, Document.class);
if (options.isAllowDiskUseSet()) {
cursor = cursor.allowDiskUse(options.isAllowDiskUse());
}
if (options.getCursorBatchSize() != null) {
cursor = cursor.batchSize(options.getCursorBatchSize());
}
options.getComment().ifPresent(cursor::comment);
HintFunction hintFunction = options.getHintObject().map(HintFunction::from).orElseGet(HintFunction::empty);
if (hintFunction.isPresent()) {
cursor = hintFunction.apply(mongoDatabaseFactory, cursor::hintString, cursor::hint);
}
Optionals.firstNonEmpty(options::getCollation, () -> operations.forType(inputType).getCollation()) //
.map(Collation::toMongoCollation) //
.ifPresent(cursor::collation);
if (options.hasExecutionTimeLimit()) {
cursor = cursor.maxTime(options.getMaxTime().toMillis(), TimeUnit.MILLISECONDS);
}
if (options.isSkipResults()) {
return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
}
return Flux.from(cursor).flatMapSequential(readCallback::doWith);
}
@Override
public <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<T> entityClass) {
return geoNear(near, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<T> entityClass, String collectionName) {
return geoNear(near, entityClass, collectionName, entityClass);
}
protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, String collectionName,
Class<T> returnType) {
return doGeoNear(near, entityClass, collectionName, returnType, QueryResultConverter.entity());
}
Mono<Long> doGeoNearCount(NearQuery near, Class<?> domainType, String collectionName) {
Builder optionsBuilder = AggregationOptions.builder().collation(near.getCollation());
if (near.hasReadPreference()) {
optionsBuilder.readPreference(near.getReadPreference());
}
if (near.hasReadConcern()) {
optionsBuilder.readConcern(near.getReadConcern());
}
String distanceField = operations.nearQueryDistanceFieldName(domainType);
Aggregation $geoNear = TypedAggregation.newAggregation(domainType,
Aggregation.geoNear(near, distanceField).skip(-1).limit(-1), Aggregation.count().as("_totalCount"))
.withOptions(optionsBuilder.build());
AggregationDefinition definition = queryOperations.createAggregation($geoNear, (AggregationOperationContext) null);
Flux<Document> results = doAggregate($geoNear, collectionName, Document.class, QueryResultConverter.entity(),
definition);
return results.last()
.map(doc -> NumberUtils.convertNumberToTargetClass(doc.get("_totalCount", Integer.class), Long.class))
.defaultIfEmpty(0L);
}
@SuppressWarnings("unchecked")
<T, R> Flux<GeoResult<R>> doGeoNear(NearQuery near, Class<?> entityClass, String collectionName, Class<T> returnType,
QueryResultConverter<? super T, ? extends R> resultConverter) {
if (near == null) {
throw new InvalidDataAccessApiUsageException("NearQuery must not be null");
}
if (entityClass == null) {
throw new InvalidDataAccessApiUsageException("Entity class must not be null");
}
String collection = StringUtils.hasText(collectionName) ? collectionName : getCollectionName(entityClass);
String distanceField = operations.nearQueryDistanceFieldName(entityClass);
EntityProjection<T, ?> projection = operations.introspectProjection(returnType, entityClass);
GeoNearResultDocumentCallback<R> callback = new GeoNearResultDocumentCallback<>(distanceField,
getResultReader(projection, collectionName, resultConverter), near.getMetric());
Builder optionsBuilder = AggregationOptions.builder();
if (near.hasReadPreference()) {
optionsBuilder.readPreference(near.getReadPreference());
}
if (near.hasReadConcern()) {
optionsBuilder.readConcern(near.getReadConcern());
}
optionsBuilder.collation(near.getCollation());
Aggregation $geoNear = TypedAggregation.newAggregation(entityClass, Aggregation.geoNear(near, distanceField))
.withOptions(optionsBuilder.build());
return aggregate($geoNear, collection, Document.class) //
.flatMapSequential(callback::doWith);
}
@Override
public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, Class<T> entityClass) {
return findAndModify(query, update, new FindAndModifyOptions(), entityClass, getCollectionName(entityClass));
}
@Override
public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, Class<T> entityClass, String collectionName) {
return findAndModify(query, update, new FindAndModifyOptions(), entityClass, collectionName);
}
@Override
public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
Class<T> entityClass) {
return findAndModify(query, update, options, entityClass, getCollectionName(entityClass));
}
public <S, T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
Class<S> entityClass, String collectionName, QueryResultConverter<? super S, ? extends T> resultConverter) {
Assert.notNull(options, "Options must not be null ");
Assert.notNull(entityClass, "Entity class must not be null");
FindAndModifyOptions optionsToUse = FindAndModifyOptions.of(options);
Optionals.ifAllPresent(query.getCollation(), optionsToUse.getCollation(), (l, r) -> {
throw new IllegalArgumentException(
"Both Query and FindAndModifyOptions define a collation; Please provide the collation only via one of the two");
});
if (!optionsToUse.getCollation().isPresent()) {
operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
}
return doFindAndModify(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), getMappedSortObject(query, entityClass), entityClass, update, optionsToUse,
resultConverter);
}
@Override
public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
Class<T> entityClass, String collectionName) {
return findAndModify(query, update, options, entityClass, collectionName, QueryResultConverter.entity());
}
@Override
public <S, T> Mono<T> findAndReplace(Query query, S replacement, FindAndReplaceOptions options, Class<S> entityType,
String collectionName, Class<T> resultType) {
return findAndReplace(query, replacement, options, entityType, collectionName, resultType,
QueryResultConverter.entity());
}
@SuppressWarnings("NullAway")
public <S, T, R> Mono<R> findAndReplace(Query query, S replacement, FindAndReplaceOptions options,
Class<S> entityType, String collectionName, Class<T> resultType,
QueryResultConverter<? super T, ? extends R> resultConverter) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(replacement, "Replacement must not be null");
Assert.notNull(options, "Options must not be null Use FindAndReplaceOptions#empty() instead");
Assert.notNull(entityType, "Entity class must not be null");
Assert.notNull(collectionName, "CollectionName must not be null");
Assert.notNull(resultType, "ResultType must not be null Use Object.class instead");
Assert.isTrue(query.getLimit() <= 1, "Query must not define a limit other than 1 ore none");
Assert.isTrue(query.getSkip() <= 0, "Query must not define skip");
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityType);
QueryContext queryContext = queryOperations.createQueryContext(query);
EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);
Document mappedQuery = queryContext.getMappedQuery(entity);
Document mappedFields = queryContext.getMappedFields(entity, projection);
Document mappedSort = queryContext.getMappedSort(entity);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
return Mono.defer(() -> {
PersistableEntityModel<S> pem = PersistableEntityModel.of(replacement, collectionName);
maybeEmitEvent(new BeforeConvertEvent<>(pem.getSource(), pem.getCollection()));
return maybeCallBeforeConvert(pem.getSource(), pem.getCollection()).map(pem::mutate).flatMap(it -> {
PersistableEntityModel<S> mapped = it
.addTargetDocument(operations.forEntity(it.getSource()).toMappedDocument(mongoConverter).getDocument());
maybeEmitEvent(new BeforeSaveEvent(mapped.getSource(), mapped.getTarget(), mapped.getCollection()));
return maybeCallBeforeSave(it.getSource(), mapped.getTarget(), mapped.getCollection())
.map(potentiallyModified -> PersistableEntityModel.of(potentiallyModified, mapped.getTarget(),
mapped.getCollection()));
}).flatMap(it -> {
Mono<R> afterFindAndReplace = doFindAndReplace(it.getCollection(), collectionPreparer, mappedQuery,
mappedFields, mappedSort, queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(),
options, projection, resultConverter);
return afterFindAndReplace.flatMap(saved -> {
maybeEmitEvent(new AfterSaveEvent<>(saved, it.getTarget(), it.getCollection()));
return maybeCallAfterSave(saved, it.getTarget(), it.getCollection());
});
});
});
}
@Override
public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass) {
return findAndRemove(query, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName) {
operations.forType(entityClass).getCollation(query);
return doFindAndRemove(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), getMappedSortObject(query, entityClass),
operations.forType(entityClass).getCollation(query).orElse(null), entityClass);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class)
*/
@Override
public Mono<Long> count(Query query, Class<?> entityClass) {
Assert.notNull(entityClass, "Entity class must not be null");
return count(query, entityClass, getCollectionName(entityClass));
}
@Override
public Mono<Long> count(Query query, String collectionName) {
return count(query, null, collectionName);
}
@Override
public Mono<Long> count(Query query, @Nullable Class<?> entityClass, String collectionName) {
Assert.notNull(query, "Query must not be null");
Assert.hasText(collectionName, "Collection name must not be null or empty");
return createMono(collectionName, collection -> {
CountContext countContext = queryOperations.countQueryContext(query);
CountOptions options = countContext.getCountOptions(entityClass);
Document filter = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
}
return doCount(collectionName, filter, options);
});
}
/**
* Run the actual count operation against the collection with given name.
*
* @param collectionName the name of the collection to count matching documents in.
* @param filter the filter to apply. Must not be {@literal null}.
* @param options options to apply. Like collation and the such.
* @return
*/
protected Mono<Long> doCount(String collectionName, Document filter, CountOptions options) {
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
}
return countExecution.countDocuments(collectionName, filter, options);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#estimatedCount(java.lang.String)
*/
@Override
public Mono<Long> estimatedCount(String collectionName) {
return doEstimatedCount(collectionName, new EstimatedDocumentCountOptions());
}
protected Mono<Long> doEstimatedCount(String collectionName, EstimatedDocumentCountOptions options) {
return createMono(collectionName, collection -> collection.estimatedDocumentCount(options));
}
@Override
public Mono<Long> exactCount(Query query, @Nullable Class<?> entityClass, String collectionName) {
CountContext countContext = queryOperations.countQueryContext(query);
CountOptions options = countContext.getCountOptions(entityClass);
Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
return doExactCount(collectionName, mappedQuery, options);
}
protected Mono<Long> doExactCount(String collectionName, Document filter, CountOptions options) {
return createMono(collectionName,
collection -> collection.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
}
protected Mono<Boolean> countCanBeEstimated(Document filter, CountOptions options) {
if (!filter.isEmpty() || !isEmptyOptions(options)) {
return Mono.just(false);
}
return ReactiveMongoDatabaseUtils.isTransactionActive(getMongoDatabaseFactory()).map(it -> !it);
}
private boolean isEmptyOptions(CountOptions options) {
return options.getLimit() <= 0 && options.getSkip() <= 0;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(reactor.core.publisher.Mono)
*/
@Override
public <T> Mono<T> insert(Mono<? extends T> objectToSave) {
Assert.notNull(objectToSave, "Mono to insert must not be null");
return objectToSave.flatMap(this::insert);
}
@Override
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave, Class<?> entityClass) {
return insertAll(batchToSave, getCollectionName(entityClass));
}
@Override
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave, String collectionName) {
Assert.notNull(batchToSave, "Batch to insert must not be null");
return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
}
@Override
public <T> Mono<T> insert(T objectToSave) {
Assert.notNull(objectToSave, "Object to insert must not be null");
ensureNotCollectionLike(objectToSave);
return insert(objectToSave, getCollectionName(ClassUtils.getUserClass(objectToSave)));
}
@Override
public <T> Mono<T> insert(T objectToSave, String collectionName) {
Assert.notNull(objectToSave, "Object to insert must not be null");
ensureNotCollectionLike(objectToSave);
return doInsert(collectionName, objectToSave, this.mongoConverter);
}
@SuppressWarnings("NullAway")
protected <T> Mono<T> doInsert(String collectionName, T objectToSave, MongoWriter<Object> writer) {
return Mono.just(PersistableEntityModel.of(objectToSave, collectionName)) //
.doOnNext(it -> maybeEmitEvent(new BeforeConvertEvent<>(it.getSource(), it.getCollection()))) //
.flatMap(it -> maybeCallBeforeConvert(it.getSource(), it.getCollection()).map(it::mutate)) //
.map(it -> {
AdaptibleEntity<T> entity = operations.forEntityUpsert(it.getSource(), mongoConverter.getConversionService());
PersistableEntityModel<T> model = PersistableEntityModel.of(entity.initializeVersionProperty(),
entity.toMappedDocument(writer).getDocument(), it.getCollection());
maybeEmitEvent(new BeforeSaveEvent<>(model.getSource(), model.getTarget(), model.getCollection()));
return model;
})//
.flatMap(it -> {
return maybeCallBeforeSave(it.getSource(), it.getTarget(), it.getCollection()).map(it::mutate);
}).flatMap(it -> {
return insertDocument(it.getCollection(), it.getTarget(), it.getSource().getClass()).flatMap(id -> {
T saved = operations.forEntity(it.getSource(), mongoConverter.getConversionService())
.populateIdIfNecessary(id);
maybeEmitEvent(new AfterSaveEvent<>(saved, it.getTarget(), collectionName));
return maybeCallAfterSave(saved, it.getTarget(), collectionName);
});
});
}
@Override
public <T> Flux<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass) {
return doInsertBatch(getCollectionName(entityClass), batchToSave, this.mongoConverter);
}
@Override
public <T> Flux<T> insert(Collection<? extends T> batchToSave, String collectionName) {
return doInsertBatch(collectionName, batchToSave, this.mongoConverter);
}
@Override
public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
return doInsertAll(objectsToSave, this.mongoConverter);
}
@Override
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
}
@SuppressWarnings("NullAway")
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
Map<String, List<T>> elementsByCollection = new HashMap<>();
listToSave.forEach(element -> {
String collection = getCollectionName(element.getClass());
List<T> collectionElements = elementsByCollection.computeIfAbsent(collection, k -> new ArrayList<>());
collectionElements.add(element);
});
return Flux.fromIterable(elementsByCollection.keySet())
.concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
}
@SuppressWarnings("NullAway")
protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
MongoWriter<Object> writer) {
Assert.notNull(writer, "MongoWriter must not be null");
Mono<List<Tuple2<AdaptibleEntity<T>, Document>>> prepareDocuments = Flux.fromIterable(batchToSave)
.flatMap(uninitialized -> {
BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
T toConvert = maybeEmitEvent(event).getSource();
return maybeCallBeforeConvert(toConvert, collectionName).flatMap(it -> {
AdaptibleEntity<T> entity = operations.forEntityUpsert(it, mongoConverter.getConversionService());
T initialized = entity.initializeVersionProperty();
MappedDocument mapped = entity.toMappedDocument(writer);
maybeEmitEvent(new BeforeSaveEvent<>(initialized, mapped.getDocument(), collectionName));
return maybeCallBeforeSave(initialized, mapped.getDocument(), collectionName).map(toSave -> {
MappedDocument mappedDocument = queryOperations.createInsertContext(mapped)
.prepareId(uninitialized.getClass());
return Tuples.of(entity, mappedDocument.getDocument());
});
});
}).collectList();
Flux<Tuple2<AdaptibleEntity<T>, Document>> insertDocuments = prepareDocuments.flatMapMany(tuples -> {
List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
});
return insertDocuments.flatMapSequential(tuple -> {
Document document = tuple.getT2();
Object id = MappedDocument.of(document).getId();
T saved = tuple.getT1().populateIdIfNecessary(id);
maybeEmitEvent(new AfterSaveEvent<>(saved, document, collectionName));
return maybeCallAfterSave(saved, document, collectionName);
});
}
@Override
public <T> Mono<T> save(Mono<? extends T> objectToSave) {
Assert.notNull(objectToSave, "Mono to save must not be null");
return objectToSave.flatMap(this::save);
}
@Override
public <T> Mono<T> save(Mono<? extends T> objectToSave, String collectionName) {
Assert.notNull(objectToSave, "Mono to save must not be null");
return objectToSave.flatMap(o -> save(o, collectionName));
}
@Override
public <T> Mono<T> save(T objectToSave) {
Assert.notNull(objectToSave, "Object to save must not be null");
return save(objectToSave, getCollectionName(ClassUtils.getUserClass(objectToSave)));
}
@Override
public <T> Mono<T> save(T objectToSave, String collectionName) {
Assert.notNull(objectToSave, "Object to save must not be null");
Assert.hasText(collectionName, "Collection name must not be null or empty");
AdaptibleEntity<T> source = operations.forEntity(objectToSave, mongoConverter.getConversionService());
return source.isVersionedEntity() ? doSaveVersioned(source, collectionName)
: doSave(collectionName, objectToSave, this.mongoConverter);
}
private <T> Mono<T> doSaveVersioned(AdaptibleEntity<T> source, String collectionName) {
if (source.isNew()) {
return doInsert(collectionName, source.getBean(), this.mongoConverter);
}
return createMono(collectionName, collection -> {
// Create query for entity with the id and old version
Query query = source.getQueryForVersion();
// Bump version number
T toSave = source.incrementVersion();
source.assertUpdateableIdIfNotSet();
BeforeConvertEvent<T> event = new BeforeConvertEvent<>(toSave, collectionName);
T afterEvent = maybeEmitEvent(event).getSource();
return maybeCallBeforeConvert(afterEvent, collectionName).flatMap(toConvert -> {
Entity<T> tEntity = operations.forEntity(toConvert);
tEntity.assertUpdateableIdIfNotSet();
MappedDocument mapped = tEntity.toMappedDocument(mongoConverter);
Document document = mapped.getDocument();
maybeEmitEvent(new BeforeSaveEvent<>(toConvert, document, collectionName));
return maybeCallBeforeSave(toConvert, document, collectionName).flatMap(it -> {
return doUpdate(collectionName, query, mapped.updateWithoutId(), it.getClass(), false, false)
.flatMap(result -> {
maybeEmitEvent(new AfterSaveEvent<T>(it, document, collectionName));
return maybeCallAfterSave(it, document, collectionName);
});
});
});
});
}
@SuppressWarnings("NullAway")
protected <T> Mono<T> doSave(String collectionName, T objectToSave, MongoWriter<Object> writer) {
return createMono(collectionName, collection -> {
T toSave = maybeEmitEvent(new BeforeConvertEvent<T>(objectToSave, collectionName)).getSource();
return maybeCallBeforeConvert(toSave, collectionName).flatMap(toConvert -> {
AdaptibleEntity<T> entity = operations.forEntityUpsert(toConvert, mongoConverter.getConversionService());
Document dbDoc = entity.toMappedDocument(writer).getDocument();
maybeEmitEvent(new BeforeSaveEvent<T>(toConvert, dbDoc, collectionName));
return maybeCallBeforeSave(toConvert, dbDoc, collectionName).flatMap(it -> {
return saveDocument(collectionName, dbDoc, it.getClass()).flatMap(id -> {
T saved = entity.populateIdIfNecessary(id);
maybeEmitEvent(new AfterSaveEvent<>(saved, dbDoc, collectionName));
return maybeCallAfterSave(saved, dbDoc, collectionName);
});
});
});
});
}
protected Mono<Object> insertDocument(String collectionName, Document dbDoc, Class<?> entityClass) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String
.format("Inserting Document containing fields: " + dbDoc.keySet() + " in collection: " + collectionName));
}
MappedDocument document = MappedDocument.of(dbDoc);
queryOperations.createInsertContext(document).prepareId(entityClass);
Flux<InsertOneResult> execute = execute(collectionName, collection -> {
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT, collectionName, entityClass,
dbDoc, null);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);
return collectionToUse.insertOne(document.getDocument());
});
return Flux.from(execute).last().map(success -> document.getId());
}
protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document> dbDocList) {
if (dbDocList.isEmpty()) {
return Flux.empty();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Inserting list of Documents containing %d items", dbDocList.size()));
}
List<Document> documents = new ArrayList<>(dbDocList.size());
return execute(collectionName, collection -> {
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT_LIST, collectionName, null,
null, null);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);
documents.addAll(toDocuments(dbDocList));
return collectionToUse.insertMany(documents);
}).flatMapSequential(s -> {
return Flux.fromStream(documents.stream() //
.map(MappedDocument::of) //
.filter(it -> it.isIdPresent(ObjectId.class)) //
.map(it -> it.getId(ObjectId.class)));
});
}
private MongoCollection<Document> prepareCollection(MongoCollection<Document> collection,
@Nullable WriteConcern writeConcernToUse) {
MongoCollection<Document> collectionToUse = collection;
if (writeConcernToUse != null) {
collectionToUse = collectionToUse.withWriteConcern(writeConcernToUse);
}
return collectionToUse;
}
@SuppressWarnings("NullAway")
protected Mono<Object> saveDocument(String collectionName, Document document, Class<?> entityClass) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Saving Document containing fields: %s", document.keySet()));
}
return createMono(collectionName, collection -> {
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.SAVE, collectionName, entityClass,
document, null);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
MappedDocument mapped = MappedDocument.of(document);
MongoCollection<Document> collectionToUse = writeConcernToUse == null //
? collection //
: collection.withWriteConcern(writeConcernToUse);
Publisher<?> publisher;
if (!mapped.hasId()) {
publisher = collectionToUse
.insertOne(queryOperations.createInsertContext(mapped).prepareId(entityClass).getDocument());
} else {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
UpdateContext updateContext = queryOperations.replaceSingleContext(mapped, true);
Document filter = updateContext.getReplacementQuery();
Document replacement = updateContext.getMappedUpdate(entity);
Mono<Document> deferredFilter;
if (updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
deferredFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
} else {
deferredFilter = Mono
.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
.defaultIfEmpty(replacement).map(it -> updateContext.applyShardKey(entity, filter, it));
}
} else {
deferredFilter = Mono.just(filter);
}
publisher = deferredFilter.flatMapMany(
it -> collectionToUse.replaceOne(it, replacement, updateContext.getReplaceOptions(entity)));
}
return Mono.from(publisher).map(o -> mapped.getId());
});
}
@Override
public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, Class<?> entityClass) {
return doUpdate(getCollectionName(entityClass), query, update, entityClass, true, false);
}
@Override
public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, String collectionName) {
return doUpdate(collectionName, query, update, null, true, false);
}
@Override
public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {
return doUpdate(collectionName, query, update, entityClass, true, false);
}
@Override
public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, Class<?> entityClass) {
return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, false);
}
@Override
public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, String collectionName) {
return doUpdate(collectionName, query, update, null, false, false);
}
@Override
public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, Class<?> entityClass,
String collectionName) {
return doUpdate(collectionName, query, update, entityClass, false, false);
}
@Override
public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, Class<?> entityClass) {
return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, true);
}
@Override
public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, String collectionName) {
return doUpdate(collectionName, query, update, null, false, true);
}
@Override
public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, Class<?> entityClass,
String collectionName) {
return doUpdate(collectionName, query, update, entityClass, false, true);
}
@SuppressWarnings("NullAway")
protected Mono<UpdateResult> doUpdate(String collectionName, Query query, UpdateDefinition update,
@Nullable Class<?> entityClass, boolean upsert, boolean multi) {
MongoPersistentEntity<?> entity = entityClass == null ? null : getPersistentEntity(entityClass);
UpdateContext updateContext = multi ? queryOperations.updateContext(update, query, upsert)
: queryOperations.updateSingleContext(update, query, upsert);
updateContext.increaseVersionForUpdateIfNecessary(entity);
Document queryObj = updateContext.getMappedQuery(entity);
UpdateOptions updateOptions = updateContext.getUpdateOptions(entity, query);
Flux<UpdateResult> result;
if (updateContext.isAggregationUpdate()) {
List<Document> pipeline = updateContext.getUpdatePipeline(entityClass);
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass,
update.getUpdateObject(), queryObj);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
result = execute(collectionName, collection -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s",
serializeToJsonSafely(queryObj), serializeToJsonSafely(pipeline), collectionName));
}
collection = writeConcernToUse != null ? collection.withWriteConcern(writeConcernToUse) : collection;
return multi ? collection.updateMany(queryObj, pipeline, updateOptions)
: collection.updateOne(queryObj, pipeline, updateOptions);
});
} else {
Document updateObj = updateContext.getMappedUpdate(entity);
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass,
updateObj, queryObj);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
result = execute(collectionName, collection -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s",
serializeToJsonSafely(queryObj), serializeToJsonSafely(updateObj), collectionName));
}
MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);
if (!UpdateMapper.isUpdateObject(updateObj)) {
Document filter = new Document(queryObj);
Mono<Document> deferredFilter;
if (updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
deferredFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
} else {
deferredFilter = Mono.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
.defaultIfEmpty(updateObj).map(it -> updateContext.applyShardKey(entity, filter, it));
}
} else {
deferredFilter = Mono.just(filter);
}
com.mongodb.client.model.ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entity);
return deferredFilter.flatMap(it -> Mono.from(collectionToUse.replaceOne(it, updateObj, replaceOptions)));
}
return multi ? collectionToUse.updateMany(queryObj, updateObj, updateOptions)
: collectionToUse.updateOne(queryObj, updateObj, updateOptions);
});
}
result = result.doOnNext(updateResult -> {
if (entity != null && entity.hasVersionProperty() && !multi) {
if (updateResult.wasAcknowledged() && updateResult.getMatchedCount() == 0) {
Document updateObj = updateContext.getMappedUpdate(entity);
if (containsVersionProperty(queryObj, entity))
throw new OptimisticLockingFailureException("Optimistic lock exception on saving entity %s to collection %s"
.formatted(entity.getName(), collectionName));
}
}
});
return result.next();
}
private boolean containsVersionProperty(Document document, @Nullable MongoPersistentEntity<?> persistentEntity) {
if (persistentEntity == null || !persistentEntity.hasVersionProperty()) {
return false;
}
return document.containsKey(persistentEntity.getRequiredVersionProperty().getFieldName());
}
@Override
public Mono<DeleteResult> remove(Mono<? extends Object> objectToRemove) {
return objectToRemove.flatMap(this::remove);
}
@Override
public Mono<DeleteResult> remove(Mono<? extends Object> objectToRemove, String collectionName) {
return objectToRemove.flatMap(it -> remove(it, collectionName));
}
@Override
public Mono<DeleteResult> remove(Object object) {
Assert.notNull(object, "Object must not be null");
return remove(operations.forEntity(object).getRemoveByQuery(), object.getClass());
}
@Override
public Mono<DeleteResult> remove(Object object, String collectionName) {
Assert.notNull(object, "Object must not be null");
Assert.hasText(collectionName, "Collection name must not be null or empty");
return doRemove(collectionName, operations.forEntity(object).getRemoveByQuery(), object.getClass());
}
@Override
public Mono<DeleteResult> remove(Query query, String collectionName) {
return remove(query, null, collectionName);
}
@Override
public Mono<DeleteResult> remove(Query query, Class<?> entityClass) {
return remove(query, entityClass, getCollectionName(entityClass));
}
@Override
public Mono<DeleteResult> remove(Query query, @Nullable Class<?> entityClass, String collectionName) {
return doRemove(collectionName, query, entityClass);
}
protected <T> Mono<DeleteResult> doRemove(String collectionName, Query query, @Nullable Class<T> entityClass) {
if (query == null) {
throw new InvalidDataAccessApiUsageException("Query passed in to remove can't be null");
}
Assert.hasText(collectionName, "Collection name must not be null or empty");
MongoPersistentEntity<?> entity = getPersistentEntity(entityClass);
DeleteContext deleteContext = queryOperations.deleteQueryContext(query);
Document queryObject = deleteContext.getMappedQuery(entity);
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(entity);
Document removeQuery = deleteContext.getMappedQuery(entity);
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.REMOVE, collectionName, entityClass,
null, removeQuery);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
return execute(collectionName, collection -> {
maybeEmitEvent(new BeforeDeleteEvent<>(removeQuery, entityClass, collectionName));
MongoCollection<Document> collectionToUse = collectionPreparer
.prepare(prepareCollection(collection, writeConcernToUse));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Remove using query: %s in collection: %s.", serializeToJsonSafely(removeQuery),
collectionName));
}
if (query.getLimit() > 0 || query.getSkip() > 0) {
FindPublisher<Document> cursor = new QueryFindPublisherPreparer(query, entityClass)
.prepare(collection.find(removeQuery)) //
.projection(MappedDocument.getIdOnlyProjection());
return Flux.from(cursor) //
.map(MappedDocument::of) //
.map(MappedDocument::getId) //
.collectList() //
.flatMapMany(val -> {
return collectionToUse.deleteMany(MappedDocument.getIdIn(val), deleteOptions);
});
} else {
return collectionToUse.deleteMany(removeQuery, deleteOptions);
}
}).doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(queryObject, entityClass, collectionName))) //
.next();
}
@Override
public <T> Flux<T> findAll(Class<T> entityClass) {
return findAll(entityClass, getCollectionName(entityClass));
}
@Override
public <T> Flux<T> findAll(Class<T> entityClass, String collectionName) {
return executeFindMultiInternal(new FindCallback(CollectionPreparer.identity(), null),
FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
collectionName);
}
@Override
@SuppressWarnings("unchecked")
public <T> Flux<T> findAllAndRemove(Query query, String collectionName) {
return (Flux<T>) findAllAndRemove(query, Object.class, collectionName);
}
@Override
public <T> Flux<T> findAllAndRemove(Query query, Class<T> entityClass) {
return findAllAndRemove(query, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Flux<T> findAllAndRemove(Query query, Class<T> entityClass, String collectionName) {
return doFindAndDelete(collectionName, query, entityClass);
}
@Override
public <T> Mono<UpdateResult> replace(Query query, T replacement, ReplaceOptions options, String collectionName) {
Assert.notNull(replacement, "Replacement must not be null");
return replace(query, (Class<T>) ClassUtils.getUserClass(replacement), replacement, options, collectionName);
}
protected <S, T> Mono<UpdateResult> replace(Query query, Class<S> entityType, T replacement, ReplaceOptions options,
String collectionName) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityType);
UpdateContext updateContext = queryOperations.replaceSingleContext(query,
operations.forEntity(replacement).toMappedDocument(this.mongoConverter), options.isUpsert());
return createMono(collectionName, collection -> {
Document mappedUpdate = updateContext.getMappedUpdate(entity);
MongoAction action = new MongoAction(writeConcern, MongoActionOperation.REPLACE, collectionName, entityType,
mappedUpdate, updateContext.getQueryObject());
MongoCollection<Document> collectionToUse = createCollectionPreparer(query, action).prepare(collection);
return collectionToUse.replaceOne(updateContext.getMappedQuery(entity), mappedUpdate,
updateContext.getReplaceOptions(entity, it -> {
it.upsert(options.isUpsert());
}));
});
}
@Override
public <T> Flux<T> tail(Query query, Class<T> entityClass) {
return tail(query, entityClass, getCollectionName(entityClass));
}
@Override
public <T> Flux<T> tail(@Nullable Query query, Class<T> entityClass, String collectionName) {
if (query == null) {
LOGGER.debug(String.format("Tail for class: %s in collection: %s", entityClass, collectionName));
return executeFindMultiInternal(
collection -> new FindCallback(CollectionPreparer.identity(), null).doInCollection(collection)
.cursorType(CursorType.TailableAwait),
FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
collectionName);
}
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
return doFind(collectionName, collectionPreparer, query.getQueryObject(), query.getFieldsObject(), entityClass,
new TailingQueryFindPublisherPreparer(query, entityClass));
}
@Override
public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @Nullable String collectionName,
ChangeStreamOptions options, Class<T> targetType) {
List<Document> filter = prepareFilter(options);
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
return ReactiveMongoDatabaseUtils.getDatabase(database, mongoDatabaseFactory) //
.map(db -> {
ChangeStreamPublisher<Document> publisher;
if (StringUtils.hasText(collectionName)) {
publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class)
: db.getCollection(collectionName).watch(filter, Document.class);
} else {
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
}
if (options.isResumeAfter()) {
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter)
.orElse(publisher);
} else if (options.isStartAfter()) {
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter)
.orElse(publisher);
}
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
publisher = options.getShowExpandedEvents().map(publisher::showExpandedEvents).orElse(publisher);
if (options.getFullDocumentBeforeChangeLookup().isPresent()) {
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());
}
return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
}) //
.flatMapMany(publisher -> Flux.from(publisher)
.map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())));
}
List<Document> prepareFilter(ChangeStreamOptions options) {
Object filter = options.getFilter().orElse(Collections.emptyList());
if (filter instanceof Aggregation agg) {
AggregationOperationContext context = agg instanceof TypedAggregation typedAggregation
? new TypeBasedAggregationOperationContext(typedAggregation.getInputType(),
getConverter().getMappingContext(), queryMapper)
: new TypeBasedAggregationOperationContext(Object.class, mappingContext, queryMapper,
FieldLookupPolicy.relaxed());
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns")));
}
if (filter instanceof List) {
return (List<Document>) filter;
}
throw new IllegalArgumentException(
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
}
@Override
public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String mapFunction,
String reduceFunction, MapReduceOptions options) {
return mapReduce(filterQuery, domainType, getCollectionName(domainType), resultType, mapFunction, reduceFunction,
options);
}
@Override
public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType,
String mapFunction, String reduceFunction, MapReduceOptions options) {
Assert.notNull(filterQuery, "Filter query must not be null");
Assert.notNull(domainType, "Domain type must not be null");
Assert.hasText(inputCollectionName, "Input collection name must not be null or empty");
Assert.notNull(resultType, "Result type must not be null");
Assert.notNull(mapFunction, "Map function must not be null");
Assert.notNull(reduceFunction, "Reduce function must not be null");
Assert.notNull(options, "MapReduceOptions must not be null");
assertLocalFunctionNames(mapFunction, reduceFunction);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(filterQuery);
return createFlux(inputCollectionName, collection -> {
Document mappedQuery = queryMapper.getMappedObject(filterQuery.getQueryObject(),
mappingContext.getPersistentEntity(domainType));
MapReducePublisher<Document> publisher = collectionPreparer.prepare(collection).mapReduce(mapFunction,
reduceFunction, Document.class);
publisher.filter(mappedQuery);
Document mappedSort = getMappedSortObject(filterQuery, domainType);
if (mappedSort != null && !mappedSort.isEmpty()) {
publisher.sort(mappedSort);
}
Meta meta = filterQuery.getMeta();
if (meta.hasMaxTime()) {
publisher.maxTime(meta.getRequiredMaxTimeMsec(), TimeUnit.MILLISECONDS);
}
if (filterQuery.getLimit() > 0 || (options.getLimit() != null)) {
if (filterQuery.getLimit() > 0 && (options.getLimit() != null)) {
throw new IllegalArgumentException(
"Both Query and MapReduceOptions define a limit; Please provide the limit only via one of the two.");
}
if (filterQuery.getLimit() > 0) {
publisher.limit(filterQuery.getLimit());
}
if (options.getLimit() != null) {
publisher.limit(options.getLimit());
}
}
Optional<Collation> collation = filterQuery.getCollation();
Optionals.ifAllPresent(filterQuery.getCollation(), options.getCollation(), (l, r) -> {
throw new IllegalArgumentException(
"Both Query and MapReduceOptions define a collation; Please provide the collation only via one of the two.");
});
if (options.getCollation().isPresent()) {
collation = options.getCollation();
}
if (!CollectionUtils.isEmpty(options.getScopeVariables())) {
publisher = publisher.scope(new Document(options.getScopeVariables()));
}
if (options.getLimit() != null && options.getLimit() > 0) {
publisher = publisher.limit(options.getLimit());
}
if (options.getFinalizeFunction().filter(StringUtils::hasText).isPresent()) {
publisher = publisher.finalizeFunction(options.getFinalizeFunction().get());
}
if (options.getJavaScriptMode() != null) {
publisher = publisher.jsMode(options.getJavaScriptMode());
}
if (StringUtils.hasText(options.getOutputCollection()) && !options.usesInlineOutput()) {
publisher = publisher.collectionName(options.getOutputCollection()).action(options.getMapReduceAction());
if (options.getOutputDatabase().isPresent()) {
publisher = publisher.databaseName(options.getOutputDatabase().get());
}
}
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
return Flux.from(publisher)
.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
});
}
private static void assertLocalFunctionNames(String... functions) {
for (String function : functions) {
if (ResourceUtils.isUrl(function)) {
throw new IllegalArgumentException(String.format(
"Blocking accessing to resource %s is not allowed using reactive infrastructure; You may load the resource at startup and cache its value.",
function));
}
}
}
@Override
public <T> ReactiveFind<T> query(Class<T> domainType) {
return new ReactiveFindOperationSupport(this).query(domainType);
}
@Override
public <T> ReactiveUpdate<T> update(Class<T> domainType) {
return new ReactiveUpdateOperationSupport(this).update(domainType);
}
@Override
public <T> ReactiveRemove<T> remove(Class<T> domainType) {
return new ReactiveRemoveOperationSupport(this).remove(domainType);
}
@Override
public <T> ReactiveInsert<T> insert(Class<T> domainType) {
return new ReactiveInsertOperationSupport(this).insert(domainType);
}
@Override
public <T> ReactiveAggregation<T> aggregateAndReturn(Class<T> domainType) {
return new ReactiveAggregationOperationSupport(this).aggregateAndReturn(domainType);
}
@Override
public <T> ReactiveMapReduce<T> mapReduce(Class<T> domainType) {
return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType);
}
@Override
public <T> ReactiveChangeStream<T> changeStream(Class<T> domainType) {
return new ReactiveChangeStreamOperationSupport(this).changeStream(domainType);
}
/**
* Retrieve and remove all documents matching the given {@code query} by calling {@link #find(Query, Class, String)}
* and {@link #remove(Query, Class, String)}, whereas the {@link Query} for {@link #remove(Query, Class, String)} is
* constructed out of the find result.
*
* @param collectionName
* @param query
* @param entityClass
* @return
*/
protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<T> entityClass) {
Flux<T> flux = find(query, entityClass, collectionName);
return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
}
@SuppressWarnings({ "rawtypes", "unchecked", "NullAway" })
<S, T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<S> entityClass,
QueryResultConverter<? super S, ? extends T> resultConverter) {
List<Object> ids = new ArrayList<>();
ProjectingReadCallback readCallback = new ProjectingReadCallback(getConverter(),
EntityProjection.nonProjecting(entityClass), collectionName);
QueryResultConverterCallback<S, T> callback = new QueryResultConverterCallback<>(resultConverter, readCallback) {
@Override
public Mono<T> doWith(Document object) {
ids.add(object.get("_id"));
return super.doWith(object);
}
};
Flux<T> flux = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), entityClass,
new QueryFindPublisherPreparer(query, query.getSortObject(), query.getLimit(), query.getSkip(), entityClass),
callback);
return Flux.from(flux).collectList().filter(it -> !it.isEmpty()).flatMapMany(list -> {
Criteria[] criterias = ids.stream() //
.map(it -> Criteria.where("_id").is(it)) //
.toArray(Criteria[]::new);
Query removeQuery = new Query(criterias.length == 1 ? criterias[0] : new Criteria().orOperator(criterias));
if (query.hasReadPreference()) {
removeQuery.withReadPreference(query.getReadPreference());
}
return Flux.from(remove(removeQuery, entityClass, collectionName))
.flatMapSequential(deleteResult -> Flux.fromIterable(list));
});
}
/**
* Create the specified collection using the provided options
*
* @param collectionName
* @param collectionOptions
* @return the collection that was created
*/
protected Mono<MongoCollection<Document>> doCreateCollection(String collectionName,
CreateCollectionOptions collectionOptions) {
return createMono(db -> db.createCollection(collectionName, collectionOptions)).doOnSuccess(it -> {
// TODO: Emit a collection created event
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Created collection [%s]", collectionName));
}
}).then(getCollection(collectionName));
}
/**
* Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
* The query document is specified as a standard {@link Document} and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
* @param collation can be {@literal null}.
* @return the {@link List} of converted objects.
*/
protected <T> Mono<T> doFindOne(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
Class<T> entityClass, @Nullable Collation collation) {
return doFindOne(collectionName, collectionPreparer, query, fields, entityClass,
findPublisher -> collation != null ? findPublisher.collation(collation.toMongoCollation()) : findPublisher);
}
/**
* Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
* The query document is specified as a standard {@link Document} and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
* @param preparer the preparer modifying collection and publisher to fit the needs.
* @return the {@link List} of converted objects.
* @since 2.2
*/
protected <T> Mono<T> doFindOne(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
Class<T> entityClass, FindPublisherPreparer preparer) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
QueryContext queryContext = queryOperations
.createQueryContext(new BasicQuery(query, fields != null ? fields : new Document()));
Document mappedFields = queryContext.getMappedFields(entity, EntityProjection.nonProjecting(entityClass));
Document mappedQuery = queryContext.getMappedQuery(entity);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("findOne using query: %s fields: %s for class: %s in collection: %s",
serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
}
return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
}
/**
* Map the results of an ad-hoc query on the default MongoDB collection to a List using the template's converter. The
* query document is specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record
* @param fields the document that specifies the fields to be returned
* @param entityClass the parameterized type of the returned list.
* @return the List of converted objects.
*/
protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass) {
return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
}
/**
* Map the results of an ad-hoc query on the default MongoDB collection to a List of the specified type. The object is
* converted from the MongoDB native representation using an instance of {@see MongoConverter}. The query document is
* specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
* @param preparer allows for customization of the {@link com.mongodb.client.FindIterable} used when iterating over
* the result set, (apply limits, skips and so on).
* @return the {@link List} of converted objects.
*/
protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
}
protected <S, T> Flux<T> doFind(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
Class<S> entityClass, @Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
Document mappedFields = queryContext.getMappedFields(entity, EntityProjection.nonProjecting(entityClass));
Document mappedQuery = queryContext.getMappedQuery(entity);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("find using query: %s fields: %s for class: %s in collection: %s",
serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
}
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields),
preparer != null ? preparer : FindPublisherPreparer.NO_OP_PREPARER, objectCallback, collectionName);
}
CollectionPreparer<MongoCollection<Document>> createCollectionPreparer(Query query) {
return ReactiveCollectionPreparerDelegate.of(query);
}
CollectionPreparer<MongoCollection<Document>> createCollectionPreparer(Query query, @Nullable MongoAction action) {
CollectionPreparer<MongoCollection<Document>> collectionPreparer = createCollectionPreparer(query);
if (action == null) {
return collectionPreparer;
}
return collectionPreparer.andThen(collection -> {
WriteConcern writeConcern = prepareWriteConcern(action);
return writeConcern != null ? collection.withWriteConcern(writeConcern) : collection;
});
}
/**
* Map the results of an ad-hoc query on the default MongoDB collection to a List of the specified targetClass while
* using sourceClass for mapping the query.
*
* @since 2.0
*/
<T, R> Flux<R> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<?> sourceClass, Class<T> targetClass,
QueryResultConverter<? super T, ? extends R> resultConverter, FindPublisherPreparer preparer) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
EntityProjection<T, ?> projection = operations.introspectProjection(targetClass, sourceClass);
QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
Document mappedFields = queryContext.getMappedFields(entity, projection);
Document mappedQuery = queryContext.getMappedQuery(entity);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("find using query: %s fields: %s for class: %s in collection: %s",
serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName));
}
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields), preparer,
getResultReader(projection, collectionName, resultConverter), collectionName);
}
protected CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions) {
return convertToCreateCollectionOptions(collectionOptions, Object.class);
}
protected CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions,
Class<?> entityType) {
return operations.convertToCreateCollectionOptions(collectionOptions, entityType);
}
/**
* Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
* The first document that matches the query is returned and also removed from the collection in the database. <br />
* The query document is specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param collation collation.
* @param entityClass the parameterized type of the returned list.
* @return the List of converted objects.
*/
protected <T> Mono<T> doFindAndRemove(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
@Nullable Document sort, @Nullable Collation collation, Class<T> entityClass) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("findAndRemove using query: %s fields: %s sort: %s for class: %s in collection: %s",
serializeToJsonSafely(query), fields, serializeToJsonSafely(sort), entityClass, collectionName));
}
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
queryMapper.getMappedObject(query, entity), fields, sort, collation),
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
}
<S, T> Mono<T> doFindAndModify(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
@Nullable Document sort, Class<S> entityClass, UpdateDefinition update, FindAndModifyOptions options,
QueryResultConverter<? super S, ? extends T> resultConverter) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
UpdateContext updateContext = queryOperations.updateSingleContext(update, query, false);
updateContext.increaseVersionForUpdateIfNecessary(entity);
return Mono.defer(() -> {
Document mappedQuery = updateContext.getMappedQuery(entity);
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(entityClass)
: updateContext.getMappedUpdate(entity);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format(
"findAndModify using query: %s fields: %s sort: %s for class: %s and update: %s " + "in collection: %s",
serializeToJsonSafely(mappedQuery), fields, serializeToJsonSafely(sort), entityClass,
serializeToJsonSafely(mappedUpdate), collectionName));
}
EntityProjection<S, ?> projection = EntityProjection.nonProjecting(entityClass);
DocumentCallback<T> callback = getResultReader(projection, collectionName, resultConverter);
return executeFindOneInternal(
new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
callback, collectionName);
});
}
/**
* Customize this part for findAndReplace.
*
* @param collectionName The name of the collection to perform the operation in.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param mappedQuery the query to look up documents.
* @param mappedFields the fields to project the result to.
* @param mappedSort the sort to be applied when executing the query.
* @param collation collation settings for the query. Can be {@literal null}.
* @param entityType the source domain type.
* @param replacement the replacement {@link Document}.
* @param options applicable options.
* @param resultType the target domain type.
* @return {@link Mono#empty()} if object does not exist, {@link FindAndReplaceOptions#isReturnNew() return new} is
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
* @since 2.1
*/
protected <T> Mono<T> doFindAndReplace(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
FindAndReplaceOptions options, Class<T> resultType) {
EntityProjection<T, ?> projection = operations.introspectProjection(resultType, entityType);
return doFindAndReplace(collectionName, collectionPreparer, mappedQuery, mappedFields, mappedSort, collation,
entityType, replacement, options, projection, QueryResultConverter.entity());
}
/**
* Customize this part for findAndReplace.
*
* @param collectionName The name of the collection to perform the operation in.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param mappedQuery the query to look up documents.
* @param mappedFields the fields to project the result to.
* @param mappedSort the sort to be applied when executing the query.
* @param collation collation settings for the query. Can be {@literal null}.
* @param entityType the source domain type.
* @param replacement the replacement {@link Document}.
* @param options applicable options.
* @param projection the projection descriptor.
* @return {@link Mono#empty()} if object does not exist, {@link FindAndReplaceOptions#isReturnNew() return new} is
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
* @since 3.4
*/
private <S, T> Mono<T> doFindAndReplace(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
FindAndReplaceOptions options, EntityProjection<S, ?> projection,
QueryResultConverter<? super S, ? extends T> resultConverter) {
return Mono.defer(() -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format(
"findAndReplace using query: %s fields: %s sort: %s for class: %s and replacement: %s "
+ "in collection: %s",
serializeToJsonSafely(mappedQuery), mappedFields, serializeToJsonSafely(mappedSort), entityType,
serializeToJsonSafely(replacement), collectionName));
}
DocumentCallback<T> resultReader = getResultReader(projection, collectionName, resultConverter);
return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields,
mappedSort, replacement, collation, options), resultReader, collectionName);
});
}
protected <E extends MongoMappingEvent<T>, T> E maybeEmitEvent(E event) {
eventDelegate.publishEvent(event);
return event;
}
protected <T> Mono<T> maybeCallBeforeConvert(T object, String collection) {
if (entityCallbacks != null) {
return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, object, collection);
}
return Mono.just(object);
}
protected <T> Mono<T> maybeCallBeforeSave(T object, Document document, String collection) {
if (entityCallbacks != null) {
return entityCallbacks.callback(ReactiveBeforeSaveCallback.class, object, document, collection);
}
return Mono.just(object);
}
protected <T> Mono<T> maybeCallAfterSave(T object, Document document, String collection) {
if (entityCallbacks != null) {
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, object, document, collection);
}
return Mono.just(object);
}
protected <T> Mono<T> maybeCallAfterConvert(T object, Document document, String collection) {
if (entityCallbacks != null) {
return entityCallbacks.callback(ReactiveAfterConvertCallback.class, object, document, collection);
}
return Mono.just(object);
}
private MongoCollection<Document> getAndPrepareCollection(MongoDatabase db, String collectionName) {
try {
MongoCollection<Document> collection = db.getCollection(collectionName, Document.class);
return prepareCollection(collection);
} catch (RuntimeException e) {
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
}
}
/**
* Ensure the given {@literal source} is not an {@link java.lang.reflect.Array}, {@link Collection} or
* {@link Iterator}.
*
* @param source can be {@literal null}.
* @since 3.2.
*/
protected void ensureNotCollectionLike(@Nullable Object source) {
if (EntityOperations.isCollectionLike(source) || source instanceof Publisher) {
throw new IllegalArgumentException("Cannot use a collection here.");
}
}
/**
* Prepare the collection before any processing is done using it. This allows a convenient way to apply settings like
* withCodecRegistry() etc. Can be overridden in subclasses.
*
* @param collection
*/
protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {
if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
return collection.withReadPreference(readPreference);
}
return collection;
}
/**
* @param database
* @return
* @since 2.1
*/
protected MongoDatabase prepareDatabase(MongoDatabase database) {
return database;
}
/**
* Prepare the WriteConcern before any processing is done using it. This allows a convenient way to apply custom
* settings in subclasses. The returned {@link WriteConcern} will be defaulted to {@link WriteConcern#ACKNOWLEDGED}
* when {@link WriteResultChecking} is set to {@link WriteResultChecking#EXCEPTION}.
*
* @param mongoAction any WriteConcern already configured or {@literal null}.
* @return The prepared WriteConcern or {@literal null}.
* @see #setWriteConcern(WriteConcern)
* @see #setWriteConcernResolver(WriteConcernResolver)
*/
protected @Nullable WriteConcern prepareWriteConcern(MongoAction mongoAction) {
WriteConcern wc = writeConcernResolver.resolve(mongoAction);
return potentiallyForceAcknowledgedWrite(wc);
}
/**
* @return the {@link MongoDatabaseFactory} in use.
* @since 3.1.4
*/
public ReactiveMongoDatabaseFactory getMongoDatabaseFactory() {
return mongoDatabaseFactory;
}
@Nullable
private WriteConcern potentiallyForceAcknowledgedWrite(@Nullable WriteConcern wc) {
if (ObjectUtils.nullSafeEquals(WriteResultChecking.EXCEPTION, writeResultChecking)) {
if (wc == null || wc.getWObject() == null
|| (wc.getWObject() instanceof Number concern && concern.intValue() < 1)) {
return WriteConcern.ACKNOWLEDGED;
}
}
return wc;
}
/**
* Internal method using callbacks to do queries against the datastore that requires reading a single object from a
* collection of objects. It will take the following steps
* <ol>
* <li>Execute the given {@link ReactiveCollectionCallback} for a {@link Document}.</li>
* <li>Apply the given {@link DocumentCallback} to each of the {@link Document}s to obtain the result.</li>
* <ol>
*
* @param collectionCallback the callback to retrieve the {@link Document}
* @param objectCallback the {@link DocumentCallback} to transform {@link Document}s into the actual domain type
* @param collectionName the collection to be queried
* @return
*/
private <T> Mono<T> executeFindOneInternal(ReactiveCollectionCallback<Document> collectionCallback,
DocumentCallback<T> objectCallback, String collectionName) {
return createMono(collectionName,
collection -> Mono.from(collectionCallback.doInCollection(collection)).flatMap(objectCallback::doWith));
}
/**
* Internal method using callback to do queries against the datastore that requires reading a collection of objects.
* It will take the following steps
* <ol>
* <li>Execute the given {@link ReactiveCollectionCallback} for a {@link FindPublisher}.</li>
* <li>Prepare that {@link FindPublisher} with the given {@link FindPublisherPreparer} (will be skipped if
* {@link FindPublisherPreparer} is {@literal null}</li>
* <li>Apply the given {@link DocumentCallback} in {@link Flux#map(Function)} of {@link FindPublisher}</li>
* <ol>
*
* @param collectionCallback the callback to retrieve the {@link FindPublisher} with, must not be {@literal null}.
* @param preparer the {@link FindPublisherPreparer} to potentially modify the {@link FindPublisher} before iterating
* over it, may be {@literal null}.
* @param objectCallback the {@link DocumentCallback} to transform {@link Document}s into the actual domain type, must
* not be {@literal null}.
* @param collectionName the collection to be queried, must not be {@literal null}.
* @return
*/
private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Document> collectionCallback,
FindPublisherPreparer preparer, DocumentCallback<T> objectCallback, String collectionName) {
return createFlux(collectionName, collection -> {
return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
.flatMapSequential(objectCallback::doWith);
});
}
@SuppressWarnings("unchecked")
private <T, R> DocumentCallback<R> getResultReader(EntityProjection<T, ?> projection, String collectionName,
QueryResultConverter<? super T, ? extends R> resultConverter) {
DocumentCallback<T> readCallback = new ProjectingReadCallback<>(mongoConverter, projection, collectionName);
return resultConverter == QueryResultConverter.entity() ? (DocumentCallback<R>) readCallback
: new QueryResultConverterCallback<>(resultConverter, readCallback);
}
/**
* Exception translation {@link Function} intended for {@link Flux#onErrorMap(Function)} usage.
*
* @return the exception translation {@link Function}
*/
private Function<Throwable, Throwable> translateException() {
return throwable -> {
if (throwable instanceof RuntimeException runtimeException) {
return potentiallyConvertRuntimeException(runtimeException, exceptionTranslator);
}
return throwable;
};
}
/**
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
* exception if the conversation failed. Thus allows safe re-throwing of the return value.
*
* @param ex the exception to translate
* @param exceptionTranslator the {@link PersistenceExceptionTranslator} to be used for translation
* @return
*/
private static RuntimeException potentiallyConvertRuntimeException(RuntimeException ex,
PersistenceExceptionTranslator exceptionTranslator) {
RuntimeException resolved = exceptionTranslator.translateExceptionIfPossible(ex);
return resolved == null ? ex : resolved;
}
@Nullable
MongoPersistentEntity<?> getPersistentEntity(@Nullable Class<?> type) {
return type == null ? null : mappingContext.getPersistentEntity(type);
}
private MappingMongoConverter getDefaultMongoConverter() {
MongoCustomConversions conversions = new MongoCustomConversions(Collections.emptyList());
MongoMappingContext context = new MongoMappingContext();
context.setSimpleTypeHolder(conversions.getSimpleTypeHolder());
context.afterPropertiesSet();
MappingMongoConverter converter = new MappingMongoConverter(NO_OP_REF_RESOLVER, context);
converter.setCustomConversions(conversions);
converter.setCodecRegistryProvider(this.mongoDatabaseFactory);
converter.afterPropertiesSet();
return converter;
}
@Contract("null, _ -> null")
private @Nullable Document getMappedSortObject(@Nullable Query query, Class<?> type) {
if (query == null) {
return null;
}
return getMappedSortObject(query.getSortObject(), type);
}
@Contract("null, _ -> null")
private @Nullable Document getMappedSortObject(@Nullable Document sortObject, Class<?> type) {
if (ObjectUtils.isEmpty(sortObject)) {
return null;
}
return queryMapper.getMappedSort(sortObject, mappingContext.getPersistentEntity(type));
}
// Callback implementations
/**
* Simple {@link ReactiveCollectionCallback} that takes a query {@link Document} plus an optional fields specification
* {@link Document} and executes that against the {@link MongoCollection}.
*
* @author Oliver Gierke
* @author Thomas Risberg
* @author Christoph Strobl
*/
private static class FindOneCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Optional<Document> fields;
private final FindPublisherPreparer preparer;
FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
@Nullable Document fields, FindPublisherPreparer preparer) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = Optional.ofNullable(fields);
this.preparer = preparer;
}
@Override
public Publisher<Document> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
FindPublisher<Document> publisher = preparer.initiateFind(collectionPreparer.prepare(collection),
col -> col.find(query, Document.class));
if (fields.isPresent()) {
publisher = publisher.projection(fields.get());
}
return publisher.limit(1).first();
}
}
/**
* Simple {@link ReactiveCollectionQueryCallback} that takes a query {@link Document} plus an optional fields
* specification {@link Document} and executes that against the {@link MongoCollection}.
*
* @author Mark Paluch
*/
private static class FindCallback implements ReactiveCollectionQueryCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final @Nullable Document query;
private final @Nullable Document fields;
FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, @Nullable Document query) {
this(collectionPreparer, query, null);
}
FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, @Nullable Document query,
@Nullable Document fields) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
}
@Override
public FindPublisher<Document> doInCollection(MongoCollection<Document> collection) {
MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
FindPublisher<Document> findPublisher;
if (ObjectUtils.isEmpty(query)) {
findPublisher = collectionToUse.find(Document.class);
} else {
findPublisher = collectionToUse.find(query, Document.class);
}
if (ObjectUtils.isEmpty(fields)) {
return findPublisher;
} else {
return findPublisher.projection(fields);
}
}
}
/**
* Simple {@link ReactiveCollectionCallback} that takes a query {@link Document} plus an optional fields specification
* {@link Document} and executes that against the {@link MongoCollection}.
*
* @author Mark Paluch
*/
private static class FindAndRemoveCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final @Nullable Document sort;
private final Optional<Collation> collation;
FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, @Nullable Document sort, @Nullable Collation collation) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
this.collation = Optional.ofNullable(collation);
}
@Override
public Publisher<Document> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
collation.map(Collation::toMongoCollation).ifPresent(findOneAndDeleteOptions::collation);
return collectionPreparer.prepare(collection).findOneAndDelete(query, findOneAndDeleteOptions);
}
}
/**
* @author Mark Paluch
*/
private static class FindAndModifyCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final @Nullable Document fields;
private final @Nullable Document sort;
private final Object update;
private final List<Document> arrayFilters;
private final FindAndModifyOptions options;
FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
@Nullable Document fields, @Nullable Document sort, Object update, List<Document> arrayFilters,
FindAndModifyOptions options) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
this.update = update;
this.arrayFilters = arrayFilters;
this.options = options;
}
@Override
public Publisher<Document> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
if (options.isRemove()) {
FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
findOneAndDeleteOptions = options.getCollation().map(Collation::toMongoCollation)
.map(findOneAndDeleteOptions::collation).orElse(findOneAndDeleteOptions);
return collectionToUse.findOneAndDelete(query, findOneAndDeleteOptions);
}
FindOneAndUpdateOptions findOneAndUpdateOptions = convertToFindOneAndUpdateOptions(options, fields, sort,
arrayFilters);
if (update instanceof Document document) {
return collection.findOneAndUpdate(query, document, findOneAndUpdateOptions);
} else if (update instanceof List) {
return collectionToUse.findOneAndUpdate(query, (List<Document>) update, findOneAndUpdateOptions);
}
return Flux
.error(new IllegalArgumentException(String.format("Using %s is not supported in findOneAndUpdate", update)));
}
private static FindOneAndUpdateOptions convertToFindOneAndUpdateOptions(FindAndModifyOptions options,
@Nullable Document fields, @Nullable Document sort, List<Document> arrayFilters) {
FindOneAndUpdateOptions result = new FindOneAndUpdateOptions();
result = result.projection(fields).sort(sort).upsert(options.isUpsert());
if (options.isReturnNew()) {
result = result.returnDocument(ReturnDocument.AFTER);
} else {
result = result.returnDocument(ReturnDocument.BEFORE);
}
result = options.getCollation().map(Collation::toMongoCollation).map(result::collation).orElse(result);
if (!CollectionUtils.isEmpty(arrayFilters)) {
result.arrayFilters(arrayFilters);
}
return result;
}
}
/**
* {@link ReactiveCollectionCallback} specific for find and remove operation.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.1
*/
private static class FindAndReplaceCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
private final Document update;
private final com.mongodb.client.model.@Nullable Collation collation;
private final FindAndReplaceOptions options;
FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, Document update, com.mongodb.client.model.@Nullable Collation collation,
FindAndReplaceOptions options) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
this.update = update;
this.collation = collation;
this.options = options;
}
@Override
public Publisher<Document> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
FindOneAndReplaceOptions findOneAndReplaceOptions = convertToFindOneAndReplaceOptions(options, fields, sort);
return collectionPreparer.prepare(collection).findOneAndReplace(query, update, findOneAndReplaceOptions);
}
private FindOneAndReplaceOptions convertToFindOneAndReplaceOptions(FindAndReplaceOptions options, Document fields,
Document sort) {
FindOneAndReplaceOptions result = new FindOneAndReplaceOptions().collation(collation);
result = result.projection(fields).sort(sort).upsert(options.isUpsert());
if (options.isReturnNew()) {
result = result.returnDocument(ReturnDocument.AFTER);
} else {
result = result.returnDocument(ReturnDocument.BEFORE);
}
return result;
}
}
private static FindOneAndDeleteOptions convertToFindOneAndDeleteOptions(@Nullable Document fields,
@Nullable Document sort) {
FindOneAndDeleteOptions result = new FindOneAndDeleteOptions();
result = result.projection(fields).sort(sort);
return result;
}
/**
* Simple internal callback to allow operations on a {@link Document}.
*
* @author Mark Paluch
*/
interface DocumentCallback<T> {
Mono<T> doWith(Document object);
}
/**
* Simple internal callback to allow operations on a {@link MongoDatabase}.
*
* @author Mark Paluch
*/
interface MongoDatabaseCallback<T> {
T doInDatabase(MongoDatabase db);
}
/**
* Simple internal callback to allow operations on a {@link MongoDatabase}.
*
* @author Mark Paluch
*/
interface ReactiveCollectionQueryCallback<T> extends ReactiveCollectionCallback<T> {
@Override
FindPublisher<T> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException;
}
static class QueryResultConverterCallback<T, R> implements DocumentCallback<R> {
private final QueryResultConverter<? super T, ? extends R> converter;
private final DocumentCallback<T> delegate;
QueryResultConverterCallback(QueryResultConverter<? super T, ? extends R> converter, DocumentCallback<T> delegate) {
this.converter = converter;
this.delegate = delegate;
}
@Override
public Mono<R> doWith(Document object) {
return delegate.doWith(object).map(it -> converter.mapDocument(object, () -> it));
}
}
/**
* Simple {@link DocumentCallback} that will transform {@link Document} into the given target type using the given
* {@link EntityReader}.
*
* @author Mark Paluch
* @author Roman Puchkovskiy
*/
class ReadDocumentCallback<T> implements DocumentCallback<T> {
private final EntityReader<? super T, Bson> reader;
private final Class<T> type;
private final String collectionName;
ReadDocumentCallback(EntityReader<? super T, Bson> reader, Class<T> type, String collectionName) {
Assert.notNull(reader, "EntityReader must not be null");
Assert.notNull(type, "Entity type must not be null");
this.reader = reader;
this.type = type;
this.collectionName = collectionName;
}
@Override
public Mono<T> doWith(Document document) {
maybeEmitEvent(new AfterLoadEvent<>(document, type, collectionName));
T entity = reader.read(type, document);
if (entity == null) {
throw new MappingException(String.format("EntityReader %s returned null", reader));
}
maybeEmitEvent(new AfterConvertEvent<>(document, entity, collectionName));
return maybeCallAfterConvert(entity, document, collectionName);
}
}
/**
* {@link DocumentCallback} transforming {@link Document} into the given {@code targetType} or decorating the
* {@code sourceType} with a {@literal projection} in case the {@code targetType} is an {@literal interface}.
*
* @param <S>
* @param <T>
* @author Christoph Strobl
* @author Roman Puchkovskiy
* @since 2.0
*/
private class ProjectingReadCallback<S, T> implements DocumentCallback<T> {
private final MongoConverter reader;
private final EntityProjection<T, S> projection;
private final String collectionName;
ProjectingReadCallback(MongoConverter reader, EntityProjection<T, S> projection, String collectionName) {
this.reader = reader;
this.projection = projection;
this.collectionName = collectionName;
}
@Override
@SuppressWarnings("unchecked")
public Mono<T> doWith(Document document) {
Class<T> returnType = projection.getMappedType().getType();
maybeEmitEvent(new AfterLoadEvent<>(document, returnType, collectionName));
Object entity = reader.project(projection, document);
if (entity == null) {
throw new MappingException(String.format("EntityReader %s returned null", reader));
}
T castEntity = (T) entity;
maybeEmitEvent(new AfterConvertEvent<>(document, castEntity, collectionName));
return maybeCallAfterConvert(castEntity, document, collectionName);
}
}
/**
* {@link DocumentCallback} that assumes a {@link GeoResult} to be created, delegates actual content unmarshalling to
* a delegate and creates a {@link GeoResult} from the result.
*
* @author Mark Paluch
* @author Chrstoph Strobl
* @author Roman Puchkovskiy
*/
static class GeoNearResultDocumentCallback<T> implements DocumentCallback<GeoResult<T>> {
private final String distanceField;
private final DocumentCallback<T> delegate;
private final Metric metric;
/**
* Creates a new {@link GeoNearResultDocumentCallback} using the given {@link DocumentCallback} delegate for
* {@link GeoResult} content unmarshalling.
*
* @param distanceField the field to read the distance from.
* @param delegate must not be {@literal null}.
* @param metric the {@link Metric} to apply to the result distance.
*/
GeoNearResultDocumentCallback(String distanceField, DocumentCallback<T> delegate, Metric metric) {
Assert.notNull(delegate, "DocumentCallback must not be null");
this.distanceField = distanceField;
this.delegate = delegate;
this.metric = metric;
}
@Override
public Mono<GeoResult<T>> doWith(Document object) {
double distance = getDistance(object);
return delegate.doWith(object).map(doWith -> new GeoResult<>(doWith, Distance.of(distance, metric)));
}
double getDistance(Document object) {
if (object.containsKey(distanceField)) {
return NumberUtils.convertNumberToTargetClass(object.get(distanceField, Number.class), Double.class);
}
return Double.NaN;
}
}
/**
* @author Mark Paluch
*/
class QueryFindPublisherPreparer implements FindPublisherPreparer {
private final Query query;
private final Document sortObject;
private final int limit;
private final long skip;
private final @Nullable Class<?> type;
QueryFindPublisherPreparer(Query query, @Nullable Class<?> type) {
this(query, query.getSortObject(), query.getLimit(), query.getSkip(), type);
}
QueryFindPublisherPreparer(Query query, Document sortObject, int limit, long skip, @Nullable Class<?> type) {
this.query = query;
this.sortObject = sortObject;
this.limit = limit;
this.skip = skip;
this.type = type;
}
@Override
public FindPublisher<Document> prepare(FindPublisher<Document> findPublisher) {
FindPublisher<Document> findPublisherToUse = operations.forType(type) //
.getCollation(query) //
.map(Collation::toMongoCollation) //
.map(findPublisher::collation) //
.orElse(findPublisher);
HintFunction hintFunction = HintFunction.from(query.getHint());
Meta meta = query.getMeta();
if (skip <= 0 && limit <= 0 && ObjectUtils.isEmpty(sortObject) && hintFunction.isEmpty() && meta.isEmpty()) {
return findPublisherToUse;
}
try {
if (skip > 0) {
findPublisherToUse = findPublisherToUse.skip((int) skip);
}
if (limit > 0) {
findPublisherToUse = findPublisherToUse.limit(limit);
}
if (!ObjectUtils.isEmpty(sortObject)) {
Document sort = type != null ? getMappedSortObject(sortObject, type) : sortObject;
findPublisherToUse = findPublisherToUse.sort(sort);
}
if (hintFunction.isPresent()) {
findPublisherToUse = hintFunction.apply(mongoDatabaseFactory, findPublisherToUse::hintString,
findPublisherToUse::hint);
}
if (meta.hasValues()) {
if (meta.hasComment()) {
findPublisherToUse = findPublisherToUse.comment(meta.getRequiredComment());
}
if (meta.hasMaxTime()) {
findPublisherToUse = findPublisherToUse.maxTime(meta.getRequiredMaxTimeMsec(), TimeUnit.MILLISECONDS);
}
if (meta.getCursorBatchSize() != null) {
findPublisherToUse = findPublisherToUse.batchSize(meta.getCursorBatchSize());
}
if (meta.getAllowDiskUse() != null) {
findPublisherToUse = findPublisherToUse.allowDiskUse(meta.getAllowDiskUse());
}
}
} catch (RuntimeException e) {
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
}
return findPublisherToUse;
}
}
class TailingQueryFindPublisherPreparer extends QueryFindPublisherPreparer {
TailingQueryFindPublisherPreparer(Query query, Class<?> type) {
super(query, type);
}
@Override
public FindPublisher<Document> prepare(FindPublisher<Document> findPublisher) {
return super.prepare(findPublisher.cursorType(CursorType.TailableAwait));
}
}
private static List<? extends Document> toDocuments(Collection<? extends Document> documents) {
return new ArrayList<>(documents);
}
/**
* {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the
* server through the driver API. <br />
* The prepare steps for {@link MongoDatabase} and {@link MongoCollection} proxy the target and invoke the desired
* target method matching the actual arguments plus a {@link ClientSession}.
*
* @author Christoph Strobl
* @since 2.1
*/
static class ReactiveSessionBoundMongoTemplate extends ReactiveMongoTemplate {
private final ReactiveMongoTemplate delegate;
private final ClientSession session;
/**
* @param session must not be {@literal null}.
* @param that must not be {@literal null}.
*/
ReactiveSessionBoundMongoTemplate(ClientSession session, ReactiveMongoTemplate that) {
super(that.mongoDatabaseFactory.withSession(session), that);
this.delegate = that;
this.session = session;
}
@Override
public Mono<MongoCollection<Document>> getCollection(String collectionName) {
// native MongoDB objects that offer methods with ClientSession must not be proxied.
return delegate.getCollection(collectionName);
}
@Override
public Mono<MongoDatabase> getMongoDatabase() {
// native MongoDB objects that offer methods with ClientSession must not be proxied.
return delegate.getMongoDatabase();
}
@Override
protected Mono<Boolean> countCanBeEstimated(Document filter, CountOptions options) {
return Mono.just(false);
}
}
class IndexCreatorEventListener implements ApplicationListener<MappingContextEvent<?, ?>> {
final Consumer<Throwable> subscriptionExceptionHandler;
public IndexCreatorEventListener(Consumer<Throwable> subscriptionExceptionHandler) {
this.subscriptionExceptionHandler = subscriptionExceptionHandler;
}
@Override
public void onApplicationEvent(MappingContextEvent<?, ?> event) {
if (!event.wasEmittedBy(mappingContext)) {
return;
}
PersistentEntity<?, ?> entity = event.getPersistentEntity();
// Double check type as Spring infrastructure does not consider nested generics
if (entity instanceof MongoPersistentEntity<?> mongoPersistentProperties) {
onCheckForIndexes(mongoPersistentProperties, subscriptionExceptionHandler);
}
}
}
/**
* Value object chaining together a given source document with its mapped representation and the collection to persist
* it to.
*
* @param <T>
* @author Christoph Strobl
* @since 2.2
*/
private static class PersistableEntityModel<T> {
private final T source;
private final @Nullable Document target;
private final String collection;
private PersistableEntityModel(T source, @Nullable Document target, String collection) {
this.source = source;
this.target = target;
this.collection = collection;
}
static <T> PersistableEntityModel<T> of(T source, String collection) {
return new PersistableEntityModel<>(source, null, collection);
}
static <T> PersistableEntityModel<T> of(T source, Document target, String collection) {
return new PersistableEntityModel<>(source, target, collection);
}
PersistableEntityModel<T> mutate(T source) {
return new PersistableEntityModel(source, target, collection);
}
PersistableEntityModel<T> addTargetDocument(Document target) {
return new PersistableEntityModel(source, target, collection);
}
T getSource() {
return source;
}
@Nullable
Document getTarget() {
return target;
}
String getCollection() {
return collection;
}
}
@FunctionalInterface
interface CountExecution {
Mono<Long> countDocuments(String collection, Document filter, CountOptions options);
}
}