/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.handler;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PublishResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.NodeStateManager;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.ReceivedFlowUnitStore;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.tasks.FlowUnitRxTask;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PublishRequestHandler {
    private static final Logger LOG = LogManager.getLogger(PublishRequestHandler.class);
    private final AtomicReference<ExecutorService> executorReference;
    private final NodeStateManager nodeStateManager;
    private final ReceivedFlowUnitStore receivedFlowUnitStore;
    private List<StreamObserver<PublishResponse>> upstreamResponseStreamList = Collections.synchronizedList(new ArrayList());

    public PublishRequestHandler(NodeStateManager nodeStateManager, ReceivedFlowUnitStore receivedFlowUnitStore, AtomicReference<ExecutorService> executorReference) {
        this.executorReference = executorReference;
        this.nodeStateManager = nodeStateManager;
        this.receivedFlowUnitStore = receivedFlowUnitStore;
    }

    public StreamObserver<FlowUnitMessage> getClientStream(StreamObserver<PublishResponse> serviceResponse) {
        this.upstreamResponseStreamList.add(serviceResponse);
        return new SendDataClientStreamUpdateConsumer(serviceResponse);
    }

    public void terminateUpstreamConnections() {
        for (StreamObserver<PublishResponse> responseStream : this.upstreamResponseStreamList) {
            responseStream.onNext((Object)PublishResponse.newBuilder().setDataStatus(PublishResponse.PublishResponseStatus.NODE_SHUTDOWN).build());
            responseStream.onCompleted();
        }
    }

    private class SendDataClientStreamUpdateConsumer
    implements StreamObserver<FlowUnitMessage> {
        private final StreamObserver<PublishResponse> serviceResponse;

        SendDataClientStreamUpdateConsumer(StreamObserver<PublishResponse> serviceResponse) {
            this.serviceResponse = serviceResponse;
        }

        public void onNext(FlowUnitMessage flowUnitMessage) {
            ExecutorService executorService = (ExecutorService)PublishRequestHandler.this.executorReference.get();
            if (executorService != null) {
                try {
                    executorService.execute(new FlowUnitRxTask(PublishRequestHandler.this.nodeStateManager, PublishRequestHandler.this.receivedFlowUnitStore, flowUnitMessage));
                    PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_IN, flowUnitMessage.getGraphNode(), flowUnitMessage.getSerializedSize());
                }
                catch (RejectedExecutionException ree) {
                    LOG.warn("Dropped handling received flow unit because the netwwork threadpool queue is full");
                    StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
                }
            }
        }

        public void onError(Throwable throwable) {
            LOG.error("Client ran into an error while streaming flow units: {}", (Object)throwable.getMessage());
            throwable.printStackTrace();
        }

        public void onCompleted() {
            LOG.debug("Client finished streaming flow units");
            this.serviceResponse.onNext((Object)this.buildDataResponse(PublishResponse.PublishResponseStatus.SUCCESS));
            this.serviceResponse.onCompleted();
        }

        private PublishResponse buildDataResponse(PublishResponse.PublishResponseStatus status) {
            return PublishResponse.newBuilder().setDataStatus(status).build();
        }
    }
}

