package com.aerospike.firefly.io.aerospike.query.legacy;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.async.Monitor;
import com.aerospike.client.listener.RecordSequenceListener;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.firefly.io.aerospike.AerospikeConnection;
import com.aerospike.firefly.util.config.ConfigurationHelper;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/firefly/io/aerospike/query/legacy/ConcurrentScanRecordSequenceListener.class */
public class ConcurrentScanRecordSequenceListener implements RecordSequenceListener {
    private final Monitor scanMonitor;
    private final int maxWaitMs;
    private final BiFunction<Long, Long, Void> metricsCallback;
    private final LinkedBlockingQueue<KeyRecord> results = new LinkedBlockingQueue<>();
    private final AtomicBoolean complete = new AtomicBoolean(false);
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Logger LOG = LoggerFactory.getLogger((Class<?>) ConcurrentScanRecordSequenceListener.class);
    private long startTime = -1;
    private final Semaphore semaphore = new Semaphore(1);

    private ConcurrentScanRecordSequenceListener(Monitor monitor, int i, BiFunction<Long, Long, Void> biFunction) {
        this.metricsCallback = biFunction;
        this.scanMonitor = monitor;
        this.maxWaitMs = i;
        try {
            this.semaphore.acquire();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static ConcurrentScanRecordSequenceListener create(AerospikeConnection aerospikeConnection, Monitor monitor, UUID uuid) {
        return new ConcurrentScanRecordSequenceListener(monitor, ConfigurationHelper.getOrDefaultInt(ConfigurationHelper.Keys.SCAN_MAX_WAIT, aerospikeConnection.conf), (l, l2) -> {
            aerospikeConnection.getScanHitCounter().setScanTimings(uuid, l.longValue(), l2.longValue());
            return null;
        });
    }

    public void setStartTime() {
        if (this.startTime != -1) {
            throw new RuntimeException("Failed to set start time of scan monitor because it was already started.");
        }
        this.startTime = System.nanoTime();
    }

    @Override // com.aerospike.client.listener.RecordSequenceListener
    public void onRecord(Key key, Record record) throws AerospikeException {
        if (this.isClosed.get()) {
            throw new AerospikeException.ScanTerminated();
        }
        this.results.add(new KeyRecord(key, record));
        this.semaphore.release();
    }

    @Override // com.aerospike.client.listener.RecordSequenceListener
    public void onSuccess() {
        this.complete.set(true);
        this.semaphore.release();
        this.metricsCallback.apply(Long.valueOf(this.startTime), Long.valueOf(System.nanoTime()));
        this.scanMonitor.notifyComplete();
    }

    @Override // com.aerospike.client.listener.RecordSequenceListener
    public void onFailure(AerospikeException aerospikeException) {
        if (!this.isClosed.get()) {
            this.LOG.error("Error: scan failed with exception", (Throwable) aerospikeException);
        }
        this.complete.set(true);
        this.semaphore.release();
        this.metricsCallback.apply(Long.valueOf(this.startTime), Long.valueOf(System.nanoTime()));
        this.scanMonitor.notifyComplete();
    }

    public Iterator<KeyRecord> iterator() {
        return new Iterator<KeyRecord>() { // from class: com.aerospike.firefly.io.aerospike.query.legacy.ConcurrentScanRecordSequenceListener.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                if (ConcurrentScanRecordSequenceListener.this.results.size() > 0) {
                    return true;
                }
                while (!ConcurrentScanRecordSequenceListener.this.complete.get() && ConcurrentScanRecordSequenceListener.this.results.size() == 0) {
                    try {
                        if (!ConcurrentScanRecordSequenceListener.this.semaphore.tryAcquire(ConcurrentScanRecordSequenceListener.this.maxWaitMs, TimeUnit.MILLISECONDS)) {
                            ConcurrentScanRecordSequenceListener.this.terminateScan();
                            throw new RuntimeException("timeout exceeded waiting for new records");
                        }
                    } catch (InterruptedException e) {
                        ConcurrentScanRecordSequenceListener.this.terminateScan();
                        throw new RuntimeException(e);
                    }
                }
                return ConcurrentScanRecordSequenceListener.this.results.size() > 0;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public KeyRecord next() {
                try {
                    if (ConcurrentScanRecordSequenceListener.this.results.size() == 0 && !hasNext()) {
                        throw new NoSuchElementException();
                    }
                    ConcurrentScanRecordSequenceListener.this.metricsCallback.apply(Long.valueOf(ConcurrentScanRecordSequenceListener.this.startTime), Long.valueOf(System.nanoTime()));
                    return ConcurrentScanRecordSequenceListener.this.results.take();
                } catch (InterruptedException e) {
                    ConcurrentScanRecordSequenceListener.this.terminateScan();
                    throw new RuntimeException(e);
                }
            }
        };
    }

    public void terminateScan() {
        this.metricsCallback.apply(Long.valueOf(this.startTime), Long.valueOf(System.nanoTime()));
        this.isClosed.set(true);
    }
}
