InMemoryTransactionManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.transaction;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.concurrent.ExecutorServiceAdapter;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.metadata.Catalog;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.CatalogMetadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.function.FunctionNamespaceManager;
import com.facebook.presto.spi.function.FunctionNamespaceTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.units.Duration;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static com.facebook.airlift.concurrent.MoreFutures.addExceptionCallback;
import static com.facebook.presto.spi.StandardErrorCode.AUTOCOMMIT_WRITE_CONFLICT;
import static com.facebook.presto.spi.StandardErrorCode.MULTI_CATALOG_WRITE_CONFLICT;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.READ_ONLY_VIOLATION;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_ALREADY_ABORTED;
import static com.facebook.presto.spi.StandardErrorCode.UNKNOWN_TRANSACTION;
import static com.facebook.presto.spi.connector.EmptyConnectorCommitHandle.INSTANCE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;

@ThreadSafe
public class InMemoryTransactionManager
        implements TransactionManager
{
    private static final Logger log = Logger.get(InMemoryTransactionManager.class);

    private final Duration idleTimeout;
    private final int maxFinishingConcurrency;

    private final ConcurrentMap<TransactionId, TransactionMetadata> transactions = new ConcurrentHashMap<>();
    private final CatalogManager catalogManager;
    private final Executor finishingExecutor;

    private final Map<String, FunctionNamespaceManager<?>> functionNamespaceManagers = new HashMap<>();
    private final Map<String, String> companionCatalogs;

    private InMemoryTransactionManager(
            Duration idleTimeout,
            int maxFinishingConcurrency,
            CatalogManager catalogManager,
            Executor finishingExecutor,
            Map<String, String> companionCatalogs)
    {
        this.catalogManager = catalogManager;
        requireNonNull(idleTimeout, "idleTimeout is null");
        checkArgument(maxFinishingConcurrency > 0, "maxFinishingConcurrency must be at least 1");
        requireNonNull(finishingExecutor, "finishingExecutor is null");

        this.idleTimeout = idleTimeout;
        this.maxFinishingConcurrency = maxFinishingConcurrency;
        this.finishingExecutor = finishingExecutor;
        this.companionCatalogs = ImmutableMap.copyOf(companionCatalogs);
    }

    public static TransactionManager create(
            TransactionManagerConfig config,
            ScheduledExecutorService idleCheckExecutor,
            CatalogManager catalogManager,
            ExecutorService finishingExecutor)
    {
        InMemoryTransactionManager transactionManager = new InMemoryTransactionManager(
                config.getIdleTimeout(),
                config.getMaxFinishingConcurrency(),
                catalogManager,
                finishingExecutor,
                config.getCompanionCatalogs());
        transactionManager.scheduleIdleChecks(config.getIdleCheckInterval(), idleCheckExecutor);
        return transactionManager;
    }

    public static TransactionManager createTestTransactionManager()
    {
        return createTestTransactionManager(new CatalogManager());
    }

    public static TransactionManager createTestTransactionManager(CatalogManager catalogManager)
    {
        // No idle checks needed
        return new InMemoryTransactionManager(new Duration(1, TimeUnit.DAYS), 1, catalogManager, directExecutor(), ImmutableMap.of());
    }

    private void scheduleIdleChecks(Duration idleCheckInterval, ScheduledExecutorService idleCheckExecutor)
    {
        idleCheckExecutor.scheduleWithFixedDelay(() -> {
            try {
                cleanUpExpiredTransactions();
            }
            catch (Throwable t) {
                log.error(t, "Unexpected exception while cleaning up expired transactions");
            }
        }, idleCheckInterval.toMillis(), idleCheckInterval.toMillis(), MILLISECONDS);
    }

    synchronized void cleanUpExpiredTransactions()
    {
        Iterator<Entry<TransactionId, TransactionMetadata>> iterator = transactions.entrySet().iterator();
        while (iterator.hasNext()) {
            Entry<TransactionId, TransactionMetadata> entry = iterator.next();
            if (entry.getValue().isExpired(idleTimeout)) {
                iterator.remove();
                log.info("Removing expired transaction: %s", entry.getKey());
                entry.getValue().asyncAbort();
            }
        }
    }

    @Override
    public TransactionInfo getTransactionInfo(TransactionId transactionId)
    {
        return getTransactionMetadata(transactionId).getTransactionInfo();
    }

    @Override
    public Optional<TransactionInfo> getOptionalTransactionInfo(TransactionId transactionId)
    {
        return tryGetTransactionMetadata(transactionId).map(TransactionMetadata::getTransactionInfo);
    }

    @Override
    public List<TransactionInfo> getAllTransactionInfos()
    {
        return transactions.values().stream()
                .map(TransactionMetadata::getTransactionInfo)
                .collect(toImmutableList());
    }

    @Override
    public void tryRegisterTransaction(TransactionInfo transactionInfo)
    {
        TransactionId transactionId = transactionInfo.getTransactionId();
        if (transactions.containsKey(transactionId)) {
            return;
        }
        registerTransaction(transactionId, transactionInfo.getIsolationLevel(), transactionInfo.isReadOnly(), transactionInfo.isAutoCommitContext());
    }

    @Override
    public TransactionId beginTransaction(boolean autoCommitContext)
    {
        return beginTransaction(DEFAULT_ISOLATION, DEFAULT_READ_ONLY, autoCommitContext);
    }

    @Override
    public TransactionId beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext)
    {
        TransactionId transactionId = TransactionId.create();
        registerTransaction(transactionId, isolationLevel, readOnly, autoCommitContext);
        return transactionId;
    }

    @Override
    public Map<String, ConnectorId> getCatalogNames(TransactionId transactionId)
    {
        return getTransactionMetadata(transactionId).getCatalogNames();
    }

    @Override
    public Optional<CatalogMetadata> getOptionalCatalogMetadata(TransactionId transactionId, String catalogName)
    {
        TransactionMetadata transactionMetadata = getTransactionMetadata(transactionId);
        return transactionMetadata.getConnectorId(catalogName)
                .map(transactionMetadata::getTransactionCatalogMetadata);
    }

    @Override
    public void enableRollback(TransactionId transactionId)
    {
        TransactionMetadata transactionMetadata = getTransactionMetadata(transactionId);
        transactionMetadata.enableRollback(true);
    }

    @Override
    public CatalogMetadata getCatalogMetadata(TransactionId transactionId, ConnectorId connectorId)
    {
        return getTransactionMetadata(transactionId).getTransactionCatalogMetadata(connectorId);
    }

    @Override
    public CatalogMetadata getCatalogMetadataForWrite(TransactionId transactionId, ConnectorId connectorId)
    {
        CatalogMetadata catalogMetadata = getCatalogMetadata(transactionId, connectorId);
        checkConnectorWrite(transactionId, connectorId);
        return catalogMetadata;
    }

    @Override
    public CatalogMetadata getCatalogMetadataForWrite(TransactionId transactionId, String catalogName)
    {
        TransactionMetadata transactionMetadata = getTransactionMetadata(transactionId);

        // there is no need to ask for a connector specific id since the overlay connectors are read only
        ConnectorId connectorId = transactionMetadata.getConnectorId(catalogName)
                .orElseThrow(() -> new PrestoException(NOT_FOUND, "Catalog does not exist: " + catalogName));

        return getCatalogMetadataForWrite(transactionId, connectorId);
    }

    @Override
    public ConnectorTransactionHandle getConnectorTransaction(TransactionId transactionId, ConnectorId connectorId)
    {
        return getCatalogMetadata(transactionId, connectorId).getTransactionHandleFor(connectorId);
    }

    @Override
    public synchronized void registerFunctionNamespaceManager(String catalogName, FunctionNamespaceManager<?> functionNamespaceManager)
    {
        checkArgument(!functionNamespaceManagers.containsKey(catalogName), "FunctionNamespaceManager is already registered for catalog [%s]", catalogName);
        functionNamespaceManagers.put(catalogName, functionNamespaceManager);
    }

    @Override
    public FunctionNamespaceTransactionHandle getFunctionNamespaceTransaction(TransactionId transactionId, String catalogName)
    {
        return getTransactionMetadata(transactionId).getFunctionNamespaceTransaction(catalogName).getTransactionHandle();
    }

    private void checkConnectorWrite(TransactionId transactionId, ConnectorId connectorId)
    {
        getTransactionMetadata(transactionId).checkConnectorWrite(connectorId);
    }

    @Override
    public void checkAndSetActive(TransactionId transactionId)
    {
        TransactionMetadata metadata = getTransactionMetadata(transactionId);
        metadata.checkOpenTransaction();
        metadata.setActive();
    }

    @Override
    public void trySetActive(TransactionId transactionId)
    {
        tryGetTransactionMetadata(transactionId).ifPresent(TransactionMetadata::setActive);
    }

    @Override
    public void trySetInactive(TransactionId transactionId)
    {
        tryGetTransactionMetadata(transactionId).ifPresent(TransactionMetadata::setInactive);
    }

    private TransactionMetadata getTransactionMetadata(TransactionId transactionId)
    {
        TransactionMetadata transactionMetadata = transactions.get(transactionId);
        if (transactionMetadata == null) {
            throw unknownTransactionError(transactionId);
        }
        return transactionMetadata;
    }

    private void registerTransaction(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext)
    {
        BoundedExecutor executor = new BoundedExecutor(finishingExecutor, maxFinishingConcurrency);
        TransactionMetadata transactionMetadata = new TransactionMetadata(
                transactionId,
                isolationLevel,
                readOnly,
                autoCommitContext,
                catalogManager,
                executor,
                functionNamespaceManagers,
                companionCatalogs);
        checkState(transactions.put(transactionId, transactionMetadata) == null, "Duplicate transaction ID: %s", transactionId);
    }

    private Optional<TransactionMetadata> tryGetTransactionMetadata(TransactionId transactionId)
    {
        return Optional.ofNullable(transactions.get(transactionId));
    }

    private ListenableFuture<TransactionMetadata> removeTransactionMetadataAsFuture(TransactionId transactionId)
    {
        TransactionMetadata transactionMetadata = transactions.remove(transactionId);
        if (transactionMetadata == null) {
            return immediateFailedFuture(unknownTransactionError(transactionId));
        }
        return immediateFuture(transactionMetadata);
    }

    private static PrestoException unknownTransactionError(TransactionId transactionId)
    {
        return new PrestoException(UNKNOWN_TRANSACTION, format("Unknown transaction ID: %s. Possibly expired? Commands ignored until end of transaction block", transactionId));
    }

    @Override
    public ListenableFuture<?> asyncCommit(TransactionId transactionId)
    {
        return nonCancellationPropagating(Futures.transformAsync(removeTransactionMetadataAsFuture(transactionId), TransactionMetadata::asyncCommit, directExecutor()));
    }

    @Override
    public ListenableFuture<?> asyncAbort(TransactionId transactionId)
    {
        return nonCancellationPropagating(Futures.transformAsync(removeTransactionMetadataAsFuture(transactionId), TransactionMetadata::asyncAbort, directExecutor()));
    }

    @Override
    public void fail(TransactionId transactionId)
    {
        // Mark transaction as failed, but don't remove it.
        tryGetTransactionMetadata(transactionId).ifPresent(TransactionMetadata::asyncAbort);
    }

    @ThreadSafe
    private static class TransactionMetadata
    {
        private final long createTimeInMillis = currentTimeMillis();
        private final CatalogManager catalogManager;
        private final TransactionId transactionId;
        private final IsolationLevel isolationLevel;
        private final boolean readOnly;
        private final boolean autoCommitContext;
        @GuardedBy("this")
        private final Map<ConnectorId, ConnectorTransactionMetadata> connectorIdToMetadata = new ConcurrentHashMap<>();
        @GuardedBy("this")
        private final AtomicReference<ConnectorId> writtenConnectorId = new AtomicReference<>();
        private final ListeningExecutorService finishingExecutor;
        private final AtomicReference<Boolean> completedSuccessfully = new AtomicReference<>();
        private final AtomicReference<Long> idleStartTime = new AtomicReference<>();

        @GuardedBy("this")
        private final Map<String, Optional<Catalog>> catalogByName = new ConcurrentHashMap<>();
        @GuardedBy("this")
        private final Map<ConnectorId, Catalog> catalogsByConnectorId = new ConcurrentHashMap<>();
        @GuardedBy("this")
        private final Map<ConnectorId, CatalogMetadata> catalogMetadata = new ConcurrentHashMap<>();

        private final Map<String, FunctionNamespaceManager<?>> functionNamespaceManagers;
        @GuardedBy("this")
        private final Map<String, FunctionNamespaceTransactionMetadata> functionNamespaceTransactions = new ConcurrentHashMap<>();
        private final Map<String, String> companionCatalogs;

        private boolean enableRollback;

        public TransactionMetadata(
                TransactionId transactionId,
                IsolationLevel isolationLevel,
                boolean readOnly,
                boolean autoCommitContext,
                CatalogManager catalogManager,
                Executor finishingExecutor,
                Map<String, FunctionNamespaceManager<?>> functionNamespaceManagers,
                Map<String, String> companionCatalogs)
        {
            this.transactionId = requireNonNull(transactionId, "transactionId is null");
            this.isolationLevel = requireNonNull(isolationLevel, "isolationLevel is null");
            this.readOnly = readOnly;
            this.autoCommitContext = autoCommitContext;
            this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
            this.finishingExecutor = listeningDecorator(ExecutorServiceAdapter.from(requireNonNull(finishingExecutor, "finishingExecutor is null")));
            this.functionNamespaceManagers = requireNonNull(functionNamespaceManagers, "functionNamespaceManagers is null");
            this.companionCatalogs = requireNonNull(companionCatalogs, "companionCatalogs is null");
        }

        public void setActive()
        {
            idleStartTime.set(null);
        }

        public void setInactive()
        {
            idleStartTime.set(System.nanoTime());
        }

        public boolean isExpired(Duration idleTimeout)
        {
            Long idleStartTime = this.idleStartTime.get();
            return idleStartTime != null && Duration.nanosSince(idleStartTime).compareTo(idleTimeout) > 0;
        }

        public void enableRollback(boolean enableRollback)
        {
            this.enableRollback = enableRollback;
        }

        public void checkOpenTransaction()
        {
            Boolean completedStatus = this.completedSuccessfully.get();
            if (completedStatus != null) {
                if (completedStatus) {
                    // Should not happen normally
                    throw new IllegalStateException("Current transaction already committed");
                }
                else {
                    throw new PrestoException(TRANSACTION_ALREADY_ABORTED, "Current transaction is aborted, commands ignored until end of transaction block");
                }
            }
        }

        private synchronized Map<String, ConnectorId> getCatalogNames()
        {
            // todo if repeatable read, this must be recorded
            Map<String, ConnectorId> catalogNames = new HashMap<>();
            catalogByName.values().stream()
                    .filter(Optional::isPresent)
                    .map(Optional::get)
                    .forEach(catalog -> catalogNames.put(catalog.getCatalogName(), catalog.getConnectorId()));

            catalogManager.getCatalogs().stream()
                    .forEach(catalog -> catalogNames.putIfAbsent(catalog.getCatalogName(), catalog.getConnectorId()));

            return ImmutableMap.copyOf(catalogNames);
        }

        private synchronized Optional<ConnectorId> getConnectorId(String catalogName)
        {
            Optional<Catalog> catalog = catalogByName.get(catalogName);
            if (catalog == null) {
                catalog = catalogManager.getCatalog(catalogName);
                catalogByName.put(catalogName, catalog);
                if (catalog.isPresent()) {
                    registerCatalog(catalog.get());
                }

                if (companionCatalogs.containsKey(catalogName)) {
                    Optional<Catalog> companionCatalog = catalogManager.getCatalog(companionCatalogs.get(catalogName));
                    checkArgument(
                            companionCatalog.isPresent(),
                            format("Invalid config, no catalog exists for catalog name %s: %s", catalogName, companionCatalogs.get(catalogName)));
                    registerCatalog(companionCatalog.get());
                }
            }
            return catalog.map(Catalog::getConnectorId);
        }

        private synchronized void registerCatalog(Catalog catalog)
        {
            catalogsByConnectorId.put(catalog.getConnectorId(), catalog);
            catalogsByConnectorId.put(catalog.getInformationSchemaId(), catalog);
            catalogsByConnectorId.put(catalog.getSystemTablesId(), catalog);
        }

        private synchronized CatalogMetadata getTransactionCatalogMetadata(ConnectorId connectorId)
        {
            if (!this.enableRollback) {
                checkOpenTransaction();
            }

            CatalogMetadata catalogMetadata = this.catalogMetadata.get(connectorId);
            if (catalogMetadata == null) {
                Catalog catalog = catalogsByConnectorId.get(connectorId);
                verify(catalog != null, "Unknown connectorId: %s", connectorId);
                Connector connector = catalog.getConnector(connectorId);

                ConnectorTransactionMetadata metadata = createConnectorTransactionMetadata(catalog.getConnectorId(), catalog);
                ConnectorTransactionMetadata informationSchema = createConnectorTransactionMetadata(catalog.getInformationSchemaId(), catalog);
                ConnectorTransactionMetadata systemTables = createConnectorTransactionMetadata(catalog.getSystemTablesId(), catalog);

                catalogMetadata = new CatalogMetadata(
                        metadata.getConnectorId(),
                        metadata.getConnectorMetadata(),
                        metadata.getTransactionHandle(),
                        informationSchema.getConnectorId(),
                        informationSchema.getConnectorMetadata(),
                        informationSchema.getTransactionHandle(),
                        systemTables.getConnectorId(),
                        systemTables.getConnectorMetadata(),
                        systemTables.getTransactionHandle(),
                        connector.getCapabilities());

                this.catalogMetadata.put(catalog.getConnectorId(), catalogMetadata);
                this.catalogMetadata.put(catalog.getInformationSchemaId(), catalogMetadata);
                this.catalogMetadata.put(catalog.getSystemTablesId(), catalogMetadata);
            }
            return catalogMetadata;
        }

        private synchronized FunctionNamespaceTransactionMetadata getFunctionNamespaceTransaction(String catalogName)
        {
            checkOpenTransaction();

            return functionNamespaceTransactions.computeIfAbsent(
                    catalogName, catalog -> {
                        verify(catalog != null, "catalog is null");
                        FunctionNamespaceManager<?> functionNamespaceManager = functionNamespaceManagers.get(catalog);
                        FunctionNamespaceTransactionHandle transactionHandle = functionNamespaceManager.beginTransaction();
                        return new FunctionNamespaceTransactionMetadata(functionNamespaceManager, transactionHandle);
                    });
        }

        public synchronized ConnectorTransactionMetadata createConnectorTransactionMetadata(ConnectorId connectorId, Catalog catalog)
        {
            Connector connector = catalog.getConnector(connectorId);
            ConnectorTransactionMetadata transactionMetadata = new ConnectorTransactionMetadata(connectorId, connector, beginTransaction(connector));
            checkState(connectorIdToMetadata.put(connectorId, transactionMetadata) == null);
            return transactionMetadata;
        }

        private ConnectorTransactionHandle beginTransaction(Connector connector)
        {
            if (connector instanceof InternalConnector) {
                return ((InternalConnector) connector).beginTransaction(transactionId, isolationLevel, readOnly);
            }
            else {
                return connector.beginTransaction(isolationLevel, readOnly);
            }
        }

        public synchronized void checkConnectorWrite(ConnectorId connectorId)
        {
            checkOpenTransaction();
            ConnectorTransactionMetadata transactionMetadata = connectorIdToMetadata.get(connectorId);
            checkArgument(transactionMetadata != null, "Cannot record write for connector not part of transaction");
            if (readOnly) {
                throw new PrestoException(READ_ONLY_VIOLATION, "Cannot execute write in a read-only transaction");
            }
            if (!writtenConnectorId.compareAndSet(null, connectorId) && !writtenConnectorId.get().equals(connectorId)) {
                throw new PrestoException(
                        MULTI_CATALOG_WRITE_CONFLICT,
                        format(
                                "Multi-catalog writes not supported in a single transaction. Attempt write to catalog %s, but already wrote to catalog %s",
                                connectorId,
                                writtenConnectorId.get()));
            }
            if (transactionMetadata.isSingleStatementWritesOnly() && !autoCommitContext) {
                throw new PrestoException(AUTOCOMMIT_WRITE_CONFLICT, "Catalog " + connectorId + " only supports writes using autocommit");
            }
        }

        public synchronized ListenableFuture<?> asyncCommit()
        {
            if (!completedSuccessfully.compareAndSet(null, true)) {
                if (completedSuccessfully.get()) {
                    // Already done
                    return immediateFuture(null);
                }
                // Transaction already aborted
                return immediateFailedFuture(new PrestoException(TRANSACTION_ALREADY_ABORTED, "Current transaction has already been aborted"));
            }

            ListenableFuture<?> functionNamespaceFuture = Futures.allAsList(functionNamespaceTransactions.values().stream()
                    .map(transactionMetadata -> finishingExecutor.submit(transactionMetadata::commit))
                    .collect(toImmutableList()));

            ConnectorId writeConnectorId = this.writtenConnectorId.get();
            if (writeConnectorId == null) {
                // for read-only transaction, we return the commit handle for the read query.
                Supplier<ListenableFuture<?>> commitReadOnlyConnectors = () -> {
                    ListenableFuture<? extends List<?>> future = Futures.allAsList(connectorIdToMetadata.values().stream()
                            .map(transactionMetadata -> finishingExecutor.submit(transactionMetadata::commit))
                            .collect(toList()));
                    addExceptionCallback(future, throwable -> log.error(throwable, "Read-only connector should not throw exception on commit"));
                    return future;
                };

                ListenableFuture<?> readOnlyCommitFuture = Futures.transformAsync(functionNamespaceFuture, ignored -> commitReadOnlyConnectors.get(), directExecutor());
                addExceptionCallback(readOnlyCommitFuture, this::abortInternal);
                return nonCancellationPropagating(readOnlyCommitFuture);
            }

            Supplier<ListenableFuture<?>> commitReadOnlyConnectors = () -> {
                ListenableFuture<? extends List<?>> future = Futures.allAsList(connectorIdToMetadata.entrySet().stream()
                        .filter(entry -> !entry.getKey().equals(writeConnectorId))
                        .map(Entry::getValue)
                        .map(transactionMetadata -> finishingExecutor.submit(transactionMetadata::commit))
                        .collect(toList()));
                addExceptionCallback(future, throwable -> log.error(throwable, "Read-only connector should not throw exception on commit"));
                return future;
            };

            // for transactions with read and write, we only return the commit handle for write query.
            ConnectorTransactionMetadata writeConnector = connectorIdToMetadata.get(writeConnectorId);
            Supplier<ListenableFuture<?>> commitFunctionNamespaceTransactions = () -> functionNamespaceFuture;
            ListenableFuture<?> readOnlyCommitFuture = Futures.transformAsync(
                    commitFunctionNamespaceTransactions.get(),
                    ignored -> commitReadOnlyConnectors.get(),
                    directExecutor());
            ListenableFuture<?> writeCommitFuture = Futures.transformAsync(readOnlyCommitFuture, ignored -> finishingExecutor.submit(writeConnector::commit), directExecutor());
            addExceptionCallback(writeCommitFuture, this::abortInternal);

            return nonCancellationPropagating(writeCommitFuture);
        }

        public synchronized ListenableFuture<?> asyncAbort()
        {
            if (!completedSuccessfully.compareAndSet(null, false)) {
                if (completedSuccessfully.get()) {
                    // Should not happen normally
                    return immediateFailedFuture(new IllegalStateException("Current transaction already committed"));
                }
                // Already done
                return immediateFuture(null);
            }
            return abortInternal();
        }

        private synchronized ListenableFuture<?> abortInternal()
        {
            // the callbacks in statement performed on another thread so are safe
            List<ListenableFuture<?>> abortFutures = Stream.concat(
                            functionNamespaceTransactions.values().stream()
                                    .map(transactionMetadata -> finishingExecutor.submit(() -> safeAbort(transactionMetadata))),
                            connectorIdToMetadata.values().stream()
                                    .map(connection -> finishingExecutor.submit(() -> safeAbort(connection))))
                    .collect(toList());
            return nonCancellationPropagating(Futures.allAsList(abortFutures));
        }

        private static void safeAbort(ConnectorTransactionMetadata connection)
        {
            try {
                connection.abort();
            }
            catch (Exception e) {
                log.error(e, "Connector threw exception on abort");
            }
        }

        private static void safeAbort(FunctionNamespaceTransactionMetadata transactionMetadata)
        {
            try {
                transactionMetadata.abort();
            }
            catch (Exception e) {
                log.error(e, "Function namespace transaction threw exception on abort");
            }
        }

        public TransactionInfo getTransactionInfo()
        {
            Duration idleTime = Optional.ofNullable(idleStartTime.get())
                    .map(Duration::nanosSince)
                    .orElseGet(() -> new Duration(0, MILLISECONDS));

            // dereferencing this field is safe because the field is atomic
            @SuppressWarnings("FieldAccessNotGuarded") Optional<ConnectorId> writtenConnectorId = Optional.ofNullable(this.writtenConnectorId.get());

            // copying the key set is safe here because the map is concurrent
            @SuppressWarnings("FieldAccessNotGuarded") List<ConnectorId> connectorIds = ImmutableList.copyOf(connectorIdToMetadata.keySet());

            return new TransactionInfo(transactionId, isolationLevel, readOnly, autoCommitContext, createTimeInMillis, idleTime, connectorIds, writtenConnectorId);
        }

        private static class ConnectorTransactionMetadata
        {
            private final ConnectorId connectorId;
            private final Connector connector;
            private final ConnectorTransactionHandle transactionHandle;
            private final ConnectorMetadata connectorMetadata;
            private final AtomicBoolean finished = new AtomicBoolean();

            public ConnectorTransactionMetadata(ConnectorId connectorId, Connector connector, ConnectorTransactionHandle transactionHandle)
            {
                this.connectorId = requireNonNull(connectorId, "connectorId is null");
                this.connector = requireNonNull(connector, "connector is null");
                this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
                this.connectorMetadata = connector.getMetadata(transactionHandle);
            }

            public ConnectorId getConnectorId()
            {
                return connectorId;
            }

            public boolean isSingleStatementWritesOnly()
            {
                return connector.isSingleStatementWritesOnly();
            }

            public synchronized ConnectorMetadata getConnectorMetadata()
            {
                checkState(!finished.get(), "Already finished");
                return connectorMetadata;
            }

            public ConnectorTransactionHandle getTransactionHandle()
            {
                checkState(!finished.get(), "Already finished");
                return transactionHandle;
            }

            public ConnectorCommitHandle commit()
            {
                if (finished.compareAndSet(false, true)) {
                    return connector.commit(transactionHandle);
                }
                return INSTANCE;
            }

            public void abort()
            {
                if (finished.compareAndSet(false, true)) {
                    connector.rollback(transactionHandle);
                }
            }
        }

        private static class FunctionNamespaceTransactionMetadata
        {
            private final FunctionNamespaceManager<?> functionNamespaceManager;
            private final FunctionNamespaceTransactionHandle transactionHandle;
            private final AtomicBoolean finished = new AtomicBoolean();

            public FunctionNamespaceTransactionMetadata(FunctionNamespaceManager<?> functionNamespaceManager, FunctionNamespaceTransactionHandle transactionHandle)
            {
                this.functionNamespaceManager = requireNonNull(functionNamespaceManager, "functionNamespaceManager is null");
                this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
            }

            public FunctionNamespaceManager<?> getFunctionNamespaceManager()
            {
                return functionNamespaceManager;
            }

            public FunctionNamespaceTransactionHandle getTransactionHandle()
            {
                return transactionHandle;
            }

            public void commit()
            {
                if (finished.compareAndSet(false, true)) {
                    functionNamespaceManager.commit(transactionHandle);
                }
            }

            public void abort()
            {
                if (finished.compareAndSet(false, true)) {
                    functionNamespaceManager.abort(transactionHandle);
                }
            }
        }
    }
}