package com.aerospike.client.proxy.grpc;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Log;
import com.aerospike.proxy.client.Kvs;
import io.grpc.CallOptions;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import io.netty.channel.EventLoop;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/aerospike/client/proxy/grpc/GrpcStream.class */
public class GrpcStream implements StreamObserver<Kvs.AerospikeResponsePayload> {
    private static final long IDLE_TIMEOUT = 30000;
    private final int id;
    private final EventLoop eventLoop;
    private StreamObserver<Kvs.AerospikeRequestPayload> requestObserver;
    private final int maxConcurrentRequests;
    private final int totalRequestsToExecute;
    private final GrpcChannelExecutor channelExecutor;
    private final MethodDescriptor<Kvs.AerospikeRequestPayload, Kvs.AerospikeResponsePayload> methodDescriptor;
    private final LinkedList<GrpcStreamingCall> pendingCalls;
    private final Map<Integer, GrpcStreamingCall> executingCalls = new HashMap();
    private boolean isClosed = false;
    private volatile int requestsSent;
    private volatile int requestsCompleted;
    private volatile long streamIdleStartTime;
    private boolean streamHalfClosed;

    public GrpcStream(GrpcChannelExecutor grpcChannelExecutor, MethodDescriptor<Kvs.AerospikeRequestPayload, Kvs.AerospikeResponsePayload> methodDescriptor, LinkedList<GrpcStreamingCall> linkedList, CallOptions callOptions, int i, EventLoop eventLoop, int i2, int i3) {
        this.channelExecutor = grpcChannelExecutor;
        this.methodDescriptor = methodDescriptor;
        this.pendingCalls = linkedList;
        this.id = i;
        this.eventLoop = eventLoop;
        this.maxConcurrentRequests = i2;
        this.totalRequestsToExecute = i3;
        setRequestObserver(ClientCalls.asyncBidiStreamingCall(grpcChannelExecutor.getChannel().newCall(methodDescriptor, callOptions), this));
    }

    private void setRequestObserver(StreamObserver<Kvs.AerospikeRequestPayload> streamObserver) {
        this.requestObserver = streamObserver;
    }

