/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistro.elasticsearch.performanceanalyzer.net;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.CertificateUtils;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.InterNodeRpcServiceGrpc;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricsRequest;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricsResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PublishResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.SubscribeMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.SubscribeResponse;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.handler.MetricsServerHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.handler.PublishRequestHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.handler.SubscribeServerHandler;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class NetServer
extends InterNodeRpcServiceGrpc.InterNodeRpcServiceImplBase
implements Runnable {
    private static final Logger LOG = LogManager.getLogger(NetServer.class);
    private final int port;
    private final int numServerThreads;
    private final boolean useHttps;
    private PublishRequestHandler sendDataHandler;
    private SubscribeServerHandler subscribeHandler;
    private MetricsServerHandler metricsServerHandler;
    protected Server server;
    private volatile boolean attemptedShutdown;

    public NetServer(int port, int numServerThreads, boolean useHttps) {
        this.port = port;
        this.numServerThreads = numServerThreads;
        this.useHttps = useHttps;
        this.attemptedShutdown = false;
    }

    protected void postStartHook() {
    }

    protected void shutdownHook() {
    }

    @Override
    public void run() {
        LOG.info("Starting the gRPC server on port {} with {} threads. Using HTTPS: {}", (Object)this.port, (Object)this.numServerThreads, (Object)this.useHttps);
        try {
            this.server = this.useHttps ? this.buildHttpsServer(CertificateUtils.getTrustedCasFile(), CertificateUtils.getCertificateFile(), CertificateUtils.getPrivateKeyFile()) : this.buildHttpServer();
            this.server.start();
            LOG.info("gRPC server started successfully!");
            this.postStartHook();
            this.server.awaitTermination();
            LOG.info("gRPC server terminating..");
        }
        catch (IOException | InterruptedException e) {
            if (!this.attemptedShutdown) {
                LOG.error("GrpcServer interrupted", (Throwable)e);
            }
            this.server.shutdownNow();
            this.shutdownHook();
        }
    }

    private NettyServerBuilder buildBaseServer() {
        return ((NettyServerBuilder)NettyServerBuilder.forPort((int)this.port).addService((BindableService)this)).bossEventLoopGroup((EventLoopGroup)new NioEventLoopGroup(this.numServerThreads)).workerEventLoopGroup((EventLoopGroup)new NioEventLoopGroup(this.numServerThreads)).channelType(NioServerSocketChannel.class);
    }

    private Server buildHttpServer() {
        return ((NettyServerBuilder)this.buildBaseServer().executor((Executor)Executors.newSingleThreadExecutor())).build();
    }

    protected Server buildHttpsServer(File trustedCasFile, File certFile, File pkeyFile) throws SSLException {
        SslContextBuilder sslContextBuilder = GrpcSslContexts.forServer((File)certFile, (File)pkeyFile);
        if (trustedCasFile != null) {
            sslContextBuilder.trustManager(trustedCasFile).clientAuth(ClientAuth.REQUIRE);
        }
        return this.buildBaseServer().sslContext(sslContextBuilder.build()).build();
    }

    @Override
    public StreamObserver<FlowUnitMessage> publish(StreamObserver<PublishResponse> responseObserver) {
        LOG.debug("publish received");
        if (this.sendDataHandler != null) {
            return this.sendDataHandler.getClientStream(responseObserver);
        }
        throw new UnsupportedOperationException("No rpc handler found for publish/");
    }

    @Override
    public void subscribe(SubscribeMessage request, StreamObserver<SubscribeResponse> responseObserver) {
        if (this.subscribeHandler != null) {
            this.subscribeHandler.handleSubscriptionRequest(request, responseObserver);
        } else {
            LOG.error("Subscribe request received before handler is set.");
            responseObserver.onError((Throwable)new UnsupportedOperationException("No rpc handler found for subscribe/"));
        }
    }

    @Override
    public void getMetrics(MetricsRequest request, StreamObserver<MetricsResponse> responseObserver) {
        if (this.metricsServerHandler != null) {
            this.metricsServerHandler.collectAPIData(request, responseObserver);
        }
    }

    public void setSubscribeHandler(SubscribeServerHandler subscribeHandler) {
        this.subscribeHandler = subscribeHandler;
    }

    public void setSendDataHandler(PublishRequestHandler sendDataHandler) {
        this.sendDataHandler = sendDataHandler;
    }

    public void setMetricsHandler(MetricsServerHandler metricsServerHandler) {
        this.metricsServerHandler = metricsServerHandler;
    }

    @VisibleForTesting
    public MetricsServerHandler getMetricsServerHandler() {
        return this.metricsServerHandler;
    }

    @VisibleForTesting
    public PublishRequestHandler getSendDataHandler() {
        return this.sendDataHandler;
    }

    @VisibleForTesting
    public SubscribeServerHandler getSubscribeHandler() {
        return this.subscribeHandler;
    }

    public void stop() {
        LOG.debug("indicating upstream nodes that current node is going down..");
        if (this.sendDataHandler != null) {
            this.sendDataHandler.terminateUpstreamConnections();
        }
        this.sendDataHandler = null;
        this.subscribeHandler = null;
    }

    public void shutdown() {
        this.stop();
        if (this.server != null) {
            this.server.shutdown();
            try {
                this.server.awaitTermination(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                this.server.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    public void setAttemptedShutdown() {
        this.attemptedShutdown = true;
    }
}

