package com.aerospike.firefly.process.computer.local;

import com.aerospike.firefly.io.aerospike.query.paged.PartitionIterator;
import com.aerospike.firefly.process.computer.local.LocalGraphComputer;
import com.aerospike.firefly.structure.FireflyGraph;
import com.aerospike.firefly.structure.FireflyVertex;
import com.aerospike.firefly.util.config.ConfigurationHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/firefly/process/computer/local/LocalWorkerPool.class */
public class LocalWorkerPool implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LocalWorkerPool.class);
    private static final BasicThreadFactory THREAD_FACTORY_WORKER = new BasicThreadFactory.Builder().namingPattern("firefly-worker-%d").build();
    private final int numberOfWorkers;
    private final ExecutorService workerPool;
    private final CompletionService<Object> completionService;
    private VertexProgramPool vertexProgramPool;
    private MapReducePool mapReducePool;
    private final Queue<LocalWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue();
    private final FireflyGraph graph;

    public LocalWorkerPool(FireflyGraph fireflyGraph, LocalMemory localMemory, int i) {
        this.graph = fireflyGraph;
        this.numberOfWorkers = i;
        this.workerPool = Executors.newFixedThreadPool(i, THREAD_FACTORY_WORKER);
        this.completionService = new ExecutorCompletionService(this.workerPool);
        for (int i2 = 0; i2 < this.numberOfWorkers; i2++) {
            this.workerMemoryPool.add(new LocalWorkerMemory(localMemory));
        }
    }

    public void setVertexProgram(VertexProgram vertexProgram) {
        this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.numberOfWorkers);
    }

    public void setMapReduce(MapReduce mapReduce) {
        this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
    }

    public List<Element> executeVertexProgram(LocalGraphComputer.ExecuteVertexProgram executeVertexProgram, boolean z, List<Element> list, GraphFilter graphFilter, List<HasContainer> list2) throws InterruptedException {
        long longValue = ((Long) ((Map) this.graph.traversal().call("aerospike.graph.admin.metadata.summary").next()).get("Total vertex count")).longValue();
        int max = Math.max((int) Math.ceil(longValue / this.numberOfWorkers), ConfigurationHelper.getOrDefaultInt(ConfigurationHelper.Keys.PAGINATION_PAGE_SIZE, this.graph.configuration()));
        LOG.debug("VERTEX PROGRAM STAGE PARTITION CONFIGURATION\n\tVertices in summary metadata: {}\n\tComputed partition size: {}\n\tPartition queue size: {}\n\tPartition max wait: {}\n", Long.valueOf(longValue), Integer.valueOf(max), Integer.valueOf(ConfigurationHelper.getOrDefaultInt(ConfigurationHelper.Keys.PAGINATION_PAGE_QUEUE_SIZE, this.graph.configuration())), Integer.valueOf(ConfigurationHelper.getOrDefaultInt(ConfigurationHelper.Keys.PAGINATION_PAGE_MAX_WAIT, this.graph.configuration())));
        AtomicLong atomicLong = new AtomicLong(0L);
        PartitionIterator.Builder partitionSize = PartitionIterator.build(this.graph).partitionSize(max);
        if (z) {
            partitionSize.containers(list2);
        } else if (list.isEmpty()) {
            partitionSize.filters(graphFilter);
        } else {
            partitionSize.vertices(list);
        }
        List<Element> synchronizedList = Collections.synchronizedList(new ArrayList());
        PartitionIterator create = partitionSize.create();
        for (int i = 0; i < this.numberOfWorkers; i++) {
            try {
                int i2 = i;
                this.completionService.submit(() -> {
                    List<Element> execute;
                    long j = 0;
                    VertexProgram<?> take = this.vertexProgramPool.take();
                    LocalWorkerMemory poll = this.workerMemoryPool.poll();
                    while (true) {
                        Optional<CloseableIterator<FireflyVertex>> next = create.next();
                        if (!next.isPresent()) {
                            this.vertexProgramPool.offer(take);
                            this.workerMemoryPool.offer(poll);
                            return null;
                        }
                        try {
                            List list3 = IteratorUtils.toList(next.get());
                            j += list3.size();
                            List list4 = (List) list3.stream().map(fireflyVertex -> {
                                return fireflyVertex.id();
                            }).collect(Collectors.toList());
                            if (!list3.isEmpty() && (execute = executeVertexProgram.execute(null, poll, obj -> {
                                return list4.contains(obj);
                            })) != null) {
                                synchronizedList.addAll(execute);
                                atomicLong.addAndGet(execute.size());
                            }
                            LOG.info("Worker {} processed {} vertices, total={}", Integer.valueOf(i2), Long.valueOf(j), Long.valueOf(atomicLong.get()));
                        } catch (Exception e) {
                            LOG.error("Worker {} failed on {} vertex of partition", Integer.valueOf(i2), Long.valueOf(j), e);
                        }
                    }
                });
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (create != null) {
            create.close();
        }
        for (int i3 = 0; i3 < this.numberOfWorkers; i3++) {
            try {
                this.completionService.take().get();
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                throw new IllegalStateException(e2.getMessage(), e2);
            }
        }
        return synchronizedList;
    }

    public void executeMapReduce(Consumer<MapReduce> consumer) throws InterruptedException {
        for (int i = 0; i < this.numberOfWorkers; i++) {
            this.completionService.submit(() -> {
                MapReduce<?, ?, ?, ?, ?> take = this.mapReducePool.take();
                consumer.accept(take);
                this.mapReducePool.offer(take);
                return null;
            });
        }
        for (int i2 = 0; i2 < this.numberOfWorkers; i2++) {
            try {
                this.completionService.take().get();
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                throw new IllegalStateException(e2.getMessage(), e2);
            }
        }
    }

    public void closeNow() {
        this.workerPool.shutdownNow();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.workerPool.shutdown();
    }
}
