package com.aerospike.firefly.structure.util;

import com.aerospike.client.Record;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.firefly.io.aerospike.AerospikeConnection;
import com.aerospike.firefly.structure.FireflyEdge;
import com.aerospike.firefly.structure.FireflyEdgeFactory;
import com.aerospike.firefly.structure.FireflyGraph;
import com.aerospike.firefly.structure.FireflyVertex;
import com.aerospike.firefly.util.exceptions.AerospikeGraphException;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/firefly/structure/util/FireflyTtlHandler.class */
public class FireflyTtlHandler implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FireflyTtlHandler.class);
    private static final QueryPolicy INDEX_POLICY = new QueryPolicy();
    public static final String TTL_TIME_KEY = "TTL_TIME_KEY";
    private final FireflyGraph graph;
    private final boolean isTtlEnabled;
    private final int ttlPurgeIntervalMillis;
    private final AtomicLong thisRunTime;
    private final AtomicBoolean timerGetFailed = new AtomicBoolean(false);
    private ScheduledExecutorService scheduler;

    public FireflyTtlHandler(FireflyGraph fireflyGraph) {
        this.graph = fireflyGraph;
        this.isTtlEnabled = (!fireflyGraph.getBaseGraph().TTL_ENABLED_FLAG || fireflyGraph.bulkLoaderFlag || fireflyGraph.getBaseGraph().WARMUP_MODE) ? false : true;
        this.ttlPurgeIntervalMillis = fireflyGraph.getBaseGraph().TTL_PURGE_INTERVAL_SECONDS * 1000;
        this.thisRunTime = new AtomicLong();
        if (this.isTtlEnabled) {
            this.scheduler = Executors.newSingleThreadScheduledExecutor();
            this.scheduler.schedule(this::startPurge, 0L, TimeUnit.MILLISECONDS);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isTtlEnabled) {
            this.scheduler.shutdownNow();
        }
    }

    private long getMillisToNextRun() {
        long currentTimeMillis = (this.thisRunTime.get() + this.ttlPurgeIntervalMillis) - System.currentTimeMillis();
        if (currentTimeMillis < 0) {
            currentTimeMillis = 0;
        }
        return currentTimeMillis;
    }

    private void startPurge() {
        this.thisRunTime.set(System.currentTimeMillis());
        try {
            long andSetTtlTime = this.graph.getBaseGraph().getAndSetTtlTime(this.thisRunTime.get());
            if (this.timerGetFailed.getAndSet(false)) {
                LOG.warn("Getting last TTL purge runtime succeeded. TTL recovering to normal operation.");
            }
            long[] purgeElements = purgeElements(andSetTtlTime, this.thisRunTime.get());
            if (getMillisToNextRun() > 0) {
                LOG.debug("Extra time available until next TTL purge cycle. Running cleanup cycle.");
                long[] purgeElements2 = purgeElements(0L, andSetTtlTime);
                Logger logger = LOG;
                long j = purgeElements2[0];
                long j2 = purgeElements2[1];
                logger.debug("Cleanup TTL cycle removed " + j + " expired Vertices and " + logger + " expired Edges.");
            } else {
                Logger logger2 = LOG;
                long j3 = purgeElements[0];
                long j4 = purgeElements[1];
                int i = this.graph.getBaseGraph().TTL_PURGE_INTERVAL_SECONDS;
                logger2.warn("The latest TTL purge had " + j3 + " expired Vertices and " + logger2 + " expired Edges which took longer than the configured time of " + j4 + " seconds to remove. Upcoming expiring elements' removal may be delayed.");
            }
            this.scheduler.schedule(this::startPurge, getMillisToNextRun(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            if (!this.timerGetFailed.getAndSet(true)) {
                LOG.warn("Getting last TTL purge runtime failed. Automatically retrying after TTL interval.", (Throwable) e);
            }
            this.scheduler.schedule(this::startPurge, getMillisToNextRun(), TimeUnit.MILLISECONDS);
        }
    }

    private long[] purgeElements(long j, long j2) {
        return new long[]{purgeVertices(j, j2), purgeEdges(j, j2)};
    }

    private long purgeVertices(long j, long j2) {
        AerospikeConnection baseGraph = this.graph.getBaseGraph();
        long j3 = 0;
        try {
            Iterator querySIndex = this.graph.graphQuery.querySIndex(baseGraph.VERTEX_AERO_SET, baseGraph.TTL_VERTEX_INDEX_NAME, Filter.range(baseGraph.TTL_BIN, j, j2, new CTX[0]), INDEX_POLICY);
            while (querySIndex.hasNext()) {
                FireflyVertex vertexFromRecord = this.graph.vertexFromRecord((KeyRecord) querySIndex.next());
                if (vertexFromRecord != null) {
                    try {
                        vertexFromRecord.remove();
                        j3++;
                    } catch (AerospikeGraphException e) {
                        LOG.error("Unexpected error occurred when removing TTL Vertex ID {}: {}", vertexFromRecord.id(), e.getMessage());
                    }
                }
            }
        } catch (AerospikeGraphException e2) {
            LOG.error("Unexpected error occurred when running index to grab TTL expired Vertices: {}", e2.getMessage());
        } catch (Exception e3) {
            LOG.error("Unexpected exception when TTL purging Vertices.", (Throwable) e3);
        }
        LOG.debug("TTL purge removed {}} expired Vertices.", Long.valueOf(j3));
        return j3;
    }

    private long purgeEdges(long j, long j2) {
        FireflyEdge create;
        AerospikeConnection baseGraph = this.graph.getBaseGraph();
        long j3 = 0;
        try {
            Iterator querySIndex = this.graph.graphQuery.querySIndex(baseGraph.EDGE_AERO_SET, baseGraph.TTL_EDGE_INDEX_NAME, Filter.range(baseGraph.TTL_BIN, IndexCollectionType.MAPVALUES, j, j2, new CTX[0]), INDEX_POLICY);
            long currentTimeMillis = System.currentTimeMillis();
            while (querySIndex.hasNext()) {
                Record record = ((KeyRecord) querySIndex.next()).record;
                for (Map.Entry<?, ?> entry : record.getMap(baseGraph.TTL_BIN).entrySet()) {
                    if (((Long) entry.getValue()).longValue() <= currentTimeMillis && (create = FireflyEdgeFactory.create(baseGraph.getIdFactory().createEdgeId(entry.getKey()), record, this.graph)) != null) {
                        try {
                            create.remove();
                            currentTimeMillis = System.currentTimeMillis();
                            j3++;
                        } catch (AerospikeGraphException e) {
                            LOG.error("Unexpected error occurred when removing TTL Edge ID {}: {}", create.id(), e.getMessage());
                        }
                    }
                }
            }
        } catch (AerospikeGraphException e2) {
            LOG.error("Unexpected error occurred when running index to grab TTL expired Edges: {}", e2.getMessage());
        } catch (Exception e3) {
            LOG.error("Unexpected exception when TTL purging Edges.", (Throwable) e3);
        }
        LOG.debug("TTL purge removed {} expired Edges.", Long.valueOf(j3));
        return j3;
    }

    static {
        INDEX_POLICY.sendKey = true;
        INDEX_POLICY.includeBinData = true;
    }
}
