/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.datastore.emulator.impl;

import com.google.apphosting.datastore.DatastoreV3Pb;
import com.google.cloud.datastore.core.exception.DatastoreException;
import com.google.cloud.datastore.emulator.impl.CloudFirestoreV1;
import com.google.cloud.datastore.emulator.impl.context.FirestoreEmulatorRequestContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import com.google.common.time.TimeSource;
import com.google.firestore.v1.WriteRequest;
import com.google.firestore.v1.WriteResponse;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class CloudFirestoreV1WriteStream
implements StreamObserver<WriteRequest> {
    private static final GoogleLogger logger = GoogleLogger.forInjectedClassName("com/google/cloud/datastore/emulator/impl/CloudFirestoreV1WriteStream");
    private static final long INITIAL_STREAM_TOKEN = 0L;
    private final CloudFirestoreV1 firestore;
    private final TimeSource timeSource;
    private final StreamObserver<WriteResponse> responseObserver;
    private final FirestoreEmulatorRequestContext context;
    private String databaseId;
    private final String streamId;
    private long nextStreamToken;
    private boolean isTerminated;

    public static CloudFirestoreV1WriteStream create(CloudFirestoreV1 firestore, TimeSource timeSource, StreamObserver<WriteResponse> responseObserver, String streamId, FirestoreEmulatorRequestContext context) {
        return new CloudFirestoreV1WriteStream(firestore, timeSource, responseObserver, streamId, context);
    }

    private CloudFirestoreV1WriteStream(CloudFirestoreV1 firestore, TimeSource timeSource, StreamObserver<WriteResponse> responseObserver, String streamId, FirestoreEmulatorRequestContext context) {
        this.firestore = firestore;
        this.timeSource = timeSource;
        this.responseObserver = responseObserver;
        this.streamId = streamId;
        this.context = context;
        this.nextStreamToken = 0L;
        this.isTerminated = false;
    }

    private boolean isInitialRequest() {
        return this.nextStreamToken == 0L;
    }

    @Override
    public void onNext(WriteRequest value) {
        try {
            this.responseObserver.onNext(this.write(value));
            if (this.isTerminated) {
                this.responseObserver.onCompleted();
            }
        }
        catch (DatastoreException e) {
            this.responseObserver.onError(e);
        }
    }

    @Override
    public void onError(Throwable t) {
        Status s = Status.fromThrowable(t);
        if (s.getCode() != Status.Code.CANCELLED) {
            ((GoogleLogger.Api)((GoogleLogger.Api)((GoogleLogger.Api)logger.atSevere()).withCause(t)).withInjectedLogSite("com/google/cloud/datastore/emulator/impl/CloudFirestoreV1WriteStream", "onError", 106, "CloudFirestoreV1WriteStream.java")).log("Client responded with error.");
        }
    }

    @Override
    public void onCompleted() {
        try {
            this.responseObserver.onCompleted();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @VisibleForTesting
    WriteResponse write(WriteRequest request) throws DatastoreException {
        if (!request.getStreamId().isEmpty()) {
            throw new DatastoreException("Resuming streams is not supported, do not set stream ID.", DatastoreV3Pb.Error.ErrorCode.BAD_REQUEST, null);
        }
        if (this.isInitialRequest()) {
            return this.handleInitialRequest(request);
        }
        return this.handleRequest(request);
    }

    private WriteResponse handleInitialRequest(WriteRequest request) throws DatastoreException {
        if (request.getDatabase().isEmpty()) {
            throw new DatastoreException("'database' must be set on an initial write request.", DatastoreV3Pb.Error.ErrorCode.BAD_REQUEST, null);
        }
        if (!request.getStreamToken().isEmpty()) {
            throw new DatastoreException("Resuming streams is not supported, 'stream_token' must not be set on an initial write request.", DatastoreV3Pb.Error.ErrorCode.BAD_REQUEST, null);
        }
        if (request.getWritesCount() != 0) {
            throw new DatastoreException("'writes' must not be set on an initial write request.", DatastoreV3Pb.Error.ErrorCode.BAD_REQUEST, null);
        }
        this.databaseId = request.getDatabase();
        return WriteResponse.newBuilder().setStreamId(this.streamId).setStreamToken(this.getNextStreamToken()).build();
    }

    private WriteResponse handleRequest(WriteRequest request) throws DatastoreException {
        if (this.isTerminated) {
            throw new DatastoreException("A non-initial request with empty writes was received. Not accepting further requests", DatastoreV3Pb.Error.ErrorCode.BAD_REQUEST, null);
        }
        if (request.getDatabase().isEmpty()) {
            request = request.toBuilder().setDatabase(this.databaseId).build();
        } else if (!request.getDatabase().equals(this.databaseId)) {
            throw new DatastoreException(String.format("Request specified a database (%s) that did not match the expected database (%s)", request.getDatabase(), this.databaseId), DatastoreV3Pb.Error.ErrorCode.BAD_REQUEST, null);
        }
        if (request.getWritesCount() == 0) {
            this.isTerminated = true;
            return WriteResponse.newBuilder().setStreamToken(this.getNextStreamToken()).build();
        }
        return this.firestore.write(this.context.toBuilder().requestTime(this.timeSource.now()).build(), request).toBuilder().setStreamToken(this.getNextStreamToken()).build();
    }

    private ByteString getNextStreamToken() {
        ByteString token = ByteString.copyFromUtf8(Long.toString(this.nextStreamToken));
        ++this.nextStreamToken;
        return token;
    }
}

