EntityOperations.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import org.bson.BsonNull;
import org.bson.Document;
import org.jspecify.annotations.Nullable;

import org.springframework.core.convert.ConversionService;
import org.springframework.core.env.Environment;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.convert.CustomConversions;
import org.springframework.data.core.PropertyPath;
import org.springframework.data.expression.ValueEvaluationContext;
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.MappingException;
import org.springframework.data.mapping.PersistentEntity;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.PersistentPropertyPath;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.data.mongodb.core.CollectionOptions.EncryptedFieldsOptions;
import org.springframework.data.mongodb.core.CollectionOptions.TimeSeriesOptions;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoJsonSchemaMapper;
import org.springframework.data.mongodb.core.convert.MongoWriter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.FieldName;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.mapping.TimeSeries;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.timeseries.Granularity;
import org.springframework.data.mongodb.core.validation.Validator;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.data.mongodb.util.DurationUtil;
import org.springframework.data.projection.EntityProjection;
import org.springframework.data.projection.EntityProjectionIntrospector;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.projection.TargetAware;
import org.springframework.data.util.Optionals;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.TimeSeriesGranularity;
import com.mongodb.client.model.ValidationOptions;

/**
 * Common operations performed on an entity in the context of it's mapping metadata.
 *
 * @author Oliver Gierke
 * @author Mark Paluch
 * @author Christoph Strobl
 * @author Ben Foster
 * @author Ross Lawley
 * @since 2.1
 * @see MongoTemplate
 * @see ReactiveMongoTemplate
 */
class EntityOperations {

	private static final String ID_FIELD = FieldName.ID.name();

	private final MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> context;
	private final QueryMapper queryMapper;

	private final EntityProjectionIntrospector introspector;

	private final MongoJsonSchemaMapper schemaMapper;

	private @Nullable Environment environment;

	EntityOperations(MongoConverter converter) {
		this(converter, new QueryMapper(converter));
	}

	public EntityOperations(MongoConverter converter, QueryMapper queryMapper) {
		this(converter, converter.getMappingContext(), converter.getCustomConversions(), converter.getProjectionFactory(),
				queryMapper);
	}

	EntityOperations(MongoConverter converter,
			MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> context,
			CustomConversions conversions, ProjectionFactory projectionFactory, QueryMapper queryMapper) {
		this.context = context;
		this.queryMapper = queryMapper;
		this.introspector = EntityProjectionIntrospector.create(projectionFactory,
				EntityProjectionIntrospector.ProjectionPredicate.typeHierarchy()
						.and(((target, underlyingType) -> !conversions.isSimpleType(target))),
				context);
		this.schemaMapper = new MongoJsonSchemaMapper(converter);
		if (converter instanceof EnvironmentCapable environmentCapable) {
			this.environment = environmentCapable.getEnvironment();
		}
	}

	/**
	 * Creates a new {@link Entity} for the given bean.
	 *
	 * @param entity must not be {@literal null}.
	 * @return new instance of {@link Entity}.
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	<T> Entity<T> forEntity(T entity) {

		Assert.notNull(entity, "Bean must not be null");

		if (entity instanceof TargetAware targetAware) {
			return new SimpleMappedEntity((Map<String, Object>) targetAware.getTarget(), this);
		}

		if (entity instanceof String) {
			return new UnmappedEntity(parse(entity.toString()), this);
		}

		if (entity instanceof Map) {
			return new SimpleMappedEntity((Map<String, Object>) entity, this);
		}

		return MappedEntity.of(entity, context, this);
	}

	/**
	 * Creates a new {@link AdaptibleEntity} for the given bean and {@link ConversionService}.
	 *
	 * @param entity must not be {@literal null}.
	 * @param conversionService must not be {@literal null}.
	 * @return new instance of {@link AdaptibleEntity}.
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	<T> AdaptibleEntity<T> forEntity(T entity, ConversionService conversionService) {

		Assert.notNull(entity, "Bean must not be null");
		Assert.notNull(conversionService, "ConversionService must not be null");

		if (entity instanceof String) {
			return new UnmappedEntity(parse(entity.toString()), this);
		}

		if (entity instanceof Map) {
			return new SimpleMappedEntity((Map<String, Object>) entity, this);
		}

		return AdaptibleMappedEntity.of(entity, context, conversionService, this);
	}

	/**
	 * Creates a new {@link AdaptibleEntity} for the given bean and {@link ConversionService} to be used for a save
	 * operation including a {@link AdaptibleEntity#assertUpdateableIdIfNotSet()} check.
	 *
	 * @param entity must not be {@literal null}.
	 * @param conversionService must not be {@literal null}.
	 * @return new instance of {@link AdaptibleEntity}.
	 * @since 5.0.3
	 */
	<T> AdaptibleEntity<T> forEntityUpsert(T entity, ConversionService conversionService) {

		AdaptibleEntity<T> adaptibleEntity = forEntity(entity, conversionService);
		adaptibleEntity.assertUpdateableIdIfNotSet();
		return adaptibleEntity;
	}

