package io.asyncer.r2dbc.mysql.client;

import io.asyncer.r2dbc.mysql.ConnectionContext;
import io.asyncer.r2dbc.mysql.MySqlSslConfiguration;
import io.asyncer.r2dbc.mysql.internal.util.AssertUtils;
import io.asyncer.r2dbc.mysql.internal.util.OperatorUtils;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.ExitMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import io.asyncer.r2dbc.mysql.message.server.WarningMessage;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.spi.R2dbcException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/asyncer/r2dbc/mysql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance((Class<?>) ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final boolean INFO_ENABLED = logger.isInfoEnabled();
    private static final Consumer<ReferenceCounted> RELEASE = (v0) -> {
        v0.release();
    };
    private final Connection connection;
    private final ConnectionContext context;
    private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer();
    private final Sinks.Many<ServerMessage> responseProcessor = Sinks.many().multicast().onBackpressureBuffer(512, false);
    private final RequestQueue requestQueue = new RequestQueue();
    private final AtomicBoolean closing = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/asyncer/r2dbc/mysql/client/ReactorNettyClient$ResponseSink.class */
    public final class ResponseSink implements SynchronousSink<ServerMessage> {
        private ResponseSink() {
        }

        @Override // reactor.core.publisher.SynchronousSink
        public void complete() {
            throw new UnsupportedOperationException();
        }

        @Override // reactor.core.publisher.SynchronousSink
        @Deprecated
        public Context currentContext() {
            return Context.empty();
        }

        @Override // reactor.core.publisher.SynchronousSink
        public ContextView contextView() {
            return Context.empty();
        }

        @Override // reactor.core.publisher.SynchronousSink
        public void error(Throwable th) {
            ReactorNettyClient.this.responseProcessor.emitError(ClientExceptions.wrap(th), Sinks.EmitFailureHandler.FAIL_FAST);
        }

        @Override // reactor.core.publisher.SynchronousSink
        public void next(ServerMessage serverMessage) {
            if (serverMessage instanceof WarningMessage) {
                int warnings = ((WarningMessage) serverMessage).getWarnings();
                if (warnings == 0) {
                    if (ReactorNettyClient.DEBUG_ENABLED) {
                        ReactorNettyClient.logger.debug("Response: {}", serverMessage);
                    }
                } else if (ReactorNettyClient.INFO_ENABLED) {
                    ReactorNettyClient.logger.info("Response: {}, reports {} warning(s)", serverMessage, Integer.valueOf(warnings));
                }
            } else if (ReactorNettyClient.DEBUG_ENABLED) {
                ReactorNettyClient.logger.debug("Response: {}", serverMessage);
            }
            ReactorNettyClient.this.responseProcessor.emitNext(serverMessage, Sinks.EmitFailureHandler.FAIL_FAST);
        }
    }

    /* loaded from: input_file:io/asyncer/r2dbc/mysql/client/ReactorNettyClient$ResponseSubscriber.class */
    private final class ResponseSubscriber implements CoreSubscriber<Object> {
        private final ResponseSink sink;

        private ResponseSubscriber(ResponseSink responseSink) {
            this.sink = responseSink;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            ((Subscriber) ReactorNettyClient.this.responseProcessor.asFlux()).onSubscribe(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Object obj) {
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.sink.error(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            ReactorNettyClient.this.handleClose();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactorNettyClient(Connection connection, MySqlSslConfiguration mySqlSslConfiguration, ConnectionContext connectionContext) {
        AssertUtils.requireNonNull(connection, "connection must not be null");
        AssertUtils.requireNonNull(connectionContext, "context must not be null");
        AssertUtils.requireNonNull(mySqlSslConfiguration, "ssl must not be null");
        AssertUtils.require(this.responseProcessor.asFlux() instanceof Subscriber, "responseProcessor(" + this.responseProcessor + ") must be a Subscriber");
        this.connection = connection;
        this.context = connectionContext;
        connection.addHandlerLast("R2dbcMySqlEnvelopeSlicer", new EnvelopeSlicer()).addHandlerLast("R2dbcMySqlMessageDuplexCodec", new MessageDuplexCodec(connectionContext, this.closing, this.requestQueue));
        if (mySqlSslConfiguration.getSslMode().startSsl()) {
            connection.addHandlerFirst("R2dbcMySqlSslBridgeHandler", new SslBridgeHandler(connectionContext, mySqlSslConfiguration));
        }
        if (logger.isTraceEnabled()) {
            logger.debug("Connection tracking logging is enabled");
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), new LoggingHandler((Class<?>) ReactorNettyClient.class, LogLevel.TRACE));
        }
        ResponseSink responseSink = new ResponseSink();
        connection.inbound().receiveObject().doOnNext(obj -> {
            if (!(obj instanceof ServerMessage)) {
                throw ClientExceptions.unsupportedProtocol(obj.getClass().getTypeName());
            }
            if (obj instanceof ReferenceCounted) {
                ((ReferenceCounted) obj).retain();
            }
            responseSink.next((ServerMessage) obj);
        }).onErrorResume(this::resumeError).subscribe((CoreSubscriber<? super Object>) new ResponseSubscriber(responseSink));
        this.requests.asFlux().concatMap(clientMessage -> {
            if (DEBUG_ENABLED) {
                logger.debug("Request: {}", clientMessage);
            }
            return connection.outbound().sendObject(clientMessage);
        }).onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public <T> Flux<T> exchange(ClientMessage clientMessage, BiConsumer<ServerMessage, SynchronousSink<T>> biConsumer) {
        AssertUtils.requireNonNull(clientMessage, "request must not be null");
        return Mono.create(monoSink -> {
            if (isConnected()) {
                this.requestQueue.submit(RequestTask.wrap(clientMessage, (MonoSink<Flux>) monoSink, OperatorUtils.discardOnCancel(this.responseProcessor.asFlux().doOnSubscribe(subscription -> {
                    emitNextRequest(clientMessage);
                }).handle(biConsumer).doOnTerminate(this.requestQueue)).doOnDiscard(ReferenceCounted.class, RELEASE)));
            } else {
                if (clientMessage instanceof Disposable) {
                    ((Disposable) clientMessage).dispose();
                }
                monoSink.error(ClientExceptions.exchangeClosed());
            }
        }).flatMapMany(Function.identity());
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public <T> Flux<T> exchange(FluxExchangeable<T> fluxExchangeable) {
        AssertUtils.requireNonNull(fluxExchangeable, "exchangeable must not be null");
        return Mono.create(monoSink -> {
            if (!isConnected()) {
                fluxExchangeable.subscribe(clientMessage -> {
                    if (clientMessage instanceof Disposable) {
                        ((Disposable) clientMessage).dispose();
                    }
                }, th -> {
                    this.requests.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
                });
                monoSink.error(ClientExceptions.exchangeClosed());
                return;
            }
            Flux doOnTerminate = this.responseProcessor.asFlux().doOnSubscribe(subscription -> {
                fluxExchangeable.subscribe(this::emitNextRequest, th2 -> {
                    this.requests.emitError(th2, Sinks.EmitFailureHandler.FAIL_FAST);
                });
            }).handle(fluxExchangeable).doOnTerminate(() -> {
                fluxExchangeable.dispose();
                this.requestQueue.run();
            });
            RequestQueue requestQueue = this.requestQueue;
            Flux doOnDiscard = OperatorUtils.discardOnCancel(doOnTerminate).doOnDiscard(ReferenceCounted.class, RELEASE);
            fluxExchangeable.getClass();
            requestQueue.submit(RequestTask.wrap(fluxExchangeable, (MonoSink<Flux>) monoSink, doOnDiscard.doOnCancel(fluxExchangeable::dispose)));
        }).flatMapMany(Function.identity());
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public Mono<Void> close() {
        return Mono.create(monoSink -> {
            if (this.closing.compareAndSet(false, true)) {
                this.requestQueue.submit(RequestTask.wrap(monoSink, Mono.fromRunnable(() -> {
                    Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(ExitMessage.INSTANCE);
                    if (tryEmitNext != Sinks.EmitResult.OK) {
                        logger.error("Exit message sending failed due to {}, force closing", tryEmitNext);
                    }
                })));
            } else {
                monoSink.success();
            }
        }).flatMap(Function.identity()).onErrorResume(th -> {
            logger.error("Exit message sending failed, force closing", th);
            return Mono.empty();
        }).then(forceClose());
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public Mono<Void> forceClose() {
        return FutureMono.deferFuture(() -> {
            return this.connection.channel().close();
        });
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.connection.outbound().alloc();
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public boolean isConnected() {
        return !this.closing.get() && this.connection.channel().isOpen();
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public void sslUnsupported() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object) SslState.UNSUPPORTED);
    }

    @Override // io.asyncer.r2dbc.mysql.client.Client
    public void loginSuccess() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object) Lifecycle.COMMAND);
    }

    public String toString() {
        Object[] objArr = new Object[2];
        objArr[0] = this.closing.get() ? "closing or closed" : "activating";
        objArr[1] = Integer.valueOf(this.context.getConnectionId());
        return String.format("ReactorNettyClient(%s){connectionId=%d}", objArr);
    }

    private void emitNextRequest(ClientMessage clientMessage) {
        if (!(isConnected() && this.requests.tryEmitNext(clientMessage) == Sinks.EmitResult.OK) && (clientMessage instanceof Disposable)) {
            ((Disposable) clientMessage).dispose();
        }
    }

    private <T> Mono<T> resumeError(Throwable th) {
        drainError(ClientExceptions.wrap(th));
        this.requests.emitComplete((signalType, emitResult) -> {
            if (!emitResult.isFailure()) {
                return false;
            }
            logger.error("Error: {}", emitResult);
            return false;
        });
        logger.error("Error: {}", th.getLocalizedMessage(), th);
        return (Mono<T>) close();
    }

    private void drainError(R2dbcException r2dbcException) {
        this.requestQueue.dispose();
        this.responseProcessor.emitError(r2dbcException, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleClose() {
        if (!this.closing.compareAndSet(false, true)) {
            drainError(ClientExceptions.expectedClosed());
        } else {
            logger.warn("Connection has been closed by peer");
            drainError(ClientExceptions.unexpectedClosed());
        }
    }
}
