package io.r2dbc.postgresql;

import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.PostgresqlCopyIn;
import io.r2dbc.postgresql.api.CopyInBuilder;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.Notification;
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionContext;
import io.r2dbc.postgresql.client.PortalNameSupplier;
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.codec.Codecs;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/r2dbc/postgresql/PostgresqlConnection.class */
public final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection {
    private final Client client;
    private final ConnectionResources resources;
    private final ConnectionContext connectionContext;
    private final Codecs codecs;
    private final Flux<Long> validationQuery;
    private volatile IsolationLevel isolationLevel;
    private volatile IsolationLevel previousIsolationLevel;
    private final Logger logger = Loggers.getLogger(getClass());
    private final AtomicReference<NotificationAdapter> notificationAdapter = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/postgresql/PostgresqlConnection$EmptyTransactionDefinition.class */
    public enum EmptyTransactionDefinition implements TransactionDefinition {
        INSTANCE;

        @Override // io.r2dbc.spi.TransactionDefinition
        public <T> T getAttribute(Option<T> option) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/postgresql/PostgresqlConnection$NotificationAdapter.class */
    public static class NotificationAdapter {
        private final Sinks.Many<Notification> sink = Sinks.many().multicast().directBestEffort();

        @Nullable
        private volatile Disposable subscription = null;

        NotificationAdapter() {
        }

        void dispose() {
            Disposable disposable = this.subscription;
            if (disposable == null || disposable.isDisposed()) {
                return;
            }
            disposable.dispose();
        }

        void register(Client client) {
            BaseSubscriber<NotificationResponse> baseSubscriber = new BaseSubscriber<NotificationResponse>() { // from class: io.r2dbc.postgresql.PostgresqlConnection.NotificationAdapter.1
                @Override // reactor.core.publisher.BaseSubscriber
                protected void hookOnSubscribe(Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                @Override // reactor.core.publisher.BaseSubscriber
                public void hookOnNext(NotificationResponse notificationResponse) {
                    NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
                }

                @Override // reactor.core.publisher.BaseSubscriber
                public void hookOnError(Throwable th) {
                    NotificationAdapter.this.sink.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
                }

                @Override // reactor.core.publisher.BaseSubscriber
                public void hookOnComplete() {
                    NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                }
            };
            this.subscription = baseSubscriber;
            client.addNotificationListener(baseSubscriber);
        }

        Flux<Notification> getEvents() {
            return this.sink.asFlux();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostgresqlConnection(Client client, Codecs codecs, PortalNameSupplier portalNameSupplier, StatementCache statementCache, IsolationLevel isolationLevel, PostgresqlConnectionConfiguration postgresqlConnectionConfiguration) {
        this.client = (Client) Assert.requireNonNull(client, "client must not be null");
        this.resources = new ConnectionResources(client, codecs, this, postgresqlConnectionConfiguration, portalNameSupplier, statementCache);
        this.connectionContext = client.getContext();
        this.codecs = (Codecs) Assert.requireNonNull(codecs, "codecs must not be null");
        this.isolationLevel = (IsolationLevel) Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
        this.validationQuery = new PostgresqlStatement(this.resources, "SELECT 1").fetchSize(0).execute().flatMap((v0) -> {
            return v0.getRowsUpdated();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client getClient() {
        return this.client;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionResources getResources() {
        return this.resources;
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> beginTransaction() {
        return beginTransaction((TransactionDefinition) EmptyTransactionDefinition.INSTANCE);
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> beginTransaction(TransactionDefinition transactionDefinition) {
        Assert.requireNonNull(transactionDefinition, "definition must not be null");
        return useTransactionStatus(transactionStatus -> {
            String str;
            if (TransactionStatus.IDLE != transactionStatus) {
                this.logger.debug(this.connectionContext.getMessage("Skipping begin transaction because status is {}"), transactionStatus);
                return Mono.empty();
            }
            IsolationLevel isolationLevel = (IsolationLevel) transactionDefinition.getAttribute(TransactionDefinition.ISOLATION_LEVEL);
            Boolean bool = (Boolean) transactionDefinition.getAttribute(TransactionDefinition.READ_ONLY);
            Boolean bool2 = (Boolean) transactionDefinition.getAttribute(PostgresTransactionDefinition.DEFERRABLE);
            str = "";
            str = isolationLevel != null ? appendTransactionMode(str, "ISOLATION LEVEL", isolationLevel.asSql()) : "";
            if (bool != null) {
                String str2 = str;
                String[] strArr = new String[1];
                strArr[0] = bool.booleanValue() ? "READ ONLY" : "READ WRITE";
                str = appendTransactionMode(str2, strArr);
            }
            if (bool2 != null) {
                String str3 = str;
                String[] strArr2 = new String[2];
                strArr2[0] = bool2.booleanValue() ? "" : "NOT";
                strArr2[1] = "DEFERRABLE";
                str = appendTransactionMode(str3, strArr2);
            }
            return exchange(str.isEmpty() ? "BEGIN" : "BEGIN " + str).doOnComplete(() -> {
                this.previousIsolationLevel = this.isolationLevel;
                if (isolationLevel != null) {
                    this.isolationLevel = isolationLevel;
                }
            });
        });
    }

    private static String appendTransactionMode(String str, String... strArr) {
        StringBuilder sb = new StringBuilder(str);
        boolean z = true;
        if (sb.length() != 0) {
            sb.append(", ");
        }
        for (String str2 : strArr) {
            if (!str2.isEmpty()) {
                if (z) {
                    z = false;
                } else {
                    sb.append(" ");
                }
                sb.append(str2);
            }
        }
        return sb.toString();
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection, io.r2dbc.spi.Closeable
    public Mono<Void> close() {
        return this.client.close().doOnSubscribe(subscription -> {
            NotificationAdapter notificationAdapter = this.notificationAdapter.get();
            if (notificationAdapter == null || !this.notificationAdapter.compareAndSet(notificationAdapter, null)) {
                return;
            }
            notificationAdapter.dispose();
        }).then(Mono.empty());
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection
    public Mono<Void> cancelRequest() {
        return this.client.cancelRequest();
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> commitTransaction() {
        AtomicReference atomicReference = new AtomicReference();
        return useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE == transactionStatus) {
                this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus);
                return Mono.empty();
            }
            Flux doOnComplete = Flux.from(exchange("COMMIT")).doOnComplete(this::cleanupIsolationLevel);
            Class<CommandComplete> cls = CommandComplete.class;
            CommandComplete.class.getClass();
            return doOnComplete.filter(cls::isInstance).cast(CommandComplete.class).handle((commandComplete, synchronousSink) -> {
                if ("ROLLBACK".equalsIgnoreCase(commandComplete.getCommand())) {
                    atomicReference.set(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)"), "COMMIT"));
                } else {
                    synchronousSink.next(commandComplete);
                }
            }).doOnComplete(() -> {
                if (atomicReference.get() != null) {
                    throw ((R2dbcException) atomicReference.get());
                }
            });
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection
    public CopyInBuilder copyIn(String str) {
        return new PostgresqlCopyIn.Builder(this.resources, str);
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public PostgresqlBatch createBatch() {
        return new PostgresqlBatch(this.resources);
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> createSavepoint(String str) {
        Assert.requireNonNull(str, "name must not be null");
        return beginTransaction().then(useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.OPEN == transactionStatus) {
                return exchange(String.format("SAVEPOINT %s", str));
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping create savepoint because status is {}"), transactionStatus);
            return Mono.empty();
        }));
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public io.r2dbc.postgresql.api.PostgresqlStatement createStatement(String str) {
        Assert.requireNonNull(str, "sql must not be null");
        return new PostgresqlStatement(this.resources, str);
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection
    public Flux<Notification> getNotifications() {
        NotificationAdapter notificationAdapter = this.notificationAdapter.get();
        if (notificationAdapter == null) {
            notificationAdapter = new NotificationAdapter();
            if (this.notificationAdapter.compareAndSet(null, notificationAdapter)) {
                notificationAdapter.register(this.client);
            } else {
                notificationAdapter = this.notificationAdapter.get();
            }
        }
        return notificationAdapter.getEvents();
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public PostgresqlConnectionMetadata getMetadata() {
        return new PostgresqlConnectionMetadata(this.client.getVersion());
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public IsolationLevel getTransactionIsolationLevel() {
        return this.isolationLevel;
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public boolean isAutoCommit() {
        return this.client.getTransactionStatus() == TransactionStatus.IDLE;
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> releaseSavepoint(String str) {
        Assert.requireNonNull(str, "name must not be null");
        return useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.OPEN == transactionStatus) {
                return exchange(String.format("RELEASE SAVEPOINT %s", str));
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping release savepoint because status is {}"), transactionStatus);
            return Mono.empty();
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> rollbackTransaction() {
        return useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE != transactionStatus) {
                return exchange("ROLLBACK").doOnComplete(this::cleanupIsolationLevel);
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping rollback transaction because status is {}"), transactionStatus);
            return Mono.empty();
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> rollbackTransactionToSavepoint(String str) {
        Assert.requireNonNull(str, "name must not be null");
        return useTransactionStatus(transactionStatus -> {
            if (TransactionStatus.IDLE != transactionStatus) {
                return exchange(String.format("ROLLBACK TO SAVEPOINT %s", str));
            }
            this.logger.debug(this.connectionContext.getMessage("Skipping rollback transaction to savepoint because status is {}"), transactionStatus);
            return Mono.empty();
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> setAutoCommit(boolean z) {
        return useTransactionStatus(transactionStatus -> {
            this.logger.debug(this.connectionContext.getMessage(String.format("Setting auto-commit mode to [%s]", Boolean.valueOf(z))));
            if (isAutoCommit()) {
                if (!z) {
                    this.logger.debug(this.connectionContext.getMessage("Beginning transaction"));
                    return beginTransaction();
                }
            } else if (z) {
                this.logger.debug(this.connectionContext.getMessage("Committing pending transactions"));
                return commitTransaction();
            }
            return Mono.empty();
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
        Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
        return withTransactionStatus(getTransactionIsolationLevelQuery(isolationLevel)).flatMapMany(this::exchange).then().doOnSuccess(r5 -> {
            this.isolationLevel = isolationLevel;
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection
    public String toString() {
        return "PostgresqlConnection{client=" + this.client + ", codecs=" + this.codecs + '}';
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Boolean> validate(ValidationDepth validationDepth) {
        if (validationDepth != ValidationDepth.LOCAL) {
            return Mono.create(monoSink -> {
                if (this.client.isConnected()) {
                    this.validationQuery.subscribe((CoreSubscriber<? super Long>) new CoreSubscriber<Long>() { // from class: io.r2dbc.postgresql.PostgresqlConnection.1
                        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
                        public void onSubscribe(Subscription subscription) {
                            subscription.request(2147483647L);
                        }

                        @Override // org.reactivestreams.Subscriber
                        public void onNext(Long l) {
                        }

                        @Override // org.reactivestreams.Subscriber
                        public void onError(Throwable th) {
                            PostgresqlConnection.this.logger.debug(PostgresqlConnection.this.connectionContext.getMessage("Validation failed"), th);
                            monoSink.success(false);
                        }

                        @Override // org.reactivestreams.Subscriber
                        public void onComplete() {
                            monoSink.success(true);
                        }
                    });
                } else {
                    monoSink.success(false);
                }
            });
        }
        Client client = this.client;
        client.getClass();
        return Mono.fromSupplier(client::isConnected);
    }

    private static Function<TransactionStatus, String> getTransactionIsolationLevelQuery(IsolationLevel isolationLevel) {
        return transactionStatus -> {
            return transactionStatus == TransactionStatus.OPEN ? String.format("SET TRANSACTION ISOLATION LEVEL %s", isolationLevel.asSql()) : String.format("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL %s", isolationLevel.asSql());
        };
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> setLockWaitTimeout(Duration duration) {
        Assert.requireNonNull(duration, "lockTimeout must not be null");
        return Mono.defer(() -> {
            return Mono.from(exchange(String.format("SET LOCK_TIMEOUT = %s", Long.valueOf(duration.toMillis())))).then();
        });
    }

    @Override // io.r2dbc.postgresql.api.PostgresqlConnection, io.r2dbc.spi.Connection
    public Mono<Void> setStatementTimeout(Duration duration) {
        Assert.requireNonNull(duration, "statementTimeout must not be null");
        return Mono.defer(() -> {
            return Mono.from(exchange(String.format("SET STATEMENT_TIMEOUT = %s", Long.valueOf(duration.toMillis())))).then();
        });
    }

    private Mono<Void> useTransactionStatus(Function<TransactionStatus, Publisher<?>> function) {
        return ((Flux) Flux.defer(() -> {
            return (Publisher) function.apply(this.client.getTransactionStatus());
        }).as(Operators::discardOnCancel)).then();
    }

    private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> function) {
        return Mono.defer(() -> {
            return Mono.just(function.apply(this.client.getTransactionStatus()));
        });
    }

    private <T> Flux<T> exchange(String str) {
        AtomicReference atomicReference = new AtomicReference();
        return SimpleQueryMessageFlow.exchange(this.client, str).handle((backendMessage, synchronousSink) -> {
            if (backendMessage instanceof ErrorResponse) {
                atomicReference.set(ExceptionFactory.createException((ErrorResponse) backendMessage, str));
            } else {
                synchronousSink.next(backendMessage);
            }
        }).doOnComplete(() -> {
            if (atomicReference.get() != null) {
                throw ((R2dbcException) atomicReference.get());
            }
        });
    }

    private void cleanupIsolationLevel() {
        if (this.previousIsolationLevel != null) {
            this.isolationLevel = this.previousIsolationLevel;
        }
        this.previousIsolationLevel = null;
    }
}