	/**
	 * @param source can be {@literal null}.
	 * @return {@literal true} if the given value is an {@literal array}, {@link Collection} or {@link Iterator}.
	 * @since 3.2
	 */
	static boolean isCollectionLike(@Nullable Object source) {

		if (source == null) {
			return false;
		}

		return ObjectUtils.isArray(source) || source instanceof Collection || source instanceof Iterator;
	}

	/**
	 * @param entityClass should not be null.
	 * @return the {@link MongoPersistentEntity#getCollection() collection name}.
	 */
	public String determineCollectionName(@Nullable Class<?> entityClass) {

		if (entityClass == null) {
			throw new InvalidDataAccessApiUsageException(
					"No class parameter provided, entity collection can't be determined");
		}

		return getRequiredPersistentEntity(entityClass).getCollection();
	}

	MongoPersistentEntity<?> getRequiredPersistentEntity(Class<?> entityClass) {

		MongoPersistentEntity<?> persistentEntity = context.getPersistentEntity(entityClass);

		if (persistentEntity == null) {
			throw new MappingException(String.format(
					"Cannot determine collection name from type '%s'. Is it a store native type?", entityClass.getName()));
		}

		return persistentEntity;
	}

	public Query getByIdInQuery(Collection<?> entities) {

		MultiValueMap<String, Object> byIds = new LinkedMultiValueMap<>();

		entities.stream() //
				.map(this::forEntity) //
				.forEach(it -> byIds.add(it.getIdFieldName(), it.getId()));

		Criteria[] criterias = byIds.entrySet().stream() //
				.map(it -> Criteria.where(it.getKey()).in(it.getValue())) //
				.toArray(Criteria[]::new);

		return new Query(criterias.length == 1 ? criterias[0] : new Criteria().orOperator(criterias));
	}

	/**
	 * Returns the name of the identifier property. Considers mapping information but falls back to the MongoDB default of
	 * {@code _id} if no identifier property can be found.
	 *
	 * @param type must not be {@literal null}.
	 * @return never {@literal null}.
	 */
	public String getIdPropertyName(Class<?> type) {

		Assert.notNull(type, "Type must not be null");

		MongoPersistentEntity<?> persistentEntity = context.getPersistentEntity(type);

		if (persistentEntity != null && persistentEntity.getIdProperty() != null) {
			return persistentEntity.getRequiredIdProperty().getName();
		}

		return ID_FIELD;
	}

	/**
	 * Return the name used for {@code $geoNear.distanceField} avoiding clashes with potentially existing properties.
	 *
	 * @param domainType must not be {@literal null}.
	 * @return the name of the distanceField to use. {@literal dis} by default.
	 * @since 2.2
	 */
	public String nearQueryDistanceFieldName(Class<?> domainType) {

		MongoPersistentEntity<?> persistentEntity = context.getPersistentEntity(domainType);
		if (persistentEntity == null || persistentEntity.getPersistentProperty("dis") == null) {
			return "dis";
		}

		String distanceFieldName = "calculated-distance";
		int counter = 0;
		while (persistentEntity.getPersistentProperty(distanceFieldName) != null) {
			distanceFieldName += "-" + (counter++);
		}

		return distanceFieldName;
	}

	private static Document parse(String source) {

		try {
			return Document.parse(source);
		} catch (org.bson.json.JsonParseException o_O) {
			throw new MappingException("Could not parse given String to save into a JSON document", o_O);
		} catch (RuntimeException o_O) {

			// legacy 3.x exception
			if (ClassUtils.matchesTypeName(o_O.getClass(), "JSONParseException")) {
				throw new MappingException("Could not parse given String to save into a JSON document", o_O);
			}
			throw o_O;
		}
	}

