/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.connector.common;

import com.google.cloud.bigquery.connector.common.BigQueryClientFactory;
import com.google.cloud.bigquery.connector.common.BigQueryConnectorException;
import com.google.cloud.bigquery.connector.common.WriteStreamStatistics;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.client.util.Sleeper;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.ApiFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.ApiFutures;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.NanoClock;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.RetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.RetryingFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.stub.readrows.ApiResultRetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Optional;
import com.google.cloud.spark.bigquery.repackaged.com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDirectDataWriterHelper {
    final Logger logger = LoggerFactory.getLogger(BigQueryDirectDataWriterHelper.class);
    final long MAX_APPEND_ROWS_REQUEST_SIZE = (long)((double)StreamWriter.getApiMaxRequestBytes() * 0.95);
    private final BigQueryWriteClient writeClient;
    private final String tablePath;
    private final ProtoSchema protoSchema;
    private final RetrySettings retrySettings;
    private final Optional<String> traceId;
    private final int partitionId;
    private final boolean writeAtLeastOnce;
    private String writeStreamName;
    private StreamWriter streamWriter;
    private ProtoRows.Builder protoRows;
    private long appendRequestRowCount = 0L;
    private long appendRequestSizeBytes = 0L;
    private long writeStreamRowCount = 0L;
    private long writeStreamTotalBytes = 0L;
    private final ExecutorService appendRowsExecutor = Executors.newSingleThreadExecutor();
    private final Queue<ApiFuture<AppendRowsResponse>> appendRowsFuturesQueue = new LinkedList<ApiFuture<AppendRowsResponse>>();

    public BigQueryDirectDataWriterHelper(BigQueryClientFactory writeClientFactory, String tablePath, ProtoSchema protoSchema, RetrySettings bigqueryDataWriterHelperRetrySettings, Optional<String> traceId, int partitionId, boolean writeAtLeastOnce) {
        this.writeClient = writeClientFactory.getBigQueryWriteClient();
        this.tablePath = tablePath;
        this.protoSchema = protoSchema;
        this.retrySettings = bigqueryDataWriterHelperRetrySettings;
        this.traceId = traceId;
        this.partitionId = partitionId;
        this.writeAtLeastOnce = writeAtLeastOnce;
        if (writeAtLeastOnce) {
            this.writeStreamName = this.tablePath + "/_default";
        } else {
            try {
                this.writeStreamName = this.retryCreateWriteStream();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new BigQueryConnectorException("Could not create write-stream after multiple retries", e);
            }
        }
        this.streamWriter = this.createStreamWriter(this.writeStreamName);
        this.protoRows = ProtoRows.newBuilder();
    }

    private String retryCreateWriteStream() throws ExecutionException, InterruptedException {
        return this.retryCallable(() -> this.writeClient.createWriteStream(CreateWriteStreamRequest.newBuilder().setParent(this.tablePath).setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()).build()).getName());
    }

    private <V> V retryCallable(Callable<V> callable) throws ExecutionException, InterruptedException {
        DirectRetryingExecutor<Object> directRetryingExecutor = new DirectRetryingExecutor<Object>(new RetryAlgorithm(new ApiResultRetryAlgorithm(), (TimedRetryAlgorithm)new ExponentialRetryAlgorithm(this.retrySettings, NanoClock.getDefaultClock())));
        RetryingFuture retryingFuture = directRetryingExecutor.createFuture(callable);
        return directRetryingExecutor.submit(retryingFuture).get();
    }

    private StreamWriter createStreamWriter(String writeStreamName) {
        try {
            StreamWriter.Builder streamWriter = StreamWriter.newBuilder(writeStreamName, this.writeClient).setWriterSchema(this.protoSchema).setEnableConnectionPool(this.writeAtLeastOnce).setRetrySettings(this.retrySettings);
            if (this.traceId.isPresent()) {
                streamWriter.setTraceId(this.traceId.get());
            }
            return streamWriter.build();
        }
        catch (IOException e) {
            throw new BigQueryConnectorException("Could not build stream-writer", e);
        }
    }

    public void addRow(ByteString message) throws IOException {
        int messageSize = message.size() + 2;
        if (this.appendRequestSizeBytes + (long)messageSize > this.MAX_APPEND_ROWS_REQUEST_SIZE) {
            if ((long)messageSize > this.MAX_APPEND_ROWS_REQUEST_SIZE) {
                throw new IOException(String.format("A single row of size %d bytes exceeded the maximum of %d bytes for an append-rows-request size", messageSize, this.MAX_APPEND_ROWS_REQUEST_SIZE));
            }
            this.sendAppendRowsRequest();
        }
        this.protoRows.addSerializedRows(message);
        this.appendRequestSizeBytes += (long)messageSize;
        ++this.appendRequestRowCount;
    }

    private void checkForFailedResponse(boolean waitForResponse) {
        ApiFuture<AppendRowsResponse> validatedAppendRowsFuture = null;
        while ((validatedAppendRowsFuture = this.appendRowsFuturesQueue.peek()) != null && (waitForResponse || validatedAppendRowsFuture.isDone())) {
            this.appendRowsFuturesQueue.poll();
            boolean succeeded = false;
            try {
                AppendRowsResponse response = (AppendRowsResponse)validatedAppendRowsFuture.get();
                succeeded = true;
            }
            catch (ExecutionException e) {
                if (e.getCause().getClass() == Exceptions.OffsetAlreadyExists.class) {
                    this.logger.warn("Ignoring OffsetAlreadyExists exception: {}", (Object)e.getCause().getMessage());
                    succeeded = true;
                    continue;
                }
                this.logger.error("Write-stream {} with name {} exception while retrieving AppendRowsResponse", (Object)this.partitionId, (Object)this.writeStreamName);
                throw new BigQueryConnectorException("Execution Exception while retrieving AppendRowsResponse", e);
            }
            catch (InterruptedException e) {
                this.logger.error("Write-stream {} with name {} interrupted exception while retrieving AppendRowsResponse", (Object)this.partitionId, (Object)this.writeStreamName);
                throw new BigQueryConnectorException("Interrupted Exception while retrieving AppendRowsResponse", e);
            }
            finally {
                if (succeeded) continue;
                this.appendRowsExecutor.shutdown();
            }
        }
    }

    private void sendAppendRowsRequest() throws IOException {
        this.checkForFailedResponse(false);
        long offset = this.writeAtLeastOnce ? -1L : this.writeStreamRowCount;
        ApiFuture<AppendRowsResponse> appendRowsResponseApiFuture = this.streamWriter.append(this.protoRows.build(), offset);
        ApiFuture validatedAppendRowsFuture = ApiFutures.transformAsync(appendRowsResponseApiFuture, appendRowsResponse -> this.validateAppendRowsResponse((AppendRowsResponse)appendRowsResponse, offset), this.appendRowsExecutor);
        this.appendRowsFuturesQueue.add(validatedAppendRowsFuture);
        this.clearProtoRows();
        this.writeStreamRowCount += this.appendRequestRowCount;
        this.writeStreamTotalBytes += this.appendRequestSizeBytes;
        this.appendRequestRowCount = 0L;
        this.appendRequestSizeBytes = 0L;
    }

    private ApiFuture<AppendRowsResponse> validateAppendRowsResponse(AppendRowsResponse appendRowsResponse, long expectedOffset) throws IOException {
        AppendRowsResponse.AppendResult appendResult;
        long responseOffset;
        if (appendRowsResponse.hasError()) {
            throw new IOException("Append request failed with error: " + appendRowsResponse.getError().getMessage());
        }
        if (!this.writeAtLeastOnce && expectedOffset != (responseOffset = (appendResult = appendRowsResponse.getAppendResult()).getOffset().getValue())) {
            throw new IOException(String.format("On stream %s append-rows response, offset %d did not match expected offset %d", this.writeStreamName, responseOffset, expectedOffset));
        }
        return ApiFutures.immediateFuture(appendRowsResponse);
    }

    public WriteStreamStatistics finalizeStream() throws IOException {
        if (this.protoRows.getSerializedRowsCount() != 0) {
            this.sendAppendRowsRequest();
        }
        try {
            this.checkForFailedResponse(true);
        }
        finally {
            this.appendRowsExecutor.shutdown();
        }
        long responseFinalizedRowCount = this.writeStreamRowCount;
        long responseFinalizedBytesWritten = this.writeStreamTotalBytes;
        if (!this.writeAtLeastOnce) {
            this.waitBeforeFinalization();
            FinalizeWriteStreamRequest finalizeWriteStreamRequest = FinalizeWriteStreamRequest.newBuilder().setName(this.writeStreamName).build();
            FinalizeWriteStreamResponse finalizeResponse = this.retryFinalizeWriteStream(finalizeWriteStreamRequest);
            long expectedFinalizedRowCount = this.writeStreamRowCount;
            responseFinalizedRowCount = finalizeResponse.getRowCount();
            if (responseFinalizedRowCount != expectedFinalizedRowCount) {
                throw new IOException(String.format("On stream %s finalization, expected finalized row count %d but received %d", this.writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
            }
        }
        this.logger.debug("Write-stream {} with name {} finalized with row-count {}", new Object[]{this.partitionId, this.writeStreamName, responseFinalizedRowCount});
        this.clean();
        return new WriteStreamStatistics(responseFinalizedRowCount, responseFinalizedBytesWritten);
    }

    private FinalizeWriteStreamResponse retryFinalizeWriteStream(FinalizeWriteStreamRequest finalizeWriteStreamRequest) {
        try {
            return this.retryCallable(() -> this.writeClient.finalizeWriteStream(finalizeWriteStreamRequest));
        }
        catch (InterruptedException | ExecutionException e) {
            throw new BigQueryConnectorException(String.format("Could not finalize stream %s.", this.writeStreamName), e);
        }
    }

    private void waitBeforeFinalization() {
        try {
            Sleeper.DEFAULT.sleep(500L);
        }
        catch (InterruptedException e) {
            throw new BigQueryConnectorException(String.format("Interrupted while sleeping before finalizing write-stream %s", this.writeStreamName), e);
        }
    }

    public void abort() {
        this.clean();
        this.protoRows = null;
        this.writeStreamName = null;
    }

    private void clean() {
        this.clearProtoRows();
        if (this.streamWriter != null) {
            this.streamWriter.close();
        }
    }

    private void clearProtoRows() {
        if (this.protoRows != null) {
            this.protoRows.clear();
        }
    }

    public String getWriteStreamName() {
        return this.writeStreamName;
    }
}

