MongoTemplate.java

/*
 * Copyright 2010-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core;

import static org.springframework.data.mongodb.core.query.SerializationUtils.*;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.domain.OffsetScrollPosition;
import org.springframework.data.domain.Window;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.mapping.MappingException;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.MongoClusterCapable;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoDatabaseUtils;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.CollectionPreparerSupport.CollectionPreparerDelegate;
import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext;
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
import org.springframework.data.mongodb.core.QueryOperations.CountContext;
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
import org.springframework.data.mongodb.core.QueryOperations.DistinctQueryContext;
import org.springframework.data.mongodb.core.QueryOperations.QueryContext;
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.ScrollUtils.KeysetScrollQuery;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.bulk.Bulk;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.bulk.BulkWriteResult;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
import org.springframework.data.mongodb.core.convert.JsonSchemaMapper;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
import org.springframework.data.mongodb.core.convert.MongoJsonSchemaMapper;
import org.springframework.data.mongodb.core.convert.MongoWriter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.index.DefaultSearchIndexOperations;
import org.springframework.data.mongodb.core.index.IndexOperations;
import org.springframework.data.mongodb.core.index.IndexOperationsProvider;
import org.springframework.data.mongodb.core.index.MongoMappingEventPublisher;
import org.springframework.data.mongodb.core.index.MongoPersistentEntityIndexCreator;
import org.springframework.data.mongodb.core.index.SearchIndexOperations;
import org.springframework.data.mongodb.core.index.SearchIndexOperationsProvider;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.event.*;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.data.mongodb.core.timeseries.Granularity;
import org.springframework.data.mongodb.core.validation.Validator;
import org.springframework.data.projection.EntityProjection;
import org.springframework.data.util.CloseableIterator;
import org.springframework.data.util.Lazy;
import org.springframework.data.util.Optionals;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.NumberUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;

import com.mongodb.ClientSessionOptions;
import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.*;
import com.mongodb.client.model.*;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;

/**
 * Primary implementation of {@link MongoOperations}. It simplifies the use of imperative MongoDB usage and helps to
 * avoid common errors. It executes core MongoDB workflow, leaving application code to provide {@link Document} and
 * extract results. This class executes BSON queries or updates, initiating iteration over {@link FindIterable} and
 * catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
 * org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
 * {@link MongoDatabaseFactory} reference, or get prepared in an application context and given to services as bean
 * reference.
 * <p>
 * Note: The {@link MongoDatabaseFactory} should always be configured as a bean in the application context, in the first
 * case given to the service directly, in the second case to the prepared template.
 * <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
 * <p>
 * {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
 * {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
 * <p>
 * You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
 * generally apply a {@link ReadPreference}.
 * <p>
 * When using transactions make sure to create this template with the same {@link MongoDatabaseFactory} that is also
 * used for {@code MongoTransactionManager} creation.
 *
 * @author Thomas Risberg
 * @author Graeme Rocher
 * @author Mark Pollack
 * @author Oliver Gierke
 * @author Amol Nayak
 * @author Patryk Wasik
 * @author Tobias Trelle
 * @author Sebastian Herold
 * @author Thomas Darimont
 * @author Chuong Ngo
 * @author Christoph Strobl
 * @author Dom��nique Tilleuil
 * @author Niko Schmuck
 * @author Mark Paluch
 * @author Laszlo Csontos
 * @author Maninder Singh
 * @author Borislav Rangelov
 * @author duozhilin
 * @author Andreas Zink
 * @author Cimon Lucas
 * @author Michael J. Simons
 * @author Roman Puchkovskiy
 * @author Yadhukrishna S Pai
 * @author Anton Barkan
 * @author Bart��omiej Mazur
 * @author Michael Krog
 * @author Jakub Zurawa
 * @author Florian L��diger
 */