	public <T> TypedOperations<T> forType(@Nullable Class<T> entityClass) {

		if (entityClass != null) {

			MongoPersistentEntity<?> entity = context.getPersistentEntity(entityClass);
			return forType(entity);
		}

		return UntypedOperations.instance();
	}

	public <T> TypedOperations<T> forType(@Nullable MongoPersistentEntity<?> entity) {

		if (entity != null) {
			return new TypedEntityOperations(entity, environment);
		}
		return UntypedOperations.instance();
	}

	/**
	 * Introspect the given {@link Class result type} in the context of the {@link Class entity type} whether the returned
	 * type is a projection and what property paths are participating in the projection.
	 *
	 * @param resultType the type to project on. Must not be {@literal null}.
	 * @param entityType the source domain type. Must not be {@literal null}.
	 * @return the introspection result.
	 * @since 3.4
	 * @see EntityProjectionIntrospector#introspect(Class, Class)
	 */
	public <M, D> EntityProjection<M, D> introspectProjection(Class<M> resultType, Class<D> entityType) {

		MongoPersistentEntity<?> persistentEntity = queryMapper.getMappingContext().getPersistentEntity(entityType);
		if (persistentEntity == null && !resultType.isInterface() || ClassUtils.isAssignable(Map.class, resultType)) {
			return (EntityProjection) EntityProjection.nonProjecting(resultType);
		}
		return introspector.introspect(resultType, entityType);
	}

