/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.threadpool;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.QueueRejectionRcaConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class QueueRejectionRca
extends Rca<ResourceFlowUnit<HotNodeSummary>> {
    private static final Logger LOG = LogManager.getLogger(QueueRejectionRca.class);
    private final int rcaPeriod;
    private final List<QueueRejectionCollector> queueRejectionCollectors;
    private int counter;
    protected Clock clock;

    public <M extends Metric> QueueRejectionRca(int rcaPeriod, M threadPool_RejectedReqs) {
        super(5L);
        this.rcaPeriod = rcaPeriod;
        this.counter = 0;
        this.clock = Clock.systemUTC();
        this.queueRejectionCollectors = Collections.unmodifiableList(Arrays.asList(new QueueRejectionCollector(ResourceUtil.WRITE_QUEUE_REJECTION, AllMetrics.ThreadPoolType.WRITE, threadPool_RejectedReqs), new QueueRejectionCollector(ResourceUtil.SEARCH_QUEUE_REJECTION, AllMetrics.ThreadPoolType.SEARCH, threadPool_RejectedReqs)));
    }

    @VisibleForTesting
    public void setClock(Clock clock) {
        this.clock = clock;
    }

    @Override
    public ResourceFlowUnit<HotNodeSummary> operate() {
        ++this.counter;
        long currTimestamp = this.clock.millis();
        for (QueueRejectionCollector collector : this.queueRejectionCollectors) {
            collector.collect(currTimestamp);
        }
        if (this.counter == this.rcaPeriod) {
            this.counter = 0;
            InstanceDetails instanceDetails = this.getInstanceDetails();
            HotNodeSummary nodeSummary = new HotNodeSummary(instanceDetails.getInstanceId(), instanceDetails.getInstanceIp());
            boolean hasUnhealthyQueue = false;
            for (QueueRejectionCollector collector : this.queueRejectionCollectors) {
                if (!collector.isUnhealthy(currTimestamp)) continue;
                nodeSummary.appendNestedSummary(collector.generateSummary(currTimestamp));
                hasUnhealthyQueue = true;
            }
            ResourceContext context = !hasUnhealthyQueue ? new ResourceContext(Resources.State.HEALTHY) : new ResourceContext(Resources.State.UNHEALTHY);
            boolean isDataNode = !instanceDetails.getIsMaster();
            return new ResourceFlowUnit<HotNodeSummary>(currTimestamp, context, nodeSummary, isDataNode);
        }
        return new ResourceFlowUnit<HotNodeSummary>(currTimestamp);
    }

    @Override
    public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
        List<FlowUnitMessage> flowUnitMessages = args.getWireHopper().readFromWire(args.getNode());
        ArrayList flowUnitList = new ArrayList();
        LOG.debug("rca: Executing fromWire: {}", (Object)this.getClass().getSimpleName());
        for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
            flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
        }
        this.setFlowUnits(flowUnitList);
    }

    @Override
    public void readRcaConf(RcaConf conf) {
        QueueRejectionRcaConfig configObj = conf.getQueueRejectionRcaConfig();
        long rejectedTimePeriod = TimeUnit.SECONDS.toMillis(configObj.getRejectionTimePeriodInSeconds());
        this.queueRejectionCollectors.forEach(collector -> collector.setRejectionTimePeriod(rejectedTimePeriod));
    }

    private static class QueueRejectionCollector {
        private final Resource threadPool;
        private final AllMetrics.ThreadPoolType threadPoolMetric;
        private final Metric threadPool_RejectedReqs;
        private boolean hasRejection;
        private long rejectionTimestamp;
        private long rejectionTimePeriodInMillis;

        public QueueRejectionCollector(Resource threadPool, AllMetrics.ThreadPoolType threadPoolMetric, Metric threadPool_RejectedReqs) {
            this.threadPool = threadPool;
            this.threadPoolMetric = threadPoolMetric;
            this.threadPool_RejectedReqs = threadPool_RejectedReqs;
            this.hasRejection = false;
            this.rejectionTimestamp = 0L;
            this.rejectionTimePeriodInMillis = TimeUnit.SECONDS.toMillis(300L);
        }

        public void setRejectionTimePeriod(long rejectionTimePeriodInMillis) {
            this.rejectionTimePeriodInMillis = rejectionTimePeriodInMillis;
        }

        public void collect(long currTimestamp) {
            for (MetricFlowUnit flowUnit : this.threadPool_RejectedReqs.getFlowUnits()) {
                if (flowUnit.isEmpty()) continue;
                double rejectCnt = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(), AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE.getField(), this.threadPoolMetric.toString(), "max");
                if (!Double.isNaN(rejectCnt)) {
                    if (rejectCnt > 0.0) {
                        if (!this.hasRejection) {
                            this.rejectionTimestamp = currTimestamp;
                        }
                        this.hasRejection = true;
                        continue;
                    }
                    this.hasRejection = false;
                    continue;
                }
                LOG.error("Failed to parse metric from threadpool {}", (Object)this.threadPool.toString());
            }
        }

        public boolean isUnhealthy(long currTimestamp) {
            return this.hasRejection && currTimestamp - this.rejectionTimestamp >= this.rejectionTimePeriodInMillis;
        }

        public HotResourceSummary generateSummary(long currTimestamp) {
            HotResourceSummary resourceSummary = null;
            if (this.isUnhealthy(currTimestamp)) {
                resourceSummary = new HotResourceSummary(this.threadPool, TimeUnit.MILLISECONDS.toSeconds(this.rejectionTimePeriodInMillis), TimeUnit.MILLISECONDS.toSeconds(currTimestamp - this.rejectionTimestamp), 0);
            }
            return resourceSummary;
        }
    }
}

