package com.aerospike.flink.batch.connectors;

import com.aerospike.client.Key;
import com.aerospike.flink.connectors.AerospikeConfiguration;
import com.aerospike.flink.connectors.AerospikeRecordMapper;
import com.aerospike.flink.connectors.DefaultAerospikeRecordMapper;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/aerospike/flink/batch/connectors/AeroJoinFunction.class */
public class AeroJoinFunction<O> implements MapPartitionFunction<Tuple, O> {
    private static final long serialVersionUID = 1;
    protected int keyField;
    protected AerospikeConfiguration info;
    private AerospikeRecordMapper<O> mapper;

    public AeroJoinFunction(AerospikeConfiguration aerospikeConfiguration) {
        this(0, aerospikeConfiguration, null);
    }

    public AeroJoinFunction(int i, AerospikeConfiguration aerospikeConfiguration) {
        this(i, aerospikeConfiguration, null);
    }

    public AeroJoinFunction(int i, AerospikeConfiguration aerospikeConfiguration, AerospikeRecordMapper<O> aerospikeRecordMapper) {
        this.keyField = i;
        this.info = aerospikeConfiguration;
        this.mapper = aerospikeRecordMapper == null ? new DefaultAerospikeRecordMapper<>() : aerospikeRecordMapper;
    }

    public void mapPartition(Iterable<Tuple> iterable, Collector<O> collector) throws Exception {
        new AeroBatchReader().aeroJoin((List) StreamSupport.stream(iterable.spliterator(), true).map(tuple -> {
            return new Key(this.info.getNamespace(), this.info.getSet(), tuple.getField(this.keyField).toString());
        }).collect(Collectors.toList()), this.info.getBatchMax()).forEach(keyRecord -> {
            collector.collect(this.mapper.mapTo(keyRecord.record));
        });
    }
}
