package com.aerospike.flink.streaming.connectors;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.flink.connectors.AerospikeClientFactory;
import com.aerospike.flink.connectors.AerospikeConfiguration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.types.Row;

/* loaded from: input_file:com/aerospike/flink/streaming/connectors/AerospikeRowWriteAheadSink.class */
public class AerospikeRowWriteAheadSink extends GenericWriteAheadSink<Row> {
    private static final long serialVersionUID = 1;
    protected transient AerospikeClient client;
    private transient EventLoops eventLoops;
    private final AerospikeConfiguration info;
    private final AerospikeClientFactory clientBuilder;

    /* JADX INFO: Access modifiers changed from: protected */
    public AerospikeRowWriteAheadSink(AerospikeClientFactory aerospikeClientFactory, EventLoops eventLoops, AerospikeConfiguration aerospikeConfiguration, TypeSerializer<Row> typeSerializer, CheckpointCommitter checkpointCommitter) throws Exception {
        super(checkpointCommitter, typeSerializer, UUID.randomUUID().toString().replace("-", "_"));
        this.info = aerospikeConfiguration;
        this.eventLoops = eventLoops;
        this.clientBuilder = aerospikeClientFactory;
    }

    public void open() throws Exception {
        super.open();
        this.client = this.clientBuilder.build();
        if (!getRuntimeContext().isCheckpointingEnabled()) {
            throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
        }
    }

    public void close() throws Exception {
        super.close();
        try {
            if (this.client != null) {
                this.client.close();
            }
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
    }

    protected boolean sendValues(Iterable<Row> iterable, long j, long j2) throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference();
        WriteListener writeListener = new WriteListener() { // from class: com.aerospike.flink.streaming.connectors.AerospikeRowWriteAheadSink.1
            @Override // com.aerospike.client.listener.WriteListener
            public void onSuccess(Key key) {
                atomicInteger2.incrementAndGet();
                if (atomicInteger.get() <= 0 || atomicInteger.get() != atomicInteger2.get()) {
                    return;
                }
                synchronized (atomicInteger2) {
                    atomicInteger2.notifyAll();
                }
            }

            @Override // com.aerospike.client.listener.WriteListener
            public void onFailure(AerospikeException aerospikeException) {
                if (atomicReference.compareAndSet(null, aerospikeException)) {
                    AerospikeRowWriteAheadSink.LOG.error("Error while sending value.", aerospikeException);
                    synchronized (atomicInteger2) {
                        atomicInteger2.notifyAll();
                    }
                }
            }
        };
        int i = 0;
        int arity = this.serializer.getArity();
        Bin[] binArr = new Bin[arity];
        for (Row row : iterable) {
            Key key = row.getField(0) instanceof Integer ? new Key(this.info.getNamespace(), this.info.getSet(), ((Integer) row.getField(0)).intValue()) : row.getField(0) instanceof Long ? new Key(this.info.getNamespace(), this.info.getSet(), ((Long) row.getField(0)).longValue()) : new Key(this.info.getNamespace(), this.info.getSet(), row.getField(0).toString());
            for (int i2 = 0; i2 < arity; i2++) {
                binArr[i2] = new Bin(this.info.getBinNames()[i2], row.getField(i2));
            }
            if (null != this.eventLoops) {
                this.client.put(this.eventLoops.next(), writeListener, null, key, binArr);
            } else {
                this.client.put(null, key, binArr);
            }
            i++;
        }
        atomicInteger.set(i);
        synchronized (atomicInteger2) {
            while (atomicReference.get() == null && i != atomicInteger2.get()) {
                atomicInteger2.wait();
            }
        }
        if (atomicReference.get() == null) {
            return true;
        }
        LOG.warn("Sending a value failed.", (Throwable) atomicReference.get());
        return false;
    }
}