public class MongoTemplate implements MongoOperations, ApplicationContextAware, IndexOperationsProvider,
		SearchIndexOperationsProvider, ReadPreferenceAware {

	private static final Log LOGGER = LogFactory.getLog(MongoTemplate.class);
	private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;

	private final MongoConverter mongoConverter;
	private final MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext;
	private final MongoDatabaseFactory mongoDbFactory;
	private final PersistenceExceptionTranslator exceptionTranslator;
	private final QueryMapper queryMapper;
	private final UpdateMapper updateMapper;
	private final JsonSchemaMapper schemaMapper;
	private final EntityOperations operations;
	private final PropertyOperations propertyOperations;
	private final QueryOperations queryOperations;
	private final EntityLifecycleEventDelegate eventDelegate;

	private @Nullable WriteConcern writeConcern;
	private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE;
	private WriteResultChecking writeResultChecking = WriteResultChecking.NONE;
	private @Nullable ReadPreference readPreference;
	private @Nullable ApplicationEventPublisher eventPublisher;
	private @Nullable EntityCallbacks entityCallbacks;
	private @Nullable ResourceLoader resourceLoader;
	private @Nullable MongoPersistentEntityIndexCreator indexCreator;

	private SessionSynchronization sessionSynchronization = SessionSynchronization.ON_ACTUAL_TRANSACTION;

	private CountExecution countExecution = this::doExactCount;

	/**
	 * Constructor used for a basic template configuration.
	 * <p>
	 * If you intend to use transactions, make sure to use {@link #MongoTemplate(MongoDatabaseFactory)} or
	 * {@link #MongoTemplate(MongoDatabaseFactory, MongoConverter)} constructors, otherwise, this template will not
	 * participate in transactions using the default {@code SessionSynchronization.ON_ACTUAL_TRANSACTION} setting as
	 * {@code MongoTransactionManager} uses strictly its configured {@link MongoDatabaseFactory} for transaction
	 * participation.
	 *
	 * @param mongoClient must not be {@literal null}.
	 * @param databaseName must not be {@literal null} or empty.
	 * @since 2.1
	 */
	public MongoTemplate(MongoClient mongoClient, String databaseName) {
		this(new SimpleMongoClientDatabaseFactory(mongoClient, databaseName), (MongoConverter) null);
	}

	/**
	 * Constructor used for a basic template configuration.
	 *
	 * @param mongoDbFactory must not be {@literal null}.
	 */
	public MongoTemplate(MongoDatabaseFactory mongoDbFactory) {
		this(mongoDbFactory, (MongoConverter) null);
	}

	/**
	 * Constructor used for a basic template configuration.
	 *
	 * @param mongoDbFactory must not be {@literal null}.
	 * @param mongoConverter
	 */
	public MongoTemplate(MongoDatabaseFactory mongoDbFactory, @Nullable MongoConverter mongoConverter) {

		Assert.notNull(mongoDbFactory, "MongoDbFactory must not be null");

		this.mongoDbFactory = mongoDbFactory;
		this.exceptionTranslator = mongoDbFactory.getExceptionTranslator();
		this.mongoConverter = mongoConverter == null ? getDefaultMongoConverter(mongoDbFactory) : mongoConverter;
		this.queryMapper = new QueryMapper(this.mongoConverter);
		this.updateMapper = new UpdateMapper(this.mongoConverter);
		this.schemaMapper = new MongoJsonSchemaMapper(this.mongoConverter);
		this.operations = new EntityOperations(this.mongoConverter, this.queryMapper);
		this.propertyOperations = new PropertyOperations(this.mongoConverter.getMappingContext());
		this.queryOperations = new QueryOperations(queryMapper, updateMapper, operations, propertyOperations,
				mongoDbFactory);
		this.eventDelegate = new EntityLifecycleEventDelegate();

		// We always have a mapping context in the converter, whether it's a simple one or not
		mappingContext = this.mongoConverter.getMappingContext();
		// We create indexes based on mapping events
		if (mappingContext instanceof MongoMappingContext mappingContext) {

			if (mappingContext.isAutoIndexCreation()) {

				indexCreator = new MongoPersistentEntityIndexCreator(mappingContext, this);
				eventPublisher = new MongoMappingEventPublisher(indexCreator);
				mappingContext.setApplicationEventPublisher(eventPublisher);
			}
		}
	}

	private MongoTemplate(MongoDatabaseFactory dbFactory, MongoTemplate that) {

		this.mongoDbFactory = dbFactory;
		this.exceptionTranslator = that.exceptionTranslator;
		this.sessionSynchronization = that.sessionSynchronization;

		// we need to (re)create the MappingMongoConverter as we need to have it use a DbRefResolver that operates within
		// the sames session. Otherwise loading referenced objects would happen outside of it.
		if (that.mongoConverter instanceof MappingMongoConverter mappingMongoConverter) {
			this.mongoConverter = mappingMongoConverter.with(dbFactory);
		} else {
			this.mongoConverter = that.mongoConverter;
		}

		this.queryMapper = that.queryMapper;
		this.updateMapper = that.updateMapper;
		this.schemaMapper = that.schemaMapper;
		this.mappingContext = that.mappingContext;
		this.operations = that.operations;
		this.propertyOperations = that.propertyOperations;
		this.queryOperations = that.queryOperations;
		this.eventDelegate = that.eventDelegate;
	}

	/**
	 * Configures the {@link WriteResultChecking} to be used with the template. Setting {@literal null} will reset the
	 * default of {@link #DEFAULT_WRITE_RESULT_CHECKING}.
	 *
	 * @param resultChecking
	 */
	public void setWriteResultChecking(@Nullable WriteResultChecking resultChecking) {
		this.writeResultChecking = resultChecking == null ? DEFAULT_WRITE_RESULT_CHECKING : resultChecking;
	}

	/**
	 * Configures the {@link WriteConcern} to be used with the template. If none is configured the {@link WriteConcern}
	 * configured on the {@link MongoDatabaseFactory} will apply.
	 *
	 * @param writeConcern
	 */
	public void setWriteConcern(@Nullable WriteConcern writeConcern) {
		this.writeConcern = writeConcern;
	}

	/**
	 * Configures the {@link WriteConcernResolver} to be used with the template.
	 *
	 * @param writeConcernResolver
	 */
	public void setWriteConcernResolver(@Nullable WriteConcernResolver writeConcernResolver) {
		this.writeConcernResolver = writeConcernResolver == null ? DefaultWriteConcernResolver.INSTANCE
				: writeConcernResolver;
	}

	/**
	 * Used by @{link {@link #prepareCollection(MongoCollection)} to set the {@link ReadPreference} before any operations
	 * are performed.
	 *
	 * @param readPreference
	 */
	public void setReadPreference(@Nullable ReadPreference readPreference) {
		this.readPreference = readPreference;
	}

	@Override
	public boolean hasReadPreference() {
		return this.readPreference != null;
	}

	@Override
	public @Nullable ReadPreference getReadPreference() {
		return this.readPreference;
	}

	/**
	 * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
	 * published or whether emission should be suppressed. Enabled by default.
	 *
	 * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events.
	 * @since 4.0
	 * @see MongoMappingEvent
	 */
	public void setEntityLifecycleEventsEnabled(boolean enabled) {
		this.eventDelegate.setEventsEnabled(enabled);
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

		prepareIndexCreator(applicationContext);
		setApplicationEventPublisher(applicationContext);

		if (entityCallbacks == null) {
			setEntityCallbacks(EntityCallbacks.create(applicationContext));
		}

		if (eventPublisher != null && mappingContext instanceof ApplicationEventPublisherAware applicationEventPublisherAware) {
			applicationEventPublisherAware.setApplicationEventPublisher(eventPublisher);
		}

		resourceLoader = applicationContext;
	}

	void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.eventPublisher = applicationEventPublisher;
		eventDelegate.setPublisher(this.eventPublisher);
	}

	/**
	 * Set the {@link EntityCallbacks} instance to use when invoking
	 * {@link org.springframework.data.mapping.callback.EntityCallback callbacks} like the {@link BeforeSaveCallback}.
	 * <br />
	 * Overrides potentially existing {@link EntityCallbacks}.
	 *
	 * @param entityCallbacks must not be {@literal null}.
	 * @throws IllegalArgumentException if the given instance is {@literal null}.
	 * @since 2.2
	 */
	public void setEntityCallbacks(EntityCallbacks entityCallbacks) {

		Assert.notNull(entityCallbacks, "EntityCallbacks must not be null");
		this.entityCallbacks = entityCallbacks;
	}

	/**
	 * Configure whether to use estimated count. Defaults to exact counting.
	 *
	 * @param enabled use {@link com.mongodb.client.MongoCollection#estimatedDocumentCount()} for unpaged and empty
	 *          {@link Query queries} if {@code true}.
	 * @since 3.4
	 */
	public void useEstimatedCount(boolean enabled) {
		useEstimatedCount(enabled, this::countCanBeEstimated);
	}

	/**
	 * Configure whether to use estimated count based on the given {@link BiPredicate estimationFilter}.
	 *
	 * @param enabled use {@link com.mongodb.client.MongoCollection#estimatedDocumentCount()} for unpaged and empty
	 *          {@link Query queries} if {@code true}.
	 * @param estimationFilter the {@link BiPredicate filter}.
	 * @since 3.4
	 */
	private void useEstimatedCount(boolean enabled, BiPredicate<Document, CountOptions> estimationFilter) {

		if (enabled) {

			this.countExecution = (collectionPreparer, collectionName, filter, options) -> {

				if (!estimationFilter.test(filter, options)) {
					return doExactCount(collectionPreparer, collectionName, filter, options);
				}

				EstimatedDocumentCountOptions estimatedDocumentCountOptions = new EstimatedDocumentCountOptions();
				if (options.getMaxTime(TimeUnit.MILLISECONDS) > 0) {
					estimatedDocumentCountOptions.maxTime(options.getMaxTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
				}

				return doEstimatedCount(collectionPreparer, collectionName, estimatedDocumentCountOptions);
			};
		} else {
			this.countExecution = this::doExactCount;
		}
	}

	/**
	 * Inspects the given {@link ApplicationContext} for {@link MongoPersistentEntityIndexCreator} and those in turn if
	 * they were registered for the current {@link MappingContext}. If no creator for the current {@link MappingContext}
	 * can be found we manually add the internally created one as {@link ApplicationListener} to make sure indexes get
	 * created appropriately for entity types persisted through this {@link MongoTemplate} instance.
	 *
	 * @param context must not be {@literal null}.
	 */
	private void prepareIndexCreator(ApplicationContext context) {

		String[] indexCreators = context.getBeanNamesForType(MongoPersistentEntityIndexCreator.class);

		for (String creator : indexCreators) {
			MongoPersistentEntityIndexCreator creatorBean = context.getBean(creator, MongoPersistentEntityIndexCreator.class);
			if (creatorBean.isIndexCreatorFor(mappingContext)) {
				return;
			}
		}

		if (context instanceof ConfigurableApplicationContext configurableApplicationContext && indexCreator != null) {
			configurableApplicationContext.addApplicationListener(indexCreator);
		}
	}

	/**
	 * Returns the default {@link org.springframework.data.mongodb.core.convert.MongoConverter}.
	 *
	 * @return
	 */
	@Override
	public MongoConverter getConverter() {
		return this.mongoConverter;
	}

	EntityOperations getEntityOperations() {
		return operations;
	}

	QueryOperations getQueryOperations() {
		return queryOperations;
	}

	@Override
	public <T> Stream<T> stream(Query query, Class<T> entityType) {
		return stream(query, entityType, getCollectionName(entityType));
	}

	@Override
	public <T> Stream<T> stream(Query query, Class<T> entityType, String collectionName) {
		return doStream(query, entityType, collectionName, entityType);
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	protected <T> Stream<T> doStream(Query query, Class<?> entityType, String collectionName, Class<T> returnType) {
		return doStream(query, entityType, collectionName, returnType, QueryResultConverter.entity());
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	<T, R> Stream<R> doStream(Query query, Class<?> entityType, String collectionName, Class<T> returnType,
			QueryResultConverter<? super T, ? extends R> resultConverter) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(entityType, "Entity type must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");
		Assert.notNull(returnType, "ReturnType must not be null");

		return execute(collectionName, (CollectionCallback<Stream<R>>) collection -> {

			MongoPersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(entityType);

			QueryContext queryContext = queryOperations.createQueryContext(query);
			EntityProjection<T, ?> projection = operations.introspectProjection(returnType, entityType);

			Document mappedQuery = queryContext.getMappedQuery(persistentEntity);
			Document mappedFields = queryContext.getMappedFields(persistentEntity, projection);

			CollectionPreparerDelegate readPreference = createDelegate(query);
			FindIterable<Document> cursor = new QueryCursorPreparer(query, entityType).initiateFind(collection,
					col -> readPreference.prepare(col).find(mappedQuery, Document.class).projection(mappedFields));

			DocumentCallback<R> resultReader = getResultReader(projection, collectionName, resultConverter);

			return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator, resultReader).stream();
		});
	}

	@Override
	public String getCollectionName(Class<?> entityClass) {
		return this.operations.determineCollectionName(entityClass);
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	public Document executeCommand(String jsonCommand) {

		Assert.hasText(jsonCommand, "JsonCommand must not be null nor empty");

		return execute(db -> db.runCommand(Document.parse(jsonCommand), Document.class));
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	public Document executeCommand(Document command) {

		Assert.notNull(command, "Command must not be null");

		return execute(db -> db.runCommand(command, Document.class));
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	public Document executeCommand(Document command, @Nullable ReadPreference readPreference) {

		Assert.notNull(command, "Command must not be null");

		return execute(db -> readPreference != null //
				? db.runCommand(command, readPreference, Document.class) //
				: db.runCommand(command, Document.class));
	}

	@Override
	public void executeQuery(Query query, String collectionName, DocumentCallbackHandler dch) {
		executeQuery(query, collectionName, dch, new QueryCursorPreparer(query, null));
	}

	/**
	 * Execute a MongoDB query and iterate over the query results on a per-document basis with a
	 * {@link DocumentCallbackHandler} using the provided CursorPreparer.
	 *
	 * @param query the query class that specifies the criteria used to find a record and also an optional fields
	 *          specification, must not be {@literal null}.
	 * @param collectionName name of the collection to retrieve the objects from
	 * @param documentCallbackHandler the handler that will extract results, one document at a time
	 * @param preparer allows for customization of the {@link FindIterable} used when iterating over the result set,
	 *          (apply limits, skips and so on).
	 */
	protected void executeQuery(Query query, String collectionName, DocumentCallbackHandler documentCallbackHandler,
			@Nullable CursorPreparer preparer) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(documentCallbackHandler, "DocumentCallbackHandler must not be null");

		Document queryObject = queryMapper.getMappedObject(query.getQueryObject(), Optional.empty());
		Document sortObject = query.getSortObject();
		Document fieldsObject = query.getFieldsObject();

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Executing query: %s fields: %s sort: %s in collection: %s",
					serializeToJsonSafely(queryObject), fieldsObject, serializeToJsonSafely(sortObject), collectionName));
		}

		this.executeQueryInternal(new FindCallback(createDelegate(query), queryObject, fieldsObject, null),
				preparer != null ? preparer : CursorPreparer.NO_OP_PREPARER, documentCallbackHandler, collectionName);
	}

	@Override
	public <T> @Nullable T execute(DbCallback<T> action) {

		Assert.notNull(action, "DbCallback must not be null");

		try {
			MongoDatabase db = prepareDatabase(this.doGetDatabase());
			return action.doInDB(db);
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	@Override
	public <T> @Nullable T execute(Class<?> entityClass, CollectionCallback<T> callback) {

		Assert.notNull(entityClass, "EntityClass must not be null");
		return execute(getCollectionName(entityClass), callback);
	}

	@Override
	public <T> @Nullable T execute(String collectionName, CollectionCallback<T> callback) {

		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(callback, "CollectionCallback must not be null");

		try {
			MongoCollection<Document> collection = getAndPrepareCollection(doGetDatabase(), collectionName);
			return callback.doInCollection(collection);
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	<T> @Nullable T doWithClient(Function<MongoCluster, T> callback) {

		if (!(getMongoDatabaseFactory() instanceof MongoClusterCapable client)) {
			throw new IllegalStateException(
					"Unable to obtain MongoCluster. Does your database factory implement MongoClusterCapable?");
		}

		try {
			return callback.apply(client.getMongoCluster());
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	@Override
	public BulkWriteResult bulkWrite(Bulk bulk, BulkWriteOptions options) {
		return new BulkWriter(this).write(getDb().getName(), bulk, options);
	}

	@Override
	public SessionScoped withSession(ClientSessionOptions options) {

		Assert.notNull(options, "ClientSessionOptions must not be null");

		return withSession(() -> mongoDbFactory.getSession(options));
	}

	@Override
	@Contract("_ -> new")
	public MongoTemplate withSession(ClientSession session) {

		Assert.notNull(session, "ClientSession must not be null");

		return new SessionBoundMongoTemplate(session, MongoTemplate.this);
	}

	/**
	 * Define if {@link MongoTemplate} should participate in transactions. Default is set to
	 * {@link SessionSynchronization#ON_ACTUAL_TRANSACTION}.
	 * <p>
	 * <strong>NOTE:</strong> MongoDB transactions require at least MongoDB 4.0.
	 *
	 * @since 2.1
	 */
	public void setSessionSynchronization(SessionSynchronization sessionSynchronization) {
		this.sessionSynchronization = sessionSynchronization;
	}

	@Override
	public <T> MongoCollection<Document> createCollection(Class<T> entityClass) {
		return createCollection(entityClass, Function.identity());
	}

	@Override
	public <T> MongoCollection<Document> createCollection(Class<T> entityClass,
			Function<? super CollectionOptions, ? extends CollectionOptions> collectionOptionsCustomizer) {

		Assert.notNull(collectionOptionsCustomizer, "CollectionOptions customizer function must not be null");

		return createCollection(entityClass,
				collectionOptionsCustomizer.apply(operations.forType(entityClass).getCollectionOptions()));
	}

	@Override
	public <T> MongoCollection<Document> createCollection(Class<T> entityClass,
			@Nullable CollectionOptions collectionOptions) {

		Assert.notNull(entityClass, "EntityClass must not be null");

		return doCreateCollection(getCollectionName(entityClass),
				operations.convertToCreateCollectionOptions(collectionOptions, entityClass));
	}

	@Override
	public MongoCollection<Document> createCollection(String collectionName) {

		Assert.notNull(collectionName, "CollectionName must not be null");

		return doCreateCollection(collectionName, new Document());
	}

	@Override
	public MongoCollection<Document> createCollection(String collectionName,
			@Nullable CollectionOptions collectionOptions) {

		Assert.notNull(collectionName, "CollectionName must not be null");
		return doCreateCollection(collectionName,
				operations.convertToCreateCollectionOptions(collectionOptions, Object.class));
	}

	@Override
	public MongoCollection<Document> createView(String name, Class<?> source, AggregationPipeline pipeline,
			@Nullable ViewOptions options) {

		return createView(name, getCollectionName(source),
				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
				options);
	}

	@Override
	public MongoCollection<Document> createView(String name, String source, AggregationPipeline pipeline,
			@Nullable ViewOptions options) {

		return createView(name, source,
				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
				options);
	}

	private MongoCollection<Document> createView(String name, String source, AggregationDefinition aggregation,
			@Nullable ViewOptions options) {
		return doCreateView(name, source, aggregation.getAggregationPipeline(), options);
	}

	@SuppressWarnings("NullAway")
	protected MongoCollection<Document> doCreateView(String name, String source, List<Document> pipeline,
			@Nullable ViewOptions options) {

		CreateViewOptions viewOptions = new CreateViewOptions();
		if (options != null) {
			options.getCollation().map(Collation::toMongoCollation).ifPresent(viewOptions::collation);
		}

		return execute(db -> {
			db.createView(name, source, pipeline, viewOptions);
			return db.getCollection(name);
		});
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	@Contract("null -> fail")
	public MongoCollection<Document> getCollection(@Nullable String collectionName) {

		Assert.notNull(collectionName, "CollectionName must not be null");

		return execute(db -> db.getCollection(collectionName, Document.class));
	}

	@Override
	public <T> boolean collectionExists(Class<T> entityClass) {
		return collectionExists(getCollectionName(entityClass));
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	public boolean collectionExists(String collectionName) {

		Assert.notNull(collectionName, "CollectionName must not be null");

		return execute(db -> {

			for (String name : db.listCollectionNames()) {
				if (name.equals(collectionName)) {
					return true;
				}
			}
			return false;
		});
	}

	@Override
	public <T> void dropCollection(Class<T> entityClass) {
		dropCollection(getCollectionName(entityClass));
	}

	@Override
	public void dropCollection(String collectionName) {

		Assert.notNull(collectionName, "CollectionName must not be null");

		execute(collectionName, (CollectionCallback<Void>) collection -> {
			collection.drop();
			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Dropped collection [%s]",
						collection.getNamespace() != null ? collection.getNamespace().getCollectionName() : collectionName));
			}
			return null;
		});
	}

	@Override
	public IndexOperations indexOps(String collectionName) {
		return indexOps(collectionName, null);
	}

	@Override
	public IndexOperations indexOps(String collectionName, @Nullable Class<?> type) {
		return new DefaultIndexOperations(this, collectionName, type);
	}

	@Override
	public IndexOperations indexOps(Class<?> entityClass) {
		return indexOps(getCollectionName(entityClass), entityClass);
	}

	@Override
	public SearchIndexOperations searchIndexOps(String collectionName) {
		return searchIndexOps(null, collectionName);
	}

	@Override
	public SearchIndexOperations searchIndexOps(Class<?> type) {
		return new DefaultSearchIndexOperations(this, type);
	}

	@Override
	public SearchIndexOperations searchIndexOps(@Nullable Class<?> type, String collectionName) {
		return new DefaultSearchIndexOperations(this, collectionName, type);
	}

	@Override
	public BulkOperations bulkOps(BulkMode mode, String collectionName) {
		return bulkOps(mode, null, collectionName);
	}

	@Override
	public BulkOperations bulkOps(BulkMode bulkMode, Class<?> entityClass) {
		return bulkOps(bulkMode, entityClass, getCollectionName(entityClass));
	}

	@Override
	public BulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName) {

		Assert.notNull(mode, "BulkMode must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		DefaultBulkOperations operations = new DefaultBulkOperations(this, collectionName,
				new BulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper, updateMapper,
						eventPublisher, entityCallbacks));

		operations.setDefaultWriteConcern(writeConcern);

		return operations;
	}

	@Override
	public ScriptOperations scriptOps() {
		return new DefaultScriptOperations(this);
	}

	// Find methods that take a Query to express the query and that return a single object.

	@Nullable
	@Override
	public <T> T findOne(Query query, Class<T> entityClass) {
		return findOne(query, entityClass, getCollectionName(entityClass));
	}

	@Nullable
	@Override
	public <T> T findOne(Query query, Class<T> entityClass, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		if (ObjectUtils.isEmpty(query.getSortObject())) {

			return doFindOne(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(),
					new QueryCursorPreparer(query, entityClass), entityClass);
		} else {
			query.limit(1);
			List<T> results = find(query, entityClass, collectionName);
			return results.isEmpty() ? null : results.get(0);
		}
	}

	@Override
	public boolean exists(Query query, Class<?> entityClass) {
		return exists(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public boolean exists(Query query, String collectionName) {
		return exists(query, null, collectionName);
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	public boolean exists(Query query, @Nullable Class<?> entityClass, String collectionName) {

		if (query == null) {
			throw new InvalidDataAccessApiUsageException("Query passed in to exist can't be null");
		}
		Assert.notNull(collectionName, "CollectionName must not be null");

		QueryContext queryContext = queryOperations.createQueryContext(query);
		Document mappedQuery = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);

		return execute(collectionName,
				new ExistsCallback(createDelegate(query), mappedQuery, queryContext.getCollation(entityClass).orElse(null)));
	}

	// Find methods that take a Query to express the query and that return a List of objects.

	@Override
	public <T> List<T> find(Query query, Class<T> entityClass) {
		return find(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> List<T> find(Query query, Class<T> entityClass, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");

		return doFind(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(), entityClass,
				new QueryCursorPreparer(query, entityClass));
	}

	@Override
	public <T> Window<T> scroll(Query query, Class<T> entityType) {

		Assert.notNull(entityType, "Entity type must not be null");

		return scroll(query, entityType, getCollectionName(entityType));
	}

	@Override
	public <T> Window<T> scroll(Query query, Class<T> entityType, String collectionName) {
		return doScroll(query, entityType, entityType, QueryResultConverter.entity(), collectionName);
	}

	<T, R> Window<R> doScroll(Query query, Class<?> sourceClass, Class<T> targetClass,
			QueryResultConverter<? super T, ? extends R> resultConverter, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(sourceClass, "Entity type must not be null");
		Assert.notNull(targetClass, "Target type must not be null");

		EntityProjection<T, ?> projection = operations.introspectProjection(targetClass, sourceClass);
		DocumentCallback<R> callback = getResultReader(projection, collectionName, resultConverter);
		int limit = query.isLimited() ? query.getLimit() + 1 : Integer.MAX_VALUE;

		if (query.hasKeyset()) {

			KeysetScrollQuery keysetPaginationQuery = ScrollUtils.createKeysetPaginationQuery(query,
					operations.getIdPropertyName(sourceClass));

			List<R> result = doFind(collectionName, createDelegate(query), keysetPaginationQuery.query(),
					keysetPaginationQuery.fields(), sourceClass,
					new QueryCursorPreparer(query, keysetPaginationQuery.sort(), limit, 0, sourceClass), callback);

			return ScrollUtils.createWindow(query, result, sourceClass, operations);
		}

		List<R> result = doFind(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(),
				sourceClass, new QueryCursorPreparer(query, query.getSortObject(), limit, query.getSkip(), sourceClass),
				callback);

		return ScrollUtils.createWindow(result, query.getLimit(), OffsetScrollPosition.positionFunction(query.getSkip()));
	}

	@Nullable
	@Override
	public <T> T findById(Object id, Class<T> entityClass) {
		return findById(id, entityClass, getCollectionName(entityClass));
	}

	@Nullable
	@Override
	public <T> T findById(Object id, Class<T> entityClass, String collectionName) {

		Assert.notNull(id, "Id must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		String idKey = operations.getIdPropertyName(entityClass);

		return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), new Document(),
				entityClass);
	}

	@Override
	public <T> List<T> findDistinct(Query query, String field, Class<?> entityClass, Class<T> resultClass) {
		return findDistinct(query, field, getCollectionName(entityClass), entityClass, resultClass);
	}

	@Override
	@SuppressWarnings({ "unchecked", "NullAway" })
	public <T> List<T> findDistinct(Query query, String field, String collectionName, Class<?> entityClass,
			Class<T> resultClass) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(field, "Field must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");
		Assert.notNull(resultClass, "ResultClass must not be null");

		MongoPersistentEntity<?> entity = entityClass != Object.class ? getPersistentEntity(entityClass) : null;
		DistinctQueryContext distinctQueryContext = queryOperations.distinctQueryContext(query, field);

		Document mappedQuery = distinctQueryContext.getMappedQuery(entity);
		String mappedFieldName = distinctQueryContext.getMappedFieldName(entity);
		Class<T> mongoDriverCompatibleType = distinctQueryContext.getDriverCompatibleClass(resultClass);

		MongoIterable<?> result = execute(collectionName, (collection) -> {

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Executing findDistinct using query %s for field: %s in collection: %s",
						serializeToJsonSafely(mappedQuery), field, collectionName));
			}

			collection = createDelegate(query).prepare(collection);

			DistinctIterable<T> iterable = collection.distinct(mappedFieldName, mappedQuery, mongoDriverCompatibleType);
			distinctQueryContext.applyCollation(entityClass, iterable::collation);

			return iterable;
		});

		if (resultClass == Object.class || mongoDriverCompatibleType != resultClass) {

			MongoConverter converter = getConverter();
			DefaultDbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDbFactory);

			result = result.map((source) -> converter.mapValueToTargetType(source,
					distinctQueryContext.getMostSpecificConversionTargetType(resultClass, entityClass), dbRefResolver));
		}

		try {
			return (List<T>) result.into(new ArrayList<>());
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	@Override
	public <T> GeoResults<T> geoNear(NearQuery near, Class<T> entityClass) {
		return geoNear(near, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> GeoResults<T> geoNear(NearQuery near, Class<T> domainType, String collectionName) {
		return geoNear(near, domainType, collectionName, domainType);
	}

	public <T> GeoResults<T> geoNear(NearQuery near, Class<?> domainType, String collectionName, Class<T> returnType) {
		return doGeoNear(near, domainType, collectionName, returnType, QueryResultConverter.entity());
	}

	long doGeoNearCount(NearQuery near, Class<?> domainType, String collectionName) {

		Builder optionsBuilder = AggregationOptions.builder().collation(near.getCollation());

		if (near.hasReadPreference()) {
			optionsBuilder.readPreference(near.getReadPreference());
		}

		if (near.hasReadConcern()) {
			optionsBuilder.readConcern(near.getReadConcern());
		}

		String distanceField = operations.nearQueryDistanceFieldName(domainType);
		Aggregation $geoNear = TypedAggregation.newAggregation(domainType,
				Aggregation.geoNear(near, distanceField).skip(-1).limit(-1), Aggregation.count().as("_totalCount"))
				.withOptions(optionsBuilder.build());

		AggregationResults<Document> results = doAggregate($geoNear, collectionName, Document.class,
				queryOperations.createAggregation($geoNear, (AggregationOperationContext) null));
		Iterator<Document> iterator = results.iterator();
		return iterator.hasNext()
				? NumberUtils.convertNumberToTargetClass(iterator.next().get("_totalCount", Integer.class), Long.class)
				: 0L;
	}

	<T, R> GeoResults<R> doGeoNear(NearQuery near, Class<?> domainType, String collectionName, Class<T> returnType,
			QueryResultConverter<? super T, ? extends R> resultConverter) {

		if (near == null) {
			throw new InvalidDataAccessApiUsageException("NearQuery must not be null");
		}

		if (domainType == null) {
			throw new InvalidDataAccessApiUsageException("Entity class must not be null");
		}

		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(returnType, "ReturnType must not be null");

		String collection = StringUtils.hasText(collectionName) ? collectionName : getCollectionName(domainType);
		String distanceField = operations.nearQueryDistanceFieldName(domainType);

		Builder optionsBuilder = AggregationOptions.builder().collation(near.getCollation());

		if (near.hasReadPreference()) {
			optionsBuilder.readPreference(near.getReadPreference());
		}

		if (near.hasReadConcern()) {
			optionsBuilder.readConcern(near.getReadConcern());
		}

		Aggregation $geoNear = TypedAggregation.newAggregation(domainType, Aggregation.geoNear(near, distanceField))
				.withOptions(optionsBuilder.build());

		AggregationResults<Document> results = aggregate($geoNear, collection, Document.class);
		EntityProjection<T, ?> projection = operations.introspectProjection(returnType, domainType);

		DocumentCallback<GeoResult<R>> callback = new GeoNearResultDocumentCallback<>(distanceField,
				getResultReader(projection, collectionName, resultConverter), near.getMetric());

		List<GeoResult<R>> result = new ArrayList<>(results.getMappedResults().size());

		BigDecimal aggregate = BigDecimal.ZERO;
		for (Document element : results) {

			GeoResult<R> geoResult = callback.doWith(element);
			aggregate = aggregate.add(BigDecimal.valueOf(geoResult.getDistance().getValue()));
			result.add(geoResult);
		}

		Distance avgDistance = Distance.of(
				result.size() == 0 ? 0 : aggregate.divide(new BigDecimal(result.size()), RoundingMode.HALF_UP).doubleValue(),
				near.getMetric());

		return new GeoResults<>(result, avgDistance);
	}

	public <T> @Nullable T findAndModify(Query query, UpdateDefinition update, Class<T> entityClass) {
		return findAndModify(query, update, new FindAndModifyOptions(), entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> @Nullable T findAndModify(Query query, UpdateDefinition update, Class<T> entityClass,
			String collectionName) {
		return findAndModify(query, update, new FindAndModifyOptions(), entityClass, collectionName);
	}

	@Override
	public <T> @Nullable T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
			Class<T> entityClass) {
		return findAndModify(query, update, options, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> @Nullable T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
			Class<T> entityClass, String collectionName) {
		return findAndModify(query, update, options, entityClass, collectionName, QueryResultConverter.entity());
	}

	<S, T> @Nullable T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options,
			Class<S> entityClass, String collectionName, QueryResultConverter<? super S, ? extends T> resultConverter) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(update, "Update must not be null");
		Assert.notNull(options, "Options must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		FindAndModifyOptions optionsToUse = FindAndModifyOptions.of(options);

		Optionals.ifAllPresent(query.getCollation(), optionsToUse.getCollation(), (l, r) -> {
			throw new IllegalArgumentException(
					"Both Query and FindAndModifyOptions define a collation; Please provide the collation only via one of the two");
		});

		if (!options.getCollation().isPresent()) {
			operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
		}

		return doFindAndModify(createDelegate(query), collectionName, query.getQueryObject(), query.getFieldsObject(),
				getMappedSortObject(query, entityClass), entityClass, update, optionsToUse, resultConverter);
	}

	@Override
	public <S, T> @Nullable T findAndReplace(Query query, S replacement, FindAndReplaceOptions options,
			Class<S> entityType, String collectionName, Class<T> resultType) {
		return findAndReplace(query, replacement, options, entityType, collectionName, resultType,
				QueryResultConverter.entity());
	}

	<S, T, R> @Nullable R findAndReplace(Query query, S replacement, FindAndReplaceOptions options, Class<S> entityType,
			String collectionName, Class<T> resultType, QueryResultConverter<? super T, ? extends R> resultConverter) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(replacement, "Replacement must not be null");
		Assert.notNull(options, "Options must not be null Use FindAndReplaceOptions#empty() instead");
		Assert.notNull(entityType, "EntityType must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(resultType, "ResultType must not be null Use Object.class instead");

		Assert.isTrue(query.getLimit() <= 1, "Query must not define a limit other than 1 ore none");
		Assert.isTrue(query.getSkip() <= 0, "Query must not define skip");

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityType);
		QueryContext queryContext = queryOperations.createQueryContext(query);

		EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);
		CollectionPreparerDelegate collectionPreparer = createDelegate(query);
		Document mappedQuery = queryContext.getMappedQuery(entity);
		Document mappedFields = queryContext.getMappedFields(entity, projection);
		Document mappedSort = queryContext.getMappedSort(entity);

		replacement = maybeCallBeforeConvert(replacement, collectionName);
		Document mappedReplacement = operations.forEntity(replacement).toMappedDocument(this.mongoConverter).getDocument();

		maybeEmitEvent(new BeforeSaveEvent<>(replacement, mappedReplacement, collectionName));
		maybeCallBeforeSave(replacement, mappedReplacement, collectionName);

		R saved = doFindAndReplace(collectionPreparer, collectionName, mappedQuery, mappedFields, mappedSort,
				queryContext.getCollation(entityType).orElse(null), entityType, mappedReplacement, options, projection,
				resultConverter);

		if (saved != null) {
			maybeEmitEvent(new AfterSaveEvent<>(saved, mappedReplacement, collectionName));
			return maybeCallAfterSave(saved, mappedReplacement, collectionName);
		}

		return saved;
	}

	// Find methods that take a Query to express the query and that return a single object that is also removed from the
	// collection in the database.

	@Override
	public <T> @Nullable T findAndRemove(Query query, Class<T> entityClass) {
		return findAndRemove(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> @Nullable T findAndRemove(Query query, Class<T> entityClass, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(entityClass, "EntityClass must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		return doFindAndRemove(createDelegate(query), collectionName, query.getQueryObject(), query.getFieldsObject(),
				getMappedSortObject(query, entityClass), operations.forType(entityClass).getCollation(query).orElse(null),
				entityClass);
	}

	@Override
	public long count(Query query, Class<?> entityClass) {

		Assert.notNull(entityClass, "Entity class must not be null");
		return count(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public long count(Query query, String collectionName) {
		return count(query, null, collectionName);
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.mongodb.core.MongoOperations#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.String)
	 */
	@Override
	public long count(Query query, @Nullable Class<?> entityClass, String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		CountContext countContext = queryOperations.countQueryContext(query);

		CountOptions options = countContext.getCountOptions(entityClass);
		Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);

		CollectionPreparerDelegate readPreference = createDelegate(query);
		return doCount(readPreference, collectionName, mappedQuery, options);
	}

	protected long doCount(CollectionPreparer collectionPreparer, String collectionName, Document filter,
			CountOptions options) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER
					.debug(String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
		}

		return countExecution.countDocuments(collectionPreparer, collectionName, filter, options);
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.mongodb.core.MongoOperations#estimatedCount(java.lang.String)
	 */
	@Override
	public long estimatedCount(String collectionName) {
		return doEstimatedCount(CollectionPreparerDelegate.of(this), collectionName, new EstimatedDocumentCountOptions());
	}

	@SuppressWarnings("NullAway")
	protected long doEstimatedCount(CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			String collectionName, EstimatedDocumentCountOptions options) {
		return execute(collectionName,
				collection -> collectionPreparer.prepare(collection).estimatedDocumentCount(options));
	}

	@Override
	public long exactCount(Query query, @Nullable Class<?> entityClass, String collectionName) {

		CountContext countContext = queryOperations.countQueryContext(query);

		CountOptions options = countContext.getCountOptions(entityClass);
		Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);

		return doExactCount(createDelegate(query), collectionName, mappedQuery, options);
	}

	@SuppressWarnings("NullAway")
	protected long doExactCount(CollectionPreparer<MongoCollection<Document>> collectionPreparer, String collectionName,
			Document filter, CountOptions options) {
		return execute(collectionName, collection -> collectionPreparer.prepare(collection)
				.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
	}

	protected boolean countCanBeEstimated(Document filter, CountOptions options) {

		return
		// only empty filter for estimatedCount
		filter.isEmpty() &&
		// no skip, no limit,...
				isEmptyOptions(options) &&
				// transaction active?
				!MongoDatabaseUtils.isTransactionActive(getMongoDatabaseFactory());
	}

	private boolean isEmptyOptions(CountOptions options) {
		return options.getLimit() <= 0 && options.getSkip() <= 0;
	}

	@Override
	public <T> T insert(T objectToSave) {

		Assert.notNull(objectToSave, "ObjectToSave must not be null");

		ensureNotCollectionLike(objectToSave);
		return insert(objectToSave, getCollectionName(ClassUtils.getUserClass(objectToSave)));
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> T insert(T objectToSave, String collectionName) {

		Assert.notNull(objectToSave, "ObjectToSave must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		ensureNotCollectionLike(objectToSave);
		return (T) doInsert(collectionName, objectToSave);
	}

	/**
	 * Ensure the given {@literal source} is not an {@link java.lang.reflect.Array}, {@link Collection} or
	 * {@link Iterator}.
	 *
	 * @param source can be {@literal null}.
	 * @since 3.2.
	 */
	protected void ensureNotCollectionLike(@Nullable Object source) {

		if (EntityOperations.isCollectionLike(source)) {
			throw new IllegalArgumentException("Cannot use a collection here");
		}
	}

	/**
	 * Prepare the collection before any processing is done using it. This allows a convenient way to apply settings like
	 * withCodecRegistry() etc. Can be overridden in subclasses.
	 *
	 * @param collection
	 */
	protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {

		if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
			return collection.withReadPreference(readPreference);
		}

		return collection;
	}

	/**
	 * Prepare the WriteConcern before any processing is done using it. This allows a convenient way to apply custom
	 * settings in subclasses. In case of using MongoDB Java driver version 3 the returned {@link WriteConcern} will be
	 * defaulted to {@link WriteConcern#ACKNOWLEDGED} when {@link WriteResultChecking} is set to
	 * {@link WriteResultChecking#EXCEPTION}.
	 *
	 * @param mongoAction any MongoAction already configured or null
	 * @return The prepared WriteConcern or null
	 */
	@Nullable
	protected WriteConcern prepareWriteConcern(MongoAction mongoAction) {

		WriteConcern wc = writeConcernResolver.resolve(mongoAction);
		return potentiallyForceAcknowledgedWrite(wc);
	}

	@Nullable
	private WriteConcern potentiallyForceAcknowledgedWrite(@Nullable WriteConcern wc) {

		if (ObjectUtils.nullSafeEquals(WriteResultChecking.EXCEPTION, writeResultChecking)) {
			if (wc == null || wc.getWObject() == null
					|| (wc.getWObject() instanceof Number concern && concern.intValue() < 1)) {
				return WriteConcern.ACKNOWLEDGED;
			}
		}
		return wc;
	}

	<T> SourceAwareDocument<T> prepareObjectForSave(String collectionName, T objectToSave) {

		BeforeConvertEvent<T> event = new BeforeConvertEvent<>(objectToSave, collectionName);
		T toConvert = maybeEmitEvent(event).getSource();
		toConvert = maybeCallBeforeConvert(toConvert, collectionName);

		AdaptibleEntity<T> entity = operations.forEntityUpsert(toConvert, mongoConverter.getConversionService());
		T initialized = entity.initializeVersionProperty();
		Document dbDoc = entity.toMappedDocument(mongoConverter).getDocument();

		maybeEmitEvent(new BeforeSaveEvent<>(initialized, dbDoc, collectionName));
		return new SourceAwareDocument<>(maybeCallBeforeSave(initialized, dbDoc, collectionName), dbDoc, collectionName);
	}

	protected <T> T doInsert(String collectionName, T objectToSave) {

		SourceAwareDocument<T> initialized = prepareObjectForSave(collectionName, objectToSave);

		Document dbDoc = initialized.document();
		Object id = insertDocument(collectionName, dbDoc, initialized.source().getClass());

		T saved = populateIdIfNecessary(initialized.source(), id);
		maybeEmitEvent(new AfterSaveEvent<>(saved, dbDoc, collectionName));
		return maybeCallAfterSave(saved, dbDoc, collectionName);
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> Collection<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass) {

		Assert.notNull(batchToSave, "BatchToSave must not be null");

		return (Collection<T>) doInsertBatch(getCollectionName(entityClass), batchToSave, this.mongoConverter);
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> Collection<T> insert(Collection<? extends T> batchToSave, String collectionName) {

		Assert.notNull(batchToSave, "BatchToSave must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		return (Collection<T>) doInsertBatch(collectionName, batchToSave, this.mongoConverter);
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> Collection<T> insertAll(Collection<? extends T> objectsToSave) {

		Assert.notNull(objectsToSave, "ObjectsToSave must not be null");
		return (Collection<T>) doInsertAll(objectsToSave, this.mongoConverter);
	}

	@SuppressWarnings("unchecked")
	protected <T> Collection<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<T> writer) {

		Map<String, List<T>> elementsByCollection = new HashMap<>();
		List<T> savedObjects = new ArrayList<>(listToSave.size());

		for (T element : listToSave) {

			if (element == null) {
				continue;
			}

			String collection = getCollectionName(ClassUtils.getUserClass(element));
			List<T> collectionElements = elementsByCollection.computeIfAbsent(collection, k -> new ArrayList<>());

			collectionElements.add(element);
		}

		for (Map.Entry<String, List<T>> entry : elementsByCollection.entrySet()) {
			savedObjects.addAll((Collection<T>) doInsertBatch(entry.getKey(), entry.getValue(), this.mongoConverter));
		}

		return savedObjects;
	}

	protected <T> Collection<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
			MongoWriter<T> writer) {

		Assert.notNull(writer, "MongoWriter must not be null");

		List<Document> documentList = new ArrayList<>(batchToSave.size());
		List<T> initializedBatchToSave = new ArrayList<>(batchToSave.size());
		for (T uninitialized : batchToSave) {

			BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
			T toConvert = maybeEmitEvent(event).getSource();
			toConvert = maybeCallBeforeConvert(toConvert, collectionName);

			AdaptibleEntity<T> entity = operations.forEntityUpsert(toConvert, mongoConverter.getConversionService());
			T initialized = entity.initializeVersionProperty();
			Document document = entity.toMappedDocument(writer).getDocument();
			maybeEmitEvent(new BeforeSaveEvent<>(initialized, document, collectionName));
			initialized = maybeCallBeforeSave(initialized, document, collectionName);

			MappedDocument mappedDocument = queryOperations.createInsertContext(MappedDocument.of(document))
					.prepareId(uninitialized.getClass());

			documentList.add(mappedDocument.getDocument());
			initializedBatchToSave.add(initialized);
		}

		List<Object> ids = insertDocumentList(collectionName, documentList);
		List<T> savedObjects = new ArrayList<>(documentList.size());

		int i = 0;
		for (T obj : initializedBatchToSave) {

			if (i < ids.size()) {
				T saved = populateIdIfNecessary(obj, ids.get(i));
				Document doc = documentList.get(i);
				maybeEmitEvent(new AfterSaveEvent<>(saved, doc, collectionName));
				savedObjects.add(maybeCallAfterSave(saved, doc, collectionName));
			} else {
				savedObjects.add(obj);
			}
			i++;
		}

		return savedObjects;
	}

	@Override
	public <T> T save(T objectToSave) {

		Assert.notNull(objectToSave, "Object to save must not be null");
		return save(objectToSave, getCollectionName(ClassUtils.getUserClass(objectToSave)));
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> T save(T objectToSave, String collectionName) {

		Assert.notNull(objectToSave, "Object to save must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");
		ensureNotCollectionLike(objectToSave);

		AdaptibleEntity<T> source = operations.forEntity(objectToSave, mongoConverter.getConversionService());

		return source.isVersionedEntity() //
				? doSaveVersioned(source, collectionName) //
				: (T) doSave(collectionName, objectToSave, this.mongoConverter);
	}

	@SuppressWarnings("unchecked")
	private <T> T doSaveVersioned(AdaptibleEntity<T> source, String collectionName) {

		if (source.isNew()) {
			return (T) doInsert(collectionName, source.getBean());
		}

		// Create query for entity with the id and old version
		Query query = source.getQueryForVersion();

		// Bump version number
		T toSave = source.incrementVersion();

		toSave = maybeEmitEvent(new BeforeConvertEvent<T>(toSave, collectionName)).getSource();
		toSave = maybeCallBeforeConvert(toSave, collectionName);

		if (source.getBean() != toSave) {
			source = operations.forEntity(toSave, mongoConverter.getConversionService());
		}

		source.assertUpdateableIdIfNotSet();

		MappedDocument mapped = source.toMappedDocument(mongoConverter);

		maybeEmitEvent(new BeforeSaveEvent<>(toSave, mapped.getDocument(), collectionName));
		toSave = maybeCallBeforeSave(toSave, mapped.getDocument(), collectionName);
		UpdateDefinition update = mapped.updateWithoutId();

		UpdateResult result = doUpdate(collectionName, query, update, toSave.getClass(), false, false);

		if (result.getModifiedCount() == 0) {

			throw new OptimisticLockingFailureException(
					String.format("Cannot save entity %s with version %s to collection %s; Has it been modified meanwhile",
							source.getId(), source.getVersion(), collectionName));
		}
		maybeEmitEvent(new AfterSaveEvent<>(toSave, mapped.getDocument(), collectionName));

		return maybeCallAfterSave(toSave, mapped.getDocument(), collectionName);
	}

	protected <T> T doSave(String collectionName, T objectToSave, MongoWriter<T> writer) {

		objectToSave = maybeEmitEvent(new BeforeConvertEvent<>(objectToSave, collectionName)).getSource();
		objectToSave = maybeCallBeforeConvert(objectToSave, collectionName);

		AdaptibleEntity<T> entity = operations.forEntityUpsert(objectToSave, mongoConverter.getConversionService());
		MappedDocument mapped = entity.toMappedDocument(writer);
		Document dbDoc = mapped.getDocument();

		maybeEmitEvent(new BeforeSaveEvent<>(objectToSave, dbDoc, collectionName));
		objectToSave = maybeCallBeforeSave(objectToSave, dbDoc, collectionName);
		Object id = saveDocument(collectionName, dbDoc, objectToSave.getClass());

		T saved = populateIdIfNecessary(objectToSave, id);
		maybeEmitEvent(new AfterSaveEvent<>(saved, dbDoc, collectionName));

		return maybeCallAfterSave(saved, dbDoc, collectionName);
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	protected Object insertDocument(String collectionName, Document document, Class<?> entityClass) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Inserting Document containing fields: %s in collection: %s", document.keySet(),
					collectionName));
		}

		MappedDocument mappedDocument = queryOperations.createInsertContext(MappedDocument.of(document))
				.prepareId(entityClass);

		return execute(collectionName, collection -> {
			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT, collectionName, entityClass,
					mappedDocument.getDocument(), null);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			if (writeConcernToUse == null) {
				collection.insertOne(mappedDocument.getDocument());
			} else {
				collection.withWriteConcern(writeConcernToUse).insertOne(mappedDocument.getDocument());
			}

			return operations.forEntity(mappedDocument.getDocument()).getId();
		});
	}

	protected List<Object> insertDocumentList(String collectionName, List<Document> documents) {

		if (documents.isEmpty()) {
			return Collections.emptyList();
		}

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Inserting list of Documents containing %s items", documents.size()));
		}

		execute(collectionName, collection -> {

			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT_LIST, collectionName, null,
					null, null);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			if (writeConcernToUse == null) {
				collection.insertMany(documents);
			} else {
				collection.withWriteConcern(writeConcernToUse).insertMany(documents);
			}

			return null;
		});

		return MappedDocument.toIds(documents);
	}

	@SuppressWarnings("NullAway")
	protected Object saveDocument(String collectionName, Document dbDoc, Class<?> entityClass) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("Saving Document containing fields: %s", dbDoc.keySet()));
		}

		return execute(collectionName, collection -> {

			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.SAVE, collectionName, entityClass,
					dbDoc, null);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			MappedDocument mapped = MappedDocument.of(dbDoc);

			MongoCollection<Document> collectionToUse = writeConcernToUse == null //
					? collection //
					: collection.withWriteConcern(writeConcernToUse);

			if (!mapped.hasId()) {

				mapped = queryOperations.createInsertContext(mapped).prepareId(mappingContext.getPersistentEntity(entityClass));
				collectionToUse.insertOne(mapped.getDocument());
			} else {

				MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
				UpdateContext updateContext = queryOperations.replaceSingleContext(mapped, true);
				Document replacement = updateContext.getMappedUpdate(entity);
				Document filter = updateContext.getReplacementQuery();
				if (updateContext.requiresShardKey(filter, entity)) {

					if (entity.getShardKey().isImmutable()) {
						filter = updateContext.applyShardKey(entity, filter, null);
					} else {
						filter = updateContext.applyShardKey(entity, filter,
								collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first());
					}
				}

				collectionToUse.replaceOne(filter, replacement, new com.mongodb.client.model.ReplaceOptions().upsert(true));
			}
			return mapped.getId();
		});
	}

	@Override
	public UpdateResult upsert(Query query, UpdateDefinition update, Class<?> entityClass) {
		return doUpdate(getCollectionName(entityClass), query, update, entityClass, true, false);
	}

	@Override
	public UpdateResult upsert(Query query, UpdateDefinition update, String collectionName) {
		return doUpdate(collectionName, query, update, null, true, false);
	}

	@Override
	public UpdateResult upsert(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {

		Assert.notNull(entityClass, "EntityClass must not be null");

		return doUpdate(collectionName, query, update, entityClass, true, false);
	}

	@Override
	public UpdateResult updateFirst(Query query, UpdateDefinition update, Class<?> entityClass) {
		return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, false);
	}

	@Override
	public UpdateResult updateFirst(Query query, UpdateDefinition update, String collectionName) {
		return doUpdate(collectionName, query, update, null, false, false);
	}

	@Override
	public UpdateResult updateFirst(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {

		Assert.notNull(entityClass, "EntityClass must not be null");

		return doUpdate(collectionName, query, update, entityClass, false, false);
	}

	@Override
	public UpdateResult updateMulti(Query query, UpdateDefinition update, Class<?> entityClass) {
		return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, true);
	}

	@Override
	public UpdateResult updateMulti(Query query, UpdateDefinition update, String collectionName) {
		return doUpdate(collectionName, query, update, null, false, true);
	}

	@Override
	public UpdateResult updateMulti(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {

		Assert.notNull(entityClass, "EntityClass must not be null");

		return doUpdate(collectionName, query, update, entityClass, false, true);
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	protected UpdateResult doUpdate(String collectionName, Query query, UpdateDefinition update,
			@Nullable Class<?> entityClass, boolean upsert, boolean multi) {

		Assert.notNull(collectionName, "CollectionName must not be null");
		Assert.notNull(query, "Query must not be null");
		Assert.notNull(update, "Update must not be null");

		MongoPersistentEntity<?> entity = entityClass == null ? null : getPersistentEntity(entityClass);

		UpdateContext updateContext = multi ? queryOperations.updateContext(update, query, upsert)
				: queryOperations.updateSingleContext(update, query, upsert);
		updateContext.increaseVersionForUpdateIfNecessary(entity);

		Document queryObj = updateContext.getMappedQuery(entity);
		UpdateOptions opts = updateContext.getUpdateOptions(entity, query);

		if (updateContext.isAggregationUpdate()) {

			List<Document> pipeline = updateContext.getUpdatePipeline(entityClass);
			MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass,
					update.getUpdateObject(), queryObj);
			WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

			return execute(collectionName, collection -> {

				if (LOGGER.isDebugEnabled()) {
					LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s",
							serializeToJsonSafely(queryObj), serializeToJsonSafely(pipeline), collectionName));
				}

				collection = writeConcernToUse != null ? collection.withWriteConcern(writeConcernToUse) : collection;

				return multi ? collection.updateMany(queryObj, pipeline, opts) : collection.updateOne(queryObj, pipeline, opts);
			});
		}

		Document updateObj = updateContext.getMappedUpdate(entity);
		MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass,
				updateObj, queryObj);
		WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

		return execute(collectionName, collection -> {

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s",
						serializeToJsonSafely(queryObj), serializeToJsonSafely(updateObj), collectionName));
			}

			collection = writeConcernToUse != null ? collection.withWriteConcern(writeConcernToUse) : collection;

			if (!UpdateMapper.isUpdateObject(updateObj)) {

				Document filter = new Document(queryObj);

				if (updateContext.requiresShardKey(filter, entity)) {

					if (entity.getShardKey().isImmutable()) {
						filter = updateContext.applyShardKey(entity, filter, null);
					} else {
						filter = updateContext.applyShardKey(entity, filter,
								collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first());
					}
				}

				com.mongodb.client.model.ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entity);
				return collection.replaceOne(filter, updateObj, replaceOptions);
			} else {
				return multi ? collection.updateMany(queryObj, updateObj, opts)
						: collection.updateOne(queryObj, updateObj, opts);
			}
		});
	}

	@Override
	public DeleteResult remove(Object object) {

		Assert.notNull(object, "Object must not be null");

		return remove(object, getCollectionName(object.getClass()));
	}

	@Override
	public DeleteResult remove(Object object, String collectionName) {

		Assert.notNull(object, "Object must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		Query query = operations.forEntity(object).getRemoveByQuery();

		return doRemove(collectionName, query, object.getClass(), false);
	}

	@Override
	public DeleteResult remove(Query query, String collectionName) {
		return doRemove(collectionName, query, null, true);
	}

	@Override
	public DeleteResult remove(Query query, Class<?> entityClass) {
		return remove(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public DeleteResult remove(Query query, Class<?> entityClass, String collectionName) {

		Assert.notNull(entityClass, "EntityClass must not be null");
		return doRemove(collectionName, query, entityClass, true);
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	protected <T> DeleteResult doRemove(String collectionName, Query query, @Nullable Class<T> entityClass,
			boolean multi) {

		Assert.notNull(query, "Query must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");

		MongoPersistentEntity<?> entity = getPersistentEntity(entityClass);

		DeleteContext deleteContext = multi ? queryOperations.deleteQueryContext(query)
				: queryOperations.deleteSingleContext(query);
		Document queryObject = deleteContext.getMappedQuery(entity);
		DeleteOptions options = deleteContext.getDeleteOptions(entity);

		MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.REMOVE, collectionName, entityClass,
				null, queryObject);

		WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

		return execute(collectionName, collection -> {

			maybeEmitEvent(new BeforeDeleteEvent<>(queryObject, entityClass, collectionName));

			Document removeQuery = queryObject;

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Remove using query: %s in collection: %s.", serializeToJsonSafely(removeQuery),
						collectionName));
			}

			if (query.getLimit() > 0 || query.getSkip() > 0) {

				MongoCursor<Document> cursor = new QueryCursorPreparer(query, entityClass)
						.prepare(collection.find(removeQuery).projection(MappedDocument.getIdOnlyProjection())) //
						.iterator();

				Set<Object> ids = new LinkedHashSet<>();
				while (cursor.hasNext()) {
					ids.add(MappedDocument.of(cursor.next()).getId());
				}

				removeQuery = MappedDocument.getIdIn(ids);
			}

			MongoCollection<Document> collectionToUse = writeConcernToUse != null
					? collection.withWriteConcern(writeConcernToUse)
					: collection;

			DeleteResult result = multi ? collectionToUse.deleteMany(removeQuery, options)
					: collectionToUse.deleteOne(removeQuery, options);

			maybeEmitEvent(new AfterDeleteEvent<>(queryObject, entityClass, collectionName));

			return result;
		});
	}

	@Override
	public <T> List<T> findAll(Class<T> entityClass) {
		return findAll(entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> List<T> findAll(Class<T> entityClass, String collectionName) {
		return executeFindMultiInternal(
				new FindCallback(CollectionPreparer.identity(), new Document(), new Document(),
						operations.forType(entityClass).getCollation().map(Collation::toMongoCollation).orElse(null)),
				CursorPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
				collectionName);
	}

	@Override
	public <T> MapReduceResults<T> mapReduce(String inputCollectionName, String mapFunction, String reduceFunction,
			Class<T> entityClass) {
		return mapReduce(new Query(), inputCollectionName, mapFunction, reduceFunction, new MapReduceOptions(),
				entityClass);
	}

	@Override
	public <T> MapReduceResults<T> mapReduce(String inputCollectionName, String mapFunction, String reduceFunction,
			@Nullable MapReduceOptions mapReduceOptions, Class<T> entityClass) {
		return mapReduce(new Query(), inputCollectionName, mapFunction, reduceFunction, mapReduceOptions, entityClass);
	}

	@Override
	public <T> MapReduceResults<T> mapReduce(Query query, String inputCollectionName, String mapFunction,
			String reduceFunction, Class<T> entityClass) {
		return mapReduce(query, inputCollectionName, mapFunction, reduceFunction, new MapReduceOptions(), entityClass);
	}

	@Override
	public <T> MapReduceResults<T> mapReduce(Query query, String inputCollectionName, String mapFunction,
			String reduceFunction, @Nullable MapReduceOptions mapReduceOptions, Class<T> entityClass) {

		return new MapReduceResults<>(
				mapReduce(query, entityClass, inputCollectionName, mapFunction, reduceFunction, mapReduceOptions, entityClass),
				new Document());
	}

	/**
	 * @param query
	 * @param domainType
	 * @param inputCollectionName
	 * @param mapFunction
	 * @param reduceFunction
	 * @param mapReduceOptions
	 * @param resultType
	 * @return
	 * @since 2.1
	 * @deprecated since 3.4 in favor of {@link #aggregate(TypedAggregation, Class)}.
	 */
	@Deprecated
	@SuppressWarnings("NullAway")
	public <T> List<T> mapReduce(Query query, Class<?> domainType, String inputCollectionName, String mapFunction,
			String reduceFunction, @Nullable MapReduceOptions mapReduceOptions, Class<T> resultType) {

		Assert.notNull(domainType, "Domain type must not be null");
		Assert.notNull(inputCollectionName, "Input collection name must not be null");
		Assert.notNull(resultType, "Result type must not be null");
		Assert.notNull(mapFunction, "Map function must not be null");
		Assert.notNull(reduceFunction, "Reduce function must not be null");

		String mapFunc = replaceWithResourceIfNecessary(mapFunction);
		String reduceFunc = replaceWithResourceIfNecessary(reduceFunction);
		CollectionPreparerDelegate readPreference = createDelegate(query);
		MongoCollection<Document> inputCollection = readPreference
				.prepare(getAndPrepareCollection(doGetDatabase(), inputCollectionName));

		// MapReduceOp
		MapReduceIterable<Document> mapReduce = inputCollection.mapReduce(mapFunc, reduceFunc, Document.class);

		if (query.getLimit() > 0 && mapReduceOptions != null && mapReduceOptions.getLimit() == null) {
			mapReduce = mapReduce.limit(query.getLimit());
		}
		if (query.getMeta().hasMaxTime()) {
			mapReduce = mapReduce.maxTime(query.getMeta().getMaxTimeMsec(), TimeUnit.MILLISECONDS);
		}

		Document mappedSort = getMappedSortObject(query, domainType);
		if (mappedSort != null && !mappedSort.isEmpty()) {
			mapReduce = mapReduce.sort(mappedSort);
		}

		mapReduce = mapReduce
				.filter(queryMapper.getMappedObject(query.getQueryObject(), mappingContext.getPersistentEntity(domainType)));

		Optional<Collation> collation = query.getCollation();

		if (mapReduceOptions != null) {

			Optionals.ifAllPresent(collation, mapReduceOptions.getCollation(), (l, r) -> {
				throw new IllegalArgumentException(
						"Both Query and MapReduceOptions define a collation; Please provide the collation only via one of the two.");
			});

			if (mapReduceOptions.getCollation().isPresent()) {
				collation = mapReduceOptions.getCollation();
			}

			if (!CollectionUtils.isEmpty(mapReduceOptions.getScopeVariables())) {
				mapReduce = mapReduce.scope(new Document(mapReduceOptions.getScopeVariables()));
			}

			if (mapReduceOptions.getLimit() != null && mapReduceOptions.getLimit() > 0) {
				mapReduce = mapReduce.limit(mapReduceOptions.getLimit());
			}

			if (mapReduceOptions.getFinalizeFunction().filter(StringUtils::hasText).isPresent()) {
				mapReduce = mapReduce.finalizeFunction(mapReduceOptions.getFinalizeFunction().get());
			}

			if (mapReduceOptions.getJavaScriptMode() != null) {
				mapReduce = mapReduce.jsMode(mapReduceOptions.getJavaScriptMode());
			}

			if (StringUtils.hasText(mapReduceOptions.getOutputCollection()) && !mapReduceOptions.usesInlineOutput()) {

				mapReduce = mapReduce.collectionName(mapReduceOptions.getOutputCollection())
						.action(mapReduceOptions.getMapReduceAction());

				if (mapReduceOptions.getOutputDatabase().isPresent()) {
					mapReduce = mapReduce.databaseName(mapReduceOptions.getOutputDatabase().get());
				}
			}
		}

		if (!collation.isPresent()) {
			collation = operations.forType(domainType).getCollation();
		}

		mapReduce = collation.map(Collation::toMongoCollation).map(mapReduce::collation).orElse(mapReduce);

		List<T> mappedResults = new ArrayList<>();
		DocumentCallback<T> callback = new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName);

		for (Document document : mapReduce) {
			mappedResults.add(callback.doWith(document));
		}

		return mappedResults;
	}

	@Override
	public <O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, Class<O> outputType) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		return aggregate(aggregation, getCollectionName(aggregation.getInputType()), outputType);
	}

	@Override
	public <O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, String inputCollectionName,
			Class<O> outputType) {
		return aggregate(aggregation, inputCollectionName, outputType, (AggregationOperationContext) null);
	}

	@Override
	public <O> AggregationResults<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		return aggregate(aggregation, getCollectionName(inputType), outputType,
				queryOperations.createAggregation(aggregation, inputType).getAggregationOperationContext());
	}

	@Override
	public <O> AggregationResults<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType) {
		return doAggregate(aggregation, collectionName, outputType, QueryResultConverter.entity());
	}

	@Override
	public <O> Stream<O> aggregateStream(TypedAggregation<?> aggregation, String inputCollectionName,
			Class<O> outputType) {
		return aggregateStream(aggregation, inputCollectionName, outputType, null);
	}

	@Override
	public <O> Stream<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		return aggregateStream(aggregation, getCollectionName(aggregation.getInputType()), outputType);
	}

	@Override
	public <O> Stream<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		return aggregateStream(aggregation, getCollectionName(inputType), outputType,
				queryOperations.createAggregation(aggregation, inputType).getAggregationOperationContext());
	}

	@Override
	public <O> Stream<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType) {
		return aggregateStream(aggregation, collectionName, outputType, null);
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> List<T> findAllAndRemove(Query query, String collectionName) {
		return (List<T>) findAllAndRemove(query, Object.class, collectionName);
	}

	@Override
	public <T> List<T> findAllAndRemove(Query query, Class<T> entityClass) {
		return findAllAndRemove(query, entityClass, getCollectionName(entityClass));
	}

	@Override
	public <T> List<T> findAllAndRemove(Query query, Class<T> entityClass, String collectionName) {
		return doFindAndDelete(collectionName, query, entityClass);
	}

	@Override
	public <T> UpdateResult replace(Query query, T replacement, ReplaceOptions options, String collectionName) {

		Assert.notNull(replacement, "Replacement must not be null");
		return replace(query, (Class<T>) ClassUtils.getUserClass(replacement), replacement, options, collectionName);
	}

	protected <S, T> UpdateResult replace(Query query, Class<S> entityType, T replacement, ReplaceOptions options,
			String collectionName) {

		Assert.notNull(query, "Query must not be null");
		Assert.notNull(replacement, "Replacement must not be null");
		Assert.notNull(options, "Options must not be null Use ReplaceOptions#none() instead");
		Assert.notNull(entityType, "EntityType must not be null");
		Assert.notNull(collectionName, "CollectionName must not be null");

		Assert.isTrue(query.getLimit() <= 1, "Query must not define a limit other than 1 ore none");
		Assert.isTrue(query.getSkip() <= 0, "Query must not define skip");

		UpdateContext updateContext = queryOperations.replaceSingleContext(query,
				operations.forEntity(replacement).toMappedDocument(this.mongoConverter), options.isUpsert());

		replacement = maybeCallBeforeConvert(replacement, collectionName);
		Document mappedReplacement = updateContext.getMappedUpdate(mappingContext.getPersistentEntity(entityType));
		maybeEmitEvent(new BeforeSaveEvent<>(replacement, mappedReplacement, collectionName));
		replacement = maybeCallBeforeSave(replacement, mappedReplacement, collectionName);

		MongoAction action = new MongoAction(writeConcern, MongoActionOperation.REPLACE, collectionName, entityType,
				mappedReplacement, updateContext.getQueryObject());

		UpdateResult result = doReplace(options, entityType, collectionName, updateContext,
				createCollectionPreparer(query, action), mappedReplacement);

		if (result.wasAcknowledged()) {

			maybeEmitEvent(new AfterSaveEvent<>(replacement, mappedReplacement, collectionName));
			maybeCallAfterSave(replacement, mappedReplacement, collectionName);
		}

		return result;
	}

	/**
	 * Retrieve and remove all documents matching the given {@code query} by calling {@link #find(Query, Class, String)}
	 * and {@link #remove(Query, Class, String)}, whereas the {@link Query} for {@link #remove(Query, Class, String)} is
	 * constructed out of the find result.
	 *
	 * @param collectionName
	 * @param query
	 * @param entityClass
	 * @return
	 */
	protected <T> List<T> doFindAndDelete(String collectionName, Query query, Class<T> entityClass) {
		return doFindAndDelete(collectionName, query, entityClass, QueryResultConverter.entity());
	}

	@SuppressWarnings("NullAway")
	<S, T> List<T> doFindAndDelete(String collectionName, Query query, Class<S> entityClass,
			QueryResultConverter<? super S, ? extends T> resultConverter) {

		List<Object> ids = new ArrayList<>();

		QueryResultConverterCallback<S, T> callback = new QueryResultConverterCallback<>(resultConverter,
				new ProjectingReadCallback<>(getConverter(), EntityProjection.nonProjecting(entityClass), collectionName)) {
			@Override
			public T doWith(Document object) {
				ids.add(object.get("_id"));
				return super.doWith(object);
			}
		};

		List<T> result = doFind(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(),
				entityClass, new QueryCursorPreparer(query, entityClass), callback);

		if (!CollectionUtils.isEmpty(result)) {

			Criteria[] criterias = ids.stream() //
					.map(it -> Criteria.where("_id").is(it)) //
					.toArray(Criteria[]::new);

			Query removeQuery = new Query(criterias.length == 1 ? criterias[0] : new Criteria().orOperator(criterias));
			if (query.hasReadPreference()) {
				removeQuery.withReadPreference(query.getReadPreference());
			}

			remove(removeQuery, entityClass, collectionName);
		}

		return result;
	}

	protected <O> AggregationResults<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType,
			@Nullable AggregationOperationContext context) {

		Assert.hasText(collectionName, "Collection name must not be null or empty");
		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		Assert.notNull(outputType, "Output type must not be null");

		return doAggregate(aggregation, collectionName, outputType,
				queryOperations.createAggregation(aggregation, context));
	}

	private <O> AggregationResults<O> doAggregate(Aggregation aggregation, String collectionName, Class<O> outputType,
			AggregationDefinition context) {
		return doAggregate(aggregation, collectionName, outputType, context.getAggregationOperationContext());
	}

	<T, O> AggregationResults<O> doAggregate(Aggregation aggregation, String collectionName, Class<T> outputType,
			QueryResultConverter<? super T, ? extends O> resultConverter) {

		return doAggregate(aggregation, collectionName, outputType, resultConverter, queryOperations
				.createAggregation(aggregation, (AggregationOperationContext) null).getAggregationOperationContext());
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	protected <O> AggregationResults<O> doAggregate(Aggregation aggregation, String collectionName, Class<O> outputType,
			AggregationOperationContext context) {
		return doAggregate(aggregation, collectionName, outputType, QueryResultConverter.entity(), context);
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	<T, O> AggregationResults<O> doAggregate(Aggregation aggregation, String collectionName, Class<T> outputType,
			QueryResultConverter<? super T, ? extends O> resultConverter, AggregationOperationContext context) {

		final DocumentCallback<O> callback;
		if (aggregation instanceof TypedAggregation<?> ta && outputType.isInterface()) {
			EntityProjection<T, ?> projection = operations.introspectProjection(outputType, ta.getInputType());
			ProjectingReadCallback cb = new ProjectingReadCallback(mongoConverter, projection, collectionName);
			callback = new QueryResultConverterCallback<>(resultConverter, cb);
		} else {

			callback = new QueryResultConverterCallback<>(resultConverter,
					new ReadDocumentCallback<>(mongoConverter, outputType, collectionName));
		}

		AggregationOptions options = aggregation.getOptions();
		AggregationUtil aggregationUtil = new AggregationUtil(queryMapper, mappingContext);

		if (options.isExplain()) {

			Document command = aggregationUtil.createCommand(collectionName, aggregation, context);

			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Executing aggregation: %s", serializeToJsonSafely(command)));
			}

			Document commandResult = executeCommand(command);
			return new AggregationResults<>(commandResult.get("results", new ArrayList<Document>(0)).stream()
					.map(callback::doWith).collect(Collectors.toList()), commandResult);
		}

		List<Document> pipeline = aggregationUtil.createPipeline(aggregation, context);

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(
					String.format("Executing aggregation: %s in collection %s", serializeToJsonSafely(pipeline), collectionName));
		}

		return execute(collectionName, collection -> {

			List<Document> rawResult = new ArrayList<>();
			CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
			Class<?> domainType = aggregation instanceof TypedAggregation ? ((TypedAggregation<?>) aggregation).getInputType()
					: null;

			Optional<Collation> collation = Optionals.firstNonEmpty(options::getCollation,
					() -> operations.forType(domainType) //
							.getCollation());

			AggregateIterable<Document> aggregateIterable = delegate.prepare(collection).aggregate(pipeline, Document.class) //
					.collation(collation.map(Collation::toMongoCollation).orElse(null));

			if (options.isAllowDiskUseSet()) {
				aggregateIterable = aggregateIterable.allowDiskUse(options.isAllowDiskUse());
			}

			if (options.getCursorBatchSize() != null) {
				aggregateIterable = aggregateIterable.batchSize(options.getCursorBatchSize());
			}

			options.getComment().ifPresent(aggregateIterable::comment);
			HintFunction hintFunction = options.getHintObject().map(HintFunction::from).orElseGet(HintFunction::empty);
			if (hintFunction.isPresent()) {
				aggregateIterable = hintFunction.apply(mongoDbFactory, aggregateIterable::hintString, aggregateIterable::hint);
			}

			if (options.hasExecutionTimeLimit()) {
				aggregateIterable = aggregateIterable.maxTime(options.getMaxTime().toMillis(), TimeUnit.MILLISECONDS);
			}

			if (options.isSkipResults()) {

				// toCollection only allowed for $out and $merge if those are the last stages
				if (aggregation.getPipeline().isOutOrMerge()) {
					aggregateIterable.toCollection();
				} else {
					aggregateIterable.first();
				}
				return new AggregationResults<>(Collections.emptyList(), new Document());
			}

			MongoIterable<O> iterable = aggregateIterable.map(val -> {

				rawResult.add(val);
				return callback.doWith(val);
			});

			return new AggregationResults<>(iterable.into(new ArrayList<>()),
					new Document("results", rawResult).append("ok", 1.0D));
		});
	}

	protected <O> Stream<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType,
			@Nullable AggregationOperationContext context) {
		return doAggregateStream(aggregation, collectionName, outputType, QueryResultConverter.entity(), context);
	}

	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	<T, O> Stream<O> doAggregateStream(Aggregation aggregation, String collectionName, Class<T> outputType,
			QueryResultConverter<? super T, ? extends O> resultConverter, @Nullable AggregationOperationContext context) {

		Assert.notNull(aggregation, "Aggregation pipeline must not be null");
		Assert.hasText(collectionName, "Collection name must not be null or empty");
		Assert.notNull(outputType, "Output type must not be null");
		Assert.isTrue(!aggregation.getOptions().isExplain(), "Can't use explain option with streaming");

		AggregationDefinition aggregationDefinition = queryOperations.createAggregation(aggregation, context);

		AggregationOptions options = aggregation.getOptions();
		List<Document> pipeline = aggregationDefinition.getAggregationPipeline();

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(
					String.format("Streaming aggregation: %s in collection %s", serializeToJsonSafely(pipeline), collectionName));
		}

		DocumentCallback<O> readCallback = new QueryResultConverterCallback<>(resultConverter,
				new ReadDocumentCallback<>(mongoConverter, outputType, collectionName));

		return execute(collectionName, (CollectionCallback<Stream<O>>) collection -> {

			CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);

			AggregateIterable<Document> cursor = delegate.prepare(collection).aggregate(pipeline, Document.class);

			if (options.isAllowDiskUseSet()) {
				cursor = cursor.allowDiskUse(options.isAllowDiskUse());
			}

			if (options.getCursorBatchSize() != null) {
				cursor = cursor.batchSize(options.getCursorBatchSize());
			}

			options.getComment().ifPresent(cursor::comment);
			HintFunction hintFunction = options.getHintObject().map(HintFunction::from).orElseGet(HintFunction::empty);
			if (options.getHintObject().isPresent()) {
				cursor = hintFunction.apply(mongoDbFactory, cursor::hintString, cursor::hint);
			}

			if (options.hasExecutionTimeLimit()) {
				cursor = cursor.maxTime(options.getMaxTime().toMillis(), TimeUnit.MILLISECONDS);
			}

			Class<?> domainType = aggregation instanceof TypedAggregation<?> typedAggregation
					? typedAggregation.getInputType()
					: null;

			Optionals.firstNonEmpty(options::getCollation, //
					() -> operations.forType(domainType).getCollation()) //
					.map(Collation::toMongoCollation) //
					.ifPresent(cursor::collation);

			return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator, readCallback).stream();
		});
	}

	@Override
	public <T> ExecutableFind<T> query(Class<T> domainType) {
		return new ExecutableFindOperationSupport(this).query(domainType);
	}

	@Override
	public <T> ExecutableUpdate<T> update(Class<T> domainType) {
		return new ExecutableUpdateOperationSupport(this).update(domainType);
	}

	@Override
	public <T> ExecutableRemove<T> remove(Class<T> domainType) {
		return new ExecutableRemoveOperationSupport(this).remove(domainType);
	}

	@Override
	public <T> ExecutableAggregation<T> aggregateAndReturn(Class<T> domainType) {
		return new ExecutableAggregationOperationSupport(this).aggregateAndReturn(domainType);
	}

	@Override
	public <T> ExecutableMapReduce<T> mapReduce(Class<T> domainType) {
		return new ExecutableMapReduceOperationSupport(this).mapReduce(domainType);
	}

	@Override
	public <T> ExecutableInsert<T> insert(Class<T> domainType) {
		return new ExecutableInsertOperationSupport(this).insert(domainType);
	}

	protected String replaceWithResourceIfNecessary(String function) {

		if (this.resourceLoader != null && ResourceUtils.isUrl(function)) {

			Resource functionResource = resourceLoader.getResource(function);

			if (!functionResource.exists()) {
				throw new InvalidDataAccessApiUsageException(String.format("Resource %s not found", function));
			}

			Scanner scanner = null;

			try {
				scanner = new Scanner(functionResource.getInputStream());
				return scanner.useDelimiter("\\A").next();
			} catch (IOException e) {
				throw new InvalidDataAccessApiUsageException(String.format("Cannot read map-reduce file %s", function), e);
			} finally {
				if (scanner != null) {
					scanner.close();
				}
			}
		}

		return function;
	}

	@Override
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	public Set<String> getCollectionNames() {
		return execute(db -> {
			Set<String> result = new LinkedHashSet<>();
			for (String name : db.listCollectionNames()) {
				result.add(name);
			}
			return result;
		});
	}

	public MongoDatabase getDb() {
		return doGetDatabase();
	}

	protected MongoDatabase doGetDatabase() {
		return MongoDatabaseUtils.getDatabase(mongoDbFactory, sessionSynchronization);
	}

	protected MongoDatabase prepareDatabase(MongoDatabase database) {
		return database;
	}

	protected <E extends MongoMappingEvent<T>, T> E maybeEmitEvent(E event) {
		eventDelegate.publishEvent(event);
		return event;
	}

	protected <T> T maybeCallBeforeConvert(T object, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(BeforeConvertCallback.class, object, collection);
		}

		return object;
	}

	protected <T> T maybeCallBeforeSave(T object, Document document, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(BeforeSaveCallback.class, object, document, collection);
		}

		return object;
	}

	protected <T> T maybeCallAfterSave(T object, Document document, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(AfterSaveCallback.class, object, document, collection);
		}

		return object;
	}

	protected <T> T maybeCallAfterConvert(T object, Document document, String collection) {

		if (entityCallbacks != null) {
			return entityCallbacks.callback(AfterConvertCallback.class, object, document, collection);
		}

		return object;
	}

	/**
	 * Create the specified collection using the provided options
	 *
	 * @param collectionName
	 * @param collectionOptions
	 * @return the collection that was created
	 */
	@SuppressWarnings("ConstantConditions")
	protected MongoCollection<Document> doCreateCollection(String collectionName, Document collectionOptions) {
		return doCreateCollection(collectionName, getCreateCollectionOptions(collectionOptions));
	}

	/**
	 * Create the specified collection using the provided options
	 *
	 * @param collectionName
	 * @param collectionOptions
	 * @return the collection that was created
	 * @since 3.3.3
	 */
	@SuppressWarnings({ "ConstantConditions", "NullAway" })
	protected MongoCollection<Document> doCreateCollection(String collectionName,
			CreateCollectionOptions collectionOptions) {

		return execute(db -> {

			db.createCollection(collectionName, collectionOptions);

			MongoCollection<Document> coll = db.getCollection(collectionName, Document.class);

			// TODO: Emit a collection created event
			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(String.format("Created collection [%s]",
						coll.getNamespace() != null ? coll.getNamespace().getCollectionName() : collectionName));
			}
			return coll;
		});
	}

	private CreateCollectionOptions getCreateCollectionOptions(Document document) {

		CreateCollectionOptions options = new CreateCollectionOptions();

		if (document.containsKey("capped")) {
			options.capped((Boolean) document.get("capped"));
		}
		if (document.containsKey("size")) {
			options.sizeInBytes(((Number) document.get("size")).longValue());
		}
		if (document.containsKey("max")) {
			options.maxDocuments(((Number) document.get("max")).longValue());
		}

		if (document.containsKey("collation")) {
			options.collation(IndexConverters.fromDocument(document.get("collation", Document.class)));
		}

		if (document.containsKey("validator")) {

			ValidationOptions validation = new ValidationOptions();

			if (document.containsKey("validationLevel")) {
				validation.validationLevel(ValidationLevel.fromString(document.getString("validationLevel")));
			}
			if (document.containsKey("validationAction")) {
				validation.validationAction(ValidationAction.fromString(document.getString("validationAction")));
			}

			validation.validator(document.get("validator", Document.class));
			options.validationOptions(validation);
		}

		if (document.containsKey("timeseries")) {

			Document timeSeries = document.get("timeseries", Document.class);
			TimeSeriesOptions timeseries = new TimeSeriesOptions(timeSeries.getString("timeField"));
			if (timeSeries.containsKey("metaField")) {
				timeseries.metaField(timeSeries.getString("metaField"));
			}
			if (timeSeries.containsKey("granularity")) {
				timeseries.granularity(TimeSeriesGranularity.valueOf(timeSeries.getString("granularity").toUpperCase()));
			}
			options.timeSeriesOptions(timeseries);
		}
		return options;
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
	 * The query document is specified as a standard {@link Document} and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param fields the document that specifies the fields to be returned.
	 * @param entityClass the parameterized type of the returned list.
	 * @return the converted object or {@literal null} if none exists.
	 */
	@Nullable
	protected <T> T doFindOne(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, Class<T> entityClass) {
		return doFindOne(collectionName, collectionPreparer, query, fields, CursorPreparer.NO_OP_PREPARER, entityClass);
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
	 * The query document is specified as a standard {@link Document} and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param fields the document that specifies the fields to be returned.
	 * @param preparer the preparer used to modify the cursor on execution.
	 * @param entityClass the parameterized type of the returned list.
	 * @return the converted object or {@literal null} if none exists.
	 * @since 2.2
	 */
	@Nullable
	@SuppressWarnings("ConstantConditions")
	protected <T> T doFindOne(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, CursorPreparer preparer, Class<T> entityClass) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
		Document mappedFields = queryContext.getMappedFields(entity, EntityProjection.nonProjecting(entityClass));
		Document mappedQuery = queryContext.getMappedQuery(entity);

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("findOne using query: %s fields: %s for class: %s in collection: %s",
					serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
		}

		return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
				new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to a List using the template's converter. The
	 * query document is specified as a standard Document and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record
	 * @param fields the document that specifies the fields to be returned
	 * @param entityClass the parameterized type of the returned list.
	 * @return the List of converted objects.
	 */
	protected <T> List<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, Class<T> entityClass) {
		return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
				new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to a List of the specified type. The object is
	 * converted from the MongoDB native representation using an instance of {@see MongoConverter}. The query document is
	 * specified as a standard Document and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from.
	 * @param collectionPreparer the preparer to prepare the collection for the actual use.
	 * @param query the query document that specifies the criteria used to find a record.
	 * @param fields the document that specifies the fields to be returned.
	 * @param entityClass the parameterized type of the returned list.
	 * @param preparer allows for customization of the {@link FindIterable} used when iterating over the result set,
	 *          (apply limits, skips and so on).
	 * @return the {@link List} of converted objects.
	 */
	protected <T> List<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document query, Document fields, Class<T> entityClass, CursorPreparer preparer) {
		return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
				new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
	}

	protected <S, T> List<T> doFind(String collectionName,
			CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
			Class<S> entityClass, @Nullable CursorPreparer preparer, DocumentCallback<T> objectCallback) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
		Document mappedFields = queryContext.getMappedFields(entity, EntityProjection.nonProjecting(entityClass));
		Document mappedQuery = queryContext.getMappedQuery(entity);

		if (LOGGER.isDebugEnabled()) {

			Document mappedSort = preparer instanceof SortingQueryCursorPreparer sqcp
					? getMappedSortObject(sqcp.getSortObject(), entity)
					: null;
			LOGGER.debug(String.format("find using query: %s fields: %s sort: %s for class: %s in collection: %s",
					serializeToJsonSafely(mappedQuery), mappedFields, serializeToJsonSafely(mappedSort), entityClass,
					collectionName));
		}

		return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields, null),
				preparer != null ? preparer : CursorPreparer.NO_OP_PREPARER, objectCallback, collectionName);
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to a List of the specified targetClass while
	 * using sourceClass for mapping the query.
	 *
	 * @since 2.0
	 */
	<T, R> List<R> doFind(CollectionPreparer<MongoCollection<Document>> collectionPreparer, String collectionName,
			Document query, Document fields, Class<?> sourceClass, Class<T> targetClass,
			QueryResultConverter<? super T, ? extends R> resultConverter, CursorPreparer preparer) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
		EntityProjection<T, ?> projection = operations.introspectProjection(targetClass, sourceClass);

		QueryContext queryContext = queryOperations.createQueryContext(new BasicQuery(query, fields));
		Document mappedFields = queryContext.getMappedFields(entity, projection);
		Document mappedQuery = queryContext.getMappedQuery(entity);

		if (LOGGER.isDebugEnabled()) {

			Document mappedSort = preparer instanceof SortingQueryCursorPreparer sqcp
					? getMappedSortObject(sqcp.getSortObject(), entity)
					: null;
			LOGGER.debug(String.format("find using query: %s fields: %s sort: %s for class: %s in collection: %s",
					serializeToJsonSafely(mappedQuery), mappedFields, serializeToJsonSafely(mappedSort), sourceClass,
					collectionName));
		}

		DocumentCallback<R> callback = getResultReader(projection, collectionName, resultConverter);
		return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields, null), preparer,
				callback, collectionName);
	}

	/**
	 * Convert given {@link CollectionOptions} to a document and take the domain type information into account when
	 * creating a mapped schema for validation. <br />
	 *
	 * @param collectionOptions can be {@literal null}.
	 * @param targetType must not be {@literal null}. Use {@link Object} type instead.
	 * @return never {@literal null}.
	 * @since 2.1
	 */
	protected Document convertToDocument(@Nullable CollectionOptions collectionOptions, Class<?> targetType) {

		if (collectionOptions == null) {
			return new Document();
		}

		Document doc = new Document();
		collectionOptions.getCapped().ifPresent(val -> doc.put("capped", val));
		collectionOptions.getSize().ifPresent(val -> doc.put("size", val));
		collectionOptions.getMaxDocuments().ifPresent(val -> doc.put("max", val));
		collectionOptions.getCollation().ifPresent(val -> doc.append("collation", val.toDocument()));

		collectionOptions.getValidationOptions().ifPresent(it -> {

			it.getValidationLevel().ifPresent(val -> doc.append("validationLevel", val.getValue()));
			it.getValidationAction().ifPresent(val -> doc.append("validationAction", val.getValue()));
			it.getValidator().ifPresent(val -> doc.append("validator", getMappedValidator(val, targetType)));
		});

		collectionOptions.getTimeSeriesOptions().map(operations.forType(targetType)::mapTimeSeriesOptions).ifPresent(it -> {

			Document timeseries = new Document("timeField", it.getTimeField());
			if (StringUtils.hasText(it.getMetaField())) {
				timeseries.append("metaField", it.getMetaField());
			}
			if (!Granularity.DEFAULT.equals(it.getGranularity())) {
				timeseries.append("granularity", it.getGranularity().name().toLowerCase());
			}
			doc.put("timeseries", timeseries);
		});

		collectionOptions.getChangeStreamOptions().map(it -> new Document("enabled", it.getPreAndPostImages()))
				.ifPresent(it -> {
					doc.put("changeStreamPreAndPostImages", it);
				});

		return doc;
	}

	Document getMappedValidator(Validator validator, Class<?> domainType) {

		Document validationRules = validator.toDocument();

		if (validationRules.containsKey("$jsonSchema")) {
			return schemaMapper.mapSchema(validationRules, domainType);
		}

		return queryMapper.getMappedObject(validationRules, mappingContext.getPersistentEntity(domainType));
	}

	/**
	 * Map the results of an ad-hoc query on the default MongoDB collection to an object using the template's converter.
	 * The first document that matches the query is returned and also removed from the collection in the database. <br />
	 * The query document is specified as a standard Document and so is the fields specification.
	 *
	 * @param collectionName name of the collection to retrieve the objects from
	 * @param query the query document that specifies the criteria used to find a record
	 * @param entityClass the parameterized type of the returned list.
	 * @return the List of converted objects.
	 */
	@SuppressWarnings("ConstantConditions")
	protected <T> @Nullable T doFindAndRemove(CollectionPreparer collectionPreparer, String collectionName,
			Document query, @Nullable Document fields, @Nullable Document sort, @Nullable Collation collation,
			Class<T> entityClass) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("findAndRemove using query: %s fields: %s sort: %s for class: %s in collection: %s",
					serializeToJsonSafely(query), fields, serializeToJsonSafely(sort), entityClass, collectionName));
		}

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
				queryMapper.getMappedObject(query, entity), fields, sort, collation),
				new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
	}

	@SuppressWarnings("ConstantConditions")
	<S, T> @Nullable T doFindAndModify(CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			String collectionName, Document query, @Nullable Document fields, @Nullable Document sort, Class<S> entityClass,
			UpdateDefinition update, @Nullable FindAndModifyOptions options,
			QueryResultConverter<? super S, ? extends T> resultConverter) {

		if (options == null) {
			options = new FindAndModifyOptions();
		}

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);

		UpdateContext updateContext = queryOperations.updateSingleContext(update, query, false);
		updateContext.increaseVersionForUpdateIfNecessary(entity);

		Document mappedQuery = updateContext.getMappedQuery(entity);
		Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(entityClass)
				: updateContext.getMappedUpdate(entity);

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format(
					"findAndModify using query: %s fields: %s sort: %s for class: %s and update: %s in collection: %s",
					serializeToJsonSafely(mappedQuery), fields, serializeToJsonSafely(sort), entityClass,
					serializeToJsonSafely(mappedUpdate), collectionName));
		}

		DocumentCallback<T> callback = getResultReader(EntityProjection.nonProjecting(entityClass), collectionName,
				resultConverter);

		return executeFindOneInternal(
				new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
						update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
				callback, collectionName);
	}

	/**
	 * Customize this part for findAndReplace.
	 *
	 * @param collectionName The name of the collection to perform the operation in.
	 * @param mappedQuery the query to look up documents.
	 * @param mappedFields the fields to project the result to.
	 * @param mappedSort the sort to be applied when executing the query.
	 * @param collation collation settings for the query. Can be {@literal null}.
	 * @param entityType the source domain type.
	 * @param replacement the replacement {@link Document}.
	 * @param options applicable options.
	 * @param resultType the target domain type.
	 * @return {@literal null} if object does not exist, {@link FindAndReplaceOptions#isReturnNew() return new} is
	 *         {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
	 */
	@Nullable
	protected <S, T> T doFindAndReplace(CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			String collectionName, Document mappedQuery, Document mappedFields, Document mappedSort,
			com.mongodb.client.model.@Nullable Collation collation, Class<S> entityType, Document replacement,
			FindAndReplaceOptions options, Class<T> resultType) {

		EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);

		return doFindAndReplace(collectionPreparer, collectionName, mappedQuery, mappedFields, mappedSort, collation,
				entityType, replacement, options, projection, QueryResultConverter.entity());
	}

	CollectionPreparerDelegate createDelegate(Query query) {
		return CollectionPreparerDelegate.of(query);
	}

	CollectionPreparer<MongoCollection<Document>> createCollectionPreparer(Query query, @Nullable MongoAction action) {
		CollectionPreparer<MongoCollection<Document>> collectionPreparer = createDelegate(query);
		if (action == null) {
			return collectionPreparer;
		}
		return collectionPreparer.andThen(collection -> {
			WriteConcern writeConcern = prepareWriteConcern(action);
			return writeConcern != null ? collection.withWriteConcern(writeConcern) : collection;
		});
	}

	/**
	 * Customize this part for findAndReplace.
	 *
	 * @param collectionName The name of the collection to perform the operation in.
	 * @param mappedQuery the query to look up documents.
	 * @param mappedFields the fields to project the result to.
	 * @param mappedSort the sort to be applied when executing the query.
	 * @param collation collation settings for the query. Can be {@literal null}.
	 * @param entityType the source domain type.
	 * @param replacement the replacement {@link Document}.
	 * @param options applicable options.
	 * @param projection the projection descriptor.
	 * @return {@literal null} if object does not exist, {@link FindAndReplaceOptions#isReturnNew() return new} is
	 *         {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
	 * @since 3.4
	 */
	@Nullable
	private <S, T, R> R doFindAndReplace(CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			String collectionName, Document mappedQuery, Document mappedFields, Document mappedSort,
			com.mongodb.client.model.@Nullable Collation collation, Class<T> entityType, Document replacement,
			FindAndReplaceOptions options, EntityProjection<S, T> projection,
			QueryResultConverter<? super S, ? extends R> resultConverter) {

		if (LOGGER.isDebugEnabled()) {
			LOGGER
					.debug(String.format(
							"findAndReplace using query: %s fields: %s sort: %s for class: %s and replacement: %s "
									+ "in collection: %s",
							serializeToJsonSafely(mappedQuery), serializeToJsonSafely(mappedFields),
							serializeToJsonSafely(mappedSort), entityType, serializeToJsonSafely(replacement), collectionName));
		}

		DocumentCallback<R> callback = getResultReader(projection, collectionName, resultConverter);
		return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields, mappedSort,
				replacement, collation, options), callback, collectionName);
	}

	@SuppressWarnings("NullAway")
	private <S> UpdateResult doReplace(ReplaceOptions options, Class<S> entityType, String collectionName,
			UpdateContext updateContext, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
			Document replacement) {

		MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityType);

		ReplaceCallback replaceCallback = new ReplaceCallback(collectionPreparer,
				updateContext.getMappedQuery(entity), replacement, updateContext.getReplaceOptions(entity, it -> {
					it.upsert(options.isUpsert());
				}));
		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug(String.format("replace one using query: %s for class: %s in collection: %s",
					serializeToJsonSafely(updateContext.getMappedQuery(entity)), entityType, collectionName));
		}

		return execute(collectionName, replaceCallback);
	}

	/**
	 * Populates the id property of the saved object, if it's not set already.
	 *
	 * @param savedObject
	 * @param id
	 */
	protected <T> T populateIdIfNecessary(T savedObject, Object id) {

		return operations.forEntity(savedObject, mongoConverter.getConversionService()) //
				.populateIdIfNecessary(id);
	}

	private MongoCollection<Document> getAndPrepareCollection(MongoDatabase db, String collectionName) {
		try {
			MongoCollection<Document> collection = db.getCollection(collectionName, Document.class);
			collection = prepareCollection(collection);
			return collection;
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	/**
	 * Internal method using callbacks to do queries against the datastore that requires reading a single object from a
	 * collection of objects. It will take the following steps
	 * <ol>
	 * <li>Execute the given {@link CollectionCallback} for a {@link Document}.</li>
	 * <li>Apply the given {@link DocumentCallback} to each of the {@link Document}s to obtain the result.</li>
	 * <ol>
	 *
	 * @param <T>
	 * @param collectionCallback the callback to retrieve the {@link Document} with
	 * @param documentCallback the {@link DocumentCallback} to transform {@link Document}s into the actual domain type
	 * @param collectionName the collection to be queried
	 * @return
	 */
	@Nullable
	private <T> T executeFindOneInternal(CollectionCallback<Document> collectionCallback,
			DocumentCallback<T> documentCallback, String collectionName) {

		try {

			Document document = collectionCallback.doInCollection(getAndPrepareCollection(doGetDatabase(), collectionName));
			return document != null ? documentCallback.doWith(document) : null;
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	/**
	 * Internal method using callback to do queries against the datastore that requires reading a collection of objects.
	 * It will take the following steps
	 * <ol>
	 * <li>Execute the given {@link CollectionCallback} for a {@link FindIterable}.</li>
	 * <li>Prepare that {@link FindIterable} with the given {@link CursorPreparer} (will be skipped if
	 * {@link CursorPreparer} is {@literal null}</li>
	 * <li>Iterate over the {@link FindIterable} and applies the given {@link DocumentCallback} to each of the
	 * {@link Document}s collecting the actual result {@link List}.</li>
	 * <ol>
	 *
	 * @param <T>
	 * @param collectionCallback the callback to retrieve the {@link FindIterable} with
	 * @param preparer the {@link CursorPreparer} to potentially modify the {@link FindIterable} before iterating over it
	 * @param documentCallback the {@link DocumentCallback} to transform {@link Document}s into the actual domain type
	 * @param collectionName the collection to be queried
	 * @return
	 */
	private <T> List<T> executeFindMultiInternal(CollectionCallback<FindIterable<Document>> collectionCallback,
			CursorPreparer preparer, DocumentCallback<T> documentCallback, String collectionName) {

		try {

			try (MongoCursor<Document> cursor = preparer
					.initiateFind(getAndPrepareCollection(doGetDatabase(), collectionName), collectionCallback::doInCollection)
					.iterator()) {

				int available = cursor.available();
				List<T> result = available > 0 ? new ArrayList<>(available) : new ArrayList<>();

				while (cursor.hasNext()) {
					Document object = cursor.next();
					result.add(documentCallback.doWith(object));
				}

				return result;
			}
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	private void executeQueryInternal(CollectionCallback<FindIterable<Document>> collectionCallback,
			CursorPreparer preparer, DocumentCallbackHandler callbackHandler, String collectionName) {

		try (MongoCursor<Document> cursor = preparer
				.initiateFind(getAndPrepareCollection(doGetDatabase(), collectionName), collectionCallback::doInCollection)
				.iterator()) {

			while (cursor.hasNext()) {
				callbackHandler.processDocument(cursor.next());
			}
		} catch (RuntimeException e) {
			throw potentiallyConvertRuntimeException(e, exceptionTranslator);
		}
	}

	@SuppressWarnings("unchecked")
	private <T, R> DocumentCallback<R> getResultReader(EntityProjection<T, ?> projection, String collectionName,
			QueryResultConverter<? super T, ? extends R> resultConverter) {

		DocumentCallback<T> readCallback = new ProjectingReadCallback<>(mongoConverter, projection, collectionName);

		return resultConverter == QueryResultConverter.entity() ? (DocumentCallback<R>) readCallback
				: new QueryResultConverterCallback<T, R>(resultConverter, readCallback);
	}

	public PersistenceExceptionTranslator getExceptionTranslator() {
		return exceptionTranslator;
	}

	@Nullable
	MongoPersistentEntity<?> getPersistentEntity(@Nullable Class<?> type) {
		return type != null ? mappingContext.getPersistentEntity(type) : null;
	}

	private static MongoConverter getDefaultMongoConverter(MongoDatabaseFactory factory) {

		DbRefResolver dbRefResolver = new DefaultDbRefResolver(factory);
		MongoCustomConversions conversions = new MongoCustomConversions(Collections.emptyList());

		MongoMappingContext mappingContext = new MongoMappingContext();
		mappingContext.setSimpleTypeHolder(conversions.getSimpleTypeHolder());
		mappingContext.afterPropertiesSet();

		MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mappingContext);
		converter.setCustomConversions(conversions);
		converter.setCodecRegistryProvider(factory);
		converter.afterPropertiesSet();

		return converter;
	}

	@Nullable
	private Document getMappedSortObject(@Nullable Query query, Class<?> type) {

		if (query == null) {
			return null;
		}

		return getMappedSortObject(query.getSortObject(), type);
	}

	private @Nullable Document getMappedSortObject(@Nullable Document sortObject, Class<?> type) {
		return getMappedSortObject(sortObject, mappingContext.getPersistentEntity(type));
	}

	private @Nullable Document getMappedSortObject(@Nullable Document sortObject,
			@Nullable MongoPersistentEntity<?> entity) {

		if (ObjectUtils.isEmpty(sortObject)) {
			return null;
		}

		return queryMapper.getMappedSort(sortObject, entity);
	}

	/**
	 * Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
	 * exception if the conversation failed. Thus allows safe re-throwing of the return value.
	 *
	 * @param ex the exception to translate
	 * @param exceptionTranslator the {@link PersistenceExceptionTranslator} to be used for translation
	 * @return
	 */
	static RuntimeException potentiallyConvertRuntimeException(RuntimeException ex,
			PersistenceExceptionTranslator exceptionTranslator) {
		RuntimeException resolved = exceptionTranslator.translateExceptionIfPossible(ex);
		return resolved == null ? ex : resolved;
	}

	// Callback implementations

	/**
	 * Simple {@link CollectionCallback} that takes a query {@link Document} plus an optional fields specification
	 * {@link Document} and executes that against the {@link MongoCollection}.
	 *
	 * @author Oliver Gierke
	 * @author Thomas Risberg
	 * @author Christoph Strobl
	 */
	private static class FindOneCallback implements CollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Optional<Document> fields;
		private final CursorPreparer cursorPreparer;

		FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
				CursorPreparer preparer) {

			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = Optional.of(fields).filter(it -> !ObjectUtils.isEmpty(fields));
			this.cursorPreparer = preparer;
		}

		@Override
		public Document doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {

			FindIterable<Document> iterable = cursorPreparer.initiateFind(collection,
					col -> collectionPreparer.prepare(col).find(query, Document.class));

			if (fields.isPresent()) {
				iterable = iterable.projection(fields.get());
			}

			return iterable.first();
		}
	}

	/**
	 * Simple {@link CollectionCallback} that takes a query {@link Document} plus an optional fields specification
	 * {@link Document} and executes that against the {@link MongoCollection}.
	 *
	 * @author Oliver Gierke
	 * @author Thomas Risberg
	 * @author Christoph Strobl
	 */
	private static class FindCallback implements CollectionCallback<FindIterable<Document>> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Document fields;
		private final com.mongodb.client.model.@Nullable Collation collation;

		public FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				Document fields, com.mongodb.client.model.@Nullable Collation collation) {

			Assert.notNull(query, "Query must not be null");
			Assert.notNull(fields, "Fields must not be null");

			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
			this.collation = collation;
		}

		@Override
		public FindIterable<Document> doInCollection(MongoCollection<Document> collection)
				throws MongoException, DataAccessException {

			FindIterable<Document> findIterable = collectionPreparer.prepare(collection).find(query, Document.class)
					.projection(fields);

			if (collation != null) {
				findIterable = findIterable.collation(collation);
			}
			return findIterable;
		}
	}

	/**
	 * Optimized {@link CollectionCallback} that takes an already mapped query and a nullable
	 * {@link com.mongodb.client.model.Collation} to execute a count query limited to one element.
	 *
	 * @author Christoph Strobl
	 * @since 2.0
	 */
	private class ExistsCallback implements CollectionCallback<Boolean> {

		private final CollectionPreparer collectionPreparer;
		private final Document mappedQuery;
		private final com.mongodb.client.model.@Nullable Collation collation;

		ExistsCallback(CollectionPreparer collectionPreparer, Document mappedQuery,
				com.mongodb.client.model.@Nullable Collation collation) {

			this.collectionPreparer = collectionPreparer;
			this.mappedQuery = mappedQuery;
			this.collation = collation;
		}

		@Override
		public Boolean doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {

			return doCount(collectionPreparer, collection.getNamespace().getCollectionName(), mappedQuery,
					new CountOptions().limit(1).collation(collation)) > 0;
		}
	}

	/**
	 * Simple {@link CollectionCallback} that takes a query {@link Document} plus an optional fields specification
	 * {@link Document} and executes that against the {@link MongoCollection}.
	 *
	 * @author Thomas Risberg
	 */
	private static class FindAndRemoveCallback implements CollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final @Nullable Document fields;
		private final @Nullable Document sort;
		private final Optional<Collation> collation;

		FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				@Nullable Document fields, @Nullable Document sort, @Nullable Collation collation) {
			this.collectionPreparer = collectionPreparer;

			this.query = query;
			this.fields = fields;
			this.sort = sort;
			this.collation = Optional.ofNullable(collation);
		}

		@Override
		public Document doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {

			FindOneAndDeleteOptions opts = new FindOneAndDeleteOptions().sort(sort).projection(fields);
			collation.map(Collation::toMongoCollation).ifPresent(opts::collation);

			return collectionPreparer.prepare(collection).findOneAndDelete(query, opts);
		}
	}

	private static class FindAndModifyCallback implements CollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final @Nullable Document fields;
		private final @Nullable Document sort;
		private final Object update;
		private final List<Document> arrayFilters;
		private final FindAndModifyOptions options;

		FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				@Nullable Document fields, @Nullable Document sort, Object update, List<Document> arrayFilters,
				FindAndModifyOptions options) {

			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
			this.sort = sort;
			this.update = update;
			this.arrayFilters = arrayFilters;
			this.options = options;
		}

		@Override
		public Document doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {

			FindOneAndUpdateOptions opts = new FindOneAndUpdateOptions();
			opts.sort(sort);
			if (options.isUpsert()) {
				opts.upsert(true);
			}
			opts.projection(fields);
			if (options.isReturnNew()) {
				opts.returnDocument(ReturnDocument.AFTER);
			}

			options.getCollation().map(Collation::toMongoCollation).ifPresent(opts::collation);

			if (!arrayFilters.isEmpty()) {
				opts.arrayFilters(arrayFilters);
			}

			if (update instanceof Document document) {
				return collectionPreparer.prepare(collection).findOneAndUpdate(query, document, opts);
			} else if (update instanceof List) {
				return collectionPreparer.prepare(collection).findOneAndUpdate(query, (List<Document>) update, opts);
			}

			throw new IllegalArgumentException(String.format("Using %s is not supported in findOneAndUpdate", update));
		}
	}

	/**
	 * {@link CollectionCallback} specific for find and remove operation.
	 *
	 * @author Mark Paluch
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	private static class FindAndReplaceCallback implements CollectionCallback<Document> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Document fields;
		private final Document sort;
		private final Document update;
		private final com.mongodb.client.model.@Nullable Collation collation;
		private final FindAndReplaceOptions options;

		FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
				Document fields, Document sort, Document update, com.mongodb.client.model.@Nullable Collation collation,
				FindAndReplaceOptions options) {
			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.fields = fields;
			this.sort = sort;
			this.update = update;
			this.options = options;
			this.collation = collation;
		}

		@Override
		public Document doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {

			FindOneAndReplaceOptions opts = new FindOneAndReplaceOptions();
			opts.sort(sort);
			opts.collation(collation);
			opts.projection(fields);

			if (options.isUpsert()) {
				opts.upsert(true);
			}

			if (options.isReturnNew()) {
				opts.returnDocument(ReturnDocument.AFTER);
			}

			return collectionPreparer.prepare(collection).findOneAndReplace(query, update, opts);
		}
	}

	/**
	 * Simple internal callback to allow operations on a {@link Document}.
	 *
	 * @author Oliver Gierke
	 * @author Thomas Darimont
	 */

	protected interface DocumentCallback<T> {

		T doWith(Document object);
	}

	/**
	 * Simple {@link DocumentCallback} that will transform {@link Document} into the given target type using the given
	 * {@link EntityReader}.
	 *
	 * @author Oliver Gierke
	 * @author Christoph Strobl
	 * @author Roman Puchkovskiy
	 */
	private class ReadDocumentCallback<T> implements DocumentCallback<T> {

		private final EntityReader<? super T, Bson> reader;
		private final Class<T> type;
		private final String collectionName;

		ReadDocumentCallback(EntityReader<? super T, Bson> reader, Class<T> type, String collectionName) {

			this.reader = reader;
			this.type = type;
			this.collectionName = collectionName;
		}

		@Override
		public T doWith(Document document) {

			maybeEmitEvent(new AfterLoadEvent<>(document, type, collectionName));
			T entity = reader.read(type, document);

			if (entity == null) {
				throw new MappingException(String.format("EntityReader %s returned null", reader));
			}

			maybeEmitEvent(new AfterConvertEvent<>(document, entity, collectionName));
			entity = maybeCallAfterConvert(entity, document, collectionName);

			return entity;
		}
	}

	static class QueryResultConverterCallback<T, R> implements DocumentCallback<R> {

		private final QueryResultConverter<? super T, ? extends R> converter;
		private final DocumentCallback<T> delegate;

		QueryResultConverterCallback(QueryResultConverter<? super T, ? extends R> converter, DocumentCallback<T> delegate) {
			this.converter = converter;
			this.delegate = delegate;
		}

		@Override
		public R doWith(Document object) {

			Lazy<T> lazy = Lazy.of(() -> delegate.doWith(object));
			return converter.mapDocument(object, lazy::get);
		}
	}

	/**
	 * {@link DocumentCallback} transforming {@link Document} into the given {@code targetType} or decorating the
	 * {@code sourceType} with a {@literal projection} in case the {@code targetType} is an {@literal interface}.
	 *
	 * @param <S>
	 * @param <T>
	 * @since 2.0
	 */
	private class ProjectingReadCallback<S, T> implements DocumentCallback<T> {

		private final MongoConverter mongoConverter;
		private final EntityProjection<T, S> projection;
		private final String collectionName;

		ProjectingReadCallback(MongoConverter mongoConverter, EntityProjection<T, S> projection, String collectionName) {

			this.mongoConverter = mongoConverter;
			this.projection = projection;
			this.collectionName = collectionName;
		}

		@Override
		@SuppressWarnings("unchecked")
		public T doWith(Document document) {

			maybeEmitEvent(new AfterLoadEvent<>(document, projection.getMappedType().getType(), collectionName));

			Object entity = mongoConverter.project(projection, document);

			if (entity == null) {
				throw new MappingException(String.format("EntityReader %s returned null", mongoConverter));
			}

			maybeEmitEvent(new AfterConvertEvent<>(document, entity, collectionName));
			return (T) maybeCallAfterConvert(entity, document, collectionName);
		}
	}

	class QueryCursorPreparer implements SortingQueryCursorPreparer {

		private final Query query;
		private final Document sortObject;
		private final int limit;
		private final long skip;
		private final @Nullable Class<?> type;

		QueryCursorPreparer(Query query, @Nullable Class<?> type) {
			this(query, query.getSortObject(), query.getLimit(), query.getSkip(), type);
		}

		QueryCursorPreparer(Query query, Document sortObject, int limit, long skip, @Nullable Class<?> type) {
			this.query = query;
			this.sortObject = sortObject;
			this.limit = limit;
			this.skip = skip;
			this.type = type;
		}

		@Override
		public FindIterable<Document> prepare(FindIterable<Document> iterable) {

			FindIterable<Document> cursorToUse = iterable;

			operations.forType(type).getCollation(query) //
					.map(Collation::toMongoCollation) //
					.ifPresent(cursorToUse::collation);

			Meta meta = query.getMeta();
			HintFunction hintFunction = HintFunction.from(query.getHint());
			if (skip <= 0 && limit <= 0 && ObjectUtils.isEmpty(sortObject) && hintFunction.isEmpty() && meta.isEmpty()
					&& query.getCollation().isEmpty()) {
				return cursorToUse;
			}

			try {
				if (skip > 0) {
					cursorToUse = cursorToUse.skip((int) skip);
				}
				if (limit > 0) {
					cursorToUse = cursorToUse.limit(limit);
				}
				if (!ObjectUtils.isEmpty(sortObject)) {
					Document sort = type != null ? getMappedSortObject(sortObject, type) : sortObject;
					cursorToUse = cursorToUse.sort(sort);
				}

				if (hintFunction.isPresent()) {
					cursorToUse = hintFunction.apply(mongoDbFactory, cursorToUse::hintString, cursorToUse::hint);
				}

				if (meta.hasValues()) {

					if (meta.hasComment()) {
						cursorToUse = cursorToUse.comment(meta.getRequiredComment());
					}

					if (meta.hasMaxTime()) {
						cursorToUse = cursorToUse.maxTime(meta.getRequiredMaxTimeMsec(), TimeUnit.MILLISECONDS);
					}

					if (meta.getCursorBatchSize() != null) {
						cursorToUse = cursorToUse.batchSize(meta.getCursorBatchSize());
					}

					if (meta.getAllowDiskUse() != null) {
						cursorToUse = cursorToUse.allowDiskUse(meta.getAllowDiskUse());
					}

					for (Meta.CursorOption option : meta.getFlags()) {

						switch (option) {

							case NO_TIMEOUT:
								cursorToUse = cursorToUse.noCursorTimeout(true);
								break;
							case PARTIAL:
								cursorToUse = cursorToUse.partial(true);
								break;
							case SECONDARY_READS:
								break;
							default:
								throw new IllegalArgumentException(String.format("%s is no supported flag.", option));
						}
					}
				}

			} catch (RuntimeException e) {
				throw potentiallyConvertRuntimeException(e, exceptionTranslator);
			}

			return cursorToUse;
		}

		@Nullable
		@Override
		public Document getSortObject() {
			return sortObject;
		}
	}

	/**
	 * {@link DocumentCallback} that assumes a {@link GeoResult} to be created, delegates actual content unmarshalling to
	 * a delegate and creates a {@link GeoResult} from the result.
	 *
	 * @author Oliver Gierke
	 * @author Christoph Strobl
	 */
	static class GeoNearResultDocumentCallback<T> implements DocumentCallback<GeoResult<T>> {

		private final String distanceField;
		private final DocumentCallback<T> delegate;
		private final Metric metric;

		/**
		 * Creates a new {@link GeoNearResultDocumentCallback} using the given {@link DocumentCallback} delegate for
		 * {@link GeoResult} content unmarshalling.
		 *
		 * @param distanceField the field to read the distance from.
		 * @param delegate must not be {@literal null}.
		 * @param metric the {@link Metric} to apply to the result distance.
		 */
		GeoNearResultDocumentCallback(String distanceField, DocumentCallback<T> delegate, Metric metric) {

			Assert.notNull(delegate, "DocumentCallback must not be null");

			this.distanceField = distanceField;
			this.delegate = delegate;
			this.metric = metric;
		}

		@Override
		public GeoResult<T> doWith(Document object) {

			double distance = Double.NaN;
			if (object.containsKey(distanceField)) {
				distance = NumberUtils.convertNumberToTargetClass(object.get(distanceField, Number.class), Double.class);
			}

			T doWith = delegate.doWith(object);

			return new GeoResult<>(doWith, Distance.of(distance, metric));
		}
	}

	/**
	 * @return the {@link MongoDatabaseFactory} in use.
	 * @since 3.1.4
	 */
	public MongoDatabaseFactory getMongoDatabaseFactory() {
		return mongoDbFactory;
	}

	/**
	 * A {@link CloseableIterator} that is backed by a MongoDB {@link MongoCollection}.
	 *
	 * @author Thomas Darimont
	 * @since 1.7
	 */
	static class CloseableIterableCursorAdapter<T> implements CloseableIterator<T> {

		private volatile @Nullable MongoCursor<Document> cursor;
		private PersistenceExceptionTranslator exceptionTranslator;
		private DocumentCallback<T> objectReadCallback;

		/**
		 * Creates a new {@link CloseableIterableCursorAdapter} backed by the given {@link MongoCollection}.
		 */
		CloseableIterableCursorAdapter(MongoIterable<Document> cursor, PersistenceExceptionTranslator exceptionTranslator,
				DocumentCallback<T> objectReadCallback) {

			this.cursor = cursor.iterator();
			this.exceptionTranslator = exceptionTranslator;
			this.objectReadCallback = objectReadCallback;
		}

		CloseableIterableCursorAdapter(MongoCursor<Document> cursor, PersistenceExceptionTranslator exceptionTranslator,
				DocumentCallback<T> objectReadCallback) {

			this.cursor = cursor;
			this.exceptionTranslator = exceptionTranslator;
			this.objectReadCallback = objectReadCallback;
		}

		@Override
		public boolean hasNext() {

			MongoCursor<Document> cursor = this.cursor;

			if (cursor == null) {
				return false;
			}

			try {
				return cursor.hasNext();
			} catch (RuntimeException ex) {
				throw potentiallyConvertRuntimeException(ex, exceptionTranslator);
			}
		}

		@Nullable
		@Override
		public T next() {

			if (cursor == null) {
				return null;
			}

			try {
				Document item = cursor.next();
				return objectReadCallback.doWith(item);
			} catch (RuntimeException ex) {
				throw potentiallyConvertRuntimeException(ex, exceptionTranslator);
			}
		}

		@Override
		public void close() {

			MongoCursor<Document> c = cursor;

			try {

				if (c != null) {
					c.close();
				}
			} catch (RuntimeException ex) {
				throw potentiallyConvertRuntimeException(ex, exceptionTranslator);
			} finally {
				cursor = null;
			}
		}
	}

	/**
	 * {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the
	 * server through the driver API. <br />
	 * The prepare steps for {@link MongoDatabase} and {@link MongoCollection} proxy the target and invoke the desired
	 * target method matching the actual arguments plus a {@link ClientSession}.
	 *
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	static class SessionBoundMongoTemplate extends MongoTemplate {

		private final MongoTemplate delegate;
		private final ClientSession session;

		/**
		 * @param session must not be {@literal null}.
		 * @param that must not be {@literal null}.
		 */
		SessionBoundMongoTemplate(ClientSession session, MongoTemplate that) {

			super(that.getMongoDatabaseFactory().withSession(session), that);

			this.delegate = that;
			this.session = session;
		}

		@Override
		public MongoCollection<Document> getCollection(@Nullable String collectionName) {

			// native MongoDB objects that offer methods with ClientSession must not be proxied.
			return delegate.getCollection(collectionName);
		}

		@Override
		public MongoDatabase getDb() {

			// native MongoDB objects that offer methods with ClientSession must not be proxied.
			return delegate.getDb();
		}

		@Override
		protected boolean countCanBeEstimated(Document filter, CountOptions options) {
			return false;
		}
	}

	@FunctionalInterface
	interface CountExecution {
		long countDocuments(CollectionPreparer collectionPreparer, String collection, Document filter,
				CountOptions options);
	}

	private static class ReplaceCallback implements CollectionCallback<UpdateResult> {

		private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
		private final Document query;
		private final Document update;
		private final com.mongodb.client.model.ReplaceOptions options;

		ReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document update,
				com.mongodb.client.model.ReplaceOptions options) {
			this.collectionPreparer = collectionPreparer;
			this.query = query;
			this.update = update;
			this.options = options;
		}

		@Override
		public UpdateResult doInCollection(MongoCollection<Document> collection)
				throws MongoException, DataAccessException {
			return collectionPreparer.prepare(collection).replaceOne(query, update, options);
		}
	}
}