ReactiveMongoTemplate.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core;

import static org.springframework.data.mongodb.core.query.SerializationUtils.*;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.domain.OffsetScrollPosition;
import org.springframework.data.domain.Window;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.Metric;
import org.springframework.data.mapping.MappingException;
import org.springframework.data.mapping.PersistentEntity;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mapping.context.MappingContextEvent;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoClusterCapable;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.mongodb.core.EntityOperations.Entity;
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
import org.springframework.data.mongodb.core.QueryOperations.CountContext;
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
import org.springframework.data.mongodb.core.QueryOperations.DistinctQueryContext;
import org.springframework.data.mongodb.core.QueryOperations.QueryContext;
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.ScrollUtils.KeysetScrollQuery;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.FieldLookupPolicy;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.bulk.Bulk;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.bulk.BulkWriteResult;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
import org.springframework.data.mongodb.core.convert.MongoWriter;
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.index.MongoMappingEventPublisher;
import org.springframework.data.mongodb.core.index.ReactiveIndexOperations;
import org.springframework.data.mongodb.core.index.ReactiveMongoPersistentEntityIndexCreator;
import org.springframework.data.mongodb.core.mapping.FieldName;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.event.*;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.data.projection.EntityProjection;
import org.springframework.data.util.Optionals;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.NumberUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;

import com.mongodb.ClientSessionOptions;
import com.mongodb.CursorType;
import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.CreateViewOptions;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.EstimatedDocumentCountOptions;
import com.mongodb.client.model.FindOneAndDeleteOptions;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.AggregatePublisher;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.DistinctPublisher;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MapReducePublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCluster;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;

/**
 * Primary implementation of {@link ReactiveMongoOperations}. It simplifies the use of Reactive MongoDB usage and helps
 * to avoid common errors. It executes core MongoDB workflow, leaving application code to provide {@link Document} and
 * extract results. This class executes BSON queries or updates, initiating iteration over {@link FindPublisher} and
 * catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
 * org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
 * {@link ReactiveMongoDatabaseFactory} reference, or get prepared in an application context and given to services as
 * bean reference.
 * <p>
 * Note: The {@link ReactiveMongoDatabaseFactory} should always be configured as a bean in the application context, in
 * the first case given to the service directly, in the second case to the prepared template.
 * <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
 * <p>
 * {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
 * {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
 * <p>
 * You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
 * generally apply a {@link ReadPreference}.
 * <p>
 * When using transactions make sure to create this template with the same {@link ReactiveMongoDatabaseFactory} that is
 * also used for {@code ReactiveMongoTransactionManager} creation.
 *
 * @author Mark Paluch
 * @author Christoph Strobl
 * @author Roman Puchkovskiy
 * @author Mathieu Ouellet
 * @author Yadhukrishna S Pai
 * @author Florian L��diger
 * @author Kyuhong Han
 * @since 2.0
 */
public class ReactiveMongoTemplate implements ReactiveMongoOperations, ApplicationContextAware {

	public static final DbRefResolver NO_OP_REF_RESOLVER = NoOpDbRefResolver.INSTANCE;

	private static final Log LOGGER = LogFactory.getLog(ReactiveMongoTemplate.class);
	private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;

	private final MongoConverter mongoConverter;
	private final MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext;
	private final ReactiveMongoDatabaseFactory mongoDatabaseFactory;
	private final PersistenceExceptionTranslator exceptionTranslator;
	private final QueryMapper queryMapper;
	private final UpdateMapper updateMapper;
	private final ApplicationListener<MappingContextEvent<?, ?>> indexCreatorListener;
	private final EntityOperations operations;
	private final PropertyOperations propertyOperations;
	private final QueryOperations queryOperations;
	private final EntityLifecycleEventDelegate eventDelegate;

	private @Nullable WriteConcern writeConcern;
	private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE;
	private WriteResultChecking writeResultChecking = WriteResultChecking.NONE;
	private @Nullable ReadPreference readPreference;
	private @Nullable ApplicationEventPublisher eventPublisher;
	private @Nullable ReactiveEntityCallbacks entityCallbacks;
	private @Nullable ReactiveMongoPersistentEntityIndexCreator indexCreator;

	private SessionSynchronization sessionSynchronization = SessionSynchronization.ON_ACTUAL_TRANSACTION;

	private CountExecution countExecution = this::doExactCount;

	/**
	 * Constructor used for a basic template configuration.
	 * <p>
	 * If you intend to use transactions, make sure to use {@link #ReactiveMongoTemplate(ReactiveMongoDatabaseFactory)} or
	 * {@link #ReactiveMongoTemplate(ReactiveMongoDatabaseFactory, MongoConverter)} constructors, otherwise, this template
	 * will not participate in transactions using the default {@code SessionSynchronization.ON_ACTUAL_TRANSACTION} setting
	 * as {@code ReactiveMongoTransactionManager} uses strictly its configured {@link ReactiveMongoDatabaseFactory} for
	 * transaction participation.
	 *
	 * @param mongoClient must not be {@literal null}.
	 * @param databaseName must not be {@literal null} or empty.
	 */
	public ReactiveMongoTemplate(MongoClient mongoClient, String databaseName) {
		this(new SimpleReactiveMongoDatabaseFactory(mongoClient, databaseName), (MongoConverter) null);
	}

