package com.aerospike.firefly.process.traversal.step;

import com.aerospike.firefly.process.traversal.step.sideEffect.FireflyGraphStep;
import com.aerospike.firefly.process.traversal.step.util.FireflyBatchReadHelper;
import com.aerospike.firefly.process.traversal.step.util.TraversalUtil;
import com.aerospike.firefly.structure.FireflyGraph;
import com.aerospike.firefly.structure.FireflyVertex;
import com.aerospike.firefly.structure.id.FireflyId;
import com.aerospike.firefly.util.config.ConfigurationHelper;
import com.aerospike.firefly.util.exceptions.GraphError;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.EmptyTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.javatuples.Pair;

/* loaded from: input_file:com/aerospike/firefly/process/traversal/step/FireflyBatchVertexReadStep.class */
public class FireflyBatchVertexReadStep extends CollectingBarrierStep<Vertex> implements LocalBarrier<Vertex> {
    private final Direction direction;
    private final Set<String> edgeLabels;
    public final List<HasContainer> fireflyHasContainers;
    public final List<HasContainer> aerospikeHasContainers;
    private final int barrierSize;
    private final List<String> requiredProperties;
    private final int threads;

    public FireflyBatchVertexReadStep(Traversal.Admin admin, Direction direction, String[] strArr, Set<String> set, List<HasContainer> list, int i, List<String> list2) {
        super(admin, i);
        this.direction = direction;
        this.edgeLabels = new HashSet(Arrays.asList(strArr));
        this.labels = new HashSet(set);
        this.barrierSize = i;
        if (list != null) {
            List<FireflyGraphStep.HasContainerWithCardinality> hasContainersWithCardinalityOrder = FireflyBatchReadHelper.getHasContainersWithCardinalityOrder((FireflyGraph) getTraversal().getGraph().get(), Vertex.class, list);
            this.fireflyHasContainers = (List) hasContainersWithCardinalityOrder.stream().map(hasContainerWithCardinality -> {
                return hasContainerWithCardinality.hasContainer;
            }).collect(Collectors.toList());
            this.aerospikeHasContainers = FireflyBatchReadHelper.getAerospikeHasContainers(hasContainersWithCardinalityOrder);
        } else {
            this.fireflyHasContainers = List.of();
            this.aerospikeHasContainers = List.of();
        }
        this.requiredProperties = list2;
        this.threads = ConfigurationHelper.getTraversalOptionInteger(ConfigurationHelper.TraversalOptions.PARALLELIZE, admin, 1, Integer.MAX_VALUE).orElse(-1).intValue();
    }

