/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistro.elasticsearch.performanceanalyzer.net;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.InterNodeRpcServiceGrpc;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricsRequest;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricsResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PublishResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.SubscribeMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.SubscribeResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.GRPCConnectionManager;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class NetClient {
    private static final Logger LOG = LogManager.getLogger(NetClient.class);
    private final GRPCConnectionManager connectionManager;
    private ConcurrentMap<InstanceDetails.Id, AtomicReference<StreamObserver<FlowUnitMessage>>> perHostOpenDataStreamMap = new ConcurrentHashMap<InstanceDetails.Id, AtomicReference<StreamObserver<FlowUnitMessage>>>();

    public NetClient(GRPCConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }

    public GRPCConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public void subscribe(InstanceDetails remoteHost, SubscribeMessage subscribeMessage, StreamObserver<SubscribeResponse> serverResponseStream) {
        LOG.debug("Trying to send intent message to {}", (Object)remoteHost);
        try {
            this.connectionManager.getClientStubForHost(remoteHost).subscribe(subscribeMessage, serverResponseStream);
            PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_OUT, subscribeMessage.getRequesterGraphNode(), subscribeMessage.getSerializedSize());
        }
        catch (StatusRuntimeException sre) {
            LOG.error("Encountered an error trying to subscribe. Status: {}", (Object)sre.getStatus(), (Object)sre);
            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
        }
    }

    public void publish(InstanceDetails remoteHost, FlowUnitMessage flowUnitMessage, StreamObserver<PublishResponse> serverResponseStream) {
        LOG.debug("Publishing {} data to {}", (Object)flowUnitMessage.getGraphNode(), (Object)remoteHost);
        try {
            StreamObserver<FlowUnitMessage> stream = this.getDataStreamForHost(remoteHost, serverResponseStream);
            stream.onNext((Object)flowUnitMessage);
            PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_OUT, flowUnitMessage.getGraphNode(), flowUnitMessage.getSerializedSize());
        }
        catch (StatusRuntimeException sre) {
            LOG.error("rca: Encountered an error trying to publish a flow unit. Status: {}", (Object)sre.getStatus(), (Object)sre);
            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
        }
    }

    public void getMetrics(InstanceDetails remoteNodeIP, MetricsRequest request, StreamObserver<MetricsResponse> responseObserver) {
        InterNodeRpcServiceGrpc.InterNodeRpcServiceStub stub = this.connectionManager.getClientStubForHost(remoteNodeIP);
        stub.getMetrics(request, responseObserver);
    }

    public void stop() {
        LOG.debug("Shutting down client streaming connections..");
        this.closeAllDataStreams();
    }

    public void flushStream(InstanceDetails.Id remoteHost) {
        LOG.debug("removing data streams for {} as we are no publishing to it.", (Object)remoteHost);
        this.perHostOpenDataStreamMap.remove(remoteHost);
    }

    private void closeAllDataStreams() {
        for (Map.Entry entry : this.perHostOpenDataStreamMap.entrySet()) {
            LOG.debug("Closing stream for host: {}", entry.getKey());
            ((StreamObserver)((AtomicReference)entry.getValue()).get()).onCompleted();
            this.perHostOpenDataStreamMap.remove(entry.getKey());
        }
    }

    private StreamObserver<FlowUnitMessage> getDataStreamForHost(InstanceDetails remoteHost, StreamObserver<PublishResponse> serverResponseStream) {
        AtomicReference streamObserverAtomicReference = (AtomicReference)this.perHostOpenDataStreamMap.get(remoteHost.getInstanceId());
        if (streamObserverAtomicReference != null) {
            return (StreamObserver)streamObserverAtomicReference.get();
        }
        return this.addOrUpdateDataStreamForHost(remoteHost, serverResponseStream);
    }

    private synchronized StreamObserver<FlowUnitMessage> addOrUpdateDataStreamForHost(InstanceDetails remoteHost, StreamObserver<PublishResponse> serverResponseStream) {
        InterNodeRpcServiceGrpc.InterNodeRpcServiceStub stub = this.connectionManager.getClientStubForHost(remoteHost);
        StreamObserver<FlowUnitMessage> dataStream = stub.publish(serverResponseStream);
        this.perHostOpenDataStreamMap.computeIfAbsent(remoteHost.getInstanceId(), s -> new AtomicReference());
        ((AtomicReference)this.perHostOpenDataStreamMap.get(remoteHost.getInstanceId())).set(dataStream);
        return dataStream;
    }
}

