/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.tasks;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PublishResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetClient;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.messages.DataMsg;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.SubscriptionManager;
import com.google.common.collect.ImmutableSet;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class FlowUnitTxTask
implements Runnable {
    private static final Logger LOG = LogManager.getLogger(FlowUnitTxTask.class);
    private final NetClient client;
    private final SubscriptionManager subscriptionManager;
    private final DataMsg dataMsg;
    private final AppContext appContext;

    public FlowUnitTxTask(NetClient client, SubscriptionManager subscriptionManager, DataMsg dataMsg, AppContext appContext) {
        this.client = client;
        this.subscriptionManager = subscriptionManager;
        this.dataMsg = dataMsg;
        this.appContext = appContext;
    }

    @Override
    public void run() {
        final String sourceGraphNode = this.dataMsg.getSourceGraphNode();
        InstanceDetails esInstanceDetails = this.appContext.getMyInstanceDetails();
        if (this.subscriptionManager.isNodeSubscribed(sourceGraphNode)) {
            ImmutableSet<InstanceDetails.Id> downstreamHostIds = this.subscriptionManager.getSubscribersFor(sourceGraphNode);
            LOG.debug("{} has downstream subscribers: {}", (Object)sourceGraphNode, downstreamHostIds);
            for (final InstanceDetails.Id downstreamHostId : downstreamHostIds) {
                for (GenericFlowUnit flowUnit : this.dataMsg.getFlowUnits()) {
                    LOG.debug("rca: [pub-tx]: {} -> {}", (Object)sourceGraphNode, (Object)downstreamHostId);
                    this.client.publish(this.appContext.getInstanceById(downstreamHostId), flowUnit.buildFlowUnitMessage(sourceGraphNode, esInstanceDetails.getInstanceId()), new StreamObserver<PublishResponse>(){

                        public void onNext(PublishResponse value) {
                            LOG.debug("rca: Received acknowledgement from the server. status: {}", (Object)value.getDataStatus());
                            if (value.getDataStatus() == PublishResponse.PublishResponseStatus.NODE_SHUTDOWN) {
                                FlowUnitTxTask.this.subscriptionManager.unsubscribeAndTerminateConnection(sourceGraphNode, downstreamHostId);
                                FlowUnitTxTask.this.client.flushStream(downstreamHostId);
                            }
                        }

                        public void onError(Throwable t) {
                            LOG.error("rca: Encountered an exception at the server: ", t);
                            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
                            FlowUnitTxTask.this.subscriptionManager.unsubscribeAndTerminateConnection(sourceGraphNode, downstreamHostId);
                            FlowUnitTxTask.this.client.flushStream(downstreamHostId);
                        }

                        public void onCompleted() {
                            LOG.debug("rca: Server closed the data channel!");
                        }
                    });
                    PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.RCA_NODES_FU_PUBLISH_COUNT, sourceGraphNode, 1);
                }
            }
        } else {
            LOG.debug("No subscribers for {}.", (Object)sourceGraphNode);
        }
    }
}

