/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.datastore.emulator.impl.watch;

import com.google.cloud.datastore.core.appengv3.converter.AppEngV3ResourceRefFromRepConverter;
import com.google.cloud.datastore.core.exception.DatastoreException;
import com.google.cloud.datastore.core.rep.EntityRef;
import com.google.cloud.datastore.core.rep.PropertyMask;
import com.google.cloud.datastore.core.rep.Query;
import com.google.cloud.datastore.core.rep.ReadResult;
import com.google.cloud.datastore.emulator.impl.context.FirestoreEmulatorRequestContext;
import com.google.cloud.datastore.emulator.impl.util.BiSetMultimap;
import com.google.cloud.datastore.emulator.impl.util.FirestoreEmulatorConverters;
import com.google.cloud.datastore.emulator.impl.util.StreamTransportClosedException;
import com.google.cloud.datastore.emulator.impl.watch.CollectionRef;
import com.google.cloud.datastore.emulator.impl.watch.CollectionSelector;
import com.google.cloud.datastore.emulator.impl.watch.FirestoreSyncManager;
import com.google.cloud.datastore.emulator.impl.watch.IncreasingTimeSource;
import com.google.cloud.datastore.emulator.impl.watch.ListenStream;
import com.google.cloud.datastore.emulator.impl.watch.StreamId;
import com.google.cloud.datastore.emulator.impl.watch.StreamTarget;
import com.google.cloud.datastore.emulator.impl.watch.TargetId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.time.TimeSource;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class ListenStreamManager<T> {
    @GuardedBy(value="this")
    private final BiSetMultimap<EntityRef, StreamTarget> documentTargets = new BiSetMultimap();
    @GuardedBy(value="this")
    private final BiSetMultimap<CollectionRef, StreamTarget> collectionQueryTargets = new BiSetMultimap();
    @GuardedBy(value="this")
    private final BiSetMultimap<CollectionRef, StreamTarget> collectionGroupQueryTargets = new BiSetMultimap();
    @GuardedBy(value="this")
    private final Map<StreamTarget, Query> streamTargetToQueries = new HashMap<StreamTarget, Query>();
    @GuardedBy(value="this")
    private final Map<StreamId, ListenStream<T>> streamIdToStream = new HashMap<StreamId, ListenStream<T>>();
    private final FirestoreSyncManager syncManager;
    private final TimeSource timeSource;

    public ListenStreamManager(FirestoreSyncManager syncManager, TimeSource timeSource) {
        this.syncManager = syncManager;
        this.timeSource = new IncreasingTimeSource(timeSource, Duration.ofNanos(1000L));
    }

    public TimeSource timeSource() {
        return this.timeSource;
    }

    public synchronized void notifyListeners(ImmutableMap<EntityRef, ReadResult> readResults) {
        this.notifyListenersSynced(Sets.union(this.notifyDocumentListeners(readResults), this.notifyQueryListeners(readResults)));
    }

    public synchronized ImmutableList<ReadResult> addDocumentTargets(ListenStream<T> stream, TargetId targetId, ImmutableSet<EntityRef> refs, Instant readTime) throws DatastoreException {
        StreamTarget streamTarget = StreamTarget.of(stream.streamId(), targetId);
        this.streamIdToStream.putIfAbsent(stream.streamId(), stream);
        this.documentTargets.addKeysByValue(streamTarget, refs);
        return this.syncManager.snapshotLookup(stream.context(), refs.stream().map(AppEngV3ResourceRefFromRepConverter.INSTANCE::convertEntityRefUnchecked).collect(ImmutableList.toImmutableList()), PropertyMask.FULL, readTime);
    }

    public synchronized ImmutableList<ReadResult> addQueryTargets(ListenStream<T> stream, TargetId targetId, Query query, EntityRef parent, CollectionSelector collectionSelector, Instant readTime) throws DatastoreException {
        StreamTarget streamTarget = StreamTarget.of(stream.streamId(), targetId);
        this.streamIdToStream.putIfAbsent(stream.streamId(), stream);
        this.streamTargetToQueries.put(streamTarget, query);
        if (collectionSelector.allDescendants().booleanValue()) {
            this.collectionGroupQueryTargets.addEntry(CollectionRef.of(parent, collectionSelector.collectionId()), streamTarget);
        } else {
            this.collectionQueryTargets.addEntry(CollectionRef.of(parent, collectionSelector.collectionId()), streamTarget);
        }
        return this.runQuery(stream.context(), query, readTime, streamTarget);
    }

    public synchronized void removeTarget(StreamTarget streamTarget) {
        this.documentTargets.removeByValue(streamTarget);
    }

    public synchronized void removeStream(StreamId streamId) {
        if (!this.streamIdToStream.containsKey(streamId)) {
            return;
        }
        ListenStream<T> stream = this.streamIdToStream.remove(streamId);
        for (TargetId targetId : stream.targetIds()) {
            StreamTarget streamTarget = StreamTarget.of(streamId, targetId);
            this.removeQuery(streamTarget);
            this.removeTarget(streamTarget);
        }
    }

    @GuardedBy(value="this")
    private void removeQuery(StreamTarget streamTarget) {
        this.streamTargetToQueries.remove(streamTarget);
        this.collectionQueryTargets.removeByValue(streamTarget);
        this.collectionGroupQueryTargets.removeByValue(streamTarget);
    }

    @GuardedBy(value="this")
    private Set<StreamId> notifyDocumentListeners(ImmutableMap<EntityRef, ReadResult> readResults) {
        ImmutableSet<StreamTarget> impactedStreamTargets = this.documentTargets.getByKeys(readResults.keySet());
        HashSet<StreamId> removedStreamIds = new HashSet<StreamId>();
        for (StreamTarget streamTarget : impactedStreamTargets) {
            ListenStream<T> stream = this.streamIdToStream.get(streamTarget.streamId());
            ImmutableSet<EntityRef> refsForStream = this.documentTargets.getByValue(streamTarget);
            try {
                ImmutableList<ReadResult> readResultsWithAuth = this.syncManager.snapshotLookup(stream.context(), refsForStream.stream().map(FirestoreEmulatorConverters::toReference).collect(ImmutableList.toImmutableList()), PropertyMask.FULL, this.timeSource.now());
                stream.notify(streamTarget.targetId(), readResultsWithAuth);
            }
            catch (DatastoreException e) {
                this.removeTarget(streamTarget);
                removedStreamIds.add(streamTarget.streamId());
                stream.notifyTargetRemove(streamTarget.targetId(), e);
            }
            catch (StreamTransportClosedException e) {
                this.removeTarget(streamTarget);
                removedStreamIds.add(streamTarget.streamId());
            }
        }
        return this.compareImpactedStreamTargetsWithRemoved(impactedStreamTargets, removedStreamIds);
    }

    @GuardedBy(value="this")
    private Set<StreamId> notifyQueryListeners(ImmutableMap<EntityRef, ReadResult> readResults) {
        Sets.SetView<StreamTarget> impactedStreamTargets = this.getImpactedQueryStreamTargets(readResults);
        HashSet<StreamId> removedStreamIds = new HashSet<StreamId>();
        for (StreamTarget streamTarget : impactedStreamTargets) {
            ListenStream<T> stream = this.streamIdToStream.get(streamTarget.streamId());
            Query query = this.streamTargetToQueries.get(streamTarget);
            try {
                stream.notifyReset(streamTarget.targetId());
                ImmutableList<ReadResult> newQueryResults = this.runQuery(stream.context(), query, this.timeSource.now(), streamTarget);
                stream.notify(streamTarget.targetId(), newQueryResults);
                stream.notifyCurrent(streamTarget.targetId());
            }
            catch (DatastoreException e) {
                this.removeTarget(streamTarget);
                removedStreamIds.add(streamTarget.streamId());
                stream.notifyTargetRemove(streamTarget.targetId(), e);
            }
            catch (StreamTransportClosedException e) {
                this.removeTarget(streamTarget);
                removedStreamIds.add(streamTarget.streamId());
            }
        }
        return this.compareImpactedStreamTargetsWithRemoved(impactedStreamTargets, removedStreamIds);
    }

    @GuardedBy(value="this")
    private Sets.SetView<StreamTarget> getImpactedQueryStreamTargets(ImmutableMap<EntityRef, ReadResult> readResults) {
        ImmutableSet directlyModified = readResults.keySet().stream().map(CollectionRef::from).collect(ImmutableSet.toImmutableSet());
        ImmutableSet descendantModified = directlyModified.stream().flatMap(ref -> ListenStreamManager.ancestors(ref)).collect(ImmutableSet.toImmutableSet());
        return Sets.union(this.collectionQueryTargets.getByKeys(directlyModified), this.collectionGroupQueryTargets.getByKeys(descendantModified));
    }

    private static Stream<CollectionRef> ancestors(CollectionRef cr) {
        ImmutableList<EntityRef.PathElement> pathElements = cr.parent().pathElements();
        Stream<List> prefixes = IntStream.range(0, pathElements.size() + 1).mapToObj(i -> pathElements.subList(0, i));
        return prefixes.map(path -> CollectionRef.of(EntityRef.create(cr.parent().partitionRef(), path), cr.collectionId()));
    }

    @GuardedBy(value="this")
    private Set<StreamId> compareImpactedStreamTargetsWithRemoved(Set<StreamTarget> impactedStreamTargets, Set<StreamId> removedStreamIds) {
        return Sets.difference(impactedStreamTargets.stream().map(StreamTarget::streamId).collect(ImmutableSet.toImmutableSet()), removedStreamIds);
    }

    @GuardedBy(value="this")
    private ImmutableList<ReadResult> runQuery(FirestoreEmulatorRequestContext context, Query query, Instant readTime, StreamTarget streamTarget) throws DatastoreException {
        ImmutableList<ReadResult> queryResults = this.syncManager.snapshotQuery(context, query, readTime);
        this.documentTargets.addKeysByValue(streamTarget, queryResults.stream().map(readResult -> FirestoreEmulatorConverters.toEntityRef(readResult.reference())).collect(ImmutableSet.toImmutableSet()));
        return queryResults;
    }

    @GuardedBy(value="this")
    private void notifyListenersSynced(Set<StreamId> streamIds) {
        for (StreamId streamId : streamIds) {
            ListenStream<T> stream = this.streamIdToStream.get(streamId);
            stream.notifySynced();
        }
    }
}