	/**
	 * Convert {@link CollectionOptions} to {@link CreateCollectionOptions} using {@link Class entityType} to obtain
	 * mapping metadata.
	 *
	 * @param collectionOptions
	 * @param entityType
	 * @return
	 * @since 3.4
	 */
	public CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions,
			Class<?> entityType) {

		Optional<Collation> collation = Optionals.firstNonEmpty(
				() -> Optional.ofNullable(collectionOptions).flatMap(CollectionOptions::getCollation),
				() -> forType(entityType).getCollation());//

		CreateCollectionOptions result = new CreateCollectionOptions();
		collation.map(Collation::toMongoCollation).ifPresent(result::collation);

		if (collectionOptions == null) {
			return result;
		}

		collectionOptions.getCapped().ifPresent(result::capped);
		collectionOptions.getSize().ifPresent(result::sizeInBytes);
		collectionOptions.getMaxDocuments().ifPresent(result::maxDocuments);
		collectionOptions.getCollation().map(Collation::toMongoCollation).ifPresent(result::collation);

		collectionOptions.getValidationOptions().ifPresent(it -> {

			ValidationOptions validationOptions = new ValidationOptions();

			it.getValidationAction().ifPresent(validationOptions::validationAction);
			it.getValidationLevel().ifPresent(validationOptions::validationLevel);

			it.getValidator().ifPresent(val -> validationOptions.validator(getMappedValidator(val, entityType)));

			result.validationOptions(validationOptions);
		});

		collectionOptions.getTimeSeriesOptions().map(forType(entityType)::mapTimeSeriesOptions).ifPresent(it -> {

			com.mongodb.client.model.TimeSeriesOptions options = new com.mongodb.client.model.TimeSeriesOptions(
					it.getTimeField());

			if (StringUtils.hasText(it.getMetaField())) {
				options.metaField(it.getMetaField());
			}
			if (!Granularity.DEFAULT.equals(it.getGranularity())) {
				options.granularity(TimeSeriesGranularity.valueOf(it.getGranularity().name().toUpperCase()));
			}
			if (it.getSpan() != null) {

				long bucketMaxSpanInSeconds = it.getSpan().time().toSeconds();
				// right now there's only one value since the two options must have the same value.
				options.bucketMaxSpan(bucketMaxSpanInSeconds, TimeUnit.SECONDS);
				options.bucketRounding(bucketMaxSpanInSeconds, TimeUnit.SECONDS);
			}

			if (!it.getExpireAfter().isNegative()) {
				result.expireAfter(it.getExpireAfter().toSeconds(), TimeUnit.SECONDS);
			}

			result.timeSeriesOptions(options);
		});

		collectionOptions.getChangeStreamOptions() //
				.map(CollectionOptions.CollectionChangeStreamOptions::getPreAndPostImages) //
				.map(ChangeStreamPreAndPostImagesOptions::new) //
				.ifPresent(result::changeStreamPreAndPostImagesOptions);

		collectionOptions.getEncryptedFieldsOptions() //
				.map(EncryptedFieldsOptions::toDocument) //
				.filter(Predicate.not(Document::isEmpty)) //
				.ifPresent(result::encryptedFields);

		return result;
	}

	private Document getMappedValidator(Validator validator, Class<?> domainType) {

		Document validationRules = validator.toDocument();

		if (validationRules.containsKey("$jsonSchema")) {
			return schemaMapper.mapSchema(validationRules, domainType);
		}

		return queryMapper.getMappedObject(validationRules, context.getPersistentEntity(domainType));
	}

	/**
	 * A representation of information about an entity.
	 *
	 * @author Oliver Gierke
	 * @author Christoph Strobl
	 * @since 2.1
	 */
	interface Entity<T> {

		/**
		 * Returns the field name of the identifier of the entity.
		 *
		 * @return
		 */
		String getIdFieldName();

		/**
		 * Returns the identifier of the entity.
		 *
		 * @return
		 */
		@Nullable
		Object getId();

		/**
		 * Returns the property value for {@code key}.
		 *
		 * @param key
		 * @return
		 * @since 4.1
		 */
		@Nullable
		Object getPropertyValue(String key);

		/**
		 * Returns the {@link Query} to find the entity by its identifier.
		 *
		 * @return
		 */
		Query getByIdQuery();

		/**
		 * Returns the {@link Query} to remove an entity by its {@literal id} and if applicable {@literal version}.
		 *
		 * @return the {@link Query} to use for removing the entity. Never {@literal null}.
		 * @since 2.2
		 */
		default Query getRemoveByQuery() {
			return isVersionedEntity() ? getQueryForVersion() : getByIdQuery();
		}

		/**
		 * Returns the {@link Query} to find the entity in its current version.
		 *
		 * @return
		 */
		Query getQueryForVersion();

		/**
		 * Maps the backing entity into a {@link MappedDocument} using the given {@link MongoWriter}.
		 *
		 * @param writer must not be {@literal null}.
		 * @return
		 */
		MappedDocument toMappedDocument(MongoWriter<? super T> writer);

		/**
		 * Asserts that the identifier type is updatable in case it is not already set.
		 */
		default void assertUpdateableIdIfNotSet() {}

		/**
		 * Returns whether the entity is versioned, i.e. if it contains a version property.
		 *
		 * @return
		 */
		default boolean isVersionedEntity() {
			return false;
		}

		/**
		 * Returns the value of the version if the entity {@link #isVersionedEntity() has a version property}.
		 *
		 * @return the entity version. Can be {@literal null}.
		 * @throws IllegalStateException if the entity does not define a {@literal version} property. Make sure to check
		 *           {@link #isVersionedEntity()}.
		 */
		@Nullable
		Object getVersion();

		/**
		 * Returns the underlying bean.
		 *
		 * @return
		 */
		T getBean();

		/**
		 * Returns whether the entity is considered to be new.
		 *
		 * @return
		 * @since 2.1.2
		 */
		boolean isNew();

		/**
		 * @param sortObject
		 * @return
		 * @since 4.1
		 * @throws IllegalStateException if a sort key yields {@literal null}.
		 */
		Map<String, Object> extractKeys(Document sortObject, Class<?> sourceType);

	}

	/**
	 * Information and commands on an entity.
	 *
	 * @author Oliver Gierke
	 * @since 2.1
	 */
	interface AdaptibleEntity<T> extends Entity<T> {

		/**
		 * Populates the identifier of the backing entity if it has an identifier property and there's no identifier
		 * currently present.
		 *
		 * @param id can be {@literal null}.
		 * @return
		 */
		T populateIdIfNecessary(@Nullable Object id);

		/**
		 * Initializes the version property of the current entity if available.
		 *
		 * @return the entity with the version property updated if available.
		 */
		T initializeVersionProperty();

		/**
		 * Increments the value of the version property if available.
		 *
		 * @return the entity with the version property incremented if available.
		 */
		T incrementVersion();

		/**
		 * Returns the current version value if the entity has a version property.
		 *
		 * @return the current version or {@literal null} in case it's uninitialized.
		 * @throws IllegalStateException if the entity does not define a {@literal version} property.
		 */
		@Nullable
		Number getVersion();
	}

	private static class UnmappedEntity<T extends Map<String, Object>> implements AdaptibleEntity<T> {

		private final T map;
		private final EntityOperations entityOperations;

		protected UnmappedEntity(T map, EntityOperations entityOperations) {
			this.map = map;
			this.entityOperations = entityOperations;
		}

		@Override
		public String getIdFieldName() {
			return ID_FIELD;
		}

		@Override
		public @Nullable Object getId() {
			return getPropertyValue(ID_FIELD);
		}

		@Override
		public @Nullable Object getPropertyValue(String key) {
			return map.get(key);
		}

		@Override
		public Query getByIdQuery() {
			return Query.query(Criteria.where(ID_FIELD).is(map.get(ID_FIELD)));
		}

		@Override
		public T populateIdIfNecessary(@Nullable Object id) {

			map.put(ID_FIELD, id);

			return map;
		}

		@Override
		public Query getQueryForVersion() {
			throw new MappingException("Cannot query for version on plain Documents");
		}

		@Override
		public MappedDocument toMappedDocument(MongoWriter<? super T> writer) {
			return MappedDocument.of(map instanceof Document document //
					? document //
					: new Document(map));
		}

		@Override
		public T initializeVersionProperty() {
			return map;
		}

		@Override
		public @Nullable Number getVersion() {
			return null;
		}

		@Override
		public T incrementVersion() {
			return map;
		}

		@Override
		public T getBean() {
			return map;
		}

		@Override
		public boolean isNew() {
			return map.get(ID_FIELD) != null;
		}

		@Override
		public Map<String, Object> extractKeys(Document sortObject, Class<?> sourceType) {

			Map<String, Object> keyset = new LinkedHashMap<>();
			MongoPersistentEntity<?> sourceEntity = entityOperations.context.getPersistentEntity(sourceType);
			if (sourceEntity != null && sourceEntity.hasIdProperty()) {
				keyset.put(sourceEntity.getRequiredIdProperty().getName(), getId());
			} else {
				keyset.put(ID_FIELD, getId());
			}

			for (String key : sortObject.keySet()) {

				Object value = resolveValue(key, sourceEntity);

				if (value == null) {
					throw new IllegalStateException(
							String.format("Cannot extract value for key %s because its value is null", key));
				}

				keyset.put(key, value);
			}

			return keyset;
		}

		@Nullable
		private Object resolveValue(String key, @Nullable MongoPersistentEntity<?> sourceEntity) {

			if (sourceEntity == null) {
				return BsonUtils.resolveValue(map, key);
			}
			PropertyPath from = PropertyPath.from(key, sourceEntity.getTypeInformation());
			PersistentPropertyPath<MongoPersistentProperty> persistentPropertyPath = entityOperations.context
					.getPersistentPropertyPath(from);
			return BsonUtils.resolveValue(map, persistentPropertyPath.toDotPath(MongoPersistentProperty::getFieldName));
		}
	}

	private static class SimpleMappedEntity<T extends Map<String, Object>> extends UnmappedEntity<T> {

		protected SimpleMappedEntity(T map, EntityOperations entityOperations) {
			super(map, entityOperations);
		}

		@Override
		@SuppressWarnings("unchecked")
		public MappedDocument toMappedDocument(MongoWriter<? super T> writer) {

			T bean = getBean();
			bean = (T) (bean instanceof Document document//
					? document //
					: new Document(bean));
			Document document = new Document();
			writer.write(bean, document);

			return MappedDocument.of(document);
		}
	}

	private static class MappedEntity<T> implements Entity<T> {

		private final MongoPersistentEntity<?> entity;
		private final IdentifierAccessor idAccessor;
		private final PersistentPropertyAccessor<T> propertyAccessor;
		private final EntityOperations entityOperations;

		protected MappedEntity(MongoPersistentEntity<?> entity, IdentifierAccessor idAccessor,
				PersistentPropertyAccessor<T> propertyAccessor, EntityOperations entityOperations) {

			this.entity = entity;
			this.idAccessor = idAccessor;
			this.propertyAccessor = propertyAccessor;
			this.entityOperations = entityOperations;
		}

		private static <T> MappedEntity<T> of(T bean,
				MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> context,
				EntityOperations entityOperations) {

			MongoPersistentEntity<?> entity = context.getRequiredPersistentEntity(bean.getClass());
			IdentifierAccessor identifierAccessor = entity.getIdentifierAccessor(bean);
			PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);

			return new MappedEntity<>(entity, identifierAccessor, propertyAccessor, entityOperations);
		}

		@Override
		public String getIdFieldName() {
			return entity.getRequiredIdProperty().getFieldName();
		}

		@Override
		public Object getId() {
			return idAccessor.getRequiredIdentifier();
		}

		@Override
		public @Nullable Object getPropertyValue(String key) {
			return propertyAccessor.getProperty(entity.getRequiredPersistentProperty(key));
		}

		@Override
		public Query getByIdQuery() {

			if (!entity.hasIdProperty()) {
				throw new MappingException("No id property found for object of type " + entity.getType());
			}

			MongoPersistentProperty idProperty = entity.getRequiredIdProperty();

			return Query.query(Criteria.where(idProperty.getName()).is(getId()));
		}

		@Override
		public Query getQueryForVersion() {

			MongoPersistentProperty idProperty = entity.getRequiredIdProperty();
			MongoPersistentProperty versionProperty = entity.getRequiredVersionProperty();

			return new Query(Criteria.where(idProperty.getName()).is(getId())//
					.and(versionProperty.getName()).is(getVersion()));
		}

		@Override
		public MappedDocument toMappedDocument(MongoWriter<? super T> writer) {

			T bean = propertyAccessor.getBean();

			Document document = new Document();
			writer.write(bean, document);

			if (document.containsKey(ID_FIELD) && document.get(ID_FIELD) == null) {
				document.remove(ID_FIELD);
			}

			return MappedDocument.of(document);
		}

		public void assertUpdateableIdIfNotSet() {

			if (!entity.hasIdProperty()) {
				return;
			}

			MongoPersistentProperty property = entity.getRequiredIdProperty();
			Object propertyValue = idAccessor.getIdentifier();

			if (propertyValue != null) {
				return;
			}

			if (!MongoSimpleTypes.AUTOGENERATED_ID_TYPES.contains(property.getType())) {
				throw new InvalidDataAccessApiUsageException(
						String.format("Cannot autogenerate id of type %s for entity of type %s", property.getType().getName(),
								entity.getType().getName()));
			}
		}

		@Override
		public boolean isVersionedEntity() {
			return entity.hasVersionProperty();
		}

		@Override
		public @Nullable Object getVersion() {
			return propertyAccessor.getProperty(entity.getRequiredVersionProperty());
		}

		@Override
		public T getBean() {
			return propertyAccessor.getBean();
		}

		@Override
		public boolean isNew() {
			return entity.isNew(propertyAccessor.getBean());
		}

		@Override
		public Map<String, Object> extractKeys(Document sortObject, Class<?> sourceType) {

			Map<String, Object> keyset = new LinkedHashMap<>();
			MongoPersistentEntity<?> sourceEntity = entityOperations.context.getPersistentEntity(sourceType);
			if (sourceEntity != null && sourceEntity.hasIdProperty()) {
				keyset.put(sourceEntity.getRequiredIdProperty().getName(), getId());
			} else {
				keyset.put(entity.getRequiredIdProperty().getName(), getId());
			}

			for (String key : sortObject.keySet()) {

				Object value;
				if (key.indexOf('.') != -1) {

					// follow the path across nested levels.
					// TODO: We should have a MongoDB-specific property path abstraction to allow diving into Document.
					value = getNestedPropertyValue(key);
				} else {
					value = getPropertyValue(key);
				}

				if (value == null) {
					throw new IllegalStateException(
							String.format("Cannot extract value for key %s because its value is null", key));
				}

				keyset.put(key, value);
			}

			return keyset;
		}

		private Object getNestedPropertyValue(String key) {

			String[] segments = key.split("\\.");
			Entity<?> currentEntity = this;
			Object currentValue = BsonNull.VALUE;

			for (int i = 0; i < segments.length; i++) {

				String segment = segments[i];
				currentValue = currentEntity.getPropertyValue(segment);

				if (i < segments.length - 1) {
					if (currentValue == null) {
						return BsonNull.VALUE;
					}

					currentEntity = entityOperations.forEntity(currentValue);
				}
			}

			return currentValue != null ? currentValue : BsonNull.VALUE;
		}
	}

	private static class AdaptibleMappedEntity<T> extends MappedEntity<T> implements AdaptibleEntity<T> {

		private final MongoPersistentEntity<?> entity;
		private final ConvertingPropertyAccessor<T> propertyAccessor;
		private final IdentifierAccessor identifierAccessor;

		private AdaptibleMappedEntity(MongoPersistentEntity<?> entity, IdentifierAccessor identifierAccessor,
				ConvertingPropertyAccessor<T> propertyAccessor, EntityOperations entityOperations) {

			super(entity, identifierAccessor, propertyAccessor, entityOperations);

			this.entity = entity;
			this.propertyAccessor = propertyAccessor;
			this.identifierAccessor = identifierAccessor;
		}

		private static <T> AdaptibleEntity<T> of(T bean,
				MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> context,
				ConversionService conversionService, EntityOperations entityOperations) {

			MongoPersistentEntity<?> entity = context.getRequiredPersistentEntity(bean.getClass());
			IdentifierAccessor identifierAccessor = entity.getIdentifierAccessor(bean);
			PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);

			return new AdaptibleMappedEntity<>(entity, identifierAccessor,
					new ConvertingPropertyAccessor<>(propertyAccessor, conversionService), entityOperations);
		}

		@Override
		public T populateIdIfNecessary(@Nullable Object id) {

			if (id == null) {
				return propertyAccessor.getBean();
			}

			MongoPersistentProperty idProperty = entity.getIdProperty();
			if (idProperty == null) {
				return propertyAccessor.getBean();
			}

			if (identifierAccessor.getIdentifier() != null) {
				return propertyAccessor.getBean();
			}

			propertyAccessor.setProperty(idProperty, id);
			return propertyAccessor.getBean();
		}

		@Override
		public @Nullable Number getVersion() {

			MongoPersistentProperty versionProperty = entity.getRequiredVersionProperty();

			return propertyAccessor.getProperty(versionProperty, Number.class);
		}

		@Override
		public T initializeVersionProperty() {

			if (!entity.hasVersionProperty()) {
				return propertyAccessor.getBean();
			}

			MongoPersistentProperty versionProperty = entity.getRequiredVersionProperty();

			propertyAccessor.setProperty(versionProperty, versionProperty.getType().isPrimitive() ? 1 : 0);

			return propertyAccessor.getBean();
		}

		@Override
		public T incrementVersion() {

			MongoPersistentProperty versionProperty = entity.getRequiredVersionProperty();
			Number version = getVersion();
			Number nextVersion = version == null ? 0 : version.longValue() + 1;

			propertyAccessor.setProperty(versionProperty, nextVersion);

			return propertyAccessor.getBean();
		}
	}

	/**
	 * Type-specific operations abstraction.
	 *
	 * @author Mark Paluch
	 * @param <T>
	 * @since 2.2
	 */
	interface TypedOperations<T> {

		/**
		 * Return the optional {@link Collation} for the underlying entity.
		 *
		 * @return
		 */
		Optional<Collation> getCollation();

		/**
		 * Return the optional {@link Collation} from the given {@link Query} and fall back to the collation configured for
		 * the underlying entity.
		 *
		 * @return
		 */
		Optional<Collation> getCollation(Query query);

		/**
		 * Derive the applicable {@link CollectionOptions} for the given type.
		 *
		 * @return never {@literal null}.
		 * @since 3.3
		 */
		CollectionOptions getCollectionOptions();

		/**
		 * Map the fields of a given {@link TimeSeriesOptions} against the target domain type to consider potentially
		 * annotated field names.
		 *
		 * @param options must not be {@literal null}.
		 * @return never {@literal null}.
		 * @since 3.3
		 */
		TimeSeriesOptions mapTimeSeriesOptions(TimeSeriesOptions options);

		/**
		 * @return the name of the id field.
		 * @since 4.1
		 */
		default String getIdKeyName() {
			return ID_FIELD;
		}
	}

	/**
	 * {@link TypedOperations} for generic entities that are not represented with {@link PersistentEntity} (e.g. custom
	 * conversions).
	 */
	enum UntypedOperations implements TypedOperations<Object> {

		INSTANCE;

		UntypedOperations() {}

		@SuppressWarnings({ "unchecked", "rawtypes" })
		public static <T> TypedOperations<T> instance() {
			return (TypedOperations) INSTANCE;
		}

		@Override
		public Optional<Collation> getCollation() {
			return Optional.empty();
		}

		@Override
		public Optional<Collation> getCollation(Query query) {

			if (query == null) {
				return Optional.empty();
			}

			return query.getCollation();
		}

		@Override
		public CollectionOptions getCollectionOptions() {
			return CollectionOptions.empty();
		}

		@Override
		public TimeSeriesOptions mapTimeSeriesOptions(TimeSeriesOptions options) {
			return options;
		}
	}

	/**
	 * {@link TypedOperations} backed by {@link MongoPersistentEntity}.
	 *
	 * @param <T>
	 */
	static class TypedEntityOperations<T> implements TypedOperations<T> {

		private final MongoPersistentEntity<T> entity;

		@Nullable private final Environment environment;

		protected TypedEntityOperations(MongoPersistentEntity<T> entity, @Nullable Environment environment) {

			this.entity = entity;
			this.environment = environment;
		}

		@Override
		public Optional<Collation> getCollation() {
			return Optional.ofNullable(entity.getCollation());
		}

		@Override
		public Optional<Collation> getCollation(Query query) {

			if (query.getCollation().isPresent()) {
				return query.getCollation();
			}

			return Optional.ofNullable(entity.getCollation());
		}

		@Override
		public CollectionOptions getCollectionOptions() {

			CollectionOptions collectionOptions = CollectionOptions.empty();
			if (entity.hasCollation()) {
				collectionOptions = collectionOptions.collation(entity.getCollation());
			}

			if (entity.isAnnotationPresent(TimeSeries.class)) {

				TimeSeries timeSeries = entity.getRequiredAnnotation(TimeSeries.class);

				if (entity.getPersistentProperty(timeSeries.timeField()) == null) {
					throw new MappingException(String.format("Time series field '%s' does not exist in type %s",
							timeSeries.timeField(), entity.getName()));
				}

				TimeSeriesOptions options = TimeSeriesOptions.timeSeries(timeSeries.timeField());
				if (StringUtils.hasText(timeSeries.metaField())) {

					if (entity.getPersistentProperty(timeSeries.metaField()) == null) {
						throw new MappingException(
								String.format("Meta field '%s' does not exist in type %s", timeSeries.metaField(), entity.getName()));
					}

					options = options.metaField(timeSeries.metaField());
				}
				if (!Granularity.DEFAULT.equals(timeSeries.granularity())) {
					options = options.granularity(timeSeries.granularity());
				}

				if (StringUtils.hasText(timeSeries.expireAfter())) {

					Duration timeout = computeIndexTimeout(timeSeries.expireAfter(), getEvaluationContextForEntity(entity));
					if (!timeout.isNegative()) {
						options = options.expireAfter(timeout);
					}
				}

				collectionOptions = collectionOptions.timeSeries(options);
			}

			return collectionOptions;
		}

		@Override
		public TimeSeriesOptions mapTimeSeriesOptions(TimeSeriesOptions source) {

			TimeSeriesOptions target = TimeSeriesOptions.timeSeries(mappedNameOrDefault(source.getTimeField()));

			if (StringUtils.hasText(source.getMetaField())) {
				target = target.metaField(mappedNameOrDefault(source.getMetaField()));
			}
			return target.granularity(source.getGranularity()).expireAfter(source.getExpireAfter()).span(source.getSpan());
		}

		@Override
		public String getIdKeyName() {
			return entity.getIdProperty() != null ? entity.getIdProperty().getName() : ID_FIELD;
		}

		private String mappedNameOrDefault(String name) {
			MongoPersistentProperty persistentProperty = entity.getPersistentProperty(name);
			return persistentProperty != null ? persistentProperty.getFieldName() : name;
		}

		/**
		 * Get the {@link ValueEvaluationContext} for a given {@link PersistentEntity entity} the default one.
		 *
		 * @param persistentEntity can be {@literal null}
		 * @return the context to use.
		 */
		private ValueEvaluationContext getEvaluationContextForEntity(@Nullable PersistentEntity<?, ?> persistentEntity) {

			if (persistentEntity instanceof BasicMongoPersistentEntity<?> mongoEntity) {
				return mongoEntity.getValueEvaluationContext(null);
			}

			return ValueEvaluationContext.of(this.environment != null ? this.environment : new StandardEnvironment(),
					SimpleEvaluationContext.forReadOnlyDataBinding().build());
		}

		/**
		 * Compute the index timeout value by evaluating a potential
		 * {@link org.springframework.expression.spel.standard.SpelExpression} and parsing the final value.
		 *
		 * @param timeoutValue must not be {@literal null}.
		 * @param evaluationContext must not be {@literal null}.
		 * @return never {@literal null}
		 * @throws IllegalArgumentException for invalid duration values.
		 */
		private static Duration computeIndexTimeout(String timeoutValue, ValueEvaluationContext evaluationContext) {
			return DurationUtil.evaluate(timeoutValue, evaluationContext);
		}
	}
}