package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.api.util.MuleSystemProperties;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.BackPressureReason;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.reactor.builder.PipelineProcessingStrategyReactiveProcessorBuilder;
import org.mule.runtime.core.internal.processor.strategy.util.ProfilingUtils;
import org.mule.runtime.core.internal.util.rx.RejectionCallbackExecutorServiceDecorator;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:repository/org/mule/runtime/mule-core/4.5.0-20220622/mule-core-4.5.0-20220622.jar:org/mule/runtime/core/internal/processor/strategy/StreamEmitterProcessingStrategyFactory.class */
public class StreamEmitterProcessingStrategyFactory extends AbstractStreamProcessingStrategyFactory {

    /* loaded from: input_file:repository/org/mule/runtime/mule-core/4.5.0-20220622/mule-core-4.5.0-20220622.jar:org/mule/runtime/core/internal/processor/strategy/StreamEmitterProcessingStrategyFactory$StreamEmitterProcessingStrategy.class */
    static class StreamEmitterProcessingStrategy extends AbstractReactorStreamProcessingStrategy {
        private static final String NO_SUBSCRIPTIONS_ACTIVE_FOR_PROCESSOR = "No subscriptions active for processor.";
        private final int bufferSize;
        private final LazyValue<Scheduler> flowDispatchSchedulerLazy;
        private final AtomicLong lastRetryTimestamp;
        private final AtomicInteger queuedEvents;
        private final BiConsumer<CoreEvent, Throwable> queuedDecrementCallback;
        private final LongUnaryOperator lastRetryTimestampCheckOperator;
        private final int sinksCount;
        private final Supplier<Long> shutdownTimeoutSupplier;
        private final AtomicInteger activeSinksCount;
        private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StreamEmitterProcessingStrategy.class);
        private static final long SCHEDULER_BUSY_RETRY_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(2);

        /* loaded from: input_file:repository/org/mule/runtime/mule-core/4.5.0-20220622/mule-core-4.5.0-20220622.jar:org/mule/runtime/core/internal/processor/strategy/StreamEmitterProcessingStrategyFactory$StreamEmitterProcessingStrategy$RoundRobinReactorSink.class */
        static class RoundRobinReactorSink<E> implements AbstractProcessingStrategy.ReactorSink<E> {
            private final List<AbstractProcessingStrategy.ReactorSink<E>> fluxSinks;
            private final AtomicInteger index = new AtomicInteger(0);
            private final IntUnaryOperator update = i -> {
                return (i + 1) % this.fluxSinks.size();
            };

            public RoundRobinReactorSink(List<AbstractProcessingStrategy.ReactorSink<E>> list) {
                this.fluxSinks = list;
            }

            @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.ReactorSink
            public void prepareDispose() {
                this.fluxSinks.stream().forEach(reactorSink -> {
                    reactorSink.prepareDispose();
                });
            }

