TransactionalRepositoryConnectionFactory.java
/*******************************************************************************
* Copyright (c) 2021 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.spring.tx;
import static org.eclipse.rdf4j.spring.util.RepositoryConnectionWrappingUtils.findWrapper;
import static org.eclipse.rdf4j.spring.util.RepositoryConnectionWrappingUtils.wrapOnce;
import java.lang.invoke.MethodHandles;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.sail.shacl.ShaclSailValidationReportHelper;
import org.eclipse.rdf4j.spring.support.connectionfactory.RepositoryConnectionFactory;
import org.eclipse.rdf4j.spring.tx.exception.CommitException;
import org.eclipse.rdf4j.spring.tx.exception.ConnectionClosedException;
import org.eclipse.rdf4j.spring.tx.exception.NoTransactionException;
import org.eclipse.rdf4j.spring.tx.exception.RDF4JTransactionException;
import org.eclipse.rdf4j.spring.tx.exception.RollbackException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ameingast@gmail.com
* @author Florian Kleedorfer
* @since 4.0.0
*/
public class TransactionalRepositoryConnectionFactory implements RepositoryConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final RepositoryConnectionFactory delegateFactory;
private final ThreadLocal<TransactionObject> transactionData = new ThreadLocal<>();
public TransactionalRepositoryConnectionFactory(RepositoryConnectionFactory delegateFactory) {
this.delegateFactory = delegateFactory;
}
public TransactionObject getTransactionData() {
return transactionData.get();
}
public RepositoryConnection getConnection() {
logger.debug("Trying to obtain connection");
TransactionObject data = getTransactionData();
if (data == null) {
throw new NoTransactionException("Cannot obtain connection: no transaction");
}
RepositoryConnection con = data.getConnection();
if (con == null) {
throw new RDF4JTransactionException(
"Cannot obtain connection: transaction started but no connection found");
}
if (!con.isOpen()) {
throw new ConnectionClosedException("Cannot obtain connection: connection closed");
}
if (data.isReadOnly()) {
logger.debug("transaction is readonly, not starting a database transaction");
} else {
if (!con.isActive()) {
logger.debug(
"connection does not have an active transaction yet, starting transaction");
con.begin();
logger.debug("con.begin() called");
}
}
logger.debug("returning connection");
return con;
}
public void closeConnection() {
logger.debug("Trying to close connection");
RepositoryConnection con = null;
try {
TransactionObject data = getTransactionData();
if (data == null) {
throw new NoTransactionException("Cannot close connection: no transaction");
}
con = data.getConnection();
if (con == null) {
throw new RDF4JTransactionException(
"Cannot close connection: transaction started but no connection found");
}
if (!con.isOpen()) {
throw new ConnectionClosedException("Cannot close connection: connection closed");
}
} finally {
try {
if (con != null && con.isActive()) {
logger.warn(
"Encountered active transaction when closing connection - rolling back!");
con.rollback();
logger.debug("con.rollback() called");
}
} catch (Throwable t) {
logger.error("Error rolling back transaction", t);
}
try {
if (con != null) {
con.close();
logger.debug("con.close() called");
}
} catch (Throwable t) {
logger.error("Error closing connection", t);
}
this.transactionData.remove();
logger.debug("Thread-local transaction data removed");
}
}
public TransactionObject createTransaction() {
logger.debug("Trying to create new transaction");
RepositoryConnection delegate = delegateFactory.getConnection();
RepositoryConnection wrappedCon = wrapOnce(
delegate,
con -> new TransactionalRepositoryConnection(con.getRepository(), con),
TransactionalRepositoryConnection.class);
TransactionObject txObj = new TransactionObject(wrappedCon);
transactionData.set(txObj);
TransactionalRepositoryConnection txCon = findWrapper(wrappedCon, TransactionalRepositoryConnection.class)
.get();
txCon.setTransactionObject(txObj);
logger.debug("Transaction created");
return txObj;
}
public void endTransaction(boolean rollback) {
logger.debug("Trying to end transaction");
TransactionObject data = getTransactionData();
if (data == null) {
throw new NoTransactionException("Cannot obtain connection: no transaction");
}
RepositoryConnection con = data.getConnection();
if (con == null) {
throw new RDF4JTransactionException(
"Cannot obtain connection: transaction started but no connection found");
}
if (!con.isOpen()) {
throw new ConnectionClosedException("Cannot obtain connection: connection closed");
}
if (data.isReadOnly()) {
logger.debug("transaction is readonly");
if (con.isActive()) {
logger.debug("however, the connection is active - rolling back");
try {
con.rollback();
} catch (Exception e) {
throw new RollbackException(
"Cannot rollback changes in readonly transaction: an error occurred",
e);
}
} else {
logger.debug("The connection is inactive, no updates have been attempted.");
}
} else {
if (con.isActive()) {
if (rollback) {
try {
logger.debug("rolling back transaction...");
con.rollback();
logger.debug("con.rollback() called");
} catch (Throwable t) {
throw new RollbackException(
"Cannot rollback transaction: an error occurred", t);
}
} else {
try {
logger.debug("committing transaction...");
con.commit();
logger.debug("con.commit() called");
} catch (Throwable t) {
ShaclSailValidationReportHelper
.getValidationReportAsString(t)
.ifPresent(report -> logger.error(
"SHACL validation failed, cannot commit. Validation report:\n{}", report));
throw new CommitException(
"Cannot commit transaction: an error occurred", t);
}
}
}
}
}
}