	/**
	 * Constructor used for a basic template configuration.
	 *
	 * @param mongoDatabaseFactory must not be {@literal null}.
	 */
	public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory) {
		this(mongoDatabaseFactory, (MongoConverter) null);
	}

	/**
	 * Constructor used for a basic template configuration.
	 *
	 * @param mongoDatabaseFactory must not be {@literal null}.
	 * @param mongoConverter can be {@literal null}.
	 */
	public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory,
			@Nullable MongoConverter mongoConverter) {
		this(mongoDatabaseFactory, mongoConverter, ReactiveMongoTemplate::handleSubscriptionException);
	}

	/**
	 * Constructor used for a basic template configuration.
	 *
	 * @param mongoDatabaseFactory must not be {@literal null}.
	 * @param mongoConverter can be {@literal null}.
	 * @param subscriptionExceptionHandler exception handler called by {@link Flux#doOnError(Consumer)} on reactive type
	 *          materialization via {@link Publisher#subscribe(Subscriber)}. This callback is used during non-blocking
	 *          subscription of e.g. index creation {@link Publisher}s. Must not be {@literal null}.
	 * @since 2.1
	 */
	public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory,
			@Nullable MongoConverter mongoConverter, Consumer<Throwable> subscriptionExceptionHandler) {

		Assert.notNull(mongoDatabaseFactory, "ReactiveMongoDatabaseFactory must not be null");

		this.mongoDatabaseFactory = mongoDatabaseFactory;
		this.exceptionTranslator = mongoDatabaseFactory.getExceptionTranslator();
		this.mongoConverter = mongoConverter == null ? getDefaultMongoConverter() : mongoConverter;
		this.queryMapper = new QueryMapper(this.mongoConverter);
		this.updateMapper = new UpdateMapper(this.mongoConverter);
		this.indexCreatorListener = new IndexCreatorEventListener(subscriptionExceptionHandler);

		// We always have a mapping context in the converter, whether it's a simple one or not
		this.mappingContext = this.mongoConverter.getMappingContext();
		this.operations = new EntityOperations(this.mongoConverter, this.queryMapper);
		this.propertyOperations = new PropertyOperations(this.mongoConverter.getMappingContext());
		this.queryOperations = new QueryOperations(queryMapper, updateMapper, operations, propertyOperations,
				mongoDatabaseFactory);
		this.eventDelegate = new EntityLifecycleEventDelegate();

		// We create indexes based on mapping events
		if (this.mappingContext instanceof MongoMappingContext mongoMappingContext) {

			if (mongoMappingContext.isAutoIndexCreation()) {
				this.indexCreator = new ReactiveMongoPersistentEntityIndexCreator(mongoMappingContext, this::indexOps);
				this.eventPublisher = new MongoMappingEventPublisher(this.indexCreatorListener);

				mongoMappingContext.setApplicationEventPublisher(this.eventPublisher);
				this.mappingContext.getPersistentEntities()
						.forEach(entity -> onCheckForIndexes(entity, subscriptionExceptionHandler));
			}
		}
	}

	private ReactiveMongoTemplate(ReactiveMongoDatabaseFactory dbFactory, ReactiveMongoTemplate that) {

		this.mongoDatabaseFactory = dbFactory;
		this.exceptionTranslator = that.exceptionTranslator;
		this.mongoConverter = that.mongoConverter;
		this.queryMapper = that.queryMapper;
		this.updateMapper = that.updateMapper;
		this.indexCreator = that.indexCreator;
		this.indexCreatorListener = that.indexCreatorListener;
		this.mappingContext = that.mappingContext;
		this.operations = that.operations;
		this.propertyOperations = that.propertyOperations;
		this.sessionSynchronization = that.sessionSynchronization;
		this.queryOperations = that.queryOperations;
		this.eventDelegate = that.eventDelegate;
	}

	private void onCheckForIndexes(MongoPersistentEntity<?> entity, Consumer<Throwable> subscriptionExceptionHandler) {

		if (indexCreator != null) {
			indexCreator.checkForIndexes(entity).subscribe(v -> {}, subscriptionExceptionHandler);
		}
	}

	private static void handleSubscriptionException(Throwable t) {
		LOGGER.error("Unexpected exception during asynchronous execution", t);
	}

	/**
	 * Configures the {@link WriteResultChecking} to be used with the template. Setting {@literal null} will reset the
	 * default of {@link ReactiveMongoTemplate#DEFAULT_WRITE_RESULT_CHECKING}.
	 *
	 * @param resultChecking
	 */
	public void setWriteResultChecking(@Nullable WriteResultChecking resultChecking) {
		this.writeResultChecking = resultChecking == null ? DEFAULT_WRITE_RESULT_CHECKING : resultChecking;
	}

	/**
	 * Configures the {@link WriteConcern} to be used with the template. If none is configured the {@link WriteConcern}
	 * configured on the {@link MongoDatabaseFactory} will apply.
	 *
	 * @param writeConcern can be {@literal null}.
	 */
	public void setWriteConcern(@Nullable WriteConcern writeConcern) {
		this.writeConcern = writeConcern;
	}

	/**
	 * Configures the {@link WriteConcernResolver} to be used with the template.
	 *
	 * @param writeConcernResolver can be {@literal null}.
	 */
	public void setWriteConcernResolver(@Nullable WriteConcernResolver writeConcernResolver) {
		this.writeConcernResolver = writeConcernResolver != null ? writeConcernResolver
				: DefaultWriteConcernResolver.INSTANCE;
	}

	/**
	 * Used by {@link #prepareCollection(MongoCollection)} to set the {@link ReadPreference} before any operations
	 * are performed.
	 *
	 * @param readPreference
	 */
	public void setReadPreference(ReadPreference readPreference) {
		this.readPreference = readPreference;
	}

	/**
	 * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
	 * published or whether emission should be suppressed. Enabled by default.
	 *
	 * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events.
	 * @since 4.0
	 * @see MongoMappingEvent
	 */
	public void setEntityLifecycleEventsEnabled(boolean enabled) {
		this.eventDelegate.setEventsEnabled(enabled);
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

		prepareIndexCreator(applicationContext);

		setApplicationEventPublisher(applicationContext);
		if (entityCallbacks == null) {
			setEntityCallbacks(ReactiveEntityCallbacks.create(applicationContext));
		}

		if (eventPublisher != null && mappingContext instanceof ApplicationEventPublisherAware applicationEventPublisherAware) {
			applicationEventPublisherAware.setApplicationEventPublisher(eventPublisher);
		}
	}

	void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.eventPublisher = applicationEventPublisher;
		eventDelegate.setPublisher(this.eventPublisher);
	}

	/**
	 * Set the {@link ReactiveEntityCallbacks} instance to use when invoking
	 * {@link org.springframework.data.mapping.callback.EntityCallback callbacks} like the
	 * {@link ReactiveBeforeSaveCallback}. <br />
	 * Overrides potentially existing {@link ReactiveEntityCallbacks}.
	 *
	 * @param entityCallbacks must not be {@literal null}.
	 * @throws IllegalArgumentException if the given instance is {@literal null}.
	 * @since 2.2
	 */
	public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) {

		Assert.notNull(entityCallbacks, "EntityCallbacks must not be null");
		this.entityCallbacks = entityCallbacks;
	}

	/**
	 * Configure whether to use estimated count. Defaults to exact counting.
	 *
	 * @param enabled use {@link com.mongodb.client.MongoCollection#estimatedDocumentCount()} for unpaged and empty
	 *          {@link Query queries} if {@code true}.
	 * @since 3.4
	 */
	public void useEstimatedCount(boolean enabled) {
		useEstimatedCount(enabled, this::countCanBeEstimated);
	}

	/**
	 * Configure whether to use estimated count based on the given {@link BiPredicate estimationFilter}.
	 *
	 * @param enabled use {@link com.mongodb.client.MongoCollection#estimatedDocumentCount()} for unpaged and empty
	 *          {@link Query queries} if {@code true}.
	 * @param estimationFilter the {@link BiPredicate filter}.
	 * @since 3.4
	 */
	private void useEstimatedCount(boolean enabled, BiFunction<Document, CountOptions, Mono<Boolean>> estimationFilter) {

		if (enabled) {

			this.countExecution = (collectionName, filter, options) -> {

				return estimationFilter.apply(filter, options).flatMap(canEstimate -> {
					if (!canEstimate) {
						return doExactCount(collectionName, filter, options);
					}

					EstimatedDocumentCountOptions estimatedDocumentCountOptions = new EstimatedDocumentCountOptions();
					if (options.getMaxTime(TimeUnit.MILLISECONDS) > 0) {
						estimatedDocumentCountOptions.maxTime(options.getMaxTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
					}

					return doEstimatedCount(collectionName, estimatedDocumentCountOptions);
				});
			};
		} else {
			this.countExecution = this::doExactCount;
		}
	}

	/**
	 * Inspects the given {@link ApplicationContext} for {@link ReactiveMongoPersistentEntityIndexCreator} and those in
	 * turn if they were registered for the current {@link MappingContext}. If no creator for the current
	 * {@link MappingContext} can be found we manually add the internally created one as {@link ApplicationListener} to
	 * make sure indexes get created appropriately for entity types persisted through this {@link ReactiveMongoTemplate}
	 * instance.
	 *
	 * @param context must not be {@literal null}.
	 */
	private void prepareIndexCreator(ApplicationContext context) {

		String[] indexCreators = context.getBeanNamesForType(ReactiveMongoPersistentEntityIndexCreator.class);

		for (String creator : indexCreators) {
			ReactiveMongoPersistentEntityIndexCreator creatorBean = context.getBean(creator,
					ReactiveMongoPersistentEntityIndexCreator.class);
			if (creatorBean.isIndexCreatorFor(mappingContext)) {
				return;
			}
		}

		if (context instanceof ConfigurableApplicationContext configurableApplicationContext) {
			configurableApplicationContext.addApplicationListener(indexCreatorListener);
		}
	}

	/**
	 * Returns the default {@link MongoConverter}.
	 *
	 * @return
	 */
	@Override
	public MongoConverter getConverter() {
		return this.mongoConverter;
	}

	EntityOperations getEntityOperations() {
		return operations;
	}

	QueryOperations getQueryOperations() {
		return queryOperations;
	}

	@Override
	public ReactiveIndexOperations indexOps(String collectionName) {
		return new DefaultReactiveIndexOperations(this, collectionName, this.queryMapper);
	}

	@Override
	public ReactiveIndexOperations indexOps(Class<?> entityClass) {
		return new DefaultReactiveIndexOperations(this, getCollectionName(entityClass), this.queryMapper, entityClass);
	}

	@Override
	public String getCollectionName(Class<?> entityClass) {
		return operations.determineCollectionName(entityClass);
	}

	@Override
	public Mono<Document> executeCommand(String jsonCommand) {

		Assert.notNull(jsonCommand, "Command must not be empty");

		return executeCommand(Document.parse(jsonCommand));
	}

	@Override
	public Mono<Document> executeCommand(Document command) {
		return executeCommand(command, null);
	}

	@Override
	public Mono<Document> executeCommand(Document command, @Nullable ReadPreference readPreference) {

		Assert.notNull(command, "Command must not be null");

		return createFlux(db -> readPreference != null ? db.runCommand(command, readPreference, Document.class)
				: db.runCommand(command, Document.class)).next();
	}

	@Override
	public <T> Flux<T> execute(Class<?> entityClass, ReactiveCollectionCallback<T> action) {
		return createFlux(getCollectionName(entityClass), action);
	}

	@Override
	public <T> Flux<T> execute(ReactiveDatabaseCallback<T> action) {
		return createFlux(action);
	}

	@Override
	public <T> Flux<T> execute(String collectionName, ReactiveCollectionCallback<T> callback) {

		Assert.notNull(callback, "ReactiveCollectionCallback must not be null");

		return createFlux(collectionName, callback);
	}

	<T> Mono<T> doWithCluster(Function<MongoCluster, Publisher<T>> callback) {

		if (!(mongoDatabaseFactory instanceof ReactiveMongoClusterCapable clusterCapable)) {
			return Mono.error(new IllegalStateException(
					"Unable to obtain MongoCluster. Does your database factory implement ReactiveMongoClusterCapable?"));
		}

		return Mono.from(callback.apply(clusterCapable.getMongoCluster())).onErrorMap(translateException());
	}

	@Override
	public ReactiveSessionScoped withSession(Publisher<ClientSession> sessionProvider) {

		Mono<ClientSession> cachedSession = Mono.from(sessionProvider).cache();

		return new ReactiveSessionScoped() {

			@Override
			public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {

				return cachedSession.flatMapMany(session -> {

					return ReactiveMongoTemplate.this.withSession(action, session) //
							.doFinally(signalType -> {
								doFinally.accept(session);
							});
				});
			}
		};
	}

	/**
	 * Define if {@link ReactiveMongoTemplate} should participate in transactions. Default is set to
	 * {@link SessionSynchronization#ON_ACTUAL_TRANSACTION}.
	 * <p>
	 * <strong>NOTE:</strong> MongoDB transactions require at least MongoDB 4.0.
	 *
	 * @since 2.2
	 */
	public void setSessionSynchronization(SessionSynchronization sessionSynchronization) {
		this.sessionSynchronization = sessionSynchronization;
	}

	private <T> Flux<T> withSession(ReactiveSessionCallback<T> action, ClientSession session) {

		ReactiveSessionBoundMongoTemplate operations = new ReactiveSessionBoundMongoTemplate(session,
				ReactiveMongoTemplate.this);

		return Flux.from(action.doInSession(operations)) //
				.contextWrite(ctx -> ReactiveMongoContext.setSession(ctx, Mono.just(session)));
	}

	@Override
	public ReactiveMongoOperations withSession(ClientSession session) {
		return new ReactiveSessionBoundMongoTemplate(session, ReactiveMongoTemplate.this);
	}

	@Override
	public ReactiveSessionScoped withSession(ClientSessionOptions sessionOptions) {
		return withSession(mongoDatabaseFactory.getSession(sessionOptions));
	}

	/**
	 * Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
	 * {@link Flux} or to reuse the {@link Flux}.
	 *
	 * @param callback must not be {@literal null}
	 * @return a {@link Flux} wrapping the {@link ReactiveDatabaseCallback}.
	 */
	public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {

		Assert.notNull(callback, "ReactiveDatabaseCallback must not be null");

		return Mono.defer(this::doGetDatabase).flatMapMany(database -> callback.doInDB(prepareDatabase(database)))
				.onErrorMap(translateException());
	}

	/**
	 * Create a reusable Mono for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new
	 * {@link Flux} or to reuse the {@link Flux}.
	 *
	 * @param callback must not be {@literal null}
	 * @return a {@link Mono} wrapping the {@link ReactiveDatabaseCallback}.
	 */
	public <T> Mono<T> createMono(ReactiveDatabaseCallback<T> callback) {

		Assert.notNull(callback, "ReactiveDatabaseCallback must not be null");

		return Mono.defer(this::doGetDatabase).flatMap(database -> Mono.from(callback.doInDB(prepareDatabase(database))))
				.onErrorMap(translateException());
	}

	/**
	 * Create a reusable {@link Flux} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
	 *
	 * @param collectionName must not be empty or {@literal null}.
	 * @param callback must not be {@literal null}.
	 * @return a reusable {@link Flux} wrapping the {@link ReactiveCollectionCallback}.
	 */
	public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {

		Assert.hasText(collectionName, "Collection name must not be null or empty");
		Assert.notNull(callback, "ReactiveDatabaseCallback must not be null");

		Mono<MongoCollection<Document>> collectionPublisher = doGetDatabase()
				.map(database -> getAndPrepareCollection(database, collectionName));

		return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
	}

	/**
	 * Create a reusable {@link Mono} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
	 *
	 * @param collectionName must not be empty or {@literal null}.
	 * @param callback must not be {@literal null}.
	 * @param <T>
	 * @return a reusable {@link Mono} wrapping the {@link ReactiveCollectionCallback}.
	 */
	public <T> Mono<T> createMono(String collectionName, ReactiveCollectionCallback<T> callback) {

		Assert.hasText(collectionName, "Collection name must not be null or empty");
		Assert.notNull(callback, "ReactiveCollectionCallback must not be null");

		Mono<MongoCollection<Document>> collectionPublisher = doGetDatabase()
				.map(database -> getAndPrepareCollection(database, collectionName));

		return collectionPublisher.flatMap(collection -> Mono.from(callback.doInCollection(collection)))
				.onErrorMap(translateException());
	}

	@Override
	public Mono<BulkWriteResult> bulkWrite(Bulk bulk, BulkWriteOptions options) {
		return doGetDatabase().flatMap(db -> new ReactiveBulkWriter(this).write(db.getName(), bulk, options));
	}

	@Override
	public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass) {
		return createCollection(entityClass, Function.identity());
	}

	@Override
	public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass,
			Function<? super CollectionOptions, ? extends CollectionOptions> collectionOptionsCustomizer) {

		Assert.notNull(collectionOptionsCustomizer, "CollectionOptions customizer function must not be null");

		return createCollection(entityClass,
				collectionOptionsCustomizer.apply(operations.forType(entityClass).getCollectionOptions()));
	}

	@Override
	public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass,
			@Nullable CollectionOptions collectionOptions) {

		Assert.notNull(entityClass, "EntityClass must not be null");

		CollectionOptions options = collectionOptions != null ? collectionOptions : CollectionOptions.empty();
		options = Optionals
				.firstNonEmpty(() -> Optional.ofNullable(collectionOptions).flatMap(CollectionOptions::getCollation),
						() -> operations.forType(entityClass).getCollation()) //
				.map(options::collation).orElse(options);

		return doCreateCollection(getCollectionName(entityClass), convertToCreateCollectionOptions(options, entityClass));
	}

	@Override
	public Mono<MongoCollection<Document>> createCollection(String collectionName) {
		return doCreateCollection(collectionName, new CreateCollectionOptions());
	}

	@Override
	public Mono<MongoCollection<Document>> createCollection(String collectionName,
			@Nullable CollectionOptions collectionOptions) {
		return doCreateCollection(collectionName, convertToCreateCollectionOptions(collectionOptions));
	}

	@Override
	public Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationPipeline pipeline,
			@Nullable ViewOptions options) {

		return createView(name, getCollectionName(source),
				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
				options);
	}

	@Override
	public Mono<MongoCollection<Document>> createView(String name, String source, AggregationPipeline pipeline,
			@Nullable ViewOptions options) {

		return createView(name, source,
				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
				options);
	}

	private Mono<MongoCollection<Document>> createView(String name, String source, AggregationDefinition aggregation,
			@Nullable ViewOptions options) {
		return doCreateView(name, source, aggregation.getAggregationPipeline(), options);
	}

	protected Mono<MongoCollection<Document>> doCreateView(String name, String source, List<Document> pipeline,
			@Nullable ViewOptions options) {

		CreateViewOptions viewOptions = new CreateViewOptions();
		if (options != null) {
			options.getCollation().map(Collation::toMongoCollation).ifPresent(viewOptions::collation);
		}

		return execute(db -> {
			return Flux.from(db.createView(name, source, pipeline, viewOptions))
					.then(Mono.fromSupplier(() -> db.getCollection(name)));
		}).next();
	}

	@Override
	public Mono<MongoCollection<Document>> getCollection(String collectionName) {

		Assert.notNull(collectionName, "Collection name must not be null");

		return createMono(db -> Mono.just(db.getCollection(collectionName)));
	}

	@Override
	public <T> Mono<Boolean> collectionExists(Class<T> entityClass) {
		return collectionExists(getCollectionName(entityClass));
	}

	@Override
	public Mono<Boolean> collectionExists(String collectionName) {
		return createMono(db -> Flux.from(db.listCollectionNames()) //
				.filter(s -> s.equals(collectionName)) //
				.map(s -> true) //
				.single(false));
	}

	@Override
	public <T> Mono<Void> dropCollection(Class<T> entityClass) {
		return dropCollection(getCollectionName(entityClass));
	}

	@Override
	public Mono<Void> dropCollection(String collectionName) {

		return createMono(collectionName, MongoCollection::drop).doOnSuccess(success -> {
			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug("Dropped collection [" + collectionName + "]");
			}
		}).then();
	}

	@Override
	public ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName) {
		return bulkOps(mode, null, collectionName);
	}

	@Override
	public ReactiveBulkOperations bulkOps(BulkMode mode, Class<?> entityClass) {
		return bulkOps(mode, entityClass, getCollectionName(entityClass));
	}

	@Override
	public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName) {

		Assert.notNull(mode, "BulkMode must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName,
				new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper,
						updateMapper, eventPublisher, entityCallbacks));

		operations.setDefaultWriteConcern(writeConcern);

		return operations;
	}

	@Override
	public Flux<String> getCollectionNames() {
		return createFlux(db -> db.listCollectionNames());
	}

	public Mono<MongoDatabase> getMongoDatabase() {
		return mongoDatabaseFactory.getMongoDatabase();
	}

	protected Mono<MongoDatabase> doGetDatabase() {
		return ReactiveMongoDatabaseUtils.getDatabase(mongoDatabaseFactory, sessionSynchronization);
	}

	<T> Mono<SourceAwareDocument<T>> prepareObjectForSaveReactive(String collectionName, T objectToSave) {
		return prepareObjectForSaveReactive(collectionName, objectToSave, mongoConverter);
	}

	<T> Mono<SourceAwareDocument<T>> prepareObjectForSaveReactive(String collectionName, T objectToSave,
			MongoWriter<? super Object> writer) {

		T toConvert = maybeEmitEvent(new BeforeConvertEvent<>(objectToSave, collectionName)).getSource();
		return maybeCallBeforeConvert(toConvert, collectionName).map(initialized -> {

			AdaptibleEntity<T> entity = operations.forEntityUpsert(initialized, mongoConverter.getConversionService());
			T withVersion = entity.initializeVersionProperty();
			Document dbDoc = entity.toMappedDocument(writer).getDocument();
			maybeEmitEvent(new BeforeSaveEvent<>(withVersion, dbDoc, collectionName));
			return new Object[] { withVersion, dbDoc };
		}).flatMap(pair -> maybeCallBeforeSave((T) pair[0], (Document) pair[1], collectionName)
				.map(saved -> new SourceAwareDocument<>(saved, (Document) pair[1], collectionName)));
	}

	@Override
	public <T> Mono<T> findOne(Query query, Class<T> entityClass) {
		return findOne(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Mono<T> findOne(Query query, Class<T> entityClass, String collectionName) {

		if (ObjectUtils.isEmpty(query.getSortObject())) {
			return doFindOne(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
					query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
		}

		query.limit(1);
		return find(query, entityClass, collectionName).next();
	}

	@Override
	public Mono<Boolean> exists(Query query, Class<?> entityClass) {
		return exists(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public Mono<Boolean> exists(Query query, String collectionName) {
		return exists(query, null, collectionName);
	}

	@Override
	public Mono<Boolean> exists(Query query, @Nullable Class<?> entityClass, String collectionName) {

		if (query == null) {
			throw new InvalidDataAccessApiUsageException("Query passed in to exist can't be null");
		}

		return createFlux(collectionName, collection -> {

			ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
			QueryContext queryContext = queryOperations.createQueryContext(query);
			Document filter = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);

			FindPublisher<Document> findPublisher = collectionPreparer.prepare(collection).find(filter, Document.class)
					.projection(new Document(FieldName.ID.name(), 1));

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("exists: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
			}

			queryContext.applyCollation(entityClass, findPublisher::collation);

			return findPublisher.limit(1);
		}).hasElements();
	}

	@Override
	public <T> Flux<T> find(Query query, Class<T> entityClass) {
		return find(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Flux<T> find(@Nullable Query query, Class<T> entityClass, String collectionName) {

		if (query == null) {
			return findAll(entityClass, collectionName);
		}

		return doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
				query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
	}

	@Override
	public <T> Mono<Window<T>> scroll(Query query, Class<T> entityType) {

		Assert.notNull(entityType, "Entity type must not be null");

		return scroll(query, entityType, getCollectionName(entityType));
	}

	@Override
	public <T> Mono<Window<T>> scroll(Query query, Class<T> entityType, String collectionName) {
		return doScroll(query, entityType, entityType, QueryResultConverter.entity(), collectionName);
	}

	<T, R> Mono<Window<R>> doScroll(Query query, Class<?> sourceClass, Class<T> targetClass,
			QueryResultConverter<? super T, ? extends R> resultConverter, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(sourceClass, "Entity type must not be null");
		Assert.notNull(targetClass, "Target type must not be null");

		EntityProjection<T, ?> projection = operations.introspectProjection(targetClass, sourceClass);
		DocumentCallback<R> callback = getResultReader(projection, collectionName, resultConverter);
		int limit = query.isLimited() ? query.getLimit() + 1 : Integer.MAX_VALUE;

		if (query.hasKeyset()) {

			KeysetScrollQuery keysetPaginationQuery = ScrollUtils.createKeysetPaginationQuery(query,
					operations.getIdPropertyName(sourceClass));

			Mono<List<R>> result = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query),
					keysetPaginationQuery.query(), keysetPaginationQuery.fields(), sourceClass,
					new QueryFindPublisherPreparer(query, keysetPaginationQuery.sort(), limit, 0, sourceClass), callback)
					.collectList();

			return result.map(it -> ScrollUtils.createWindow(query, it, sourceClass, operations));
		}

		Mono<List<R>> result = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
				query.getFieldsObject(), sourceClass,
				new QueryFindPublisherPreparer(query, query.getSortObject(), limit, query.getSkip(), sourceClass), callback)
				.collectList();

		return result.map(
				it -> ScrollUtils.createWindow(it, query.getLimit(), OffsetScrollPosition.positionFunction(query.getSkip())));
	}

	@Override
	public <T> Mono<T> findById(Object id, Class<T> entityClass) {
		return findById(id, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Mono<T> findById(Object id, Class<T> entityClass, String collectionName) {

		String idKey = operations.getIdPropertyName(entityClass);

		return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), null, entityClass,
				(Collation) null);
	}

	@Override
	public <T> Flux<T> findDistinct(Query query, String field, Class<?> entityClass, Class<T> resultClass) {
		return findDistinct(query, field, getCollectionName(entityClass), entityClass, resultClass);
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> Flux<T> findDistinct(Query query, String field, String collectionName, Class<?> entityClass,
			Class<T> resultClass) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(field, "Field must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");
		Assert.notNull(resultClass, "ResultClass must not be null");

		MongoPersistentEntity<?> entity = getPersistentEntity(entityClass);
		DistinctQueryContext distinctQueryContext = queryOperations.distinctQueryContext(query, field);

		Document mappedQuery = distinctQueryContext.getMappedQuery(entity);
		String mappedFieldName = distinctQueryContext.getMappedFieldName(entity);
		Class<T> mongoDriverCompatibleType = distinctQueryContext.getDriverCompatibleClass(resultClass);
		ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);

		Flux<?> result = execute(collectionName, collection -> {

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Executing findDistinct using query %s for field: %s in collection: %s",
						serializeToJsonSafely(mappedQuery), field, collectionName));
			}

			FindPublisherPreparer preparer = new QueryFindPublisherPreparer(query, entityClass);

			DistinctPublisher<T> publisher = collectionPreparer.prepare(collection).distinct(mappedFieldName, mappedQuery,
					mongoDriverCompatibleType);
			distinctQueryContext.applyCollation(entityClass, publisher::collation);
			return publisher;
		});

		if (resultClass == Object.class || mongoDriverCompatibleType != resultClass) {

			Class<?> targetType = distinctQueryContext.getMostSpecificConversionTargetType(resultClass, entityClass);
			MongoConverter converter = getConverter();

			result = result.map(it -> converter.mapValueToTargetType(it, targetType, NO_OP_REF_RESOLVER));
		}

		return (Flux<T>) result;
	}

	@Override
	public <O> Flux<O> aggregate(TypedAggregation<?> aggregation, String inputCollectionName, Class<O> outputType) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		return doAggregate(aggregation, inputCollectionName, aggregation.getInputType(), outputType);
	}

	@Override
	public <O> Flux<O> aggregate(TypedAggregation<?> aggregation, Class<O> outputType) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		return aggregate(aggregation, getCollectionName(aggregation.getInputType()), outputType);
	}

	@Override
	public <O> Flux<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
		return doAggregate(aggregation, getCollectionName(inputType), inputType, outputType);
	}

	@Override
	public <O> Flux<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType) {
		return doAggregate(aggregation, collectionName, null, outputType);
	}

	protected <O> Flux<O> doAggregate(Aggregation aggregation, String collectionName, @Nullable Class<?> inputType,
			Class<O> outputType) {

		AggregationDefinition context = queryOperations.createAggregation(aggregation, inputType);
		return doAggregate(aggregation, collectionName, outputType, QueryResultConverter.entity(), context);
	}

	<T, O> Flux<O> doAggregate(Aggregation aggregation, String collectionName, @Nullable Class<?> inputType,
			Class<T> outputType, QueryResultConverter<? super T, ? extends O> resultConverter) {

		AggregationDefinition context = queryOperations.createAggregation(aggregation, inputType);
		return doAggregate(aggregation, collectionName, outputType, resultConverter, context);
	}

	<T, O> Flux<O> doAggregate(Aggregation aggregation, String collectionName, Class<T> outputType,
			QueryResultConverter<? super T, ? extends O> resultConverter, AggregationDefinition definition) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");
		Assert.notNull(outputType, "Output type must not be null");

		AggregationOptions options = aggregation.getOptions();
		Assert.isTrue(!options.isExplain(), "Cannot use explain option with streaming");


		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Streaming aggregation: %s in collection %s",
					serializeToJsonSafely(definition.getAggregationPipeline()), collectionName));
		}

		DocumentCallback<O> readCallback = new QueryResultConverterCallback<>(resultConverter,
				new ReadDocumentCallback<>(mongoConverter, outputType, collectionName));
		return execute(collectionName, collection -> aggregateAndMap(collection, definition.getAggregationPipeline(),
				definition.isOutOrMerge(), options, readCallback, definition.getInputType()));
	}

	private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<Document> pipeline,
			boolean isOutOrMerge, AggregationOptions options, DocumentCallback<O> readCallback,
			@Nullable Class<?> inputType) {

		ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(options);
		AggregatePublisher<Document> cursor = collectionPreparer.prepare(collection).aggregate(pipeline, Document.class);

		if (options.isAllowDiskUseSet()) {
			cursor = cursor.allowDiskUse(options.isAllowDiskUse());
		}

		if (options.getCursorBatchSize() != null) {
			cursor = cursor.batchSize(options.getCursorBatchSize());
		}

		options.getComment().ifPresent(cursor::comment);

		HintFunction hintFunction = options.getHintObject().map(HintFunction::from).orElseGet(HintFunction::empty);
		if (hintFunction.isPresent()) {
			cursor = hintFunction.apply(mongoDatabaseFactory, cursor::hintString, cursor::hint);
		}

		Optionals.firstNonEmpty(options::getCollation, () -> operations.forType(inputType).getCollation()) //
				.map(Collation::toMongoCollation) //
				.ifPresent(cursor::collation);

		if (options.hasExecutionTimeLimit()) {
			cursor = cursor.maxTime(options.getMaxTime().toMillis(), TimeUnit.MILLISECONDS);
		}

		if (options.isSkipResults()) {
			return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
		}

		return Flux.from(cursor).flatMapSequential(readCallback::doWith);
	}

	@Override
	public <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<T> entityClass) {
		return geoNear(near, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<T> entityClass, String collectionName) {
		return geoNear(near, entityClass, collectionName, entityClass);
	}

	protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, String collectionName,
			Class<T> returnType) {
		return doGeoNear(near, entityClass, collectionName, returnType, QueryResultConverter.entity());
	}

	Mono<Long> doGeoNearCount(NearQuery near, Class<?> domainType, String collectionName) {

		Builder optionsBuilder = AggregationOptions.builder().collation(near.getCollation());

		if (near.hasReadPreference()) {
			optionsBuilder.readPreference(near.getReadPreference());
		}

		if (near.hasReadConcern()) {
			optionsBuilder.readConcern(near.getReadConcern());
		}

		String distanceField = operations.nearQueryDistanceFieldName(domainType);
		Aggregation $geoNear = TypedAggregation.newAggregation(domainType,
				Aggregation.geoNear(near, distanceField).skip(-1).limit(-1), Aggregation.count().as("_totalCount"))
				.withOptions(optionsBuilder.build());

		AggregationDefinition definition = queryOperations.createAggregation($geoNear, (AggregationOperationContext) null);

		Flux<Document> results = doAggregate($geoNear, collectionName, Document.class, QueryResultConverter.entity(),
				definition);

		return results.last()
				.map(doc -> NumberUtils.convertNumberToTargetClass(doc.get("_totalCount", Integer.class), Long.class))
				.defaultIfEmpty(0L);
	}

	@SuppressWarnings("unchecked")
	<T, R> Flux<GeoResult<R>> doGeoNear(NearQuery near, Class<?> entityClass, String collectionName, Class<T> returnType,
			QueryResultConverter<? super T, ? extends R> resultConverter) {

		if (near == null) {
			throw new InvalidDataAccessApiUsageException("NearQuery must not be null");
		}

		if (entityClass == null) {
			throw new InvalidDataAccessApiUsageException("Entity class must not be null");
		}

		String collection = StringUtils.hasText(collectionName) ? collectionName : getCollectionName(entityClass);
		String distanceField = operations.nearQueryDistanceFieldName(entityClass);
		EntityProjection<T, ?> projection = operations.introspectProjection(returnType, entityClass);

		GeoNearResultDocumentCallback<R> callback = new GeoNearResultDocumentCallback<>(distanceField,
				getResultReader(projection, collectionName, resultConverter), near.getMetric());

		Builder optionsBuilder = AggregationOptions.builder();
		if (near.hasReadPreference()) {
			optionsBuilder.readPreference(near.getReadPreference());
		}

		if (near.hasReadConcern()) {
			optionsBuilder.readConcern(near.getReadConcern());
		}

		optionsBuilder.collation(near.getCollation());

		Aggregation $geoNear = TypedAggregation.newAggregation(entityClass, Aggregation.geoNear(near, distanceField))
				.withOptions(optionsBuilder.build());

		return aggregate($geoNear, collection, Document.class) //
				.flatMapSequential(callback::doWith);
	}

	@Override
	public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, Class<T> entityClass) {
		return findAndModify(query, update, new FindAndModifyOptions(), entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, Class<T> entityClass, String collectionName) {
		return findAndModify(query, update, new FindAndModifyOptions(), entityClass, collectionName);
	}

	@Override
	public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
			Class<T> entityClass) {
		return findAndModify(query, update, options, entityClass, getCollectionName(entityClass));
	}

	public <S, T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
			Class<S> entityClass, String collectionName, QueryResultConverter<? super S, ? extends T> resultConverter) {

		Assert.notNull(options, "Options must not be null ");
		Assert.notNull(entityClass, "Entity class must not be null");

		FindAndModifyOptions optionsToUse = FindAndModifyOptions.of(options);

		Optionals.ifAllPresent(query.getCollation(), optionsToUse.getCollation(), (l, r) -> {
			throw new IllegalArgumentException(
					"Both Query and FindAndModifyOptions define a collation; Please provide the collation only via one of the two");
		});

		if (!optionsToUse.getCollation().isPresent()) {
			operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
		}

		return doFindAndModify(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
				query.getFieldsObject(), getMappedSortObject(query, entityClass), entityClass, update, optionsToUse,
				resultConverter);
	}

	@Override
	public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
			Class<T> entityClass, String collectionName) {
		return findAndModify(query, update, options, entityClass, collectionName, QueryResultConverter.entity());
	}

	@Override
	public <S, T> Mono<T> findAndReplace(Query query, S replacement, FindAndReplaceOptions options, Class<S> entityType,
			String collectionName, Class<T> resultType) {
		return findAndReplace(query, replacement, options, entityType, collectionName, resultType,
				QueryResultConverter.entity());
	}

	@SuppressWarnings("NullAway")
	public <S, T, R> Mono<R> findAndReplace(Query query, S replacement, FindAndReplaceOptions options,
			Class<S> entityType, String collectionName, Class<T> resultType,
			QueryResultConverter<? super T, ? extends R> resultConverter) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(replacement, "Replacement must not be null");
		Assert.notNull(options, "Options must not be null Use FindAndReplaceOptions#empty() instead");
		Assert.notNull(entityType, "Entity class must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(resultType, "ResultType must not be null Use Object.class instead");

		Assert.isTrue(query.getLimit() <= 1, "Query must not define a limit other than 1 ore none");
		Assert.isTrue(query.getSkip() <= 0, "Query must not define skip");

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityType);
		QueryContext queryContext = queryOperations.createQueryContext(query);
		EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);

		Document mappedQuery = queryContext.getMappedQuery(entity);
		Document mappedFields = queryContext.getMappedFields(entity, projection);
		Document mappedSort = queryContext.getMappedSort(entity);
		ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);

		return Mono.defer(() -> {

			PersistableEntityModel<S> pem = PersistableEntityModel.of(replacement, collectionName);

			maybeEmitEvent(new BeforeConvertEvent<>(pem.getSource(), pem.getCollection()));

			return maybeCallBeforeConvert(pem.getSource(), pem.getCollection()).map(pem::mutate).flatMap(it -> {
				PersistableEntityModel<S> mapped = it
						.addTargetDocument(operations.forEntity(it.getSource()).toMappedDocument(mongoConverter).getDocument());
				maybeEmitEvent(new BeforeSaveEvent(mapped.getSource(), mapped.getTarget(), mapped.getCollection()));

				return maybeCallBeforeSave(it.getSource(), mapped.getTarget(), mapped.getCollection())
						.map(potentiallyModified -> PersistableEntityModel.of(potentiallyModified, mapped.getTarget(),
								mapped.getCollection()));
			}).flatMap(it -> {

				Mono<R> afterFindAndReplace = doFindAndReplace(it.getCollection(), collectionPreparer, mappedQuery,
						mappedFields, mappedSort, queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(),
						options, projection, resultConverter);
				return afterFindAndReplace.flatMap(saved -> {
					maybeEmitEvent(new AfterSaveEvent<>(saved, it.getTarget(), it.getCollection()));
					return maybeCallAfterSave(saved, it.getTarget(), it.getCollection());
				});
			});
		});
	}

	@Override
	public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass) {
		return findAndRemove(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName) {

		operations.forType(entityClass).getCollation(query);
		return doFindAndRemove(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
				query.getFieldsObject(), getMappedSortObject(query, entityClass),
				operations.forType(entityClass).getCollation(query).orElse(null), entityClass);
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class)
	 */
	@Override
	public Mono<Long> count(Query query, Class<?> entityClass) {

		Assert.notNull(entityClass, "Entity class must not be null");

		return count(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public Mono<Long> count(Query query, String collectionName) {
		return count(query, null, collectionName);
	}

	@Override
	public Mono<Long> count(Query query, @Nullable Class<?> entityClass, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		return createMono(collectionName, collection -> {

			CountContext countContext = queryOperations.countQueryContext(query);
			CountOptions options = countContext.getCountOptions(entityClass);
			Document filter = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(
						String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
			}

			return doCount(collectionName, filter, options);
		});
	}

	/**
	 * Run the actual count operation against the collection with given name.
	 *
	 * @param collectionName the name of the collection to count matching documents in.
	 * @param filter the filter to apply. Must not be {@literal null}.
	 * @param options options to apply. Like collation and the such.
	 * @return
	 */
	protected Mono<Long> doCount(String collectionName, Document filter, CountOptions options) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER
					.debug(String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
		}

		return countExecution.countDocuments(collectionName, filter, options);
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#estimatedCount(java.lang.String)
	 */
	@Override
	public Mono<Long> estimatedCount(String collectionName) {
		return doEstimatedCount(collectionName, new EstimatedDocumentCountOptions());
	}

	protected Mono<Long> doEstimatedCount(String collectionName, EstimatedDocumentCountOptions options) {
		return createMono(collectionName, collection -> collection.estimatedDocumentCount(options));
	}

	@Override
	public Mono<Long> exactCount(Query query, @Nullable Class<?> entityClass, String collectionName) {

		CountContext countContext = queryOperations.countQueryContext(query);

		CountOptions options = countContext.getCountOptions(entityClass);
		Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);

		return doExactCount(collectionName, mappedQuery, options);
	}

	protected Mono<Long> doExactCount(String collectionName, Document filter, CountOptions options) {

		return createMono(collectionName,
				collection -> collection.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
	}

	protected Mono<Boolean> countCanBeEstimated(Document filter, CountOptions options) {

		if (!filter.isEmpty() || !isEmptyOptions(options)) {
			return Mono.just(false);
		}
		return ReactiveMongoDatabaseUtils.isTransactionActive(getMongoDatabaseFactory()).map(it -> !it);
	}

	private boolean isEmptyOptions(CountOptions options) {
		return options.getLimit() <= 0 && options.getSkip() <= 0;
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(reactor.core.publisher.Mono)
	 */
	@Override
	public <T> Mono<T> insert(Mono<? extends T> objectToSave) {

		Assert.notNull(objectToSave, "Mono to insert must not be null");

		return objectToSave.flatMap(this::insert);
	}

	@Override
	public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave, Class<?> entityClass) {
		return insertAll(batchToSave, getCollectionName(entityClass));
	}

	@Override
	public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave, String collectionName) {

		Assert.notNull(batchToSave, "Batch to insert must not be null");

		return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
	}

	@Override
	public <T> Mono<T> insert(T objectToSave) {

		Assert.notNull(objectToSave, "Object to insert must not be null");

		ensureNotCollectionLike(objectToSave);
		return insert(objectToSave, getCollectionName(ClassUtils.getUserClass(objectToSave)));
	}

	@Override
	public <T> Mono<T> insert(T objectToSave, String collectionName) {

		Assert.notNull(objectToSave, "Object to insert must not be null");

		ensureNotCollectionLike(objectToSave);
		return doInsert(collectionName, objectToSave, this.mongoConverter);
	}

	@SuppressWarnings("NullAway")
	protected <T> Mono<T> doInsert(String collectionName, T objectToSave, MongoWriter<Object> writer) {

		return Mono.just(PersistableEntityModel.of(objectToSave, collectionName)) //
				.doOnNext(it -> maybeEmitEvent(new BeforeConvertEvent<>(it.getSource(), it.getCollection()))) //
				.flatMap(it -> maybeCallBeforeConvert(it.getSource(), it.getCollection()).map(it::mutate)) //
				.map(it -> {

					AdaptibleEntity<T> entity = operations.forEntityUpsert(it.getSource(), mongoConverter.getConversionService());
					PersistableEntityModel<T> model = PersistableEntityModel.of(entity.initializeVersionProperty(),
							entity.toMappedDocument(writer).getDocument(), it.getCollection());

					maybeEmitEvent(new BeforeSaveEvent<>(model.getSource(), model.getTarget(), model.getCollection()));
					return model;
				})//
				.flatMap(it -> {
					return maybeCallBeforeSave(it.getSource(), it.getTarget(), it.getCollection()).map(it::mutate);
				}).flatMap(it -> {

					return insertDocument(it.getCollection(), it.getTarget(), it.getSource().getClass()).flatMap(id -> {

						T saved = operations.forEntity(it.getSource(), mongoConverter.getConversionService())
								.populateIdIfNecessary(id);
						maybeEmitEvent(new AfterSaveEvent<>(saved, it.getTarget(), collectionName));
						return maybeCallAfterSave(saved, it.getTarget(), collectionName);
					});
				});
	}

	@Override
	public <T> Flux<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass) {
		return doInsertBatch(getCollectionName(entityClass), batchToSave, this.mongoConverter);
	}

	@Override
	public <T> Flux<T> insert(Collection<? extends T> batchToSave, String collectionName) {
		return doInsertBatch(collectionName, batchToSave, this.mongoConverter);
	}

	@Override
	public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
		return doInsertAll(objectsToSave, this.mongoConverter);
	}

	@Override
	public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
		return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
	}

	@SuppressWarnings("NullAway")
	protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {

		Map<String, List<T>> elementsByCollection = new HashMap<>();

		listToSave.forEach(element -> {

			String collection = getCollectionName(element.getClass());
			List<T> collectionElements = elementsByCollection.computeIfAbsent(collection, k -> new ArrayList<>());

			collectionElements.add(element);
		});

		return Flux.fromIterable(elementsByCollection.keySet())
				.concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
	}

	@SuppressWarnings("NullAway")
	protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
			MongoWriter<Object> writer) {

		Assert.notNull(writer, "MongoWriter must not be null");

		Mono<List<Tuple2<AdaptibleEntity<T>, Document>>> prepareDocuments = Flux.fromIterable(batchToSave)
				.flatMap(uninitialized -> {

					BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
					T toConvert = maybeEmitEvent(event).getSource();

					return maybeCallBeforeConvert(toConvert, collectionName).flatMap(it -> {

						AdaptibleEntity<T> entity = operations.forEntityUpsert(it, mongoConverter.getConversionService());
						T initialized = entity.initializeVersionProperty();
						MappedDocument mapped = entity.toMappedDocument(writer);

						maybeEmitEvent(new BeforeSaveEvent<>(initialized, mapped.getDocument(), collectionName));
						return maybeCallBeforeSave(initialized, mapped.getDocument(), collectionName).map(toSave -> {

							MappedDocument mappedDocument = queryOperations.createInsertContext(mapped)
									.prepareId(uninitialized.getClass());

							return Tuples.of(entity, mappedDocument.getDocument());
						});
					});
				}).collectList();

		Flux<Tuple2<AdaptibleEntity<T>, Document>> insertDocuments = prepareDocuments.flatMapMany(tuples -> {

			List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());

			return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
		});

		return insertDocuments.flatMapSequential(tuple -> {

			Document document = tuple.getT2();
			Object id = MappedDocument.of(document).getId();

			T saved = tuple.getT1().populateIdIfNecessary(id);
			maybeEmitEvent(new AfterSaveEvent<>(saved, document, collectionName));
			return maybeCallAfterSave(saved, document, collectionName);
		});
	}

	@Override
	public <T> Mono<T> save(Mono<? extends T> objectToSave) {

		Assert.notNull(objectToSave, "Mono to save must not be null");

		return objectToSave.flatMap(this::save);
	}

	@Override
	public <T> Mono<T> save(Mono<? extends T> objectToSave, String collectionName) {

		Assert.notNull(objectToSave, "Mono to save must not be null");

		return objectToSave.flatMap(o -> save(o, collectionName));
	}

	@Override
	public <T> Mono<T> save(T objectToSave) {

		Assert.notNull(objectToSave, "Object to save must not be null");
		return save(objectToSave, getCollectionName(ClassUtils.getUserClass(objectToSave)));
	}

	@Override
	public <T> Mono<T> save(T objectToSave, String collectionName) {

		Assert.notNull(objectToSave, "Object to save must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		AdaptibleEntity<T> source = operations.forEntity(objectToSave, mongoConverter.getConversionService());

		return source.isVersionedEntity() ? doSaveVersioned(source, collectionName)
				: doSave(collectionName, objectToSave, this.mongoConverter);
	}

	private <T> Mono<T> doSaveVersioned(AdaptibleEntity<T> source, String collectionName) {

		if (source.isNew()) {
			return doInsert(collectionName, source.getBean(), this.mongoConverter);
		}

		return createMono(collectionName, collection -> {

			// Create query for entity with the id and old version
			Query query = source.getQueryForVersion();

			// Bump version number
			T toSave = source.incrementVersion();

			source.assertUpdateableIdIfNotSet();

			BeforeConvertEvent<T> event = new BeforeConvertEvent<>(toSave, collectionName);
			T afterEvent = maybeEmitEvent(event).getSource();

			return maybeCallBeforeConvert(afterEvent, collectionName).flatMap(toConvert -> {

				Entity<T> tEntity = operations.forEntity(toConvert);
				tEntity.assertUpdateableIdIfNotSet();

				MappedDocument mapped = tEntity.toMappedDocument(mongoConverter);
				Document document = mapped.getDocument();

				maybeEmitEvent(new BeforeSaveEvent<>(toConvert, document, collectionName));
				return maybeCallBeforeSave(toConvert, document, collectionName).flatMap(it -> {

					return doUpdate(collectionName, query, mapped.updateWithoutId(), it.getClass(), false, false)
							.flatMap(result -> {
								maybeEmitEvent(new AfterSaveEvent<T>(it, document, collectionName));
								return maybeCallAfterSave(it, document, collectionName);
							});
				});
			});
		});
	}

	@SuppressWarnings("NullAway")
	protected <T> Mono<T> doSave(String collectionName, T objectToSave, MongoWriter<Object> writer) {

		return createMono(collectionName, collection -> {

			T toSave = maybeEmitEvent(new BeforeConvertEvent<T>(objectToSave, collectionName)).getSource();

			return maybeCallBeforeConvert(toSave, collectionName).flatMap(toConvert -> {

				AdaptibleEntity<T> entity = operations.forEntityUpsert(toConvert, mongoConverter.getConversionService());
				Document dbDoc = entity.toMappedDocument(writer).getDocument();
				maybeEmitEvent(new BeforeSaveEvent<T>(toConvert, dbDoc, collectionName));

				return maybeCallBeforeSave(toConvert, dbDoc, collectionName).flatMap(it -> {

					return saveDocument(collectionName, dbDoc, it.getClass()).flatMap(id -> {

						T saved = entity.populateIdIfNecessary(id);
						maybeEmitEvent(new AfterSaveEvent<>(saved, dbDoc, collectionName));
						return maybeCallAfterSave(saved, dbDoc, collectionName);
					});
				});
			});
		});
	}

	protected Mono<Object> insertDocument(String collectionName, Document dbDoc, Class<?> entityClass) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String
					.format("Inserting Document containing fields: " + dbDoc.keySet() + " in collection: " + collectionName));
		}

		MappedDocument document = MappedDocument.of(dbDoc);
		queryOperations.createInsertContext(document).prepareId(entityClass);

		Flux<InsertOneResult> execute = execute(collectionName, collection -> {

			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT, collectionName, entityClass,
					dbDoc, null);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);

			return collectionToUse.insertOne(document.getDocument());
		});

		return Flux.from(execute).last().map(success -> document.getId());
	}

	protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document> dbDocList) {

		if (dbDocList.isEmpty()) {
			return Flux.empty();
		}

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Inserting list of Documents containing %d items", dbDocList.size()));
		}

		List<Document> documents = new ArrayList<>(dbDocList.size());

		return execute(collectionName, collection -> {

			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT_LIST, collectionName, null,
					null, null);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
			MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);

			documents.addAll(toDocuments(dbDocList));

			return collectionToUse.insertMany(documents);

		}).flatMapSequential(s -> {

			return Flux.fromStream(documents.stream() //
					.map(MappedDocument::of) //
					.filter(it -> it.isIdPresent(ObjectId.class)) //
					.map(it -> it.getId(ObjectId.class)));
		});
	}

	private MongoCollection<Document> prepareCollection(MongoCollection<Document> collection,
			@Nullable WriteConcern writeConcernToUse) {
		MongoCollection<Document> collectionToUse = collection;

		if (writeConcernToUse != null) {
			collectionToUse = collectionToUse.withWriteConcern(writeConcernToUse);
		}
		return collectionToUse;
	}

	@SuppressWarnings("NullAway")
	protected Mono<Object> saveDocument(String collectionName, Document document, Class<?> entityClass) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Saving Document containing fields: %s", document.keySet()));
		}

		return createMono(collectionName, collection -> {

			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.SAVE, collectionName, entityClass,
					document, null);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
			MappedDocument mapped = MappedDocument.of(document);

			MongoCollection<Document> collectionToUse = writeConcernToUse == null //
					? collection //
					: collection.withWriteConcern(writeConcernToUse);

			Publisher<?> publisher;
			if (!mapped.hasId()) {
				publisher = collectionToUse
						.insertOne(queryOperations.createInsertContext(mapped).prepareId(entityClass).getDocument());
			} else {

				MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
				UpdateContext updateContext = queryOperations.replaceSingleContext(mapped, true);
				Document filter = updateContext.getReplacementQuery();
				Document replacement = updateContext.getMappedUpdate(entity);

				Mono<Document> deferredFilter;

				if (updateContext.requiresShardKey(filter, entity)) {
					if (entity.getShardKey().isImmutable()) {
						deferredFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
					} else {
						deferredFilter = Mono
								.from(
										collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
								.defaultIfEmpty(replacement).map(it -> updateContext.applyShardKey(entity, filter, it));
					}
				} else {
					deferredFilter = Mono.just(filter);
				}

				publisher = deferredFilter.flatMapMany(
						it -> collectionToUse.replaceOne(it, replacement, updateContext.getReplaceOptions(entity)));
			}

			return Mono.from(publisher).map(o -> mapped.getId());
		});

	}

	@Override
	public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, Class<?> entityClass) {
		return doUpdate(getCollectionName(entityClass), query, update, entityClass, true, false);
	}

	@Override
	public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, String collectionName) {
		return doUpdate(collectionName, query, update, null, true, false);
	}

	@Override
	public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {
		return doUpdate(collectionName, query, update, entityClass, true, false);
	}

	@Override
	public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, Class<?> entityClass) {
		return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, false);
	}

	@Override
	public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, String collectionName) {
		return doUpdate(collectionName, query, update, null, false, false);
	}

	@Override
	public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, Class<?> entityClass,
			String collectionName) {
		return doUpdate(collectionName, query, update, entityClass, false, false);
	}

	@Override
	public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, Class<?> entityClass) {
		return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, true);
	}

	@Override
	public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, String collectionName) {
		return doUpdate(collectionName, query, update, null, false, true);
	}

	@Override
	public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, Class<?> entityClass,
			String collectionName) {
		return doUpdate(collectionName, query, update, entityClass, false, true);
	}

	@SuppressWarnings("NullAway")
	protected Mono<UpdateResult> doUpdate(String collectionName, Query query, UpdateDefinition update,
			@Nullable Class<?> entityClass, boolean upsert, boolean multi) {

		MongoPersistentEntity<?> entity = entityClass == null ? null : getPersistentEntity(entityClass);

		UpdateContext updateContext = multi ? queryOperations.updateContext(update, query, upsert)
				: queryOperations.updateSingleContext(update, query, upsert);
		updateContext.increaseVersionForUpdateIfNecessary(entity);

		Document queryObj = updateContext.getMappedQuery(entity);
		UpdateOptions updateOptions = updateContext.getUpdateOptions(entity, query);

		Flux<UpdateResult> result;

		if (updateContext.isAggregationUpdate()) {

			List<Document> pipeline = updateContext.getUpdatePipeline(entityClass);
			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass,
					update.getUpdateObject(), queryObj);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			result = execute(collectionName, collection -> {

				if (LOGGER.isDebugEnabled()) {
					LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s",
							serializeToJsonSafely(queryObj), serializeToJsonSafely(pipeline), collectionName));
				}

				collection = writeConcernToUse != null ? collection.withWriteConcern(writeConcernToUse) : collection;

				return multi ? collection.updateMany(queryObj, pipeline, updateOptions)
						: collection.updateOne(queryObj, pipeline, updateOptions);
			});
		} else {

			Document updateObj = updateContext.getMappedUpdate(entity);
			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass,
					updateObj, queryObj);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			result = execute(collectionName, collection -> {

				if (LOGGER.isDebugEnabled()) {
					LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s",
							serializeToJsonSafely(queryObj), serializeToJsonSafely(updateObj), collectionName));
				}

				MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);

				if (!UpdateMapper.isUpdateObject(updateObj)) {

					Document filter = new Document(queryObj);
					Mono<Document> deferredFilter;

					if (updateContext.requiresShardKey(filter, entity)) {
						if (entity.getShardKey().isImmutable()) {
							deferredFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
						} else {
							deferredFilter = Mono.from(
									collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
									.defaultIfEmpty(updateObj).map(it -> updateContext.applyShardKey(entity, filter, it));
						}
					} else {
						deferredFilter = Mono.just(filter);
					}

					com.mongodb.client.model.ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entity);
					return deferredFilter.flatMap(it -> Mono.from(collectionToUse.replaceOne(it, updateObj, replaceOptions)));
				}

				return multi ? collectionToUse.updateMany(queryObj, updateObj, updateOptions)
						: collectionToUse.updateOne(queryObj, updateObj, updateOptions);
			});
		}

		result = result.doOnNext(updateResult -> {

			if (entity != null && entity.hasVersionProperty() && !multi) {
				if (updateResult.wasAcknowledged() && updateResult.getMatchedCount() == 0) {

					Document updateObj = updateContext.getMappedUpdate(entity);
					if (containsVersionProperty(queryObj, entity))
						throw new OptimisticLockingFailureException("Optimistic lock exception on saving entity %s to collection %s"
								.formatted(entity.getName(), collectionName));
				}
			}
		});

		return result.next();
	}

	private boolean containsVersionProperty(Document document, @Nullable MongoPersistentEntity<?> persistentEntity) {

		if (persistentEntity == null || !persistentEntity.hasVersionProperty()) {
			return false;
		}

		return document.containsKey(persistentEntity.getRequiredVersionProperty().getFieldName());
	}

	@Override
	public Mono<DeleteResult> remove(Mono<? extends Object> objectToRemove) {
		return objectToRemove.flatMap(this::remove);
	}

	@Override
	public Mono<DeleteResult> remove(Mono<? extends Object> objectToRemove, String collectionName) {
		return objectToRemove.flatMap(it -> remove(it, collectionName));
	}

	@Override
	public Mono<DeleteResult> remove(Object object) {

		Assert.notNull(object, "Object must not be null");

		return remove(operations.forEntity(object).getRemoveByQuery(), object.getClass());
	}

	@Override
	public Mono<DeleteResult> remove(Object object, String collectionName) {

		Assert.notNull(object, "Object must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		return doRemove(collectionName, operations.forEntity(object).getRemoveByQuery(), object.getClass());
	}

	@Override
	public Mono<DeleteResult> remove(Query query, String collectionName) {
		return remove(query, null, collectionName);
	}

	@Override
	public Mono<DeleteResult> remove(Query query, Class<?> entityClass) {
		return remove(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public Mono<DeleteResult> remove(Query query, @Nullable Class<?> entityClass, String collectionName) {
		return doRemove(collectionName, query, entityClass);
	}

	protected <T> Mono<DeleteResult> doRemove(String collectionName, Query query, @Nullable Class<T> entityClass) {

		if (query == null) {
			throw new InvalidDataAccessApiUsageException("Query passed in to remove can't be null");
		}

		Assert.hasText(collectionName, "Collection name must not be null or empty");

		MongoPersistentEntity<?> entity = getPersistentEntity(entityClass);

		DeleteContext deleteContext = queryOperations.deleteQueryContext(query);
		Document queryObject = deleteContext.getMappedQuery(entity);
		DeleteOptions deleteOptions = deleteContext.getDeleteOptions(entity);
		Document removeQuery = deleteContext.getMappedQuery(entity);
		MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.REMOVE, collectionName, entityClass,
				null, removeQuery);
		WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
		ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);

		return execute(collectionName, collection -> {

			maybeEmitEvent(new BeforeDeleteEvent<>(removeQuery, entityClass, collectionName));

			MongoCollection<Document> collectionToUse = collectionPreparer
					.prepare(prepareCollection(collection, writeConcernToUse));

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Remove using query: %s in collection: %s.", serializeToJsonSafely(removeQuery),
						collectionName));
			}

			if (query.getLimit() > 0 || query.getSkip() > 0) {

				FindPublisher<Document> cursor = new QueryFindPublisherPreparer(query, entityClass)
						.prepare(collection.find(removeQuery)) //
						.projection(MappedDocument.getIdOnlyProjection());

				return Flux.from(cursor) //
						.map(MappedDocument::of) //
						.map(MappedDocument::getId) //
						.collectList() //
						.flatMapMany(val -> {
							return collectionToUse.deleteMany(MappedDocument.getIdIn(val), deleteOptions);
						});
			} else {
				return collectionToUse.deleteMany(removeQuery, deleteOptions);
			}

		}).doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(queryObject, entityClass, collectionName))) //
				.next();
	}

	@Override
	public <T> Flux<T> findAll(Class<T> entityClass) {
		return findAll(entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Flux<T> findAll(Class<T> entityClass, String collectionName) {
		return executeFindMultiInternal(new FindCallback(CollectionPreparer.identity(), null),
				FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
				collectionName);
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> Flux<T> findAllAndRemove(Query query, String collectionName) {
		return (Flux<T>) findAllAndRemove(query, Object.class, collectionName);
	}

	@Override
	public <T> Flux<T> findAllAndRemove(Query query, Class<T> entityClass) {
		return findAllAndRemove(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Flux<T> findAllAndRemove(Query query, Class<T> entityClass, String collectionName) {
		return doFindAndDelete(collectionName, query, entityClass);
	}

	@Override
	public <T> Mono<UpdateResult> replace(Query query, T replacement, ReplaceOptions options, String collectionName) {

		Assert.notNull(replacement, "Replacement must not be null");
		return replace(query, (Class<T>) ClassUtils.getUserClass(replacement), replacement, options, collectionName);
	}

	protected <S, T> Mono<UpdateResult> replace(Query query, Class<S> entityType, T replacement, ReplaceOptions options,
			String collectionName) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityType);
		UpdateContext updateContext = queryOperations.replaceSingleContext(query,
				operations.forEntity(replacement).toMappedDocument(this.mongoConverter), options.isUpsert());

		return createMono(collectionName, collection -> {

			Document mappedUpdate = updateContext.getMappedUpdate(entity);

			MongoAction action = new MongoAction(writeConcern, MongoActionOperation.REPLACE, collectionName, entityType,
					mappedUpdate, updateContext.getQueryObject());

			MongoCollection<Document> collectionToUse = createCollectionPreparer(query, action).prepare(collection);

			return collectionToUse.replaceOne(updateContext.getMappedQuery(entity), mappedUpdate,
					updateContext.getReplaceOptions(entity, it -> {
						it.upsert(options.isUpsert());
					}));
		});
	}

	@Override
	public <T> Flux<T> tail(Query query, Class<T> entityClass) {
		return tail(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> Flux<T> tail(@Nullable Query query, Class<T> entityClass, String collectionName) {

		if (query == null) {

			LOGGER.debug(String.format("Tail for class: %s in collection: %s", entityClass, collectionName));

			return executeFindMultiInternal(
					collection -> new FindCallback(CollectionPreparer.identity(), null).doInCollection(collection)
							.cursorType(CursorType.TailableAwait),
					FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
					collectionName);
		}

		ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
		return doFind(collectionName, collectionPreparer, query.getQueryObject(), query.getFieldsObject(), entityClass,
				new TailingQueryFindPublisherPreparer(query, entityClass));
	}

	@Override
	public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @Nullable String collectionName,
			ChangeStreamOptions options, Class<T> targetType) {

		List<Document> filter = prepareFilter(options);
		FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
				: FullDocument.UPDATE_LOOKUP;

		return ReactiveMongoDatabaseUtils.getDatabase(database, mongoDatabaseFactory) //
				.map(db -> {
					ChangeStreamPublisher<Document> publisher;
					if (StringUtils.hasText(collectionName)) {
						publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class)
								: db.getCollection(collectionName).watch(filter, Document.class);

					} else {
						publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
					}

					if (options.isResumeAfter()) {
						publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter)
								.orElse(publisher);
					} else if (options.isStartAfter()) {
						publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter)
								.orElse(publisher);
					}
					publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
							.orElse(publisher);
					publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
					publisher = options.getShowExpandedEvents().map(publisher::showExpandedEvents).orElse(publisher);

					if (options.getFullDocumentBeforeChangeLookup().isPresent()) {
						publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());
					}
					return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
				}) //
				.flatMapMany(publisher -> Flux.from(publisher)
						.map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())));
	}

	List<Document> prepareFilter(ChangeStreamOptions options) {

		Object filter = options.getFilter().orElse(Collections.emptyList());

		if (filter instanceof Aggregation agg) {
			AggregationOperationContext context = agg instanceof TypedAggregation typedAggregation
					? new TypeBasedAggregationOperationContext(typedAggregation.getInputType(),
							getConverter().getMappingContext(), queryMapper)
					: new TypeBasedAggregationOperationContext(Object.class, mappingContext, queryMapper,
							FieldLookupPolicy.relaxed());
			return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
					Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns")));
		}

		if (filter instanceof List) {
			return (List<Document>) filter;
		}

		throw new IllegalArgumentException(
				"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
	}

	@Override
	public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String mapFunction,
			String reduceFunction, MapReduceOptions options) {

		return mapReduce(filterQuery, domainType, getCollectionName(domainType), resultType, mapFunction, reduceFunction,
				options);
	}

	@Override
	public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType,
			String mapFunction, String reduceFunction, MapReduceOptions options) {

		Assert.notNull(filterQuery, "Filter query must not be null");
		Assert.notNull(domainType, "Domain type must not be null");
		Assert.hasText(inputCollectionName, "Input collection name must not be null or empty");
		Assert.notNull(resultType, "Result type must not be null");
		Assert.notNull(mapFunction, "Map function must not be null");
		Assert.notNull(reduceFunction, "Reduce function must not be null");
		Assert.notNull(options, "MapReduceOptions must not be null");

		assertLocalFunctionNames(mapFunction, reduceFunction);

		ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(filterQuery);
		return createFlux(inputCollectionName, collection -> {

			Document mappedQuery = queryMapper.getMappedObject(filterQuery.getQueryObject(),
					mappingContext.getPersistentEntity(domainType));

			MapReducePublisher<Document> publisher = collectionPreparer.prepare(collection).mapReduce(mapFunction,
					reduceFunction, Document.class);

			publisher.filter(mappedQuery);

			Document mappedSort = getMappedSortObject(filterQuery, domainType);
			if (mappedSort != null && !mappedSort.isEmpty()) {
				publisher.sort(mappedSort);
			}

			Meta meta = filterQuery.getMeta();
			if (meta.hasMaxTime()) {
				publisher.maxTime(meta.getRequiredMaxTimeMsec(), TimeUnit.MILLISECONDS);
			}

			if (filterQuery.getLimit() > 0 || (options.getLimit() != null)) {

				if (filterQuery.getLimit() > 0 && (options.getLimit() != null)) {
					throw new IllegalArgumentException(
							"Both Query and MapReduceOptions define a limit; Please provide the limit only via one of the two.");
				}

				if (filterQuery.getLimit() > 0) {
					publisher.limit(filterQuery.getLimit());
				}

				if (options.getLimit() != null) {
					publisher.limit(options.getLimit());
				}
			}

			Optional<Collation> collation = filterQuery.getCollation();

			Optionals.ifAllPresent(filterQuery.getCollation(), options.getCollation(), (l, r) -> {
				throw new IllegalArgumentException(
						"Both Query and MapReduceOptions define a collation; Please provide the collation only via one of the two.");
			});

			if (options.getCollation().isPresent()) {
				collation = options.getCollation();
			}

			if (!CollectionUtils.isEmpty(options.getScopeVariables())) {
				publisher = publisher.scope(new Document(options.getScopeVariables()));
			}

			if (options.getLimit() != null && options.getLimit() > 0) {
				publisher = publisher.limit(options.getLimit());
			}

			if (options.getFinalizeFunction().filter(StringUtils::hasText).isPresent()) {
				publisher = publisher.finalizeFunction(options.getFinalizeFunction().get());
			}

			if (options.getJavaScriptMode() != null) {
				publisher = publisher.jsMode(options.getJavaScriptMode());
			}

			if (StringUtils.hasText(options.getOutputCollection()) && !options.usesInlineOutput()) {
				publisher = publisher.collectionName(options.getOutputCollection()).action(options.getMapReduceAction());

				if (options.getOutputDatabase().isPresent()) {
					publisher = publisher.databaseName(options.getOutputDatabase().get());
				}
			}

			publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);

			return Flux.from(publisher)
					.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
		});
	}

	private static void assertLocalFunctionNames(String... functions) {

		for (String function : functions) {

			if (ResourceUtils.isUrl(function)) {

				throw new IllegalArgumentException(String.format(
						"Blocking accessing to resource %s is not allowed using reactive infrastructure; You may load the resource at startup and cache its value.",
						function));
			}
		}
	}

	@Override
	public <T> ReactiveFind<T> query(Class<T> domainType) {
		return new ReactiveFindOperationSupport(this).query(domainType);
	}

	@Override
	public <T> ReactiveUpdate<T> update(Class<T> domainType) {
		return new ReactiveUpdateOperationSupport(this).update(domainType);
	}

	@Override
	public <T> ReactiveRemove<T> remove(Class<T> domainType) {
		return new ReactiveRemoveOperationSupport(this).remove(domainType);
	}

	@Override
	public <T> ReactiveInsert<T> insert(Class<T> domainType) {
		return new ReactiveInsertOperationSupport(this).insert(domainType);
	}

	@Override
	public <T> ReactiveAggregation<T> aggregateAndReturn(Class<T> domainType) {
		return new ReactiveAggregationOperationSupport(this).aggregateAndReturn(domainType);
	}

	@Override
	public <T> ReactiveMapReduce<T> mapReduce(Class<T> domainType) {
		return new ReactiveMapReduceOperationSupport(this).mapReduce(domainType);
	}

	@Override
	public <T> ReactiveChangeStream<T> changeStream(Class<T> domainType) {
		return new ReactiveChangeStreamOperationSupport(this).changeStream(domainType);
	}

	/**
	 * Retrieve and remove all documents matching the given {@code query} by calling {@link #find(Query, Class, String)}
	 * and {@link #remove(Query, Class, String)}, whereas the {@link Query} for {@link #remove(Query, Class, String)} is
	 * constructed out of the find result.
	 *
	 * @param collectionName
	 * @param query
	 * @param entityClass
	 * @return
	 */
	protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<T> entityClass) {

		Flux<T> flux = find(query, entityClass, collectionName);

		return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
				.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
						.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
	}

	@SuppressWarnings({ "rawtypes", "unchecked", "NullAway" })
	<S, T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<S> entityClass,
			QueryResultConverter<? super S, ? extends T> resultConverter) {

		List<Object> ids = new ArrayList<>();
		ProjectingReadCallback readCallback = new ProjectingReadCallback(getConverter(),
				EntityProjection.nonProjecting(entityClass), collectionName);

		QueryResultConverterCallback<S, T> callback = new QueryResultConverterCallback<>(resultConverter, readCallback) {

			@Override
			public Mono<T> doWith(Document object) {
				ids.add(object.get("_id"));
				return super.doWith(object);
			}
		};

		Flux<T> flux = doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
				query.getFieldsObject(), entityClass,
				new QueryFindPublisherPreparer(query, query.getSortObject(), query.getLimit(), query.getSkip(), entityClass),
				callback);

		return Flux.from(flux).collectList().filter(it -> !it.isEmpty()).flatMapMany(list -> {

			Criteria[] criterias = ids.stream() //
					.map(it -> Criteria.where("_id").is(it)) //
					.toArray(Criteria[]::new);

			Query removeQuery = new Query(criterias.length == 1 ? criterias[0] : new Criteria().orOperator(criterias));
			if (query.hasReadPreference()) {
				removeQuery.withReadPreference(query.getReadPreference());
			}

			return Flux.from(remove(removeQuery, entityClass, collectionName))
					.flatMapSequential(deleteResult -> Flux.fromIterable(list));
		});
	}

	/**
	 * Create the specified collection using the provided options
	 *
	 * @param collectionName
	 * @param collectionOptions
	 * @return the collection that was created
	 */
	protected Mono<MongoCollection<Document>> doCreateCollection(String collectionName,
			CreateCollectionOptions collectionOptions) {

		return createMono(db -> db.createCollection(collectionName, collectionOptions)).doOnSuccess(it -> {

			// TODO: Emit a collection created event
			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Created collection [%s]", collectionName));
			}

		}).then(getCollection(collectionName));
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
	 * The query document is specified as a standard {@link Document} and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param fields the document that specifies the fields to be returned.
	 * @param entityClass the parameterized type of the returned list.
	 * @param collation can be {@literal null}.
	 * @return the {@link List} of converted objects.
	 */
	protected <T> Mono<T> doFindOne(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
			Class<T> entityClass, @Nullable Collation collation) {

		return doFindOne(collectionName, collectionPreparer, query, fields, entityClass,
				findPublisher -> collation != null ? findPublisher.collation(collation.toMongoCollation()) : findPublisher);
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
	 * The query document is specified as a standard {@link Document} and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param fields the document that specifies the fields to be returned.
	 * @param entityClass the parameterized type of the returned list.
	 * @param preparer the preparer modifying collection and publisher to fit the needs.
	 * @return the {@link List} of converted objects.
	 * @since 2.2
	 */
	protected <T> Mono<T> doFindOne(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
			Class<T> entityClass, FindPublisherPreparer preparer) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		QueryContext queryContext = queryOperations
				.createQueryContext(new BasicQuery(query, fields != null ? fields : new Document()));
		Document mappedFields = queryContext.getMappedFields(entity, EntityProjection.nonProjecting(entityClass));
		Document mappedQuery = queryContext.getMappedQuery(entity);

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("findOne using query: %s fields: %s for class: %s in collection: %s",
					serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
		}

		return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
				new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to a List using the template's converter. The
	 * query document is specified as a standard Document and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record
	 * @param fields the document that specifies the fields to be returned
	 * @param entityClass the parameterized type of the returned list.
	 * @return the List of converted objects.
	 */
	protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, Class<T> entityClass) {
		return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
				new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to a List of the specified type. The object is
	 * converted from the MongoDB native representation using an instance of {@see MongoConverter}. The query document is
	 * specified as a standard Document and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param fields the document that specifies the fields to be returned.
	 * @param entityClass the parameterized type of the returned list.
	 * @param preparer allows for customization of the {@link com.mongodb.client.FindIterable} used when iterating over
	 *          the result set, (apply limits, skips and so on).
	 * @return the {@link List} of converted objects.
	 */
	protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
		return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
				new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
	}

	protected <S, T> Flux<T> doFind(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
			Class<S> entityClass, @Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
		Document mappedFields = queryContext.getMappedFields(entity, EntityProjection.nonProjecting(entityClass));
		Document mappedQuery = queryContext.getMappedQuery(entity);

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("find using query: %s fields: %s for class: %s in collection: %s",
					serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
		}

		return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields),
				preparer != null ? preparer : FindPublisherPreparer.NO_OP_PREPARER, objectCallback, collectionName);
	}

	CollectionPreparer<MongoCollection<Document>> createCollectionPreparer(Query query) {
		return ReactiveCollectionPreparerDelegate.of(query);
	}

	CollectionPreparer<MongoCollection<Document>> createCollectionPreparer(Query query, @Nullable MongoAction action) {
		CollectionPreparer<MongoCollection<Document>> collectionPreparer = createCollectionPreparer(query);
		if (action == null) {
			return collectionPreparer;
		}
		return collectionPreparer.andThen(collection -> {
			WriteConcern writeConcern = prepareWriteConcern(action);
			return writeConcern != null ? collection.withWriteConcern(writeConcern) : collection;
		});
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to a List of the specified targetClass while
	 * using sourceClass for mapping the query.
	 *
	 * @since 2.0
	 */
	<T, R> Flux<R> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, Class<?> sourceClass, Class<T> targetClass,
			QueryResultConverter<? super T, ? extends R> resultConverter, FindPublisherPreparer preparer) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
		EntityProjection<T, ?> projection = operations.introspectProjection(targetClass, sourceClass);

		QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
		Document mappedFields = queryContext.getMappedFields(entity, projection);
		Document mappedQuery = queryContext.getMappedQuery(entity);

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("find using query: %s fields: %s for class: %s in collection: %s",
					serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName));
		}

		return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields), preparer,
				getResultReader(projection, collectionName, resultConverter), collectionName);
	}

	protected CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions) {
		return convertToCreateCollectionOptions(collectionOptions, Object.class);
	}

	protected CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions,
			Class<?> entityType) {
		return operations.convertToCreateCollectionOptions(collectionOptions, entityType);
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
	 * The first document that matches the query is returned and also removed from the collection in the database. <br />
	 * The query document is specified as a standard Document and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param collation collation.
	 * @param entityClass the parameterized type of the returned list.
	 * @return the List of converted objects.
	 */
	protected <T> Mono<T> doFindAndRemove(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
			@Nullable Document sort, @Nullable Collation collation, Class<T> entityClass) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("findAndRemove using query: %s fields: %s sort: %s for class: %s in collection: %s",
					serializeToJsonSafely(query), fields, serializeToJsonSafely(sort), entityClass, collectionName));
		}

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
				queryMapper.getMappedObject(query, entity), fields, sort, collation),
				new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
	}

	<S, T> Mono<T> doFindAndModify(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
			@Nullable Document sort, Class<S> entityClass, UpdateDefinition update, FindAndModifyOptions options,
			QueryResultConverter<? super S, ? extends T> resultConverter) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
		UpdateContext updateContext = queryOperations.updateSingleContext(update, query, false);
		updateContext.increaseVersionForUpdateIfNecessary(entity);

		return Mono.defer(() -> {

			Document mappedQuery = updateContext.getMappedQuery(entity);
			Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(entityClass)
					: updateContext.getMappedUpdate(entity);

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format(
						"findAndModify using query: %s fields: %s sort: %s for class: %s and update: %s " + "in collection: %s",
						serializeToJsonSafely(mappedQuery), fields, serializeToJsonSafely(sort), entityClass,
						serializeToJsonSafely(mappedUpdate), collectionName));
			}

			EntityProjection<S, ?> projection = EntityProjection.nonProjecting(entityClass);
			DocumentCallback<T> callback = getResultReader(projection, collectionName, resultConverter);

			return executeFindOneInternal(
					new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
							update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
					callback, collectionName);
		});
	}

	/**
	 * Customize this part for findAndReplace.
	 *
	 * @param collectionName The name of the collection to perform the operation in.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param mappedQuery the query to look up documents.
	 * @param mappedFields the fields to project the result to.
	 * @param mappedSort the sort to be applied when executing the query.
	 * @param collation collation settings for the query. Can be {@literal null}.
	 * @param entityType the source domain type.
	 * @param replacement the replacement {@link Document}.
	 * @param options applicable options.
	 * @param resultType the target domain type.
	 * @return {@link Mono#empty()} if object does not exist, {@link FindAndReplaceOptions#isReturnNew() return new} is
	 *         {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
	 * @since 2.1
	 */
	protected <T> Mono<T> doFindAndReplace(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
			Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
			FindAndReplaceOptions options, Class<T> resultType) {

		EntityProjection<T, ?> projection = operations.introspectProjection(resultType, entityType);

		return doFindAndReplace(collectionName, collectionPreparer, mappedQuery, mappedFields, mappedSort, collation,
				entityType, replacement, options, projection, QueryResultConverter.entity());
	}

	/**
	 * Customize this part for findAndReplace.
	 *
	 * @param collectionName The name of the collection to perform the operation in.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param mappedQuery the query to look up documents.
	 * @param mappedFields the fields to project the result to.
	 * @param mappedSort the sort to be applied when executing the query.
	 * @param collation collation settings for the query. Can be {@literal null}.
	 * @param entityType the source domain type.
	 * @param replacement the replacement {@link Document}.
	 * @param options applicable options.
	 * @param projection the projection descriptor.
	 * @return {@link Mono#empty()} if object does not exist, {@link FindAndReplaceOptions#isReturnNew() return new} is
	 *         {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
	 * @since 3.4
	 */
	private <S, T> Mono<T> doFindAndReplace(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
			Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
			FindAndReplaceOptions options, EntityProjection<S, ?> projection,
			QueryResultConverter<? super S, ? extends T> resultConverter) {

		return Mono.defer(() -> {

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format(
						"findAndReplace using query: %s fields: %s sort: %s for class: %s and replacement: %s "
								+ "in collection: %s",
						serializeToJsonSafely(mappedQuery), mappedFields, serializeToJsonSafely(mappedSort), entityType,
						serializeToJsonSafely(replacement), collectionName));
			}

			DocumentCallback<T> resultReader = getResultReader(projection, collectionName, resultConverter);

			return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields,
					mappedSort, replacement, collation, options), resultReader, collectionName);

		});
	}

	protected <E extends MongoMappingEvent<T>, T> E maybeEmitEvent(E event) {
		eventDelegate.publishEvent(event);
		return event;
	}

	protected <T> Mono<T> maybeCallBeforeConvert(T object, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, object, collection);
		}

		return Mono.just(object);
	}

	protected <T> Mono<T> maybeCallBeforeSave(T object, Document document, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(ReactiveBeforeSaveCallback.class, object, document, collection);
		}

		return Mono.just(object);
	}

	protected <T> Mono<T> maybeCallAfterSave(T object, Document document, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(ReactiveAfterSaveCallback.class, object, document, collection);
		}

		return Mono.just(object);
	}

	protected <T> Mono<T> maybeCallAfterConvert(T object, Document document, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(ReactiveAfterConvertCallback.class, object, document, collection);
		}

		return Mono.just(object);
	}

	private MongoCollection<Document> getAndPrepareCollection(MongoDatabase db, String collectionName) {

		try {
			MongoCollection<Document> collection = db.getCollection(collectionName, Document.class);
			return prepareCollection(collection);
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	/**
	 * Ensure the given {@literal source} is not an {@link java.lang.reflect.Array}, {@link Collection} or
	 * {@link Iterator}.
	 *
	 * @param source can be {@literal null}.
	 * @since 3.2.
	 */
	protected void ensureNotCollectionLike(@Nullable Object source) {

		if (EntityOperations.isCollectionLike(source) || source instanceof Publisher) {
			throw new IllegalArgumentException("Cannot use a collection here.");
		}
	}

	/**
	 * Prepare the collection before any processing is done using it. This allows a convenient way to apply settings like
	 * withCodecRegistry() etc. Can be overridden in subclasses.
	 *
	 * @param collection
	 */
	protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {

		if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
			return collection.withReadPreference(readPreference);
		}

		return collection;
	}

	/**
	 * @param database
	 * @return
	 * @since 2.1
	 */
	protected MongoDatabase prepareDatabase(MongoDatabase database) {
		return database;
	}

	/**
	 * Prepare the WriteConcern before any processing is done using it. This allows a convenient way to apply custom
	 * settings in subclasses. The returned {@link WriteConcern} will be defaulted to {@link WriteConcern#ACKNOWLEDGED}
	 * when {@link WriteResultChecking} is set to {@link WriteResultChecking#EXCEPTION}.
	 *
	 * @param mongoAction any WriteConcern already configured or {@literal null}.
	 * @return The prepared WriteConcern or {@literal null}.
	 * @see #setWriteConcern(WriteConcern)
	 * @see #setWriteConcernResolver(WriteConcernResolver)
	 */
	protected @Nullable WriteConcern prepareWriteConcern(MongoAction mongoAction) {

		WriteConcern wc = writeConcernResolver.resolve(mongoAction);
		return potentiallyForceAcknowledgedWrite(wc);
	}

	/**
	 * @return the {@link MongoDatabaseFactory} in use.
	 * @since 3.1.4
	 */
	public ReactiveMongoDatabaseFactory getMongoDatabaseFactory() {
		return mongoDatabaseFactory;
	}

	@Nullable
	private WriteConcern potentiallyForceAcknowledgedWrite(@Nullable WriteConcern wc) {

		if (ObjectUtils.nullSafeEquals(WriteResultChecking.EXCEPTION, writeResultChecking)) {
			if (wc == null || wc.getWObject() == null
					|| (wc.getWObject() instanceof Number concern && concern.intValue() < 1)) {
				return WriteConcern.ACKNOWLEDGED;
			}
		}
		return wc;
	}

	/**
	 * Internal method using callbacks to do queries against the datastore that requires reading a single object from a
	 * collection of objects. It will take the following steps
	 * <ol>
	 * <li>Execute the given {@link ReactiveCollectionCallback} for a {@link Document}.</li>
	 * <li>Apply the given {@link DocumentCallback} to each of the {@link Document}s to obtain the result.</li>
	 * <ol>
	 *
	 * @param collectionCallback the callback to retrieve the {@link Document}
	 * @param objectCallback the {@link DocumentCallback} to transform {@link Document}s into the actual domain type
	 * @param collectionName the collection to be queried
	 * @return
	 */
	private <T> Mono<T> executeFindOneInternal(ReactiveCollectionCallback<Document> collectionCallback,
			DocumentCallback<T> objectCallback, String collectionName) {

		return createMono(collectionName,
				collection -> Mono.from(collectionCallback.doInCollection(collection)).flatMap(objectCallback::doWith));
	}

	/**
	 * Internal method using callback to do queries against the datastore that requires reading a collection of objects.
	 * It will take the following steps
	 * <ol>
	 * <li>Execute the given {@link ReactiveCollectionCallback} for a {@link FindPublisher}.</li>
	 * <li>Prepare that {@link FindPublisher} with the given {@link FindPublisherPreparer} (will be skipped if
	 * {@link FindPublisherPreparer} is {@literal null}</li>
	 * <li>Apply the given {@link DocumentCallback} in {@link Flux#map(Function)} of {@link FindPublisher}</li>
	 * <ol>
	 *
	 * @param collectionCallback the callback to retrieve the {@link FindPublisher} with, must not be {@literal null}.
	 * @param preparer the {@link FindPublisherPreparer} to potentially modify the {@link FindPublisher} before iterating
	 *          over it, may be {@literal null}.
	 * @param objectCallback the {@link DocumentCallback} to transform {@link Document}s into the actual domain type, must
	 *          not be {@literal null}.
	 * @param collectionName the collection to be queried, must not be {@literal null}.
	 * @return
	 */
	private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Document> collectionCallback,
			FindPublisherPreparer preparer, DocumentCallback<T> objectCallback, String collectionName) {

		return createFlux(collectionName, collection -> {
			return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
					.flatMapSequential(objectCallback::doWith);
		});
	}

	@SuppressWarnings("unchecked")
	private <T, R> DocumentCallback<R> getResultReader(EntityProjection<T, ?> projection, String collectionName,
			QueryResultConverter<? super T, ? extends R> resultConverter) {

		DocumentCallback<T> readCallback = new ProjectingReadCallback<>(mongoConverter, projection, collectionName);

		return resultConverter == QueryResultConverter.entity() ? (DocumentCallback<R>) readCallback
				: new QueryResultConverterCallback<>(resultConverter, readCallback);
	}

	/**
	 * Exception translation {@link Function} intended for {@link Flux#onErrorMap(Function)} usage.
	 *
	 * @return the exception translation {@link Function}
	 */
	private Function<Throwable, Throwable> translateException() {

		return throwable -> {

			if (throwable instanceof RuntimeException runtimeException) {
				return potentiallyConvertRuntimeException(runtimeException, exceptionTranslator);
			}

			return throwable;
		};
	}

	/**
	 * Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
	 * exception if the conversation failed. Thus allows safe re-throwing of the return value.
	 *
	 * @param ex the exception to translate
	 * @param exceptionTranslator the {@link PersistenceExceptionTranslator} to be used for translation
	 * @return
	 */
	private static RuntimeException potentiallyConvertRuntimeException(RuntimeException ex,
			PersistenceExceptionTranslator exceptionTranslator) {
		RuntimeException resolved = exceptionTranslator.translateExceptionIfPossible(ex);
		return resolved == null ? ex : resolved;
	}

	@Nullable
	MongoPersistentEntity<?> getPersistentEntity(@Nullable Class<?> type) {
		return type == null ? null : mappingContext.getPersistentEntity(type);
	}

	private MappingMongoConverter getDefaultMongoConverter() {

		MongoCustomConversions conversions = new MongoCustomConversions(Collections.emptyList());

		MongoMappingContext context = new MongoMappingContext();
		context.setSimpleTypeHolder(conversions.getSimpleTypeHolder());
		context.afterPropertiesSet();

		MappingMongoConverter converter = new MappingMongoConverter(NO_OP_REF_RESOLVER, context);
		converter.setCustomConversions(conversions);
		converter.setCodecRegistryProvider(this.mongoDatabaseFactory);
		converter.afterPropertiesSet();

		return converter;
	}

	@Contract("null, _ -> null")
	private @Nullable Document getMappedSortObject(@Nullable Query query, Class<?> type) {

		if (query == null) {
			return null;
		}

		return getMappedSortObject(query.getSortObject(), type);
	}

	@Contract("null, _ -> null")
	private @Nullable Document getMappedSortObject(@Nullable Document sortObject, Class<?> type) {

		if (ObjectUtils.isEmpty(sortObject)) {
			return null;
		}

		return queryMapper.getMappedSort(sortObject, mappingContext.getPersistentEntity(type));
	}

	// Callback implementations

	/**
	 * Simple {@link ReactiveCollectionCallback} that takes a query {@link Document} plus an optional fields specification
	 * {@link Document} and executes that against the {@link MongoCollection}.
	 *
	 * @author Oliver Gierke
	 * @author Thomas Risberg
	 * @author Christoph Strobl
	 */
	private static class FindOneCallback implements ReactiveCollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Optional<Document> fields;
		private final FindPublisherPreparer preparer;

		FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				@Nullable Document fields, FindPublisherPreparer preparer) {
			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = Optional.ofNullable(fields);
			this.preparer = preparer;
		}

		@Override
		public Publisher<Document> doInCollection(MongoCollection<Document> collection)
				throws MongoException, DataAccessException {

			FindPublisher<Document> publisher = preparer.initiateFind(collectionPreparer.prepare(collection),
					col -> col.find(query, Document.class));

			if (fields.isPresent()) {
				publisher = publisher.projection(fields.get());
			}

			return publisher.limit(1).first();
		}
	}

	/**
	 * Simple {@link ReactiveCollectionQueryCallback} that takes a query {@link Document} plus an optional fields
	 * specification {@link Document} and executes that against the {@link MongoCollection}.
	 *
	 * @author Mark Paluch
	 */
	private static class FindCallback implements ReactiveCollectionQueryCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;

		private final @Nullable Document query;
		private final @Nullable Document fields;

		FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, @Nullable Document query) {
			this(collectionPreparer, query, null);
		}

		FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, @Nullable Document query,
				@Nullable Document fields) {
			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
		}

		@Override
		public FindPublisher<Document> doInCollection(MongoCollection<Document> collection) {

			MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
			FindPublisher<Document> findPublisher;
			if (ObjectUtils.isEmpty(query)) {
				findPublisher = collectionToUse.find(Document.class);
			} else {
				findPublisher = collectionToUse.find(query, Document.class);
			}

			if (ObjectUtils.isEmpty(fields)) {
				return findPublisher;
			} else {
				return findPublisher.projection(fields);
			}
		}
	}

	/**
	 * Simple {@link ReactiveCollectionCallback} that takes a query {@link Document} plus an optional fields specification
	 * {@link Document} and executes that against the {@link MongoCollection}.
	 *
	 * @author Mark Paluch
	 */
	private static class FindAndRemoveCallback implements ReactiveCollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Document fields;
		private final @Nullable Document sort;
		private final Optional<Collation> collation;

		FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				Document fields, @Nullable Document sort, @Nullable Collation collation) {
			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
			this.sort = sort;
			this.collation = Optional.ofNullable(collation);
		}

		@Override
		public Publisher<Document> doInCollection(MongoCollection<Document> collection)
				throws MongoException, DataAccessException {

			FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
			collation.map(Collation::toMongoCollation).ifPresent(findOneAndDeleteOptions::collation);

			return collectionPreparer.prepare(collection).findOneAndDelete(query, findOneAndDeleteOptions);
		}
	}

	/**
	 * @author Mark Paluch
	 */
	private static class FindAndModifyCallback implements ReactiveCollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final @Nullable Document fields;
		private final @Nullable Document sort;
		private final Object update;
		private final List<Document> arrayFilters;
		private final FindAndModifyOptions options;

		FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				@Nullable Document fields, @Nullable Document sort, Object update, List<Document> arrayFilters,
				FindAndModifyOptions options) {

			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
			this.sort = sort;
			this.update = update;
			this.arrayFilters = arrayFilters;
			this.options = options;
		}

		@Override
		public Publisher<Document> doInCollection(MongoCollection<Document> collection)
				throws MongoException, DataAccessException {

			MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
			if (options.isRemove()) {
				FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);

				findOneAndDeleteOptions = options.getCollation().map(Collation::toMongoCollation)
						.map(findOneAndDeleteOptions::collation).orElse(findOneAndDeleteOptions);

				return collectionToUse.findOneAndDelete(query, findOneAndDeleteOptions);
			}

			FindOneAndUpdateOptions findOneAndUpdateOptions = convertToFindOneAndUpdateOptions(options, fields, sort,
					arrayFilters);
			if (update instanceof Document document) {
				return collection.findOneAndUpdate(query, document, findOneAndUpdateOptions);
			} else if (update instanceof List) {
				return collectionToUse.findOneAndUpdate(query, (List<Document>) update, findOneAndUpdateOptions);
			}

			return Flux
					.error(new IllegalArgumentException(String.format("Using %s is not supported in findOneAndUpdate", update)));
		}

		private static FindOneAndUpdateOptions convertToFindOneAndUpdateOptions(FindAndModifyOptions options,
				@Nullable Document fields, @Nullable Document sort, List<Document> arrayFilters) {

			FindOneAndUpdateOptions result = new FindOneAndUpdateOptions();

			result = result.projection(fields).sort(sort).upsert(options.isUpsert());

			if (options.isReturnNew()) {
				result = result.returnDocument(ReturnDocument.AFTER);
			} else {
				result = result.returnDocument(ReturnDocument.BEFORE);
			}

			result = options.getCollation().map(Collation::toMongoCollation).map(result::collation).orElse(result);

			if (!CollectionUtils.isEmpty(arrayFilters)) {
				result.arrayFilters(arrayFilters);
			}

			return result;
		}
	}

	/**
	 * {@link ReactiveCollectionCallback} specific for find and remove operation.
	 *
	 * @author Mark Paluch
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	private static class FindAndReplaceCallback implements ReactiveCollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Document fields;
		private final Document sort;
		private final Document update;
		private final com.mongodb.client.model.@Nullable Collation collation;
		private final FindAndReplaceOptions options;

		FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				Document fields, Document sort, Document update, com.mongodb.client.model.@Nullable Collation collation,
				FindAndReplaceOptions options) {
			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
			this.sort = sort;
			this.update = update;
			this.collation = collation;
			this.options = options;
		}

		@Override
		public Publisher<Document> doInCollection(MongoCollection<Document> collection)
				throws MongoException, DataAccessException {

			FindOneAndReplaceOptions findOneAndReplaceOptions = convertToFindOneAndReplaceOptions(options, fields, sort);
			return collectionPreparer.prepare(collection).findOneAndReplace(query, update, findOneAndReplaceOptions);
		}

		private FindOneAndReplaceOptions convertToFindOneAndReplaceOptions(FindAndReplaceOptions options, Document fields,
				Document sort) {

			FindOneAndReplaceOptions result = new FindOneAndReplaceOptions().collation(collation);

			result = result.projection(fields).sort(sort).upsert(options.isUpsert());

			if (options.isReturnNew()) {
				result = result.returnDocument(ReturnDocument.AFTER);
			} else {
				result = result.returnDocument(ReturnDocument.BEFORE);
			}

			return result;
		}
	}

	private static FindOneAndDeleteOptions convertToFindOneAndDeleteOptions(@Nullable Document fields,
			@Nullable Document sort) {

		FindOneAndDeleteOptions result = new FindOneAndDeleteOptions();
		result = result.projection(fields).sort(sort);

		return result;
	}

	/**
	 * Simple internal callback to allow operations on a {@link Document}.
	 *
	 * @author Mark Paluch
	 */

	interface DocumentCallback<T> {

		Mono<T> doWith(Document object);
	}

	/**
	 * Simple internal callback to allow operations on a {@link MongoDatabase}.
	 *
	 * @author Mark Paluch
	 */

	interface MongoDatabaseCallback<T> {

		T doInDatabase(MongoDatabase db);
	}

	/**
	 * Simple internal callback to allow operations on a {@link MongoDatabase}.
	 *
	 * @author Mark Paluch
	 */
	interface ReactiveCollectionQueryCallback<T> extends ReactiveCollectionCallback<T> {

		@Override
		FindPublisher<T> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException;
	}

	static class QueryResultConverterCallback<T, R> implements DocumentCallback<R> {

		private final QueryResultConverter<? super T, ? extends R> converter;
		private final DocumentCallback<T> delegate;

		QueryResultConverterCallback(QueryResultConverter<? super T, ? extends R> converter, DocumentCallback<T> delegate) {
			this.converter = converter;
			this.delegate = delegate;
		}

		@Override
		public Mono<R> doWith(Document object) {
			return delegate.doWith(object).map(it -> converter.mapDocument(object, () -> it));
		}
	}

	/**
	 * Simple {@link DocumentCallback} that will transform {@link Document} into the given target type using the given
	 * {@link EntityReader}.
	 *
	 * @author Mark Paluch
	 * @author Roman Puchkovskiy
	 */
	class ReadDocumentCallback<T> implements DocumentCallback<T> {

		private final EntityReader<? super T, Bson> reader;
		private final Class<T> type;
		private final String collectionName;

		ReadDocumentCallback(EntityReader<? super T, Bson> reader, Class<T> type, String collectionName) {

			Assert.notNull(reader, "EntityReader must not be null");
			Assert.notNull(type, "Entity type must not be null");

			this.reader = reader;
			this.type = type;
			this.collectionName = collectionName;
		}

		@Override
		public Mono<T> doWith(Document document) {

			maybeEmitEvent(new AfterLoadEvent<>(document, type, collectionName));

			T entity = reader.read(type, document);

			if (entity == null) {
				throw new MappingException(String.format("EntityReader %s returned null", reader));
			}

			maybeEmitEvent(new AfterConvertEvent<>(document, entity, collectionName));
			return maybeCallAfterConvert(entity, document, collectionName);
		}
	}

	/**
	 * {@link DocumentCallback} transforming {@link Document} into the given {@code targetType} or decorating the
	 * {@code sourceType} with a {@literal projection} in case the {@code targetType} is an {@literal interface}.
	 *
	 * @param <S>
	 * @param <T>
	 * @author Christoph Strobl
	 * @author Roman Puchkovskiy
	 * @since 2.0
	 */
	private class ProjectingReadCallback<S, T> implements DocumentCallback<T> {

		private final MongoConverter reader;
		private final EntityProjection<T, S> projection;
		private final String collectionName;

		ProjectingReadCallback(MongoConverter reader, EntityProjection<T, S> projection, String collectionName) {
			this.reader = reader;
			this.projection = projection;
			this.collectionName = collectionName;
		}

		@Override
		@SuppressWarnings("unchecked")
		public Mono<T> doWith(Document document) {

			Class<T> returnType = projection.getMappedType().getType();
			maybeEmitEvent(new AfterLoadEvent<>(document, returnType, collectionName));

			Object entity = reader.project(projection, document);

			if (entity == null) {
				throw new MappingException(String.format("EntityReader %s returned null", reader));
			}

			T castEntity = (T) entity;
			maybeEmitEvent(new AfterConvertEvent<>(document, castEntity, collectionName));
			return maybeCallAfterConvert(castEntity, document, collectionName);
		}
	}

	/**
	 * {@link DocumentCallback} that assumes a {@link GeoResult} to be created, delegates actual content unmarshalling to
	 * a delegate and creates a {@link GeoResult} from the result.
	 *
	 * @author Mark Paluch
	 * @author Chrstoph Strobl
	 * @author Roman Puchkovskiy
	 */
	static class GeoNearResultDocumentCallback<T> implements DocumentCallback<GeoResult<T>> {

		private final String distanceField;
		private final DocumentCallback<T> delegate;
		private final Metric metric;

		/**
		 * Creates a new {@link GeoNearResultDocumentCallback} using the given {@link DocumentCallback} delegate for
		 * {@link GeoResult} content unmarshalling.
		 *
		 * @param distanceField the field to read the distance from.
		 * @param delegate must not be {@literal null}.
		 * @param metric the {@link Metric} to apply to the result distance.
		 */
		GeoNearResultDocumentCallback(String distanceField, DocumentCallback<T> delegate, Metric metric) {

			Assert.notNull(delegate, "DocumentCallback must not be null");

			this.distanceField = distanceField;
			this.delegate = delegate;
			this.metric = metric;
		}

		@Override
		public Mono<GeoResult<T>> doWith(Document object) {

			double distance = getDistance(object);

			return delegate.doWith(object).map(doWith -> new GeoResult<>(doWith, Distance.of(distance, metric)));
		}

		double getDistance(Document object) {

			if (object.containsKey(distanceField)) {
				return NumberUtils.convertNumberToTargetClass(object.get(distanceField, Number.class), Double.class);
			}

			return Double.NaN;
		}
	}

	/**
	 * @author Mark Paluch
	 */
	class QueryFindPublisherPreparer implements FindPublisherPreparer {

		private final Query query;

		private final Document sortObject;

		private final int limit;

		private final long skip;
		private final @Nullable Class<?> type;

		QueryFindPublisherPreparer(Query query, @Nullable Class<?> type) {
			this(query, query.getSortObject(), query.getLimit(), query.getSkip(), type);
		}

		QueryFindPublisherPreparer(Query query, Document sortObject, int limit, long skip, @Nullable Class<?> type) {

			this.query = query;
			this.sortObject = sortObject;
			this.limit = limit;
			this.skip = skip;
			this.type = type;
		}

		@Override
		public FindPublisher<Document> prepare(FindPublisher<Document> findPublisher) {

			FindPublisher<Document> findPublisherToUse = operations.forType(type) //
					.getCollation(query) //
					.map(Collation::toMongoCollation) //
					.map(findPublisher::collation) //
					.orElse(findPublisher);

			HintFunction hintFunction = HintFunction.from(query.getHint());
			Meta meta = query.getMeta();
			if (skip <= 0 && limit <= 0 && ObjectUtils.isEmpty(sortObject) && hintFunction.isEmpty() && meta.isEmpty()) {
				return findPublisherToUse;
			}

			try {

				if (skip > 0) {
					findPublisherToUse = findPublisherToUse.skip((int) skip);
				}

				if (limit > 0) {
					findPublisherToUse = findPublisherToUse.limit(limit);
				}

				if (!ObjectUtils.isEmpty(sortObject)) {
					Document sort = type != null ? getMappedSortObject(sortObject, type) : sortObject;
					findPublisherToUse = findPublisherToUse.sort(sort);
				}

				if (hintFunction.isPresent()) {
					findPublisherToUse = hintFunction.apply(mongoDatabaseFactory, findPublisherToUse::hintString,
							findPublisherToUse::hint);
				}

				if (meta.hasValues()) {

					if (meta.hasComment()) {
						findPublisherToUse = findPublisherToUse.comment(meta.getRequiredComment());
					}

					if (meta.hasMaxTime()) {
						findPublisherToUse = findPublisherToUse.maxTime(meta.getRequiredMaxTimeMsec(), TimeUnit.MILLISECONDS);
					}

					if (meta.getCursorBatchSize() != null) {
						findPublisherToUse = findPublisherToUse.batchSize(meta.getCursorBatchSize());
					}

					if (meta.getAllowDiskUse() != null) {
						findPublisherToUse = findPublisherToUse.allowDiskUse(meta.getAllowDiskUse());
					}
				}

			} catch (RuntimeException e) {
				throw potentiallyConvertRuntimeException(e, exceptionTranslator);
			}

			return findPublisherToUse;
		}

	}

	class TailingQueryFindPublisherPreparer extends QueryFindPublisherPreparer {

		TailingQueryFindPublisherPreparer(Query query, Class<?> type) {
			super(query, type);
		}

		@Override
		public FindPublisher<Document> prepare(FindPublisher<Document> findPublisher) {
			return super.prepare(findPublisher.cursorType(CursorType.TailableAwait));
		}
	}

	private static List<? extends Document> toDocuments(Collection<? extends Document> documents) {
		return new ArrayList<>(documents);
	}

	/**
	 * {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the
	 * server through the driver API. <br />
	 * The prepare steps for {@link MongoDatabase} and {@link MongoCollection} proxy the target and invoke the desired
	 * target method matching the actual arguments plus a {@link ClientSession}.
	 *
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	static class ReactiveSessionBoundMongoTemplate extends ReactiveMongoTemplate {

		private final ReactiveMongoTemplate delegate;
		private final ClientSession session;

		/**
		 * @param session must not be {@literal null}.
		 * @param that must not be {@literal null}.
		 */
		ReactiveSessionBoundMongoTemplate(ClientSession session, ReactiveMongoTemplate that) {

			super(that.mongoDatabaseFactory.withSession(session), that);

			this.delegate = that;
			this.session = session;
		}

		@Override
		public Mono<MongoCollection<Document>> getCollection(String collectionName) {

			// native MongoDB objects that offer methods with ClientSession must not be proxied.
			return delegate.getCollection(collectionName);
		}

		@Override
		public Mono<MongoDatabase> getMongoDatabase() {

			// native MongoDB objects that offer methods with ClientSession must not be proxied.
			return delegate.getMongoDatabase();
		}

		@Override
		protected Mono<Boolean> countCanBeEstimated(Document filter, CountOptions options) {
			return Mono.just(false);
		}
	}

	class IndexCreatorEventListener implements ApplicationListener<MappingContextEvent<?, ?>> {

		final Consumer<Throwable> subscriptionExceptionHandler;

		public IndexCreatorEventListener(Consumer<Throwable> subscriptionExceptionHandler) {
			this.subscriptionExceptionHandler = subscriptionExceptionHandler;
		}

		@Override
		public void onApplicationEvent(MappingContextEvent<?, ?> event) {

			if (!event.wasEmittedBy(mappingContext)) {
				return;
			}

			PersistentEntity<?, ?> entity = event.getPersistentEntity();

			// Double check type as Spring infrastructure does not consider nested generics
			if (entity instanceof MongoPersistentEntity<?> mongoPersistentProperties) {

				onCheckForIndexes(mongoPersistentProperties, subscriptionExceptionHandler);
			}
		}
	}

	/**
	 * Value object chaining together a given source document with its mapped representation and the collection to persist
	 * it to.
	 *
	 * @param <T>
	 * @author Christoph Strobl
	 * @since 2.2
	 */
	private static class PersistableEntityModel<T> {

		private final T source;
		private final @Nullable Document target;
		private final String collection;

		private PersistableEntityModel(T source, @Nullable Document target, String collection) {

			this.source = source;
			this.target = target;
			this.collection = collection;
		}

		static <T> PersistableEntityModel<T> of(T source, String collection) {
			return new PersistableEntityModel<>(source, null, collection);
		}

		static <T> PersistableEntityModel<T> of(T source, Document target, String collection) {
			return new PersistableEntityModel<>(source, target, collection);
		}

		PersistableEntityModel<T> mutate(T source) {
			return new PersistableEntityModel(source, target, collection);
		}

		PersistableEntityModel<T> addTargetDocument(Document target) {
			return new PersistableEntityModel(source, target, collection);
		}

		T getSource() {
			return source;
		}

		@Nullable
		Document getTarget() {
			return target;
		}

		String getCollection() {
			return collection;
		}
	}

	@FunctionalInterface
	interface CountExecution {
		Mono<Long> countDocuments(String collection, Document filter, CountOptions options);
	}
}