/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spark.bigquery.v2.context;

import com.google.cloud.bigquery.connector.common.ArrowUtil;
import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import com.google.cloud.bigquery.connector.common.IteratorMultiplexer;
import com.google.cloud.bigquery.connector.common.NonInterruptibleBlockingBytesChannel;
import com.google.cloud.bigquery.connector.common.ParallelArrowReader;
import com.google.cloud.bigquery.connector.common.ReadRowsHelper;
import com.google.cloud.bigquery.connector.common.ReadRowsResponseInputStreamEnumeration;
import com.google.cloud.spark.bigquery.ArrowSchemaConverter;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.MoreExecutors;
import com.google.cloud.spark.bigquery.repackaged.com.google.protobuf.ByteString;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.compression.CommonsCompressionFactory;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.memory.BufferAllocator;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.util.AutoCloseables;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.VectorLoader;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.VectorSchemaRoot;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.compression.CompressionCodec;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowReader;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader;
import com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.types.pojo.Schema;
import com.google.cloud.spark.bigquery.v2.context.InputPartitionReaderContext;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

public class ArrowColumnBatchPartitionReaderContext
implements InputPartitionReaderContext<ColumnarBatch> {
    private static final long maxAllocation = 524288000L;
    private final ReadRowsHelper readRowsHelper;
    private final ArrowReaderAdapter reader;
    private final BufferAllocator allocator;
    private final List<String> namesInOrder;
    private ColumnarBatch currentBatch;
    private final BigQueryStorageReadRowsTracer tracer;
    private boolean closed = false;
    private final Map<String, StructField> userProvidedFieldMap;
    private final List<AutoCloseable> closeables = new ArrayList<AutoCloseable>();

    ArrowColumnBatchPartitionReaderContext(Iterator<ReadRowsResponse> readRowsResponses, ByteString schema, ReadRowsHelper readRowsHelper, List<String> namesInOrder, BigQueryStorageReadRowsTracer tracer, Optional<StructType> userProvidedSchema, int numBackgroundThreads, ReadSession.TableReadOptions.ResponseCompressionCodec responseCompressionCodec) {
        this.allocator = ArrowUtil.newRootAllocator(524288000L);
        this.readRowsHelper = readRowsHelper;
        this.namesInOrder = namesInOrder;
        this.tracer = tracer;
        this.closeables.add(null);
        List userProvidedFieldList = Arrays.stream(userProvidedSchema.orElse(new StructType()).fields()).collect(Collectors.toList());
        this.userProvidedFieldMap = userProvidedFieldList.stream().collect(Collectors.toMap(StructField::name, field -> field));
        if (numBackgroundThreads == 1) {
            InputStream fullStream = this.makeSingleInputStream(readRowsResponses, schema, tracer, responseCompressionCodec);
            this.reader = new ParallelReaderAdapter(this.allocator, ImmutableList.of(this.newArrowStreamReader(fullStream)), MoreExecutors.newDirectExecutorService(), tracer.forkWithPrefix("BackgroundReader"), null);
        } else if (numBackgroundThreads > 1) {
            ThreadPoolExecutor backgroundParsingService = new ThreadPoolExecutor(1, numBackgroundThreads - 1, 2L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
            IteratorMultiplexer<ReadRowsResponse> multiplexer = new IteratorMultiplexer<ReadRowsResponse>(readRowsResponses, numBackgroundThreads);
            ArrayList<ArrowReader> readers = new ArrayList<ArrowReader>();
            for (int x = 0; x < numBackgroundThreads; ++x) {
                BigQueryStorageReadRowsTracer multiplexedTracer = tracer.forkWithPrefix("multiplexed-" + x);
                SequenceInputStream responseStream = new SequenceInputStream(new ReadRowsResponseInputStreamEnumeration(multiplexer.getSplit(x), multiplexedTracer, responseCompressionCodec));
                SequenceInputStream schemaAndBatches = new SequenceInputStream(schema.newInput(), responseStream);
                this.closeables.add(multiplexedTracer::finished);
                readers.add(this.newArrowStreamReader(schemaAndBatches));
            }
            this.reader = new ParallelReaderAdapter(this.allocator, readers, backgroundParsingService, tracer.forkWithPrefix("MultithreadReader"), multiplexer);
        } else {
            InputStream fullStream = this.makeSingleInputStream(readRowsResponses, schema, tracer, responseCompressionCodec);
            this.reader = new SimpleAdapter(this.newArrowStreamReader(fullStream));
        }
    }

    private InputStream makeSingleInputStream(Iterator<ReadRowsResponse> readRowsResponses, ByteString schema, BigQueryStorageReadRowsTracer tracer, ReadSession.TableReadOptions.ResponseCompressionCodec responseCompressionCodec) {
        SequenceInputStream batchStream = new SequenceInputStream(new ReadRowsResponseInputStreamEnumeration(readRowsResponses, tracer, responseCompressionCodec));
        return new SequenceInputStream(schema.newInput(), batchStream);
    }

    @Override
    public boolean next() throws IOException {
        this.tracer.nextBatchNeeded();
        if (this.closed) {
            return false;
        }
        this.tracer.rowsParseStarted();
        boolean bl = this.closed = !this.reader.loadNextBatch();
        if (this.closed) {
            return false;
        }
        VectorSchemaRoot root = this.reader.root();
        if (this.currentBatch == null) {
            ColumnVector[] columns = (ColumnVector[])this.namesInOrder.stream().map(root::getVector).map(vector -> ArrowSchemaConverter.newArrowSchemaConverter(vector, this.userProvidedFieldMap.get(vector.getName()))).toArray(ColumnVector[]::new);
            this.currentBatch = new ColumnarBatch(columns);
        }
        this.currentBatch.setNumRows(root.getRowCount());
        this.tracer.rowsParseFinished(this.currentBatch.numRows());
        return true;
    }

    @Override
    public ColumnarBatch get() {
        return this.currentBatch;
    }

    @Override
    public Optional<BigQueryStorageReadRowsTracer> getBigQueryStorageReadRowsTracer() {
        return Optional.of(this.tracer);
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        try {
            this.tracer.finished();
            this.closeables.set(0, this.reader);
            this.closeables.add(this.allocator);
            AutoCloseables.close(this.closeables);
        }
        catch (Exception e) {
            throw new IOException("Failure closing arrow components. stream: " + this.readRowsHelper, e);
        }
        finally {
            try {
                this.readRowsHelper.close();
            }
            catch (Exception e) {
                throw new IOException("Failure closing stream: " + this.readRowsHelper, e);
            }
        }
    }

    private ArrowStreamReader newArrowStreamReader(InputStream fullStream) {
        BufferAllocator childAllocator = this.allocator.newChildAllocator("readerAllocator" + (this.closeables.size() - 1), 0L, 524288000L);
        this.closeables.add(childAllocator);
        return new ArrowStreamReader(new NonInterruptibleBlockingBytesChannel(fullStream), childAllocator, (CompressionCodec.Factory)CommonsCompressionFactory.INSTANCE);
    }

    static class ParallelReaderAdapter
    implements ArrowReaderAdapter {
        private final ParallelArrowReader reader;
        private final VectorLoader loader;
        private final VectorSchemaRoot root;
        private final List<AutoCloseable> closeables = new ArrayList<AutoCloseable>();
        private IOException initialException;

        ParallelReaderAdapter(BufferAllocator allocator, List<ArrowReader> readers, ExecutorService executor, BigQueryStorageReadRowsTracer tracer, AutoCloseable closeable) {
            Schema schema = null;
            this.closeables.add(closeable);
            try {
                schema = readers.get(0).getVectorSchemaRoot().getSchema();
            }
            catch (IOException e) {
                this.initialException = e;
                this.closeables.addAll(readers);
                this.reader = null;
                this.loader = null;
                this.root = null;
                return;
            }
            BufferAllocator readerAllocator = allocator.newChildAllocator("ParallelReaderAllocator", 0L, 524288000L);
            this.root = VectorSchemaRoot.create(schema, readerAllocator);
            this.closeables.add(this.root);
            this.loader = new VectorLoader(this.root);
            this.reader = new ParallelArrowReader(readers, executor, this.loader, tracer);
            this.closeables.add(0, this.reader);
            this.closeables.add(readerAllocator);
        }

        @Override
        public boolean loadNextBatch() throws IOException {
            if (this.initialException != null) {
                throw new IOException(this.initialException);
            }
            return this.reader.next();
        }

        @Override
        public VectorSchemaRoot root() throws IOException {
            return this.root;
        }

        @Override
        public void close() throws Exception {
            AutoCloseables.close(this.closeables);
        }
    }

    static class SimpleAdapter
    implements ArrowReaderAdapter {
        private final ArrowReader reader;

        SimpleAdapter(ArrowReader reader) {
            this.reader = reader;
        }

        @Override
        public boolean loadNextBatch() throws IOException {
            return this.reader.loadNextBatch();
        }

        @Override
        public VectorSchemaRoot root() throws IOException {
            return this.reader.getVectorSchemaRoot();
        }

        @Override
        public void close() throws Exception {
            this.reader.close(false);
        }
    }

    static interface ArrowReaderAdapter
    extends AutoCloseable {
        public boolean loadNextBatch() throws IOException;

        public VectorSchemaRoot root() throws IOException;
    }
}

