package com.aerospike.flink.streaming.connectors;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.policy.CommitLevel;
import com.aerospike.client.policy.GenerationPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.ResultSet;
import com.aerospike.flink.connectors.AerospikeClientFactory;
import com.aerospike.flink.connectors.AerospikeConfiguration;
import com.aerospike.flink.connectors.feature.FeatureKeyException;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;

/* loaded from: input_file:com/aerospike/flink/streaming/connectors/AbstractAerospikeTupleSink.class */
public abstract class AbstractAerospikeTupleSink<IN> extends AerospikeSinkBase<IN, ResultSet> {
    private static final long serialVersionUID = -1838187599588681537L;
    private final AerospikeConfiguration conf;
    private WritePolicy wpolicy;

    public AbstractAerospikeTupleSink(AerospikeConfiguration aerospikeConfiguration, AerospikeClientFactory aerospikeClientFactory, EventLoops eventLoops) throws FeatureKeyException {
        this(aerospikeConfiguration, aerospikeClientFactory, null, eventLoops);
    }

    public AbstractAerospikeTupleSink(AerospikeConfiguration aerospikeConfiguration, AerospikeClientFactory aerospikeClientFactory, KeySelector<IN, ?> keySelector, EventLoops eventLoops) throws FeatureKeyException {
        super(aerospikeClientFactory, keySelector, eventLoops);
        this.conf = aerospikeConfiguration;
    }

    @Override // com.aerospike.flink.streaming.connectors.AerospikeSinkBase
    public void open(Configuration configuration) {
        this.conf.addAll(configuration);
        super.open(this.conf);
        this.wpolicy = new WritePolicy();
        this.wpolicy.recordExistsAction = this.conf.getRecordExistsAction();
        this.wpolicy.commitLevel = CommitLevel.valueOf(this.conf.getString(AerospikeConfiguration.AerospikeConfigEnum.COMMIT_LEVEL.PropertyName, (String) AerospikeConfiguration.AerospikeConfigEnum.COMMIT_LEVEL.DefaultValue));
        this.wpolicy.sendKey = this.conf.getBoolean(AerospikeConfiguration.AerospikeConfigEnum.SEND_KEY.PropertyName, ((Boolean) AerospikeConfiguration.AerospikeConfigEnum.SEND_KEY.DefaultValue).booleanValue());
        this.wpolicy.generationPolicy = GenerationPolicy.valueOf(this.conf.getString(AerospikeConfiguration.AerospikeConfigEnum.GEN_POLICY.PropertyName, AerospikeConfiguration.AerospikeConfigEnum.GEN_POLICY.DefaultValue.toString()));
        this.wpolicy.setTimeout(this.conf.getClientTimeOut());
    }

    @Override // com.aerospike.flink.streaming.connectors.AerospikeSinkBase
    public void send(IN in) {
        Object[] extract = extract(in);
        try {
            Object field = this.keySelector == null ? extract[0] : ((Tuple) this.keySelector.getKey(in)).getField(0);
            Key key = field instanceof Integer ? new Key(this.conf.getNamespace(), this.conf.getSet(), ((Integer) field).intValue()) : new Key(this.conf.getNamespace(), this.conf.getSet(), field.toString());
            if (null != this.eventLoops) {
                this.client.put(this.eventLoops.next(), this.callback, this.wpolicy, key, bind(extract));
            } else {
                this.client.put(this.wpolicy, key, bind(extract));
                this.callback.onSuccess(key);
            }
        } catch (Exception e) {
            e.printStackTrace();
            this.callback.onFailure(new AerospikeException("Send failure: " + in.toString()));
        }
    }

    private Bin[] bind(Object[] objArr) {
        Bin[] binArr = new Bin[this.conf.getBinNames().length];
        int i = this.conf.getBinNames().length < objArr.length ? 1 : 0;
        for (int i2 = 0; i2 < this.conf.getBinNames().length; i2++) {
            binArr[i2] = new Bin(this.conf.getBinNames()[i2], objArr[i2 + i]);
        }
        return binArr;
    }

    protected abstract Object[] extract(IN in);
}
