package com.aerospike.firefly.util.concurrency;

import com.aerospike.client.Key;
import com.aerospike.firefly.io.aerospike.AerospikeConnection;
import com.aerospike.firefly.util.exceptions.AerospikeGraphException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/firefly/util/concurrency/FireflyRecordLockHandler.class */
public class FireflyRecordLockHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FireflyRecordLockHandler.class);
    private static final ConcurrentHashMap<Key, FireflyRecordLock> RECORD_LOCKS = new ConcurrentHashMap<>();
    private final AerospikeConnection db;
    private final int lockTtl;
    private final int lockTimeout;
    private final int lockPollIntervalMillis;
    private final boolean starvationProtectionEnabled;

    /* loaded from: input_file:com/aerospike/firefly/util/concurrency/FireflyRecordLockHandler$FireflyRecordLock.class */
    public static class FireflyRecordLock {
        private final FireflyRecordLockHandler handler;
        private final Key key;
        private final AtomicInteger pendingRequests = new AtomicInteger(0);
        private final AtomicLong lockAcquireTime = new AtomicLong(0);
        private final AtomicInteger hotKeyCount = new AtomicInteger(0);
        private final AtomicInteger hotKeyBackoff = new AtomicInteger(0);
        private final AtomicBoolean printErrorToLog = new AtomicBoolean(true);
        private final ArrayBlockingQueue<Object> lockQueue = new ArrayBlockingQueue<>(1, true);
        private final Timer timer = new Timer(true);

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/aerospike/firefly/util/concurrency/FireflyRecordLockHandler$FireflyRecordLock$AcquireRecordLockTask.class */
        public static class AcquireRecordLockTask extends TimerTask {
            private final FireflyRecordLock lockRecord;

            private AcquireRecordLockTask(FireflyRecordLock fireflyRecordLock) {
                this.lockRecord = fireflyRecordLock;
            }

            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (this.lockRecord.hotKeyBackoff.getAndDecrement() > 0) {
                    return;
                }
                try {
                    this.lockRecord.lockQueue.offer(this.lockRecord.handler.db.writeKeyLock(this.lockRecord.key, this.lockRecord.handler.lockTtl));
                    this.lockRecord.printErrorToLog.set(true);
                    this.lockRecord.hotKeyCount.set(0);
                    this.lockRecord.hotKeyBackoff.set(0);
                    cancel();
                } catch (AerospikeGraphException e) {
                    if (e.errorCode != 14) {
                        if (e.errorCode == 5 || !this.lockRecord.printErrorToLog.getAndSet(false)) {
                            return;
                        }
                        FireflyRecordLockHandler.LOG.error("Unexpected error when attempting to acquire record lock.", (Throwable) e);
                        return;
                    }
                    FireflyRecordLockHandler.LOG.warn("Hot key on record lock with Key {}. Exponentially backing off before next grab attempt.", this.lockRecord.key);
                    int andIncrement = this.lockRecord.hotKeyCount.getAndIncrement();
                    if (andIncrement > 15) {
                        andIncrement = 15;
                    }
                    this.lockRecord.hotKeyBackoff.set(Math.max(1, 2 << andIncrement));
                }
            }
        }

        private FireflyRecordLock(FireflyRecordLockHandler fireflyRecordLockHandler, Key key) {
            this.handler = fireflyRecordLockHandler;
            this.key = key;
            startLockPoller(0);
        }

        private FireflyRecordLock lock() {
            try {
                if (this.lockQueue.poll(this.handler.lockTimeout, TimeUnit.MILLISECONDS) != null) {
                    this.lockAcquireTime.set(System.currentTimeMillis());
                    return this;
                }
            } catch (InterruptedException e) {
                FireflyRecordLockHandler.LOG.error("Unexpected error: interrupted when trying to acquire record lock. Contact support for additional help if needed.", (Throwable) e);
            }
            synchronized (FireflyRecordLockHandler.RECORD_LOCKS) {
                if (this.pendingRequests.decrementAndGet() == 0) {
                    FireflyRecordLockHandler.RECORD_LOCKS.remove(this.key);
                    this.timer.cancel();
                }
            }
            String str = "Timeout of " + this.handler.lockTimeout + " milliseconds exceeded when attempting to acquire record lock.";
            FireflyRecordLockHandler.LOG.error(str);
            throw new RuntimeException(str);
        }

        public void unlock() {
            int decrementAndGet;
            synchronized (FireflyRecordLockHandler.RECORD_LOCKS) {
                decrementAndGet = this.pendingRequests.decrementAndGet();
                if (decrementAndGet == 0) {
                    FireflyRecordLockHandler.RECORD_LOCKS.remove(this.key);
                    this.timer.cancel();
                }
            }
            try {
                if (System.currentTimeMillis() - this.lockAcquireTime.get() < this.handler.lockTtl) {
                    this.handler.db.delete(this.key, null);
                }
            } catch (Exception e) {
                FireflyRecordLockHandler.LOG.error("Unexpected error when unlocking record lock.", (Throwable) e);
            }
            if (decrementAndGet != 0) {
                startLockPoller(this.handler.starvationProtectionEnabled ? this.handler.lockPollIntervalMillis : 0);
            }
        }

        private void startLockPoller(int i) {
            try {
                this.timer.schedule(new AcquireRecordLockTask(this), i, this.handler.lockPollIntervalMillis);
            } catch (IllegalStateException e) {
                FireflyRecordLockHandler.LOG.error("Unexpected error: record lock in illegal state. Contact support for additional help if needed.", (Throwable) e);
                synchronized (FireflyRecordLockHandler.RECORD_LOCKS) {
                    FireflyRecordLockHandler.RECORD_LOCKS.remove(this.key);
                    this.timer.cancel();
                }
            }
        }
    }

    public FireflyRecordLockHandler(AerospikeConnection aerospikeConnection) {
        this.db = aerospikeConnection;
        this.lockTtl = aerospikeConnection.MERGE_EDGE_TTL;
        this.lockTimeout = aerospikeConnection.MERGE_EDGE_EVAL_TIMEOUT;
        this.lockPollIntervalMillis = aerospikeConnection.MERGE_EDGE_POLL_INTERVAL;
        this.starvationProtectionEnabled = aerospikeConnection.MERGE_EDGE_STARVATION_PROTECTION;
    }

    public FireflyRecordLock getLock(Key key) {
        FireflyRecordLock computeIfAbsent;
        synchronized (RECORD_LOCKS) {
            computeIfAbsent = RECORD_LOCKS.computeIfAbsent(key, key2 -> {
                return new FireflyRecordLock(this, key);
            });
            computeIfAbsent.pendingRequests.incrementAndGet();
        }
        return computeIfAbsent.lock();
    }
}
