/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.handler;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.SubscribeMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.SubscribeResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.SubscriptionManager;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.requests.CompositeSubscribeRequest;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.tasks.SubscriptionRxTask;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class SubscribeServerHandler {
    private static final Logger LOG = LogManager.getLogger(SubscribeServerHandler.class);
    private final AtomicReference<ExecutorService> executorServiceAtomicReference;
    private final SubscriptionManager subscriptionManager;

    public SubscribeServerHandler(SubscriptionManager subscriptionManager, AtomicReference<ExecutorService> executorServiceAtomicReference) {
        this.executorServiceAtomicReference = executorServiceAtomicReference;
        this.subscriptionManager = subscriptionManager;
    }

    public void handleSubscriptionRequest(SubscribeMessage request, StreamObserver<SubscribeResponse> responseObserver) {
        CompositeSubscribeRequest subscribeRequest = new CompositeSubscribeRequest(request, responseObserver);
        ExecutorService executorService = this.executorServiceAtomicReference.get();
        if (executorService != null) {
            try {
                executorService.execute(new SubscriptionRxTask(this.subscriptionManager, subscribeRequest));
                PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_IN, subscribeRequest.getSubscribeMessage().getRequesterGraphNode(), subscribeRequest.getSubscribeMessage().getSerializedSize());
            }
            catch (RejectedExecutionException ree) {
                LOG.warn("Dropped processing subscription request because the network threadpool is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
    }
}

