package com.aerospike.client.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Log;
import com.aerospike.client.Value;
import com.aerospike.client.cluster.Cluster;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.command.MultiCommand;
import com.aerospike.client.lua.LuaCache;
import com.aerospike.client.lua.LuaInputStream;
import com.aerospike.client.lua.LuaInstance;
import com.aerospike.client.lua.LuaOutputStream;
import com.aerospike.client.policy.QueryPolicy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.luaj.vm2.LuaInteger;
import org.luaj.vm2.LuaValue;

/* loaded from: input_file:com/aerospike/client/query/QueryAggregateExecutor.class */
public final class QueryAggregateExecutor extends QueryExecutor implements Runnable {
    private final BlockingQueue<LuaValue> inputQueue;
    private final ResultSet resultSet;
    private LuaInstance lua;

    public QueryAggregateExecutor(Cluster cluster, QueryPolicy queryPolicy, Statement statement, Node node) throws AerospikeException {
        super(cluster, queryPolicy, statement, node);
        this.inputQueue = new ArrayBlockingQueue(500);
        this.resultSet = new ResultSet(this, queryPolicy.recordQueueSize);
        LuaValue.valueOf(0);
        this.lua = LuaCache.getInstance();
        try {
            initializeThreads();
            this.threadPool.execute(this);
        } catch (RuntimeException e) {
            LuaCache.putInstance(this.lua);
            throw e;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            runThreads();
        } catch (Exception e) {
            super.stopThreads(e);
        } finally {
            LuaCache.putInstance(this.lua);
        }
    }

    public void runThreads() throws AerospikeException {
        try {
            startThreads();
            this.lua.loadPackage(this.statement);
            LuaValue[] luaValueArr = new LuaValue[4 + this.statement.getFunctionArgs().length];
            luaValueArr[0] = this.lua.getFunction(this.statement.getFunctionName());
            luaValueArr[1] = LuaInteger.valueOf(2);
            luaValueArr[2] = new LuaInputStream(this.inputQueue);
            luaValueArr[3] = new LuaOutputStream(this.resultSet);
            int i = 4;
            for (Value value : this.statement.getFunctionArgs()) {
                int i2 = i;
                i++;
                luaValueArr[i2] = value.getLuaValue(this.lua);
            }
            this.lua.call("apply_stream", luaValueArr);
            this.resultSet.put(ResultSet.END);
        } catch (Throwable th) {
            this.resultSet.put(ResultSet.END);
            throw th;
        }
    }

    @Override // com.aerospike.client.query.QueryExecutor
    protected MultiCommand createCommand(Node node, long j, boolean z) {
        return new QueryAggregateCommand(node, this.policy, this.statement, this.lua, this.inputQueue, j, z);
    }

    @Override // com.aerospike.client.query.QueryExecutor
    protected void sendCancel() {
        this.inputQueue.clear();
        this.resultSet.abort();
        while (!this.inputQueue.offer(LuaValue.NIL)) {
            if (this.inputQueue.poll() == null) {
                if (Log.debugEnabled()) {
                    Log.debug("Lua input queue " + this.statement.taskId + " both offer and poll failed on abort");
                    return;
                }
                return;
            }
        }
    }

    @Override // com.aerospike.client.query.QueryExecutor
    protected void sendCompleted() {
        while (true) {
            try {
                this.inputQueue.put(LuaValue.NIL);
                return;
            } catch (InterruptedException e) {
                if (Log.debugEnabled()) {
                    Log.debug("Lua input queue " + this.statement.taskId + " put interrupted");
                }
            }
        }
    }

    public ResultSet getResultSet() {
        return this.resultSet;
    }
}
