/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spark.bigquery.write;

import com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.Iterators;
import com.google.cloud.spark.bigquery.write.context.DataWriterContext;
import com.google.cloud.spark.bigquery.write.context.DataWriterContextFactory;
import com.google.cloud.spark.bigquery.write.context.WriterCommitMessageContext;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Optional;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSqlUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSourceWriterContextPartitionHandler
implements Function2<Integer, Iterator<Row>, Iterator<WriterCommitMessageContext>>,
Serializable {
    private static Logger logger = LoggerFactory.getLogger(DataSourceWriterContextPartitionHandler.class);
    private final DataWriterContextFactory dataWriterContextFactory;
    private long epoch;
    private long taskId;

    public DataSourceWriterContextPartitionHandler(DataWriterContextFactory dataWriterContextFactory, long epoch) {
        this.dataWriterContextFactory = dataWriterContextFactory;
        this.epoch = epoch;
        TaskContext tc = TaskContext.get();
        this.taskId = tc != null ? tc.taskAttemptId() : 0L;
    }

    public Iterator<WriterCommitMessageContext> call(Integer partitionId, Iterator<Row> rowIterator) throws Exception {
        Throwable throwable = null;
        try (DataWriterContext<InternalRow> dataWriterContext = this.dataWriterContextFactory.createDataWriterContext(partitionId, this.taskId, this.epoch);){
            Object row;
            while (rowIterator.hasNext()) {
                row = rowIterator.next();
                InternalRow internalRow = SparkSqlUtils.getInstance().rowToInternalRow((Row)row);
                dataWriterContext.write(internalRow);
            }
            row = Iterators.forArray(dataWriterContext.commit());
            return row;
        }
        catch (Exception e) {
            logger.warn("Encountered error writing partition {} in task id {} for epoch {}. Calling DataWriter.abort()", new Object[]{partitionId, this.taskId, this.epoch, e});
            dataWriterContext.abort();
            WriterCommitMessageContext writerCommitMessage = new WriterCommitMessageContext(){

                @Override
                public Optional<Exception> getError() {
                    return Optional.of(e);
                }
            };
            Iterator iterator = ImmutableList.of(writerCommitMessage).iterator();
            return iterator;
        }
        catch (Throwable throwable2) {
            throwable = throwable2;
            throw throwable2;
        }
    }
}

