ReactiveMongoTransactionManager.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb;
import reactor.core.publisher.Mono;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.reactive.AbstractReactiveTransactionManager;
import org.springframework.transaction.reactive.GenericReactiveTransaction;
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
import org.springframework.transaction.support.SmartTransactionObject;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import com.mongodb.ClientSessionOptions;
import com.mongodb.MongoException;
import com.mongodb.TransactionOptions;
import com.mongodb.reactivestreams.client.ClientSession;
/**
* A {@link org.springframework.transaction.ReactiveTransactionManager} implementation that manages
* {@link com.mongodb.reactivestreams.client.ClientSession} based transactions for a single
* {@link org.springframework.data.mongodb.ReactiveMongoDatabaseFactory}.
* <p>
* Binds a {@link ClientSession} from the specified
* {@link org.springframework.data.mongodb.ReactiveMongoDatabaseFactory} to the subscriber
* {@link reactor.util.context.Context}. {@link org.springframework.transaction.TransactionDefinition#isReadOnly()
* Readonly} transactions operate on a {@link ClientSession} and enable causal consistency, and also
* {@link ClientSession#startTransaction() start},
* {@link com.mongodb.reactivestreams.client.ClientSession#commitTransaction() commit} or
* {@link ClientSession#abortTransaction() abort} a transaction.
* <p>
* Application code is required to retrieve the {@link com.mongodb.reactivestreams.client.MongoDatabase} via
* {@link org.springframework.data.mongodb.ReactiveMongoDatabaseUtils#getDatabase(ReactiveMongoDatabaseFactory)} instead
* of a standard {@link org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase()} call. Spring
* classes such as {@link org.springframework.data.mongodb.core.ReactiveMongoTemplate} use this strategy implicitly. By
* default, failure of a {@literal commit} operation raises a {@link TransactionSystemException}. You can override
* {@link #doCommit(TransactionSynchronizationManager, ReactiveMongoTransactionObject)} to implement the
* <a href="https://docs.mongodb.com/manual/core/transactions/#retry-commit-operation">Retry Commit Operation</a>
* behavior as outlined in the MongoDB reference manual.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.2
* @see <a href="https://www.mongodb.com/transactions">MongoDB Transaction Documentation</a>
* @see ReactiveMongoDatabaseUtils#getDatabase(ReactiveMongoDatabaseFactory, SessionSynchronization)
*/
public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
private @Nullable ReactiveMongoDatabaseFactory databaseFactory;
private MongoTransactionOptions options;
private final MongoTransactionOptionsResolver transactionOptionsResolver;
/**
* Create a new {@link ReactiveMongoTransactionManager} for bean-style usage. <br />
* <strong>Note:</strong>The {@link org.springframework.data.mongodb.ReactiveMongoDatabaseFactory db factory} has to
* be {@link #setDatabaseFactory(ReactiveMongoDatabaseFactory)} set} before using the instance. Use this constructor
* to prepare a {@link ReactiveMongoTransactionManager} via a {@link org.springframework.beans.factory.BeanFactory}.
* <br />
* Optionally it is possible to set default {@link TransactionOptions transaction options} defining
* {@link com.mongodb.ReadConcern} and {@link com.mongodb.WriteConcern}.
*
* @see #setDatabaseFactory(ReactiveMongoDatabaseFactory)
*/
public ReactiveMongoTransactionManager() {
this.transactionOptionsResolver = MongoTransactionOptionsResolver.defaultResolver();
this.options = MongoTransactionOptions.NONE;
}
/**
* Create a new {@link ReactiveMongoTransactionManager} obtaining sessions from the given
* {@link ReactiveMongoDatabaseFactory}.
*
* @param databaseFactory must not be {@literal null}.
*/
public ReactiveMongoTransactionManager(ReactiveMongoDatabaseFactory databaseFactory) {
this(databaseFactory, null);
}
/**
* Create a new {@link ReactiveMongoTransactionManager} obtaining sessions from the given
* {@link ReactiveMongoDatabaseFactory} applying the given {@link TransactionOptions options}, if present, when
* starting a new transaction.
*
* @param databaseFactory must not be {@literal null}.
* @param options can be {@literal null}. Will default {@link MongoTransactionOptions#NONE} if {@literal null}.
*/
public ReactiveMongoTransactionManager(ReactiveMongoDatabaseFactory databaseFactory,
@Nullable TransactionOptions options) {
this(databaseFactory, MongoTransactionOptionsResolver.defaultResolver(), MongoTransactionOptions.of(options));
}
/**
* Create a new {@link ReactiveMongoTransactionManager} obtaining sessions from the given
* {@link ReactiveMongoDatabaseFactory} applying the given {@link TransactionOptions options}, if present, when
* starting a new transaction.
*
* @param databaseFactory must not be {@literal null}.
* @param transactionOptionsResolver must not be {@literal null}.
* @param defaultTransactionOptions can be {@literal null}. Will default {@link MongoTransactionOptions#NONE} if
* {@literal null}.
* @since 4.3
*/
public ReactiveMongoTransactionManager(ReactiveMongoDatabaseFactory databaseFactory,
MongoTransactionOptionsResolver transactionOptionsResolver,
@Nullable MongoTransactionOptions defaultTransactionOptions) {
Assert.notNull(databaseFactory, "DatabaseFactory must not be null");
Assert.notNull(transactionOptionsResolver, "MongoTransactionOptionsResolver must not be null");
this.databaseFactory = databaseFactory;
this.transactionOptionsResolver = transactionOptionsResolver;
this.options = defaultTransactionOptions != null ? defaultTransactionOptions : MongoTransactionOptions.NONE;
}
@Override
protected Object doGetTransaction(TransactionSynchronizationManager synchronizationManager)
throws TransactionException {
ReactiveMongoResourceHolder resourceHolder = (ReactiveMongoResourceHolder) synchronizationManager
.getResource(getRequiredDatabaseFactory());
return new ReactiveMongoTransactionObject(resourceHolder);
}
@Override
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
return extractMongoTransaction(transaction).hasResourceHolder();
}
@Override
protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction,
TransactionDefinition definition) throws TransactionException {
return Mono.defer(() -> {
ReactiveMongoTransactionObject mongoTransactionObject = extractMongoTransaction(transaction);
Mono<ReactiveMongoResourceHolder> holder = newResourceHolder(definition,
ClientSessionOptions.builder().causallyConsistent(true).build());
return holder.doOnNext(resourceHolder -> {
mongoTransactionObject.setResourceHolder(resourceHolder);
if (logger.isDebugEnabled()) {
logger.debug(
String.format("About to start transaction for session %s.", debugString(resourceHolder.getSession())));
}
}).doOnNext(resourceHolder -> {
MongoTransactionOptions mongoTransactionOptions = transactionOptionsResolver.resolve(definition)
.mergeWith(options);
mongoTransactionObject.startTransaction(mongoTransactionOptions.toDriverOptions());
if (logger.isDebugEnabled()) {
logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getSession())));
}
})//
.onErrorMap(
ex -> new TransactionSystemException(String.format("Could not start Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex))
.doOnSuccess(resourceHolder -> {
synchronizationManager.bindResource(getRequiredDatabaseFactory(), resourceHolder);
}).then();
});
}
@Override
protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager, Object transaction)
throws TransactionException {
return Mono.fromSupplier(() -> {
ReactiveMongoTransactionObject mongoTransactionObject = extractMongoTransaction(transaction);
mongoTransactionObject.setResourceHolder(null);
return synchronizationManager.unbindResource(getRequiredDatabaseFactory());
});
}
@Override
protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction,
Object suspendedResources) {
return Mono
.fromRunnable(() -> synchronizationManager.bindResource(getRequiredDatabaseFactory(), suspendedResources));
}
@Override
protected final Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) throws TransactionException {
return Mono.defer(() -> {
ReactiveMongoTransactionObject mongoTransactionObject = extractMongoTransaction(status);
if (logger.isDebugEnabled()) {
logger.debug(String.format("About to commit transaction for session %s.",
debugString(mongoTransactionObject.getSession())));
}
return doCommit(synchronizationManager, mongoTransactionObject).onErrorMap(ex -> {
return new TransactionSystemException(String.format("Could not commit Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);
});
});
}
/**
* Customization hook to perform an actual commit of the given transaction.<br />
* If a commit operation encounters an error, the MongoDB driver throws a {@link MongoException} holding
* {@literal error labels}. <br />
* By default those labels are ignored, nevertheless one might check for
* {@link MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL transient commit errors labels} and retry the the
* commit.
*
* @param synchronizationManager reactive synchronization manager.
* @param transactionObject never {@literal null}.
*/
protected Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager,
ReactiveMongoTransactionObject transactionObject) {
return transactionObject.commitTransaction();
}
@Override
protected Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) {
return Mono.defer(() -> {
ReactiveMongoTransactionObject mongoTransactionObject = extractMongoTransaction(status);
if (logger.isDebugEnabled()) {
logger.debug(String.format("About to abort transaction for session %s.",
debugString(mongoTransactionObject.getSession())));
}
return mongoTransactionObject.abortTransaction().onErrorResume(MongoException.class, ex -> {
return Mono
.error(new TransactionSystemException(String.format("Could not abort Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex));
});
});
}
@Override
protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) throws TransactionException {
return Mono.fromRunnable(() -> {
ReactiveMongoTransactionObject transactionObject = extractMongoTransaction(status);
transactionObject.getRequiredResourceHolder().setRollbackOnly();
});
}
@Override
protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager,
Object transaction) {
Assert.isInstanceOf(ReactiveMongoTransactionObject.class, transaction,
() -> String.format("Expected to find a %s but it turned out to be %s.", ReactiveMongoTransactionObject.class,
transaction.getClass()));
return Mono.fromRunnable(() -> {
ReactiveMongoTransactionObject mongoTransactionObject = (ReactiveMongoTransactionObject) transaction;
// Remove the connection holder from the thread.
synchronizationManager.unbindResource(getRequiredDatabaseFactory());
mongoTransactionObject.getRequiredResourceHolder().clear();
if (logger.isDebugEnabled()) {
logger.debug(String.format("About to release Session %s after transaction.",
debugString(mongoTransactionObject.getSession())));
}
mongoTransactionObject.closeSession();
});
}
/**
* Set the {@link ReactiveMongoDatabaseFactory} that this instance should manage transactions for.
*
* @param databaseFactory must not be {@literal null}.
*/
public void setDatabaseFactory(ReactiveMongoDatabaseFactory databaseFactory) {
Assert.notNull(databaseFactory, "DatabaseFactory must not be null");
this.databaseFactory = databaseFactory;
}
/**
* Set the {@link TransactionOptions} to be applied when starting transactions.
*
* @param options can be {@literal null}.
*/
public void setOptions(@Nullable TransactionOptions options) {
this.options = MongoTransactionOptions.of(options);
}
/**
* Get the {@link ReactiveMongoDatabaseFactory} that this instance manages transactions for.
*
* @return can be {@literal null}.
*/
public @Nullable ReactiveMongoDatabaseFactory getDatabaseFactory() {
return databaseFactory;
}
@Override
public void afterPropertiesSet() {
getRequiredDatabaseFactory();
}
private Mono<ReactiveMongoResourceHolder> newResourceHolder(TransactionDefinition definition,
ClientSessionOptions options) {
ReactiveMongoDatabaseFactory dbFactory = getRequiredDatabaseFactory();
return dbFactory.getSession(options).map(session -> new ReactiveMongoResourceHolder(session, dbFactory));
}
/**
* @throws IllegalStateException if {@link #databaseFactory} is {@literal null}.
*/
private ReactiveMongoDatabaseFactory getRequiredDatabaseFactory() {
Assert.state(databaseFactory != null,
"ReactiveMongoTransactionManager operates upon a ReactiveMongoDatabaseFactory; Did you forget to provide one; It's required");
return databaseFactory;
}
private static ReactiveMongoTransactionObject extractMongoTransaction(Object transaction) {
Assert.isInstanceOf(ReactiveMongoTransactionObject.class, transaction,
() -> String.format("Expected to find a %s but it turned out to be %s.", ReactiveMongoTransactionObject.class,
transaction.getClass()));
return (ReactiveMongoTransactionObject) transaction;
}
private static ReactiveMongoTransactionObject extractMongoTransaction(GenericReactiveTransaction status) {
Assert.isInstanceOf(ReactiveMongoTransactionObject.class, status.getTransaction(),
() -> String.format("Expected to find a %s but it turned out to be %s.", ReactiveMongoTransactionObject.class,
status.getTransaction().getClass()));
return (ReactiveMongoTransactionObject) status.getTransaction();
}
private static String debugString(@Nullable ClientSession session) {
if (session == null) {
return "null";
}
String debugString = String.format("[%s@%s ", ClassUtils.getShortName(session.getClass()),
Integer.toHexString(session.hashCode()));
try {
if (session.getServerSession() != null) {
debugString += String.format("id = %s, ", session.getServerSession().getIdentifier());
debugString += String.format("causallyConsistent = %s, ", session.isCausallyConsistent());
debugString += String.format("txActive = %s, ", session.hasActiveTransaction());
debugString += String.format("txNumber = %d, ", session.getServerSession().getTransactionNumber());
debugString += String.format("closed = %b, ", session.getServerSession().isClosed());
debugString += String.format("clusterTime = %s", session.getClusterTime());
} else {
debugString += "id = n/a";
debugString += String.format("causallyConsistent = %s, ", session.isCausallyConsistent());
debugString += String.format("txActive = %s, ", session.hasActiveTransaction());
debugString += String.format("clusterTime = %s", session.getClusterTime());
}
} catch (RuntimeException e) {
debugString += String.format("error = %s", e.getMessage());
}
debugString += "]";
return debugString;
}
/**
* MongoDB specific transaction object, representing a {@link MongoResourceHolder}. Used as transaction object by
* {@link ReactiveMongoTransactionManager}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.2
* @see ReactiveMongoResourceHolder
*/
protected static class ReactiveMongoTransactionObject implements SmartTransactionObject {
private @Nullable ReactiveMongoResourceHolder resourceHolder;
ReactiveMongoTransactionObject(@Nullable ReactiveMongoResourceHolder resourceHolder) {
this.resourceHolder = resourceHolder;
}
/**
* Set the {@link MongoResourceHolder}.
*
* @param resourceHolder can be {@literal null}.
*/
void setResourceHolder(@Nullable ReactiveMongoResourceHolder resourceHolder) {
this.resourceHolder = resourceHolder;
}
/**
* @return {@literal true} if a {@link MongoResourceHolder} is set.
*/
final boolean hasResourceHolder() {
return resourceHolder != null;
}
/**
* Start a MongoDB transaction optionally given {@link TransactionOptions}.
*
* @param options can be {@literal null}
*/
void startTransaction(@Nullable TransactionOptions options) {
ClientSession session = getRequiredSession();
if (options != null) {
session.startTransaction(options);
} else {
session.startTransaction();
}
}
/**
* Commit the transaction.
*/
public Mono<Void> commitTransaction() {
return Mono.from(getRequiredSession().commitTransaction());
}
/**
* Rollback (abort) the transaction.
*/
public Mono<Void> abortTransaction() {
return Mono.from(getRequiredSession().abortTransaction());
}
/**
* Close a {@link ClientSession} without regard to its transactional state.
*/
void closeSession() {
ClientSession session = getRequiredSession();
if (session.getServerSession() != null && !session.getServerSession().isClosed()) {
session.close();
}
}
public @Nullable ClientSession getSession() {
return resourceHolder != null ? resourceHolder.getSession() : null;
}
private ReactiveMongoResourceHolder getRequiredResourceHolder() {
Assert.state(resourceHolder != null, "ReactiveMongoResourceHolder is required but not present; o_O");
return resourceHolder;
}
private ClientSession getRequiredSession() {
ClientSession session = getSession();
Assert.state(session != null, "A Session is required but it turned out to be null");
return session;
}
@Override
public boolean isRollbackOnly() {
return this.resourceHolder != null && this.resourceHolder.isRollbackOnly();
}
@Override
public void flush() {
throw new UnsupportedOperationException("flush() not supported");
}
}
}