/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.opendistroforelasticsearch.sql.legacy.query.planner.physical.node;

import com.amazon.opendistroforelasticsearch.sql.legacy.query.planner.core.ExecuteParams;
import com.amazon.opendistroforelasticsearch.sql.legacy.query.planner.physical.PhysicalOperator;
import com.amazon.opendistroforelasticsearch.sql.legacy.query.planner.physical.Row;
import com.amazon.opendistroforelasticsearch.sql.legacy.query.planner.resource.ResourceManager;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class BatchPhysicalOperator<T>
implements PhysicalOperator<T> {
    protected static final Logger LOG = LogManager.getLogger();
    private ResourceManager resourceMgr;
    private Iterator<Row<T>> curBatch;

    @Override
    public void open(ExecuteParams params) throws Exception {
        this.resourceMgr = (ResourceManager)params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER);
    }

    @Override
    public boolean hasNext() {
        if (this.isNoMoreDataInCurrentBatch()) {
            LOG.debug("{} No more data in current batch, pre-fetching next batch", (Object)this);
            Collection<Row<Row>> nextBatch = this.prefetchSafely();
            LOG.debug("{} Pre-fetched {} rows", (Object)this, (Object)nextBatch.size());
            if (LOG.isTraceEnabled()) {
                nextBatch.forEach(row -> LOG.trace("Row pre-fetched: {}", row));
            }
            this.curBatch = nextBatch.iterator();
        }
        return this.curBatch.hasNext();
    }

    @Override
    public Row<T> next() {
        return this.curBatch.next();
    }

    private Collection<Row<T>> prefetchSafely() {
        Objects.requireNonNull(this.resourceMgr, "ResourceManager is not set so unable to do sanity check");
        boolean isHealthy = this.resourceMgr.isHealthy();
        boolean isTimeout = this.resourceMgr.isTimeout();
        if (isHealthy && !isTimeout) {
            try {
                return this.prefetch();
            }
            catch (Exception e) {
                throw new IllegalStateException("Failed to prefetch next batch", e);
            }
        }
        throw new IllegalStateException("Exit due to " + (isHealthy ? "time out" : "insufficient resource"));
    }

    protected abstract Collection<Row<T>> prefetch() throws Exception;

    private boolean isNoMoreDataInCurrentBatch() {
        return this.curBatch == null || !this.curBatch.hasNext();
    }
}