            @Override // org.mule.runtime.api.lifecycle.Disposable
            public void dispose() {
                this.fluxSinks.stream().forEach(reactorSink -> {
                    reactorSink.prepareDispose();
                });
                this.fluxSinks.stream().forEach(reactorSink2 -> {
                    reactorSink2.dispose();
                });
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.mule.runtime.core.api.processor.Sink, java.util.function.Consumer
            public void accept(CoreEvent coreEvent) {
                this.fluxSinks.get(nextIndex()).accept(coreEvent);
            }

            private int nextIndex() {
                return this.index.getAndUpdate(this.update);
            }

            @Override // org.mule.runtime.core.api.processor.Sink
            public BackPressureReason emit(CoreEvent coreEvent) {
                return this.fluxSinks.get(nextIndex()).emit(coreEvent);
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.ReactorSink
            public E intoSink(CoreEvent coreEvent) {
                return coreEvent;
            }
        }

        public StreamEmitterProcessingStrategy(int i, int i2, Supplier<Scheduler> supplier, Supplier<Scheduler> supplier2, int i3, int i4, boolean z, Supplier<Long> supplier3) {
            super(i2, supplier2, i3, i4, z);
            this.lastRetryTimestamp = new AtomicLong(Long.MIN_VALUE);
            this.queuedEvents = new AtomicInteger();
            this.queuedDecrementCallback = (coreEvent, th) -> {
                this.queuedEvents.decrementAndGet();
            };
            this.lastRetryTimestampCheckOperator = j -> {
                if (System.nanoTime() - j < SCHEDULER_BUSY_RETRY_INTERVAL_NS * 2) {
                    return j;
                }
                return Long.MIN_VALUE;
            };
            this.activeSinksCount = new AtomicInteger(0);
            this.bufferSize = i;
            this.flowDispatchSchedulerLazy = new LazyValue<>((Supplier) supplier);
            this.sinksCount = getSinksCount();
            this.shutdownTimeoutSupplier = supplier3;
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Stoppable
        public void stop() {
            if (allSchedulersStopped()) {
                stopSchedulersIfNeeded();
            }
            super.stop();
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Disposable
        public void dispose() {
            long currentTimeMillis = System.currentTimeMillis();
            long longValue = this.shutdownTimeoutSupplier.get().longValue();
            while (!allSchedulersStopped() && System.currentTimeMillis() - currentTimeMillis < longValue) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            if (!allSchedulersStopped()) {
                if (System.getProperty(MuleSystemProperties.MULE_LIFECYCLE_FAIL_ON_FIRST_DISPOSE_ERROR) != null) {
                    throw new IllegalStateException("Schedulers of ProcessingStrategy not stopped before shutdown timeout.");
                }
                LOGGER.warn("Schedulers of ProcessingStrategy not stopped before shutdown timeout.");
            }
            int andSet = this.activeSinksCount.getAndSet(0);
            super.dispose();
            long currentTimeMillis2 = System.currentTimeMillis();
            while (this.activeSinksCount.get() >= (-andSet) && System.currentTimeMillis() - currentTimeMillis2 < longValue) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            if (this.activeSinksCount.get() >= (-andSet)) {
                if (System.getProperty(MuleSystemProperties.MULE_LIFECYCLE_FAIL_ON_FIRST_DISPOSE_ERROR) != null) {
                    throw new IllegalStateException("Completion of ProcessingStrategy sinks not complete/cancelled before shutdown timeout.");
                }
                LOGGER.warn("Completion of ProcessingStrategy sinks not complete/cancelled before shutdown timeout.");
            }
        }

        private boolean allSchedulersStopped() {
            return this.activeSinksCount.get() <= 0;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy
        public boolean stopSchedulersIfNeeded() {
            boolean decrementActiveSinks = decrementActiveSinks();
            if (decrementActiveSinks) {
                try {
                    super.stopSchedulersIfNeeded();
                } finally {
                    this.flowDispatchSchedulerLazy.ifComputed((v0) -> {
                        v0.stop();
                    });
                }
            }
            return decrementActiveSinks;
        }

        private boolean decrementActiveSinks() {
            return this.activeSinksCount.decrementAndGet() <= 0;
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
            ArrayList arrayList = new ArrayList();
            int bufferQueueSize = getBufferQueueSize();
            for (int i = 0; i < this.sinksCount; i++) {
                Latch latch = new Latch();
                EmitterProcessor create = EmitterProcessor.create(bufferQueueSize);
                AtomicReference<Throwable> atomicReference = new AtomicReference<>();
                create.transform(reactiveProcessor).subscribe(null, getThrowableConsumer(flowConstruct, latch, atomicReference), () -> {
                    latch.release();
                });
                if (!create.hasDownstreams()) {
                    throw resolveSubscriptionErrorCause(atomicReference);
                }
                arrayList.add(new AbstractProcessingStrategy.DefaultReactorSink(create.sink(FluxSink.OverflowStrategy.BUFFER), l -> {
                    awaitSubscribersCompletion(flowConstruct, this.shutdownTimeoutSupplier.get().longValue(), latch, l.longValue());
                    stopSchedulersIfNeeded();
                }, this.onEventConsumer, bufferQueueSize));
            }
            this.activeSinksCount.addAndGet(this.sinksCount);
            return new RoundRobinReactorSink(arrayList);
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public void registerInternalSink(Publisher<CoreEvent> publisher, String str) {
            Flux.from(publisher).subscribe(null, th -> {
                LOGGER.error("Exception reached PS subscriber for " + str, th);
                stopSchedulersIfNeeded();
            }, () -> {
                stopSchedulersIfNeeded();
            });
            this.activeSinksCount.incrementAndGet();
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Publisher<CoreEvent> configureInternalPublisher(Publisher<CoreEvent> publisher) {
            return Flux.from(publisher).doAfterTerminate(() -> {
                decrementActiveSinks();
            }).doOnSubscribe(subscription -> {
                this.activeSinksCount.incrementAndGet();
            });
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy
        public ScheduledExecutorService getNonBlockingTaskScheduler() {
            return getRetryScheduler(super.getNonBlockingTaskScheduler());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public ScheduledExecutorService getRetryScheduler(ScheduledExecutorService scheduledExecutorService) {
            return new RejectionCallbackExecutorServiceDecorator(scheduledExecutorService, scheduledExecutorService, () -> {
                onRejected(scheduledExecutorService);
            }, () -> {
                this.lastRetryTimestamp.set(Long.MIN_VALUE);
            }, Duration.ofMillis(2L));
        }

        protected void onRejected(ScheduledExecutorService scheduledExecutorService) {
            LOGGER.trace("Shared scheduler {} is busy. Scheduling of the current event will be retried after {}ms.", (Object) (scheduledExecutorService instanceof Scheduler ? ((Scheduler) scheduledExecutorService).getName() : scheduledExecutorService.toString()), (Object) 2L);
            this.lastRetryTimestamp.set(System.nanoTime());
        }

        protected int getSinksCount() {
            return Math.min(this.maxConcurrency, AbstractStreamProcessingStrategyFactory.CORES * 2);
        }

        protected MuleRuntimeException resolveSubscriptionErrorCause(AtomicReference<Throwable> atomicReference) {
            return atomicReference.get() != null ? new MuleRuntimeException(I18nMessageFactory.createStaticMessage(NO_SUBSCRIPTIONS_ACTIVE_FOR_PROCESSOR), atomicReference.get()) : new MuleRuntimeException(I18nMessageFactory.createStaticMessage(NO_SUBSCRIPTIONS_ACTIVE_FOR_PROCESSOR));
        }

        protected Consumer<Throwable> getThrowableConsumer(FlowConstruct flowConstruct, Latch latch, AtomicReference<Throwable> atomicReference) {
            return th -> {
                LOGGER.error("Exception reached PS subscriber for flow '" + flowConstruct.getName() + "'", th);
                atomicReference.set(th);
                latch.release();
            };
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
            return PipelineProcessingStrategyReactiveProcessorBuilder.pipelineProcessingStrategyReactiveProcessorFrom(reactiveProcessor, this.executionClassloader, ProfilingUtils.getArtifactId(this.muleContext), ProfilingUtils.getArtifactType(this.muleContext)).withScheduler(decorateScheduler(getFlowDispatcherScheduler())).withProfilingService(getProfilingService()).build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy
        public BackPressureReason checkCapacity(CoreEvent coreEvent) {
            if (this.lastRetryTimestamp.get() != Long.MIN_VALUE && this.lastRetryTimestamp.updateAndGet(this.lastRetryTimestampCheckOperator) != Long.MIN_VALUE) {
                if (this.maxConcurrencyEagerCheck) {
                    return BackPressureReason.REQUIRED_SCHEDULER_BUSY;
                }
                if (this.queuedEvents.incrementAndGet() > getBufferQueueSize()) {
                    this.queuedEvents.decrementAndGet();
                    return BackPressureReason.REQUIRED_SCHEDULER_BUSY_WITH_FULL_BUFFER;
                }
                ((BaseEventContext) coreEvent.getContext()).onResponse(this.queuedDecrementCallback);
            }
            return super.checkCapacity(coreEvent);
        }

        protected Scheduler getFlowDispatcherScheduler() {
            return this.flowDispatchSchedulerLazy.get();
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy
        protected int getBufferQueueSize() {
            return this.bufferSize / this.sinksCount;
        }
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext, String str) {
        return new StreamEmitterProcessingStrategy(getBufferSize(), getSubscriberCount(), getFlowDispatchSchedulerSupplier(muleContext, str), getCpuLightSchedulerSupplier(muleContext, str), resolveParallelism(), getMaxConcurrency(), isMaxConcurrencyEagerCheck(), () -> {
            return Long.valueOf(muleContext.getConfiguration().getShutdownTimeout());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory
    public Supplier<Scheduler> getCpuLightSchedulerSupplier(MuleContext muleContext, String str) {
        return () -> {
            return muleContext.getSchedulerService().cpuLightScheduler(muleContext.getSchedulerBaseConfig().withName(str + "." + ReactiveProcessor.ProcessingType.CPU_LITE.name()));
        };
    }

    private Supplier<Scheduler> getFlowDispatchSchedulerSupplier(MuleContext muleContext, String str) {
        return () -> {
            SchedulerConfig withName = muleContext.getSchedulerBaseConfig().withName(str + ".dispatch");
            if (FLOW_DISPATCH_WORKERS > 0) {
                withName = withName.withMaxConcurrentTasks(FLOW_DISPATCH_WORKERS);
            }
            return muleContext.getSchedulerService().cpuLightScheduler(withName);
        };
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory, org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return StreamEmitterProcessingStrategy.class;
    }
}