    public void onNext(Kvs.AerospikeResponsePayload aerospikeResponsePayload) {
        GrpcStreamingCall remove;
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoop.schedule(() -> {
                onNext(aerospikeResponsePayload);
            }, 0L, TimeUnit.NANOSECONDS);
            return;
        }
        int id = aerospikeResponsePayload.getId();
        if (aerospikeResponsePayload.getHasNext()) {
            remove = this.executingCalls.get(Integer.valueOf(id));
        } else {
            remove = this.executingCalls.remove(Integer.valueOf(id));
            this.requestsCompleted++;
            this.channelExecutor.onRequestCompleted();
        }
        if (remove != null && !remove.isAborted()) {
            try {
                remove.onNext(aerospikeResponsePayload);
            } catch (Throwable th) {
                if (aerospikeResponsePayload.getHasNext()) {
                    abortCallAtServer(remove, id);
                }
            }
        }
        executePendingCalls();
    }

    private void abortCallAtServer(GrpcStreamingCall grpcStreamingCall, int i) {
        grpcStreamingCall.markAborted();
        int i2 = this.requestsSent;
        this.requestsSent = i2 + 1;
        Kvs.AerospikeRequestPayload.Builder newBuilder = Kvs.AerospikeRequestPayload.newBuilder();
        newBuilder.setId(i2);
        newBuilder.setAbortRequest(Kvs.AbortRequest.newBuilder().setAbortId(i));
        this.requestObserver.onNext(newBuilder.build());
    }

    private void abortExecutingCalls(Throwable th) {
        this.isClosed = true;
        Iterator<GrpcStreamingCall> it = this.executingCalls.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().onError(th);
            } catch (Exception e) {
                Log.debug("Exception in invoking onError: " + e);
            }
        }
        markClosed();
    }

    private void markClosed() {
        this.isClosed = true;
        this.executingCalls.clear();
        this.channelExecutor.onStreamClosed(this);
    }

    public void onError(Throwable th) {
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoop.schedule(() -> {
                onError(th);
            }, 0L, TimeUnit.NANOSECONDS);
            return;
        }
        if (this.executingCalls.isEmpty()) {
            Iterator<GrpcStreamingCall> it = this.pendingCalls.iterator();
            while (it.hasNext()) {
                try {
                    it.next().onError(th);
                } catch (Exception e) {
                    Log.debug("Exception in invoking onError: " + e);
                }
            }
            this.pendingCalls.clear();
        }
        abortExecutingCalls(th);
    }

    public void onCompleted() {
        if (this.eventLoop.inEventLoop()) {
            abortExecutingCalls(new AerospikeException(1, "stream completed before all responses have been received"));
        } else {
            this.eventLoop.schedule(this::onCompleted, 0L, TimeUnit.NANOSECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LinkedList<GrpcStreamingCall> getPendingCalls() {
        return this.pendingCalls;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MethodDescriptor<Kvs.AerospikeRequestPayload, Kvs.AerospikeResponsePayload> getMethodDescriptor() {
        return this.methodDescriptor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getOngoingRequests() {
        return this.executingCalls.size() + this.pendingCalls.size();
    }

    public int getId() {
        return this.id;
    }

    public int getRequestsCompleted() {
        return this.requestsCompleted;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getMaxConcurrentRequests() {
        return this.maxConcurrentRequests;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getTotalRequestsToExecute() {
        return this.totalRequestsToExecute;
    }

    public String toString() {
        return "GrpcStream{id=" + this.id + ", channelExecutor=" + this.channelExecutor + '}';
    }

    public int getExecutedRequests() {
        return getRequestsCompleted() + getOngoingRequests();
    }

    public void executePendingCalls() {
        if (this.isClosed) {
            return;
        }
        if (this.pendingCalls.isEmpty()) {
            if (this.streamIdleStartTime == 0) {
                this.streamIdleStartTime = System.currentTimeMillis();
            }
            if (this.streamIdleStartTime + IDLE_TIMEOUT <= System.currentTimeMillis() && !this.streamHalfClosed) {
                this.streamHalfClosed = true;
                this.requestObserver.onCompleted();
            }
        } else if (this.streamIdleStartTime != 0) {
            this.streamIdleStartTime = 0L;
        }
        if (this.streamHalfClosed) {
            if (this.executingCalls.isEmpty()) {
                markClosed();
                return;
            }
            return;
        }
        Iterator<GrpcStreamingCall> it = this.pendingCalls.iterator();
        while (it.hasNext()) {
            GrpcStreamingCall next = it.next();
            if (next.hasSendDeadlineExpired() || next.hasExpired()) {
                next.onError(new AerospikeException.Timeout(next.getPolicy(), next.getIteration()));
                it.remove();
            } else if (this.executingCalls.size() < this.maxConcurrentRequests && this.requestsSent < this.totalRequestsToExecute) {
                execute(next);
                it.remove();
            }
        }
    }

    private void execute(GrpcStreamingCall grpcStreamingCall) {
        try {
            if (grpcStreamingCall.hasExpired()) {
                grpcStreamingCall.onError(new AerospikeException.Timeout(grpcStreamingCall.getPolicy(), grpcStreamingCall.getIteration()));
                return;
            }
            Kvs.AerospikeRequestPayload.Builder requestBuilder = grpcStreamingCall.getRequestBuilder();
            int i = this.requestsSent;
            this.requestsSent = i + 1;
            requestBuilder.setId(i).setIteration(grpcStreamingCall.getIteration());
            GrpcConversions.setRequestPolicy(grpcStreamingCall.getPolicy(), requestBuilder);
            Kvs.AerospikeRequestPayload build = requestBuilder.build();
            this.executingCalls.put(Integer.valueOf(i), grpcStreamingCall);
            this.requestObserver.onNext(build);
            if (this.requestsSent >= this.totalRequestsToExecute) {
                this.requestObserver.onCompleted();
                this.streamHalfClosed = true;
            }
            if (grpcStreamingCall.hasExpiry()) {
                this.eventLoop.schedule(() -> {
                    onCallExpired(i);
                }, grpcStreamingCall.nanosTillExpiry(), TimeUnit.NANOSECONDS);
            }
        } catch (Exception e) {
            grpcStreamingCall.onError(e);
        }
    }

    private void onCallExpired(int i) {
        GrpcStreamingCall remove = this.executingCalls.remove(Integer.valueOf(i));
        if (remove == null) {
            return;
        }
        remove.onError(new AerospikeException.Timeout(remove.getPolicy(), remove.getIteration()));
        if (remove.isSingleResponse()) {
            return;
        }
        abortCallAtServer(remove, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean canEnqueue() {
        return (this.isClosed || this.streamHalfClosed || this.requestsSent >= this.totalRequestsToExecute) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueue(GrpcStreamingCall grpcStreamingCall) {
        this.pendingCalls.add(grpcStreamingCall);
    }

    public void closePendingCalls() {
        this.pendingCalls.forEach(grpcStreamingCall -> {
            try {
                grpcStreamingCall.failIfNotComplete(-1);
            } catch (Exception e) {
                Log.error("Error shutting down " + getClass() + ": " + e.getMessage());
            }
        });
        this.pendingCalls.clear();
        this.executingCalls.values().forEach(grpcStreamingCall2 -> {
            try {
                grpcStreamingCall2.failIfNotComplete(-1);
            } catch (Exception e) {
                Log.error("Error shutting down " + getClass() + ": " + e.getMessage());
            }
        });
        this.executingCalls.clear();
        try {
            this.requestObserver.onCompleted();
            this.streamHalfClosed = true;
        } catch (Throwable th) {
        }
    }
}
