/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.processing.loading.sort.impl;

import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.loading.sort.impl.ThreadStatusObserver;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;

public class ParallelReadMergeSorterImpl
extends AbstractMergeSorter {
    private static final LogService LOGGER = LogServiceFactory.getLogService((String)ParallelReadMergeSorterImpl.class.getName());
    private SortParameters sortParameters;
    private SortIntermediateFileMerger intermediateFileMerger;
    private SingleThreadFinalSortFilesMerger finalMerger;
    private AtomicLong rowCounter;
    private ExecutorService executorService;

    public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
        this.rowCounter = rowCounter;
    }

    @Override
    public void initialize(SortParameters sortParameters) {
        this.sortParameters = sortParameters;
        this.intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
        String[] storeLocations = CarbonDataProcessorUtil.getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), false, false);
        String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations, File.separator, "sortrowtmp");
        this.finalMerger = new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(), sortParameters);
    }

    @Override
    public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException {
        SortDataRows sortDataRow = new SortDataRows(this.sortParameters, this.intermediateFileMerger);
        final int batchSize = CarbonProperties.getInstance().getBatchSize();
        try {
            sortDataRow.initialize();
        }
        catch (CarbonSortKeyAndGroupByException e) {
            throw new CarbonDataLoadingException(e);
        }
        this.executorService = Executors.newFixedThreadPool(iterators.length, (ThreadFactory)new CarbonThreadFactory("SafeParallelSorterPool:" + this.sortParameters.getTableName()));
        this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
        try {
            for (int i = 0; i < iterators.length; ++i) {
                this.executorService.execute(new SortIteratorThread(iterators[i], sortDataRow, batchSize, this.rowCounter, this.threadStatusObserver));
            }
            this.executorService.shutdown();
            this.executorService.awaitTermination(2L, TimeUnit.DAYS);
            this.processRowToNextStep(sortDataRow, this.sortParameters);
        }
        catch (Exception e) {
            this.checkError();
            throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
        }
        this.checkError();
        try {
            this.intermediateFileMerger.finish();
            this.intermediateFileMerger = null;
            this.finalMerger.startFinalMerge();
        }
        catch (CarbonDataWriterException e) {
            throw new CarbonDataLoadingException(e);
        }
        catch (CarbonSortKeyAndGroupByException e) {
            throw new CarbonDataLoadingException(e);
        }
        CarbonIterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>(){

            public boolean hasNext() {
                return ParallelReadMergeSorterImpl.this.finalMerger.hasNext();
            }

            public CarbonRowBatch next() {
                CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
                for (int counter = 0; ParallelReadMergeSorterImpl.this.finalMerger.hasNext() && counter < batchSize; ++counter) {
                    rowBatch.addRow(new CarbonRow(ParallelReadMergeSorterImpl.this.finalMerger.next()));
                }
                return rowBatch;
            }
        };
        return new Iterator[]{batchIterator};
    }

    @Override
    public void close() {
        if (this.intermediateFileMerger != null) {
            this.intermediateFileMerger.close();
        }
        if (null != this.executorService && !this.executorService.isShutdown()) {
            this.executorService.shutdownNow();
        }
    }

    private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) throws CarbonDataLoadingException {
        try {
            sortDataRows.startSorting();
            LOGGER.info("Record Processed For table: " + parameters.getTableName());
            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(parameters.getPartitionID(), Long.valueOf(System.currentTimeMillis()));
            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValuesTotalTime(parameters.getPartitionID(), Long.valueOf(System.currentTimeMillis()));
            return false;
        }
        catch (CarbonSortKeyAndGroupByException e) {
            throw new CarbonDataLoadingException(e);
        }
    }

    private static class SortIteratorThread
    implements Runnable {
        private Iterator<CarbonRowBatch> iterator;
        private SortDataRows sortDataRows;
        private Object[][] buffer;
        private AtomicLong rowCounter;
        private ThreadStatusObserver observer;

        public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows, int batchSize, AtomicLong rowCounter, ThreadStatusObserver observer) {
            this.iterator = iterator;
            this.sortDataRows = sortDataRows;
            this.buffer = new Object[batchSize][];
            this.rowCounter = rowCounter;
            this.observer = observer;
        }

        @Override
        public void run() {
            try {
                while (this.iterator.hasNext()) {
                    CarbonRowBatch batch = this.iterator.next();
                    int i = 0;
                    while (batch.hasNext()) {
                        CarbonRow row = batch.next();
                        if (row == null) continue;
                        this.buffer[i++] = row.getData();
                    }
                    if (i <= 0) continue;
                    this.sortDataRows.addRowBatch(this.buffer, i);
                    this.rowCounter.getAndAdd(i);
                }
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e);
                this.observer.notifyFailed(e);
            }
        }
    }
}

