/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.pc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkItemManager<T, U extends WorkItemConsumer> {
    private static final Logger LOG = LoggerFactory.getLogger(WorkItemManager.class);
    private final int numWorkers;
    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;
    private final List<U> consumers = new ArrayList<U>();
    private CountDownLatch countdownLatch;
    private BlockingQueue<Object> resultsQueue;

    public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
        this.numWorkers = numWorkers;
        this.workQueue = new LinkedBlockingQueue<T>(batchSize * numWorkers);
        this.service = Executors.newFixedThreadPool(numWorkers, new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
        this.createConsumers(builder, numWorkers, collectResults);
        this.start();
    }

    public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
        this(builder, "workItemConsumer", batchSize, numWorkers, false);
    }

    public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
        this.resultsQueue = resultsQueue;
    }

    private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
        if (collectResults) {
            this.setResultsCollection(new LinkedBlockingQueue<Object>());
        }
        for (int i = 0; i < numWorkers; ++i) {
            WorkItemConsumer c = (WorkItemConsumer)builder.build(this.workQueue);
            this.consumers.add(c);
            if (!collectResults) continue;
            c.setResults(this.resultsQueue);
        }
    }

    public void start() {
        this.countdownLatch = new CountDownLatch(this.numWorkers);
        for (WorkItemConsumer c : this.consumers) {
            c.setCountDownLatch(this.countdownLatch);
            this.service.execute(c);
        }
    }

    public void produce(T item) {
        try {
            this.workQueue.put(item);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    public void checkProduce(T item) {
        if (this.countdownLatch.getCount() < (long)this.numWorkers) {
            LOG.info("Fewer workers detected: {}", (Object)this.countdownLatch.getCount());
            this.drain();
            this.start();
        }
        this.produce(item);
    }

    public void drain() {
        try {
            if (this.countdownLatch == null || this.countdownLatch.getCount() == 0L) {
                return;
            }
            LOG.debug("Drain: Stated! Queue size: {}", (Object)this.workQueue.size());
            this.countdownLatch.await();
            LOG.debug("Drain: Done! Queue size: {}", (Object)this.workQueue.size());
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    public void shutdown() throws InterruptedException {
        int avgCommitTimeSeconds = this.getAvgCommitTimeSeconds() * 2;
        LOG.info("WorkItemManager: Shutdown started. Will wait for: {} minutes...", (Object)avgCommitTimeSeconds);
        this.service.shutdown();
        this.service.awaitTermination(avgCommitTimeSeconds, TimeUnit.MINUTES);
        LOG.info("WorkItemManager: Shutdown done!");
    }

    public BlockingQueue getResults() {
        return this.resultsQueue;
    }

    private int getAvgCommitTimeSeconds() {
        int commitTimeSeconds = 0;
        for (WorkItemConsumer c : this.consumers) {
            commitTimeSeconds = (int)((long)commitTimeSeconds + c.getMaxCommitTimeInMs());
        }
        return commitTimeSeconds / this.consumers.size() / 1000;
    }
}

