/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.processing.loading.steps;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.parser.RowParser;
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;

public class InputProcessorStepImpl
extends AbstractDataLoadProcessorStep {
    private RowParser rowParser;
    private CarbonIterator<Object[]>[] inputIterators;
    private short sdkWriterCores;
    public ExecutorService executorService;
    boolean isRawDataRequired = false;

    public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration, CarbonIterator<Object[]>[] inputIterators) {
        super(configuration, null);
        this.inputIterators = inputIterators;
        this.sdkWriterCores = configuration.getWritingCoresCount();
    }

    @Override
    public DataField[] getOutput() {
        return this.configuration.getDataFields();
    }

    @Override
    public void initialize() throws IOException {
        super.initialize();
        this.rowParser = new RowParserImpl(this.getOutput(), this.configuration);
        this.executorService = Executors.newCachedThreadPool((ThreadFactory)new CarbonThreadFactory("InputProcessorPool:" + this.configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName()));
        this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(this.configuration);
    }

    @Override
    public Iterator<CarbonRowBatch>[] execute() {
        int batchSize = CarbonProperties.getInstance().getBatchSize();
        List<CarbonIterator<Object[]>>[] readerIterators = CarbonDataProcessorUtil.partitionInputReaderIterators(this.inputIterators, this.sdkWriterCores);
        Iterator[] outIterators = new Iterator[readerIterators.length];
        for (int i = 0; i < outIterators.length; ++i) {
            outIterators[i] = new InputProcessorIterator(readerIterators[i], this.rowParser, batchSize, this.configuration.isPreFetch(), this.executorService, this.rowCounter, this.isRawDataRequired);
        }
        return outIterators;
    }

    @Override
    public void close() {
        if (!this.closed) {
            super.close();
            if (null != this.executorService) {
                this.executorService.shutdownNow();
            }
            for (CarbonIterator<Object[]> inputIterator : this.inputIterators) {
                inputIterator.close();
            }
        }
    }

    @Override
    protected String getStepName() {
        return "Input Processor";
    }

    public static class InputProcessorIterator
    extends CarbonIterator<CarbonRowBatch> {
        private List<CarbonIterator<Object[]>> inputIterators;
        private CarbonIterator<Object[]> currentIterator;
        private int counter;
        private int batchSize;
        private RowParser rowParser;
        private Future<CarbonRowBatch> future;
        private ExecutorService executorService;
        private boolean nextBatch;
        private boolean firstTime;
        private boolean preFetch;
        private AtomicLong rowCounter;
        private boolean isRawDataRequired = false;

        public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService, AtomicLong rowCounter, boolean isRawDataRequired) {
            this.inputIterators = inputIterators;
            this.batchSize = batchSize;
            this.rowParser = rowParser;
            this.counter = 0;
            this.currentIterator = inputIterators.get(this.counter++);
            this.executorService = executorService;
            this.rowCounter = rowCounter;
            this.preFetch = preFetch;
            this.nextBatch = false;
            this.firstTime = true;
            this.isRawDataRequired = isRawDataRequired;
        }

        public boolean hasNext() {
            return this.nextBatch || this.internalHasNext();
        }

        private boolean internalHasNext() {
            boolean hasNext;
            if (this.firstTime) {
                this.firstTime = false;
                this.currentIterator.initialize();
            }
            if (!(hasNext = this.currentIterator.hasNext())) {
                this.currentIterator.close();
                if (this.counter < this.inputIterators.size()) {
                    this.currentIterator = this.inputIterators.get(this.counter++);
                    this.currentIterator.initialize();
                    hasNext = this.internalHasNext();
                }
            }
            return hasNext;
        }

        public CarbonRowBatch next() {
            if (this.preFetch) {
                return this.getCarbonRowBatchWithPreFetch();
            }
            return this.getBatch();
        }

        private CarbonRowBatch getCarbonRowBatchWithPreFetch() {
            CarbonRowBatch result = null;
            if (this.future == null) {
                this.future = this.getCarbonRowBatch();
            }
            try {
                result = this.future.get();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
            this.nextBatch = false;
            if (this.hasNext()) {
                this.nextBatch = true;
                this.future = this.getCarbonRowBatch();
            }
            return result;
        }

        private Future<CarbonRowBatch> getCarbonRowBatch() {
            return this.executorService.submit(new Callable<CarbonRowBatch>(){

                @Override
                public CarbonRowBatch call() throws Exception {
                    return InputProcessorIterator.this.getBatch();
                }
            });
        }

        private CarbonRowBatch getBatch() {
            int count;
            CarbonRowBatch carbonRowBatch = new CarbonRowBatch(this.batchSize);
            if (this.isRawDataRequired) {
                for (count = 0; this.internalHasNext() && count < this.batchSize; ++count) {
                    Object[] rawRow = (Object[])this.currentIterator.next();
                    carbonRowBatch.addRow(new CarbonRow(this.rowParser.parseRow(rawRow), rawRow));
                }
            } else {
                while (this.internalHasNext() && count < this.batchSize) {
                    carbonRowBatch.addRow(new CarbonRow(this.rowParser.parseRow((Object[])this.currentIterator.next())));
                    ++count;
                }
            }
            this.rowCounter.getAndAdd(carbonRowBatch.getSize());
            return carbonRowBatch;
        }
    }
}

