BulkWriterSupport.java

/*
 * Copyright 2026-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.mongodb.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.bson.Document;
import org.jspecify.annotations.Nullable;

import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.core.bulk.Bulk;
import org.springframework.data.mongodb.core.bulk.BulkOperation;
import org.springframework.data.mongodb.core.bulk.BulkOperationContext;
import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace;
import org.springframework.data.mongodb.core.mapping.CollectionName;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;

import com.mongodb.MongoNamespace;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;

/**
 * Base class for {@link BulkWriter} and {@link ReactiveBulkWriter}.
 *
 * @author Mark Paluch
 * @author Christoph Strobl
 * @since 5.1
 */
abstract class BulkWriterSupport {

	final EntityOperations entityOperations;
	final QueryOperations queryOperations;
	final MappingContext<? extends MongoPersistentEntity<?>, ? extends MongoPersistentProperty> mappingContext;

	public BulkWriterSupport(EntityOperations entityOperations, QueryOperations queryOperations,
			MappingContext<? extends MongoPersistentEntity<?>, ? extends MongoPersistentProperty> mappingContext) {
		this.entityOperations = entityOperations;
		this.queryOperations = queryOperations;
		this.mappingContext = mappingContext;
	}

	static Set<TypedNamespace> getTypedNamespaces(Bulk bulk) {
		return bulk.operations().stream().map(it -> it.context().namespace()).collect(Collectors.toSet());
	}

	String resolveCollectionName(BulkOperation operation) {
		return resolveCollectionName(operation.context().namespace());
	}

	String resolveCollectionName(TypedNamespace namespace) {

		if (namespace.hasCollectionName()) {
			return namespace.getRequiredCollectionName().getCollectionName(entityOperations::getRequiredPersistentEntity);
		}
		return entityOperations.determineCollectionName(namespace.type());
	}

	@Nullable
	@SuppressWarnings("unchecked")
	MongoPersistentEntity<?> getPersistentEntity(BulkOperationContext context) {

		BulkOperationContext.TypedNamespace namespace = context.namespace();

		if (namespace.type() != null) {
			return mappingContext.getPersistentEntity(namespace.type());
		}

		if (namespace.hasCollectionName()) {
			CollectionName collectionName = namespace.getRequiredCollectionName();
			if (collectionName.getEntityClass() != Object.class) {
				return mappingContext.getPersistentEntity(collectionName.getEntityClass());
			}
		}

		return null;
	}

	/**
	 * Strategy interface to collect {@link WriteModel}s for a {@link Bulk} operation.
	 */
	interface WriteModelCollector {

		List<SourceAwareDocument<Object>> getAfterSaveCallables();

		MongoNamespace resolveNamespace(String collectionName);

		void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc);

		void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, UpdateOptions options);

		void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options);

		void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
				SourceAwareDocument<Object> sourceDoc);

	}

	/**
	 * Collector for single-collection bulk operations.
	 */
	static class SingleCollectionCollector implements WriteModelCollector {

		private final List<WriteModel<Document>> writeModels = new ArrayList<>();
		private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
		private final MongoNamespace namespace;

		public SingleCollectionCollector(MongoNamespace namespace) {
			this.namespace = namespace;
		}

		MongoNamespace getNamespace() {
			return namespace;
		}

		List<WriteModel<Document>> getWriteModels() {
			return writeModels;
		}

		@Override
		public MongoNamespace resolveNamespace(String collectionName) {
			return namespace;
		}

		@Override
		public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
			writeModels.add(new InsertOneModel<>(document));
			afterSaveCallables.add(sourceDoc);
		}

		@Override
		public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
				UpdateOptions options) {
			if (multi) {
				writeModels.add(BulkWriteSupport.updateMany(query, update, options));
			} else {
				writeModels.add(BulkWriteSupport.updateOne(query, update, options));
			}
		}

		@Override
		public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
			if (removeFirst) {
				writeModels.add(BulkWriteSupport.removeOne(query, options));
			} else {
				writeModels.add(BulkWriteSupport.removeMany(query, options));
			}
		}

		@Override
		public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
				SourceAwareDocument<Object> sourceDoc) {
			writeModels.add(BulkWriteSupport.replaceOne(query, replacement, options));
			afterSaveCallables.add(sourceDoc);
		}

		@Override
		public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
			return afterSaveCallables;
		}

	}

	/**
	 * Collector for multi-collection bulk operations.
	 */
	static class MultiCollectionCollector implements WriteModelCollector {

		private final List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
		private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
		private final String defaultDatabaseName;

		public MultiCollectionCollector(String defaultDatabaseName) {
			this.defaultDatabaseName = defaultDatabaseName;
		}

		@Override
		public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
			return afterSaveCallables;
		}

		List<ClientNamespacedWriteModel> getWriteModels() {
			return writeModels;
		}

		@Override
		public MongoNamespace resolveNamespace(String collectionName) {
			return new MongoNamespace(defaultDatabaseName, collectionName);
		}

		@Override
		public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
			writeModels.add(ClientNamespacedWriteModel.insertOne(namespace, document));
			afterSaveCallables.add(sourceDoc);
		}

		@Override
		public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
				UpdateOptions options) {
			if (multi) {
				writeModels.add(BulkWriteSupport.updateMany(namespace, query, update, options));
			} else {
				writeModels.add(BulkWriteSupport.updateOne(namespace, query, update, options));
			}
		}

		@Override
		public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
			if (removeFirst) {
				writeModels.add(BulkWriteSupport.removeOne(namespace, query, options));
			} else {
				writeModels.add(BulkWriteSupport.removeMany(namespace, query, options));
			}
		}

		@Override
		public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
				SourceAwareDocument<Object> sourceDoc) {
			writeModels.add(BulkWriteSupport.replaceOne(namespace, query, replacement, options));
			afterSaveCallables.add(sourceDoc);
		}

	}

}