/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.processing.loading.sort.impl;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.loading.sort.impl.ThreadStatusObserver;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;

public class UnsafeParallelReadMergeSorterImpl
extends AbstractMergeSorter {
    private static final LogService LOGGER = LogServiceFactory.getLogService((String)UnsafeParallelReadMergeSorterImpl.class.getName());
    private SortParameters sortParameters;
    private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
    private AtomicLong rowCounter;
    private ExecutorService executorService;

    public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
        this.rowCounter = rowCounter;
    }

    @Override
    public void initialize(SortParameters sortParameters) {
        this.sortParameters = sortParameters;
        this.unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
        this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, sortParameters.getTempFileLocation());
    }

    @Override
    public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException {
        int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
        UnsafeSortDataRows sortDataRow = new UnsafeSortDataRows(this.sortParameters, this.unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
        final int batchSize = CarbonProperties.getInstance().getBatchSize();
        try {
            sortDataRow.initialize();
        }
        catch (Exception e) {
            throw new CarbonDataLoadingException(e);
        }
        this.executorService = Executors.newFixedThreadPool(iterators.length, (ThreadFactory)new CarbonThreadFactory("UnsafeParallelSorterPool:" + this.sortParameters.getTableName()));
        this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
        try {
            for (int i = 0; i < iterators.length; ++i) {
                this.executorService.execute(new SortIteratorThread(iterators[i], sortDataRow, batchSize, this.rowCounter, this.threadStatusObserver));
            }
            this.executorService.shutdown();
            this.executorService.awaitTermination(2L, TimeUnit.DAYS);
            if (!this.sortParameters.getObserver().isFailed()) {
                this.processRowToNextStep(sortDataRow, this.sortParameters);
            }
        }
        catch (Exception e) {
            this.checkError();
            throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
        }
        this.checkError();
        try {
            this.unsafeIntermediateFileMerger.finish();
            List<UnsafeCarbonRowPage> rowPages = this.unsafeIntermediateFileMerger.getRowPages();
            this.finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), this.unsafeIntermediateFileMerger.getMergedPages());
        }
        catch (CarbonDataWriterException e) {
            throw new CarbonDataLoadingException(e);
        }
        catch (CarbonSortKeyAndGroupByException e) {
            throw new CarbonDataLoadingException(e);
        }
        CarbonIterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>(){

            public boolean hasNext() {
                return UnsafeParallelReadMergeSorterImpl.this.finalMerger.hasNext();
            }

            public CarbonRowBatch next() {
                CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
                for (int counter = 0; UnsafeParallelReadMergeSorterImpl.this.finalMerger.hasNext() && counter < batchSize; ++counter) {
                    rowBatch.addRow(new CarbonRow(UnsafeParallelReadMergeSorterImpl.this.finalMerger.next()));
                }
                return rowBatch;
            }
        };
        return new Iterator[]{batchIterator};
    }

    @Override
    public void close() {
        if (null != this.executorService && !this.executorService.isShutdown()) {
            this.executorService.shutdownNow();
        }
        this.unsafeIntermediateFileMerger.close();
        this.finalMerger.clear();
    }

    private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters) throws CarbonDataLoadingException {
        try {
            sortDataRows.startSorting();
            LOGGER.info("Record Processed For table: " + parameters.getTableName());
            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(parameters.getPartitionID(), Long.valueOf(System.currentTimeMillis()));
            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValuesTotalTime(parameters.getPartitionID(), Long.valueOf(System.currentTimeMillis()));
            return false;
        }
        catch (Exception e) {
            throw new CarbonDataLoadingException(e);
        }
    }

    private static class SortIteratorThread
    implements Runnable {
        private Iterator<CarbonRowBatch> iterator;
        private UnsafeSortDataRows sortDataRows;
        private Object[][] buffer;
        private AtomicLong rowCounter;
        private ThreadStatusObserver threadStatusObserver;

        public SortIteratorThread(Iterator<CarbonRowBatch> iterator, UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
            this.iterator = iterator;
            this.sortDataRows = sortDataRows;
            this.buffer = new Object[batchSize][];
            this.rowCounter = rowCounter;
            this.threadStatusObserver = threadStatusObserver;
        }

        @Override
        public void run() {
            try {
                while (this.iterator.hasNext()) {
                    CarbonRowBatch batch = this.iterator.next();
                    int i = 0;
                    while (batch.hasNext()) {
                        CarbonRow row = batch.next();
                        if (row == null) continue;
                        this.buffer[i++] = row.getData();
                    }
                    if (i <= 0) continue;
                    this.sortDataRows.addRowBatch(this.buffer, i);
                    this.rowCounter.getAndAdd(i);
                }
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e);
                this.threadStatusObserver.notifyFailed(e);
            }
        }
    }
}

