ReactiveBulkWriter.java
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Set;
import org.bson.Document;
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.bulk.Bulk;
import org.springframework.data.mongodb.core.bulk.BulkOperation;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove;
import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst;
import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.bulk.BulkWriteResult;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
/**
* Internal API wrapping a {@link ReactiveMongoTemplate} to encapsulate {@link Bulk} handling using a reactive flow.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 5.1
*/
class ReactiveBulkWriter extends BulkWriterSupport {
private final ReactiveMongoTemplate template;
ReactiveBulkWriter(ReactiveMongoTemplate template) {
super(template.getEntityOperations(), template.getQueryOperations(), template.getConverter().getMappingContext());
this.template = template;
}
public Mono<BulkWriteResult> write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) {
Set<TypedNamespace> namespaces = getTypedNamespaces(bulk);
if (namespaces.size() == 1) {
return writeToSingleCollection(defaultDatabase, bulk, options, namespaces.iterator().next());
}
return writeToMultipleCollections(defaultDatabase, bulk, options);
}
private Mono<BulkWriteResult> writeToSingleCollection(String defaultDatabase, Bulk bulk,
BulkWriteOptions options, TypedNamespace namespace) {
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, resolveCollectionName(namespace));
SingleCollectionCollector collector = new SingleCollectionCollector(mongoNamespace);
return buildWriteModelsReactive(bulk, collector).then(Mono.defer(() -> {
String collectionName = collector.getNamespace().getCollectionName();
List<SourceAwareDocument<Object>> afterSaveCallables = collector.getAfterSaveCallables();
return template
.createMono(collectionName,
col -> col.bulkWrite(collector.getWriteModels(),
new com.mongodb.client.model.BulkWriteOptions()
.ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))))
.map(BulkWriteResult::from)
.doOnSuccess(
v -> afterSaveCallables
.forEach(callable -> template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(),
callable.document(), callable.collectionName()))))
.flatMap(result -> Flux.concat(afterSaveCallables.stream().map(callable -> template
.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList())
.then(Mono.just(result)));
}));
}
private Mono<BulkWriteResult> writeToMultipleCollections(String defaultDatabase, Bulk bulk,
BulkWriteOptions options) {
MultiCollectionCollector collector = new MultiCollectionCollector(defaultDatabase);
return buildWriteModelsReactive(bulk, collector).then(Mono.defer(() -> {
List<ClientNamespacedWriteModel> writeModels = collector.getWriteModels();
List<SourceAwareDocument<Object>> afterSaveCallables = collector.getAfterSaveCallables();
return template
.doWithCluster(client -> client.bulkWrite(writeModels,
ClientBulkWriteOptions
.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))))
.map(BulkWriteResult::from)
.doOnSuccess(
v -> afterSaveCallables
.forEach(callable -> template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(),
callable.document(), callable.collectionName()))))
.flatMap(result -> Flux.concat(afterSaveCallables.stream().map(callable -> template
.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList())
.then(Mono.just(result)));
}));
}
private Mono<Void> buildWriteModelsReactive(Bulk bulk, WriteModelCollector collector) {
return Flux.fromIterable(bulk.operations()).concatMap(bulkOp -> addOperationReactive(bulkOp, collector)).then();
}
private Mono<Void> addOperationReactive(BulkOperation bulkOp, WriteModelCollector collector) {
MongoNamespace namespace = collector.resolveNamespace(resolveCollectionName(bulkOp));
MongoPersistentEntity<?> entity = getPersistentEntity(bulkOp.context());
if (bulkOp instanceof Insert insert) {
return template
.prepareObjectForSaveReactive(namespace.getCollectionName(), insert.value())
.doOnNext(sad -> collector.addInsert(namespace, sad.document(), toObject(sad))).then();
}
if (bulkOp instanceof Update update) {
boolean multi = !(bulkOp instanceof UpdateFirst);
UpdateContext updateContext = queryOperations.updateContext(update.update(), update.query(),
update.upsert());
Document mappedQuery = updateContext.getMappedQuery(entity);
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(entity)
: updateContext.getMappedUpdate(entity);
UpdateOptions updateOptions = updateContext.getUpdateOptions(entity, update.query());
collector.addUpdate(namespace, multi, mappedQuery, mappedUpdate, updateOptions);
return Mono.empty();
}
if (bulkOp instanceof Remove remove) {
DeleteContext deleteContext = queryOperations.deleteQueryContext(remove.query());
Document mappedQuery = deleteContext.getMappedQuery(entity);
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(entity);
collector.addRemove(namespace, remove instanceof RemoveFirst, mappedQuery, deleteOptions);
return Mono.empty();
}
if (bulkOp instanceof Replace replace) {
return template
.prepareObjectForSaveReactive(namespace.getCollectionName(), replace.replacement())
.doOnNext(sad -> {
UpdateContext updateContext = queryOperations.replaceSingleContext(replace.query(),
MappedDocument.of(sad.document()), replace.upsert());
Document mappedQuery = updateContext
.getMappedQuery(entity);
UpdateOptions updateOptions = updateContext.getUpdateOptions(entity,
replace.query());
collector.addReplace(namespace, mappedQuery, sad.document(), updateOptions, toObject(sad));
}).then();
}
return Mono.error(new IllegalStateException("Unknown bulk operation type: " + bulkOp.getClass()));
}
@SuppressWarnings("unchecked")
private static SourceAwareDocument<Object> toObject(SourceAwareDocument<?> sad) {
return (SourceAwareDocument<Object>) sad;
}
}