    public void parallelBarrierConsumer(TraverserSet<Vertex> traverserSet) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.threads, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("Aerospike-Graph-BatchVertexRead-Worker-" + thread.getId());
            thread.setDaemon(true);
            return thread;
        });
        FireflyGraph fireflyGraph = (FireflyGraph) getTraversal().getGraph().get();
        FireflyBatchReadHelper.pullFromLeft(this.traversal, fireflyGraph, traverserSet, this.barrierSize);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        ArrayList<Pair> arrayList = new ArrayList();
        while (!traverserSet.isEmpty()) {
            Traverser.Admin<Vertex> remove = traverserSet.remove();
            FireflyVertex fireflyVertex = (FireflyVertex) remove.get();
            arrayList.add(new Pair(fireflyVertex, remove));
            if (fireflyVertex.isEdgeCacheOverflowed()) {
                if (!hashMap2.containsKey(fireflyVertex)) {
                    hashMap2.put(fireflyVertex, newFixedThreadPool.submit(() -> {
                        return fireflyVertex.getVertexIdsFromVertex(this.direction, this.edgeLabels);
                    }));
                }
            } else if (!hashMap.containsKey(fireflyVertex) || hashMap.get(fireflyVertex) == null) {
                ArrayList arrayList2 = new ArrayList();
                Iterator<FireflyId> vertexIdsFromVertex = fireflyVertex.getVertexIdsFromVertex(this.direction, this.edgeLabels);
                while (vertexIdsFromVertex.hasNext()) {
                    arrayList2.add(vertexIdsFromVertex.next());
                }
                hashMap.put(fireflyVertex, arrayList2);
            }
        }
        HashSet hashSet = new HashSet();
        ArrayList arrayList3 = new ArrayList();
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            for (FireflyId fireflyId : (List) it.next()) {
                if (!concurrentHashMap.containsKey(fireflyId)) {
                    hashSet.add(fireflyId);
                }
            }
            if (hashSet.size() > fireflyGraph.getBaseGraph().AEROSPIKE_BATCH_READ_SIZE) {
                ArrayList arrayList4 = new ArrayList(hashSet);
                arrayList3.add(newFixedThreadPool.submit(() -> {
                    for (FireflyVertex fireflyVertex2 : fireflyGraph.readVertices(this.aerospikeHasContainers, arrayList4, this.requiredProperties)) {
                        concurrentHashMap.put(fireflyVertex2.id, fireflyVertex2);
                    }
                }));
                hashSet.clear();
            }
        }
        if (!hashSet.isEmpty()) {
            ArrayList arrayList5 = new ArrayList(hashSet);
            arrayList3.add(newFixedThreadPool.submit(() -> {
                for (FireflyVertex fireflyVertex2 : fireflyGraph.readVertices(this.aerospikeHasContainers, arrayList5, this.requiredProperties)) {
                    concurrentHashMap.put(fireflyVertex2.id, fireflyVertex2);
                }
            }));
        }
        while (!hashMap2.isEmpty()) {
            Iterator it2 = hashMap2.entrySet().iterator();
            while (it2.hasNext()) {
                Map.Entry entry = (Map.Entry) it2.next();
                if (((Future) entry.getValue()).isDone()) {
                    try {
                        Iterator it3 = (Iterator) ((Future) entry.getValue()).get();
                        ArrayList arrayList6 = new ArrayList();
                        while (it3.hasNext()) {
                            FireflyId fireflyId2 = (FireflyId) it3.next();
                            arrayList6.add(fireflyId2);
                            if (!concurrentHashMap.containsKey(fireflyId2)) {
                                hashSet.add(fireflyId2);
                            }
                        }
                        hashMap.putIfAbsent((Element) entry.getKey(), arrayList6);
                        if (hashSet.size() > fireflyGraph.getBaseGraph().AEROSPIKE_BATCH_READ_SIZE) {
                            ArrayList arrayList7 = new ArrayList(hashSet);
                            arrayList3.add(newFixedThreadPool.submit(() -> {
                                for (FireflyVertex fireflyVertex2 : fireflyGraph.readVertices(this.aerospikeHasContainers, arrayList7, this.requiredProperties)) {
                                    concurrentHashMap.putIfAbsent(fireflyVertex2.id, fireflyVertex2);
                                }
                            }));
                            hashSet.clear();
                        }
                        it2.remove();
                    } catch (InterruptedException e) {
                        throw new TraversalInterruptedException();
                    } catch (ExecutionException e2) {
                        GraphError.sneakyThrow(e2);
                    }
                }
            }
        }
        if (!hashSet.isEmpty()) {
            ArrayList arrayList8 = new ArrayList(hashSet);
            arrayList3.add(newFixedThreadPool.submit(() -> {
                for (FireflyVertex fireflyVertex2 : fireflyGraph.readVertices(this.aerospikeHasContainers, arrayList8, this.requiredProperties)) {
                    concurrentHashMap.put(fireflyVertex2.id, fireflyVertex2);
                }
            }));
        }
        try {
            Iterator it4 = arrayList3.iterator();
            while (it4.hasNext()) {
                ((Future) it4.next()).get();
            }
        } catch (InterruptedException e3) {
            throw new TraversalInterruptedException();
        } catch (ExecutionException e4) {
            GraphError.sneakyThrow(e4);
        }
        for (Pair pair : arrayList) {
            Element element = (Element) pair.getValue0();
            Traverser.Admin admin = (Traverser.Admin) pair.getValue1();
            Stream stream = ((List) hashMap.get(element)).stream();
            Objects.requireNonNull(concurrentHashMap);
            for (FireflyVertex fireflyVertex2 : (List) stream.map((v1) -> {
                return r1.get(v1);
            }).collect(Collectors.toList())) {
                if (fireflyVertex2 != null && HasContainer.testAll(fireflyVertex2, this.fireflyHasContainers)) {
                    traverserSet.add(admin.split(fireflyVertex2, this));
                }
            }
        }
        if (traverserSet.isEmpty()) {
            traverserSet.add((Traverser.Admin<Vertex>) EmptyTraverser.instance());
        }
    }

    @Override // org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep
    public void barrierConsumer(TraverserSet<Vertex> traverserSet) {
        if (this.threads != -1) {
            parallelBarrierConsumer(traverserSet);
            return;
        }
        FireflyGraph fireflyGraph = (FireflyGraph) getTraversal().getGraph().get();
        FireflyBatchReadHelper.pullFromLeft(this.traversal, fireflyGraph, traverserSet, this.barrierSize);
        TraverserSet traverserSet2 = new TraverserSet();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        while (!traverserSet.isEmpty()) {
            Traverser.Admin<Vertex> remove = traverserSet.remove();
            FireflyVertex fireflyVertex = (FireflyVertex) remove.get();
            int size = arrayList2.size();
            TraversalUtil.supernodeTraversalWarning(fireflyGraph, this.traversal, fireflyVertex);
            if (hashMap2.containsKey(fireflyVertex)) {
                arrayList2.addAll((Collection) hashMap2.get(fireflyVertex));
            } else {
                ArrayList arrayList3 = new ArrayList();
                Iterator<FireflyId> vertexIdsFromVertex = fireflyVertex.getVertexIdsFromVertex(this.direction, this.edgeLabels);
                while (vertexIdsFromVertex.hasNext()) {
                    arrayList3.add(vertexIdsFromVertex.next());
                }
                hashMap2.put(fireflyVertex, arrayList3);
                arrayList2.addAll(arrayList3);
            }
            for (int i = size; i < arrayList2.size(); i++) {
                FireflyId fireflyId = (FireflyId) arrayList2.get(i);
                if (!hashMap.containsKey(fireflyId)) {
                    hashSet.add(fireflyId);
                }
            }
            arrayList.add(new FireflyBatchReadHelper.ReadStepInfo(remove, Integer.valueOf(arrayList2.size() - size)));
            if (hashSet.size() >= fireflyGraph.getBaseGraph().AEROSPIKE_BATCH_READ_SIZE || arrayList2.size() >= 5 * fireflyGraph.getBaseGraph().AEROSPIKE_BATCH_READ_SIZE) {
                List<HasContainer> list = this.aerospikeHasContainers;
                List<HasContainer> list2 = this.fireflyHasContainers;
                Objects.requireNonNull(fireflyGraph);
                FireflyBatchReadHelper.drainDataToOutput(this, arrayList2, hashSet, hashMap, arrayList, list, list2, traverserSet2, fireflyGraph::readVertices, this.requiredProperties);
            }
        }
        List<HasContainer> list3 = this.aerospikeHasContainers;
        List<HasContainer> list4 = this.fireflyHasContainers;
        Objects.requireNonNull(fireflyGraph);
        FireflyBatchReadHelper.drainDataToOutput(this, arrayList2, hashSet, hashMap, arrayList, list3, list4, traverserSet2, fireflyGraph::readVertices, this.requiredProperties);
        if (traverserSet2.isEmpty()) {
            traverserSet.add((Traverser.Admin<Vertex>) EmptyTraverser.instance());
        } else {
            traverserSet.addAll(traverserSet2);
            traverserSet2.clear();
        }
    }

    @Override // org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep, org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep
    public String toString() {
        return StringFactory.stepString(this, this.direction, this.edgeLabels, Integer.valueOf(this.barrierSize));
    }
}
