package com.aerospike.flink.streaming.connectors;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.flink.connectors.AerospikeClientFactory;
import com.aerospike.flink.connectors.AerospikeConfiguration;
import com.aerospike.flink.connectors.feature.FeatureKeyException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/flink/streaming/connectors/AerospikeSinkBase.class */
public abstract class AerospikeSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private static final long serialVersionUID = -795565681302801719L;
    protected final Logger log;
    protected transient AerospikeClient client;
    protected transient EventLoops eventLoops;
    protected KeySelector<IN, ?> keySelector;
    protected volatile transient Throwable exception;
    protected transient WriteListener callback;
    private final AtomicInteger updatesPending;
    private final AerospikeClientFactory clientFactory;

    AerospikeSinkBase() throws FeatureKeyException {
        this(null);
    }

    AerospikeSinkBase(AerospikeClientFactory aerospikeClientFactory) throws FeatureKeyException {
        this(aerospikeClientFactory, null);
    }

    AerospikeSinkBase(AerospikeClientFactory aerospikeClientFactory, KeySelector<IN, ?> keySelector) throws FeatureKeyException {
        this(aerospikeClientFactory, keySelector, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AerospikeSinkBase(AerospikeClientFactory aerospikeClientFactory, KeySelector<IN, ?> keySelector, EventLoops eventLoops) throws FeatureKeyException {
        this.log = LoggerFactory.getLogger(getClass());
        this.keySelector = null;
        this.updatesPending = new AtomicInteger();
        this.clientFactory = aerospikeClientFactory == null ? AerospikeClientFactory.getInstance().withParameterTool((ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()) : aerospikeClientFactory;
        this.eventLoops = eventLoops;
        this.keySelector = keySelector;
    }

    public void open(Configuration configuration) {
        ClientPolicy clientPolicy = new ClientPolicy();
        clientPolicy.timeout = ((AerospikeConfiguration) configuration).getClientTimeOut();
        clientPolicy.eventLoops = this.eventLoops;
        this.client = this.clientFactory.build(clientPolicy);
        this.callback = new WriteListener() { // from class: com.aerospike.flink.streaming.connectors.AerospikeSinkBase.1
            @Override // com.aerospike.client.listener.WriteListener
            public void onSuccess(Key key) {
                if (AerospikeSinkBase.this.updatesPending.decrementAndGet() == 0) {
                    synchronized (AerospikeSinkBase.this.updatesPending) {
                        AerospikeSinkBase.this.updatesPending.notifyAll();
                    }
                }
            }

            @Override // com.aerospike.client.listener.WriteListener
            public void onFailure(AerospikeException aerospikeException) {
                if (AerospikeSinkBase.this.updatesPending.decrementAndGet() == 0) {
                    synchronized (AerospikeSinkBase.this.updatesPending) {
                        AerospikeSinkBase.this.updatesPending.notifyAll();
                    }
                }
                AerospikeSinkBase.this.exception = aerospikeException;
                AerospikeSinkBase.this.log.error("Error while sending value.", aerospikeException);
            }
        };
    }

    public void invoke(IN in) throws Exception {
        checkAsyncErrors();
        send(in);
        this.updatesPending.incrementAndGet();
    }

    public abstract void send(IN in);

    public void close() throws Exception {
        try {
            checkAsyncErrors();
            waitForPendingUpdates();
            checkAsyncErrors();
        } finally {
            try {
                if (this.client != null) {
                    this.client.close();
                }
            } catch (Exception e) {
                this.log.error("Error while closing session.", e);
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkAsyncErrors();
        waitForPendingUpdates();
        checkAsyncErrors();
    }

    private void waitForPendingUpdates() throws InterruptedException {
        synchronized (this.updatesPending) {
            while (this.updatesPending.get() > 0) {
                this.updatesPending.wait();
            }
        }
    }

    private void checkAsyncErrors() throws Exception {
        Throwable th = this.exception;
        if (th != null) {
            this.exception = null;
            throw new IOException("Error while sending value.", th);
        }
    }

    @VisibleForTesting
    int getNumOfPendingRecords() {
        return this.updatesPending.get();
    }
}
