/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.mapreduce.tools;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AbstractKuduScannerBuilder;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Update;
import org.apache.kudu.mapreduce.CommandLineParser;
import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
import org.apache.kudu.mapreduce.tools.BigLinkedListCommon;
import org.apache.kudu.shaded.com.google.common.base.Joiner;
import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
import org.apache.kudu.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class IntegrationTestBigLinkedList
extends Configured
implements Tool {
    private static final byte[] NO_KEY = new byte[1];
    private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY = "IntegrationTestBigLinkedList.generator.num_rows";
    private static final String GENERATOR_NUM_MAPPERS_KEY = "IntegrationTestBigLinkedList.generator.map.tasks";
    private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width";
    private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap";
    private static final int WIDTH_DEFAULT = 1000000;
    private static final int WRAP_DEFAULT = 25;
    private static final int ROWKEY_LENGTH = 16;
    private String toRun;
    private String[] otherArgs;

    private static void configureScannerForRandomRead(AbstractKuduScannerBuilder builder, KuduTable table, long keyOne, long keyTwo) {
        PartialRow lowerBound = table.getSchema().newPartialRow();
        lowerBound.addLong(0, keyOne);
        lowerBound.addLong(1, keyTwo);
        builder.lowerBound(lowerBound);
        PartialRow upperBound = table.getSchema().newPartialRow();
        upperBound.addLong(0, keyOne + 1L);
        upperBound.addLong(1, keyTwo + 1L);
        builder.exclusiveUpperBound(upperBound);
    }

    private static String getTableName(Configuration conf) {
        return conf.get("IntegrationTestBigLinkedList.table", "IntegrationTestBigLinkedList");
    }

    private static String getHeadsTable(Configuration conf) {
        return conf.get("IntegrationTestBigLinkedList.heads_table", "IntegrationTestBigLinkedListHeads");
    }

    private static CINode getCINode(RowResult result, CINode node) {
        node.key = IntegrationTestBigLinkedList.getStringFromKeys(result.getLong(0), result.getLong(1));
        node.prev = result.isNull(2) || result.isNull(3) ? "NO_REFERENCE" : IntegrationTestBigLinkedList.getStringFromKeys(result.getLong(2), result.getLong(3));
        node.rowId = result.getInt(4);
        node.client = result.getString(5);
        node.updateCount = result.getInt(6);
        return node;
    }

    private static void printCINodeString(CINode node) {
        System.out.printf("%s:%s:%012d:%s:%s\n", node.key, node.prev, node.rowId, node.client, node.updateCount);
    }

    private static String getStringFromKeys(long key1, long key2) {
        return key1 + "," + key2;
    }

    private static RowResult getOneRowResult(KuduScanner scanner) throws KuduException {
        RowResultIterator rowResults = scanner.nextRows();
        if (rowResults.getNumRows() == 0) {
            return null;
        }
        if (rowResults.getNumRows() > 1) {
            throw new RuntimeException("Received too many rows from scanner " + scanner);
        }
        return rowResults.next();
    }

    private void usage() {
        System.err.println("Usage: " + ((Object)((Object)this)).getClass().getSimpleName() + " COMMAND [COMMAND options]");
        System.err.println("  where COMMAND is one of:");
        System.err.println("");
        System.err.println("  Generator                  A map only job that generates data.");
        System.err.println("  Verify                     A map reduce job that looks for holes");
        System.err.println("                             Look at the counts after running");
        System.err.println("                             REFERENCED and UNREFERENCED are ok");
        System.err.println("                             any UNDEFINED counts are bad. Do not");
        System.err.println("                             run at the same time as the Generator.");
        System.err.println("  Print                      A standalone program that prints nodes");
        System.err.println("                             in the linked list.");
        System.err.println("  Loop                       A program to Loop through Generator and");
        System.err.println("                             Verify steps");
        System.err.println("  Update                     A program to updade the nodes");
        System.err.println("  Walker                     A standalong program that starts ");
        System.err.println("                             following a linked list");
        System.err.println("\t  ");
        System.err.flush();
    }

    protected void processOptions(String[] args) {
        if (args.length < 1) {
            this.usage();
            throw new RuntimeException("Incorrect Number of args.");
        }
        this.toRun = args[0];
        this.otherArgs = Arrays.copyOfRange(args, 1, args.length);
    }

    public int run(String[] args) throws Exception {
        Configured tool;
        this.processOptions(args);
        if (this.toRun.equals("Generator")) {
            tool = new Generator();
        } else if (this.toRun.equals("Verify")) {
            tool = new Verify();
        } else if (this.toRun.equals("Loop")) {
            Loop loop = new Loop();
            loop.it = this;
            tool = loop;
        } else if (this.toRun.equals("Print")) {
            tool = new Print();
        } else if (this.toRun.equals("Update")) {
            tool = new Updater();
        } else if (this.toRun.equals("Walker")) {
            tool = new Walker();
        } else {
            this.usage();
            throw new RuntimeException("Unknown arg");
        }
        return ToolRunner.run((Configuration)this.getConf(), (Tool)tool, (String[])this.otherArgs);
    }

    private static void setJobConf(Job job, int numMappers, long numNodes, Integer width, Integer wrapMultiplier) {
        job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
        job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
        if (width != null) {
            job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width.intValue());
        }
        if (wrapMultiplier != null) {
            job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier.intValue());
        }
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run((Tool)new IntegrationTestBigLinkedList(), (String[])args);
        System.exit(ret);
    }

    static /* synthetic */ byte[] access$300() {
        return NO_KEY;
    }

    private static class Walker
    extends Configured
    implements Tool {
        private KuduClient client;
        private KuduTable table;

        private Walker() {
        }

        public int run(String[] args) throws IOException {
            if (args.length < 1) {
                System.err.println("Usage: Walker <start key> [<num nodes>]");
                System.err.println(" where <num nodes> defaults to 100 nodes that will be printed out");
                return 1;
            }
            int maxNumNodes = 100;
            if (args.length == 2) {
                maxNumNodes = Integer.parseInt(args[1]);
            }
            System.out.println("Running Walker with args:" + Arrays.deepToString(args));
            String[] keys = args[0].split(",", -1);
            if (keys.length != 2) {
                System.err.println("The row key must be formatted like key1,key2");
                return 1;
            }
            long keyOne = Long.parseLong(keys[0]);
            long keyTwo = Long.parseLong(keys[1]);
            System.out.println("Walking with " + IntegrationTestBigLinkedList.getStringFromKeys(keyOne, keyTwo));
            this.walk(keyOne, keyTwo, maxNumNodes);
            return 0;
        }

        private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws KuduException {
            CommandLineParser parser = new CommandLineParser(this.getConf());
            this.client = parser.getClient();
            this.table = this.client.openTable(IntegrationTestBigLinkedList.getTableName(this.getConf()));
            long prevKeyOne = headKeyOne;
            long prevKeyTwo = headKeyTwo;
            CINode node = new CINode();
            int nodesCount = 0;
            do {
                RowResult rr;
                if ((rr = this.nextNode(prevKeyOne, prevKeyTwo)) == null) {
                    System.err.println(IntegrationTestBigLinkedList.getStringFromKeys(prevKeyOne, prevKeyTwo) + " doesn't exist!");
                    break;
                }
                IntegrationTestBigLinkedList.getCINode(rr, node);
                IntegrationTestBigLinkedList.printCINodeString(node);
                if (rr.isNull(2) || rr.isNull(3)) {
                    System.err.println("Last node didn't have a reference, breaking");
                    break;
                }
                prevKeyOne = rr.getLong(2);
                prevKeyTwo = rr.getLong(3);
            } while (headKeyOne != prevKeyOne && headKeyTwo != prevKeyTwo && ++nodesCount < maxNumNodes);
        }

        private RowResult nextNode(long keyOne, long keyTwo) throws KuduException {
            KuduScanner.KuduScannerBuilder builder = this.client.newScannerBuilder(this.table);
            IntegrationTestBigLinkedList.configureScannerForRandomRead(builder, this.table, keyOne, keyTwo);
            return IntegrationTestBigLinkedList.getOneRowResult(builder.build());
        }
    }

    private static class Updater
    extends Configured
    implements Tool {
        private static final Logger LOG = LoggerFactory.getLogger(Updater.class);
        private static final String MAX_LINK_UPDATES_PER_MAPPER = "kudu.updates.per.mapper";

        private Updater() {
        }

        public int run(long maxLinkUpdatesPerMapper) throws Exception {
            LOG.info("Running Updater with maxLinkUpdatesPerMapper=" + maxLinkUpdatesPerMapper);
            Job job = new Job(this.getConf());
            job.setJobName("Link Updater");
            job.setNumReduceTasks(0);
            job.setJarByClass(((Object)((Object)this)).getClass());
            Joiner columnsToQuery = Joiner.on(",");
            new KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(job, IntegrationTestBigLinkedList.getHeadsTable(this.getConf()), columnsToQuery.join("key1", "key2", new Object[0])).configure();
            job.setMapperClass(UpdaterMapper.class);
            job.setMapOutputKeyClass(BytesWritable.class);
            job.setMapOutputValueClass(BytesWritable.class);
            job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
            job.getConfiguration().setInt("mapreduce.map.maxattempts", 1);
            job.getConfiguration().setInt("mapreduce.task.timeout", 0);
            job.getConfiguration().setLong(MAX_LINK_UPDATES_PER_MAPPER, maxLinkUpdatesPerMapper);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(NullWritable.class);
            job.setOutputFormatClass(NullOutputFormat.class);
            KuduTableMapReduceUtil.addDependencyJars(job);
            boolean success = job.waitForCompletion(true);
            Counters counters = job.getCounters();
            if (success) {
                Counter brokenLinks = counters.findCounter((Enum)Counts.BROKEN_LINKS);
                Counter badUpdates = counters.findCounter((Enum)Counts.BAD_UPDATE_COUNTS);
                if (brokenLinks.getValue() > 0L || badUpdates.getValue() > 0L) {
                    LOG.error("Corruption was detected, see the job's counters. Ending the update loop.");
                    success = false;
                }
            }
            return success ? 0 : 1;
        }

        public int run(String[] args) throws Exception {
            if (args.length < 2) {
                System.err.println("Usage: Update <num iterations> <max link updates per mapper>");
                System.err.println(" where <num iterations> will be 'infinite' if passed a negative value or zero");
                return 1;
            }
            LOG.info("Running Loop with args:" + Arrays.deepToString(args));
            int numIterations = Integer.parseInt(args[0]);
            long maxUpdates = Long.parseLong(args[1]);
            if (numIterations <= 0) {
                numIterations = Integer.MAX_VALUE;
            }
            if (maxUpdates < 1L) {
                maxUpdates = 1L;
            }
            for (int i = 0; i < numIterations; ++i) {
                LOG.info("Starting iteration = " + i);
                int ret = this.run(maxUpdates);
                if (ret == 0) continue;
                LOG.error("Can't continue updating, last run failed.");
                return ret;
            }
            return 0;
        }

        public static class UpdaterMapper
        extends Mapper<NullWritable, RowResult, NullWritable, NullWritable> {
            private KuduClient client;
            private KuduTable table;
            private KuduSession session;
            private static final ImmutableList<String> SCAN_COLUMN_NAMES = ImmutableList.of("prev1", "prev2", "update_count", "client");
            private int numUpdatesPerMapper;
            private List<Pair<Long, Long>> headsCache;

            protected void setup(Mapper.Context context) throws KuduException {
                Configuration conf = context.getConfiguration();
                CommandLineParser parser = new CommandLineParser(conf);
                this.client = parser.getClient();
                this.table = this.client.openTable(IntegrationTestBigLinkedList.getTableName(conf));
                this.session = this.client.newSession();
                this.numUpdatesPerMapper = conf.getInt(Updater.MAX_LINK_UPDATES_PER_MAPPER, 1);
                this.headsCache = new ArrayList<Pair<Long, Long>>(this.numUpdatesPerMapper);
            }

            protected void map(NullWritable key, RowResult value, Mapper.Context context) throws IOException, InterruptedException {
                do {
                    if (this.headsCache.size() >= this.numUpdatesPerMapper) continue;
                    value = (RowResult)context.getCurrentValue();
                    this.headsCache.add(new Pair<Long, Long>(value.getLong(0), value.getLong(1)));
                } while (context.nextKeyValue());
                LOG.info("Processing " + this.headsCache.size() + " linked lists, out of " + this.numUpdatesPerMapper);
                this.processAllHeads(context);
            }

            private void processAllHeads(Mapper.Context context) throws IOException {
                for (Pair<Long, Long> value : this.headsCache) {
                    this.processHead(value, context);
                }
            }

            private void processHead(Pair<Long, Long> head, Mapper.Context context) throws IOException {
                long headKeyOne = head.getFirst();
                long headKeyTwo = head.getSecond();
                long prevKeyOne = headKeyOne;
                long prevKeyTwo = headKeyTwo;
                int currentCount = -1;
                int newCount = -1;
                String client = null;
                LOG.info("Head: " + IntegrationTestBigLinkedList.getStringFromKeys(headKeyOne, headKeyTwo));
                do {
                    RowResult prev;
                    if ((prev = this.nextNode(prevKeyOne, prevKeyTwo)) == null) {
                        context.getCounter((Enum)Counts.BROKEN_LINKS).increment(1L);
                        LOG.warn(IntegrationTestBigLinkedList.getStringFromKeys(prevKeyOne, prevKeyTwo) + " doesn't exist");
                        break;
                    }
                    if (prev.isNull(0) || prev.isNull(1)) {
                        context.getCounter((Enum)Counts.BROKEN_LINKS).increment(1L);
                        LOG.warn(IntegrationTestBigLinkedList.getStringFromKeys(prevKeyOne, prevKeyTwo) + " isn't referencing anywhere");
                        break;
                    }
                    int prevCount = prev.getInt(2);
                    String prevClient = prev.getString(3);
                    if (currentCount == -1) {
                        currentCount = prevCount;
                        newCount = currentCount + 1;
                        client = prevClient;
                    }
                    if (prevCount != currentCount) {
                        context.getCounter((Enum)Counts.BAD_UPDATE_COUNTS).increment(1L);
                        LOG.warn(IntegrationTestBigLinkedList.getStringFromKeys(prevKeyOne, prevKeyTwo) + " has a wrong updateCount, " + prevCount + " instead of " + currentCount);
                        break;
                    }
                    if (!prevClient.equals(client)) {
                        context.getCounter((Enum)Counts.BROKEN_LINKS).increment(1L);
                        LOG.warn(IntegrationTestBigLinkedList.getStringFromKeys(prevKeyOne, prevKeyTwo) + " has the wrong client, bad reference? Bad client= " + prevClient);
                        break;
                    }
                    this.updateRow(prevKeyOne, prevKeyTwo, newCount);
                    context.getCounter((Enum)Counts.UPDATED_NODES).increment(1L);
                    if (prevKeyOne % 10L == 0L) {
                        context.progress();
                    }
                    prevKeyOne = prev.getLong(0);
                    prevKeyTwo = prev.getLong(1);
                } while (headKeyOne != prevKeyOne && headKeyTwo != prevKeyTwo);
                this.updateStatCounters(context, newCount);
                context.getCounter((Enum)Counts.UPDATED_LINKS).increment(1L);
            }

            private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws KuduException {
                KuduScanner.KuduScannerBuilder builder = (KuduScanner.KuduScannerBuilder)this.client.newScannerBuilder(this.table).setProjectedColumnNames(SCAN_COLUMN_NAMES);
                IntegrationTestBigLinkedList.configureScannerForRandomRead(builder, this.table, prevKeyOne, prevKeyTwo);
                return IntegrationTestBigLinkedList.getOneRowResult(builder.build());
            }

            private void updateRow(long keyOne, long keyTwo, int newCount) throws IOException {
                Update update = this.table.newUpdate();
                PartialRow row = update.getRow();
                row.addLong("key1", keyOne);
                row.addLong("key2", keyTwo);
                row.addInt("update_count", newCount);
                this.session.apply(update);
            }

            private void updateStatCounters(Mapper.Context context, int newCount) {
                switch (newCount) {
                    case -1: 
                    case 0: {
                        break;
                    }
                    case 1: {
                        context.getCounter((Enum)Counts.FIRST_UPDATE).increment(1L);
                        break;
                    }
                    case 2: {
                        context.getCounter((Enum)Counts.SECOND_UPDATE).increment(1L);
                        break;
                    }
                    case 3: {
                        context.getCounter((Enum)Counts.THIRD_UPDATE).increment(1L);
                        break;
                    }
                    case 4: {
                        context.getCounter((Enum)Counts.FOURTH_UPDATE).increment(1L);
                        break;
                    }
                    default: {
                        context.getCounter((Enum)Counts.MORE_THAN_FOUR_UPDATES).increment(1L);
                    }
                }
            }

            protected void cleanup(Mapper.Context context) throws KuduException {
                this.session.close();
                this.client.shutdown();
            }
        }

        public static enum Counts {
            UPDATED_LINKS,
            UPDATED_NODES,
            FIRST_UPDATE,
            SECOND_UPDATE,
            THIRD_UPDATE,
            FOURTH_UPDATE,
            MORE_THAN_FOUR_UPDATES,
            BROKEN_LINKS,
            BAD_UPDATE_COUNTS;

        }
    }

    private static class Print
    extends Configured
    implements Tool {
        private Print() {
        }

        public int run(String[] args) throws Exception {
            RowResultIterator rowResults;
            PartialRow row;
            Options options = new Options();
            options.addOption("s", "start", true, "start key, only the first component");
            options.addOption("e", "end", true, "end key (exclusive), only the first component");
            options.addOption("l", "limit", true, "number to print");
            GnuParser parser = new GnuParser();
            CommandLine cmd = null;
            try {
                cmd = parser.parse(options, args);
                if (cmd.getArgs().length != 0) {
                    throw new ParseException("Command takes no arguments");
                }
            }
            catch (ParseException e) {
                System.err.println("Failed to parse command line " + e.getMessage());
                System.err.println();
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp(((Object)((Object)this)).getClass().getSimpleName(), options);
                System.exit(-1);
            }
            CommandLineParser cmdLineParser = new CommandLineParser(this.getConf());
            long timeout = cmdLineParser.getOperationTimeoutMs();
            KuduClient client = cmdLineParser.getClient();
            KuduTable table = client.openTable(IntegrationTestBigLinkedList.getTableName(this.getConf()));
            KuduScanner.KuduScannerBuilder builder = (KuduScanner.KuduScannerBuilder)client.newScannerBuilder(table).scanRequestTimeout(timeout);
            if (cmd.hasOption("s")) {
                row = table.getSchema().newPartialRow();
                row.addLong(0, Long.parseLong(cmd.getOptionValue("s")));
                builder.lowerBound(row);
            }
            if (cmd.hasOption("e")) {
                row = table.getSchema().newPartialRow();
                row.addLong(0, Long.parseLong(cmd.getOptionValue("e")));
                builder.exclusiveUpperBound(row);
            }
            int limit = cmd.hasOption("l") ? Integer.parseInt(cmd.getOptionValue("l")) : 100;
            int count = 0;
            KuduScanner scanner = builder.build();
            while (scanner.hasMoreRows() && count < limit) {
                rowResults = scanner.nextRows();
                count = Print.printNodesAndGetNewCount(count, limit, rowResults);
            }
            rowResults = scanner.close();
            Print.printNodesAndGetNewCount(count, limit, rowResults);
            client.shutdown();
            return 0;
        }

        private static int printNodesAndGetNewCount(int oldCount, int limit, RowResultIterator rowResults) {
            int newCount = oldCount;
            if (rowResults == null) {
                return newCount;
            }
            CINode node = new CINode();
            for (RowResult result : rowResults) {
                node = IntegrationTestBigLinkedList.getCINode(result, node);
                IntegrationTestBigLinkedList.printCINodeString(node);
                if (++newCount != limit) continue;
                break;
            }
            return newCount;
        }
    }

    static class Loop
    extends Configured
    implements Tool {
        private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
        IntegrationTestBigLinkedList it;
        FileSystem fs;

        Loop() {
        }

        protected void runGenerator(int numMappers, long numNodes, int numTablets, String outputDir, Integer width, Integer wrapMultiplier) throws Exception {
            Path outputPath = new Path(outputDir);
            UUID uuid = UUID.randomUUID();
            Path generatorOutput = new Path(outputPath, uuid.toString());
            Generator generator = new Generator();
            generator.setConf(this.getConf());
            int retCode = generator.run(numMappers, numNodes, numTablets, generatorOutput, width, wrapMultiplier);
            if (retCode > 0) {
                throw new RuntimeException("Generator failed with return code: " + retCode);
            }
            this.fs.delete(generatorOutput, true);
        }

        protected void runVerify(String outputDir, int numReducers, long expectedNumNodes, int retries) throws Exception {
            for (int i = 0; i < retries; ++i) {
                if (i > 0) {
                    long sleep = 60000L;
                    LOG.info("Retrying in " + sleep + "ms");
                    Thread.sleep(sleep);
                }
                Path outputPath = new Path(outputDir);
                UUID uuid = UUID.randomUUID();
                Path iterationOutput = new Path(outputPath, uuid.toString());
                Verify verify = new Verify();
                verify.setConf(this.getConf());
                int retCode = verify.run(iterationOutput, numReducers);
                if (retCode > 0) {
                    LOG.warn("Verify.run failed with return code: " + retCode);
                    continue;
                }
                if (!verify.verify(expectedNumNodes)) {
                    LOG.warn("Verify.verify failed");
                    continue;
                }
                this.fs.delete(iterationOutput, true);
                LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
                return;
            }
            throw new RuntimeException("Ran out of retries to verify");
        }

        public int run(String[] args) throws Exception {
            int numVerifyRetries;
            if (args.length < 6) {
                System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <num_tablets> <output dir> <num reducers> [<width> <wrap multiplier><start expected nodes> <num_verify_retries>]");
                return 1;
            }
            LOG.info("Running Loop with args:" + Arrays.deepToString(args));
            int numIterations = Integer.parseInt(args[0]);
            int numMappers = Integer.parseInt(args[1]);
            long numNodes = Long.parseLong(args[2]);
            int numTablets = Integer.parseInt(args[3]);
            String outputDir = args[4];
            int numReducers = Integer.parseInt(args[5]);
            Integer width = args.length < 7 ? null : Integer.valueOf(Integer.parseInt(args[6]));
            Integer wrapMultiplier = args.length < 8 ? null : Integer.valueOf(Integer.parseInt(args[7]));
            long expectedNumNodes = args.length < 9 ? 0L : Long.parseLong(args[8]);
            int n = numVerifyRetries = args.length < 10 ? 3 : Integer.parseInt(args[9]);
            if (numIterations < 0) {
                numIterations = Integer.MAX_VALUE;
            }
            this.fs = FileSystem.get((Configuration)this.getConf());
            for (int i = 0; i < numIterations; ++i) {
                LOG.info("Starting iteration = " + i);
                this.runGenerator(numMappers, numNodes, numTablets, outputDir, width, wrapMultiplier);
                this.runVerify(outputDir, numReducers, expectedNumNodes += (long)numMappers * numNodes, numVerifyRetries);
            }
            return 0;
        }
    }

    static class Verify
    extends Configured
    implements Tool {
        private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
        private static final BytesWritable DEF = new BytesWritable(IntegrationTestBigLinkedList.access$300());
        private static final Joiner COMMA_JOINER = Joiner.on(",");
        private static final byte[] rowKey = new byte[16];
        private static final byte[] prev = new byte[16];
        private Job job;

        Verify() {
        }

        public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
                return 0;
            }
            String outputDir = args[0];
            int numReducers = Integer.parseInt(args[1]);
            return this.run(outputDir, numReducers);
        }

        public int run(String outputDir, int numReducers) throws Exception {
            return this.run(new Path(outputDir), numReducers);
        }

        public int run(Path outputDir, int numReducers) throws Exception {
            LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
            this.job = new Job(this.getConf());
            this.job.setJobName("Link Verifier");
            this.job.setNumReduceTasks(numReducers);
            this.job.setJarByClass(((Object)((Object)this)).getClass());
            Joiner columnsToQuery = Joiner.on(",");
            new KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(this.job, IntegrationTestBigLinkedList.getTableName(this.getConf()), columnsToQuery.join("key1", "key2", "prev1", "prev2")).configure();
            this.job.setMapperClass(VerifyMapper.class);
            this.job.setMapOutputKeyClass(BytesWritable.class);
            this.job.setMapOutputValueClass(BytesWritable.class);
            this.job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
            this.job.setReducerClass(VerifyReducer.class);
            this.job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath((Job)this.job, (Path)outputDir);
            boolean success = this.job.waitForCompletion(true);
            return success ? 0 : 1;
        }

        public boolean verify(long expectedReferenced) throws Exception {
            if (this.job == null) {
                throw new IllegalStateException("You should call run() first");
            }
            Counters counters = this.job.getCounters();
            Counter referenced = counters.findCounter((Enum)BigLinkedListCommon.Counts.REFERENCED);
            Counter unreferenced = counters.findCounter((Enum)BigLinkedListCommon.Counts.UNREFERENCED);
            Counter undefined = counters.findCounter((Enum)BigLinkedListCommon.Counts.UNDEFINED);
            Counter multiref = counters.findCounter((Enum)BigLinkedListCommon.Counts.EXTRAREFERENCES);
            boolean success = true;
            if (expectedReferenced != referenced.getValue()) {
                LOG.error("Expected referenced count does not match with actual referenced count. Expected referenced=" + expectedReferenced + ", actual=" + referenced.getValue());
                success = false;
            }
            if (unreferenced.getValue() > 0L) {
                boolean couldBeMultiRef = multiref.getValue() == unreferenced.getValue();
                LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
                success = false;
            }
            if (undefined.getValue() > 0L) {
                LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
                success = false;
            }
            if (!success) {
                String keyString;
                CounterGroup g = (CounterGroup)counters.getGroup("undef");
                Iterator it = g.iterator();
                while (it.hasNext()) {
                    keyString = ((Counter)it.next()).getName();
                    LOG.error("undefined row " + keyString);
                }
                g = (CounterGroup)counters.getGroup("unref");
                it = g.iterator();
                while (it.hasNext()) {
                    keyString = ((Counter)it.next()).getName();
                    LOG.error("unreferred row " + keyString);
                }
            }
            return success;
        }

        public static class VerifyReducer
        extends Reducer<BytesWritable, BytesWritable, Text, Text> {
            private ArrayList<byte[]> refs = new ArrayList();

            public void reduce(BytesWritable key, Iterable<BytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
                int defCount = 0;
                this.refs.clear();
                for (BytesWritable type : values) {
                    if (type.getLength() == DEF.getLength()) {
                        ++defCount;
                        continue;
                    }
                    byte[] bytes = new byte[type.getLength()];
                    System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
                    this.refs.add(bytes);
                }
                ArrayList<String> refsList = new ArrayList<String>(this.refs.size());
                String keyString = null;
                if (defCount == 0 || this.refs.size() != 1) {
                    for (byte[] ref : this.refs) {
                        refsList.add(COMMA_JOINER.join(Bytes.getLong(ref), Bytes.getLong(ref, 8), new Object[0]));
                    }
                    keyString = COMMA_JOINER.join(Bytes.getLong(key.getBytes()), Bytes.getLong(key.getBytes(), 8), new Object[0]);
                    LOG.error("Linked List error: Key = " + keyString + " References = " + refsList);
                }
                if (defCount == 0 && this.refs.size() > 0) {
                    context.write((Object)new Text(keyString), (Object)new Text(((Object)refsList).toString()));
                    context.getCounter((Enum)BigLinkedListCommon.Counts.UNDEFINED).increment(1L);
                } else if (defCount > 0 && this.refs.size() == 0) {
                    context.write((Object)new Text(keyString), (Object)new Text("none"));
                    context.getCounter((Enum)BigLinkedListCommon.Counts.UNREFERENCED).increment(1L);
                } else {
                    if (this.refs.size() > 1) {
                        if (refsList != null) {
                            context.write((Object)new Text(keyString), (Object)new Text(((Object)refsList).toString()));
                        }
                        context.getCounter((Enum)BigLinkedListCommon.Counts.EXTRAREFERENCES).increment((long)(this.refs.size() - 1));
                    }
                    context.getCounter((Enum)BigLinkedListCommon.Counts.REFERENCED).increment(1L);
                }
            }
        }

        public static class VerifyMapper
        extends Mapper<NullWritable, RowResult, BytesWritable, BytesWritable> {
            private BytesWritable row = new BytesWritable();
            private BytesWritable ref = new BytesWritable();

            protected void map(NullWritable key, RowResult value, Mapper.Context context) throws IOException, InterruptedException {
                Bytes.setLong(rowKey, value.getLong(0));
                Bytes.setLong(rowKey, value.getLong(1), 8);
                this.row.set(rowKey, 0, rowKey.length);
                context.write((Object)this.row, (Object)DEF);
                if (value.isNull(2)) {
                    LOG.warn(String.format("Prev is not set for: %s", Bytes.pretty(rowKey)));
                } else {
                    Bytes.setLong(prev, value.getLong(2));
                    Bytes.setLong(prev, value.getLong(3), 8);
                    this.ref.set(prev, 0, prev.length);
                    context.write((Object)this.ref, (Object)this.row);
                }
            }
        }
    }

    static class Generator
    extends Configured
    implements Tool {
        private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
        private CommandLineParser parser;
        private KuduClient client;

        Generator() {
        }

        public int run(String[] args) throws Exception {
            if (args.length < 4) {
                System.out.println("Usage : " + Generator.class.getSimpleName() + " <num mappers> <num nodes per map> <num_tablets> <tmp output dir> [<width> <wrap multiplier>]");
                System.out.println("   where <num nodes per map> should be a multiple of  width*wrap multiplier, 25M by default");
                return 0;
            }
            int numMappers = Integer.parseInt(args[0]);
            long numNodes = Long.parseLong(args[1]);
            int numTablets = Integer.parseInt(args[2]);
            Path tmpOutput = new Path(args[3]);
            Integer width = args.length < 5 ? null : Integer.valueOf(Integer.parseInt(args[4]));
            Integer wrapMultiplier = args.length < 6 ? null : Integer.valueOf(Integer.parseInt(args[5]));
            return this.run(numMappers, numNodes, numTablets, tmpOutput, width, wrapMultiplier);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int run(int numMappers, long numNodes, int numTablets, Path tmpOutput, Integer width, Integer wrapMultiplier) throws Exception {
            this.parser = new CommandLineParser(this.getConf());
            this.client = this.parser.getClient();
            try {
                int ret = this.runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier);
                if (ret > 0) {
                    int n = ret;
                    return n;
                }
                int n = this.runGenerator(numMappers, numNodes, numTablets, tmpOutput, width, wrapMultiplier);
                return n;
            }
            finally {
                this.client.close();
                this.client = null;
            }
        }

        private void createTables(int numTablets) throws Exception {
            this.createSchema(IntegrationTestBigLinkedList.getTableName(this.getConf()), BigLinkedListCommon.getTableSchema(), numTablets);
            this.createSchema(IntegrationTestBigLinkedList.getHeadsTable(this.getConf()), BigLinkedListCommon.getHeadsTableSchema(), numTablets);
        }

        private void createSchema(String tableName, Schema schema, int numTablets) throws Exception {
            if (this.client.tableExists(tableName)) {
                return;
            }
            CreateTableOptions builder = BigLinkedListCommon.getCreateTableOptions(schema, this.parser.getNumReplicas(), numTablets, 1);
            this.client.createTable(tableName, schema, builder);
        }

        public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, Integer wrapMultiplier) throws Exception {
            LOG.info("Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes);
            Job job = new Job(this.getConf());
            job.setJobName("Random Input Generator");
            job.setNumReduceTasks(0);
            job.setJarByClass(((Object)((Object)this)).getClass());
            job.setInputFormatClass(GeneratorInputFormat.class);
            job.setOutputKeyClass(BytesWritable.class);
            job.setOutputValueClass(NullWritable.class);
            IntegrationTestBigLinkedList.setJobConf(job, numMappers, numNodes, width, wrapMultiplier);
            job.setMapperClass(Mapper.class);
            FileOutputFormat.setOutputPath((Job)job, (Path)tmpOutput);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        }

        public int runGenerator(int numMappers, long numNodes, int numTablets, Path tmpOutput, Integer width, Integer wrapMultiplier) throws Exception {
            LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes);
            this.createTables(numTablets);
            Job job = new Job(this.getConf());
            job.setJobName("Link Generator");
            job.setNumReduceTasks(0);
            job.setJarByClass(((Object)((Object)this)).getClass());
            FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{tmpOutput});
            job.setInputFormatClass(OneFilePerMapperSFIF.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(NullWritable.class);
            IntegrationTestBigLinkedList.setJobConf(job, numMappers, numNodes, width, wrapMultiplier);
            job.setMapperClass(GeneratorMapper.class);
            job.setOutputFormatClass(NullOutputFormat.class);
            job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
            job.getConfiguration().setInt("mapreduce.map.maxattempts", 1);
            job.getConfiguration().setInt("mapreduce.task.timeout", 0);
            KuduTableMapReduceUtil.addDependencyJars(job);
            KuduTableMapReduceUtil.addCredentialsToJob(this.client, job);
            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        }

        static class GeneratorMapper
        extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
            private byte[][] first = null;
            private byte[][] prev = null;
            private byte[][] current = null;
            private String id;
            private long rowId = 0L;
            private int i;
            private KuduClient client;
            private KuduTable table;
            private KuduSession session;
            private KuduTable headsTable;
            private long numNodes;
            private long wrap;
            private int width;

            GeneratorMapper() {
            }

            protected void setup(Mapper.Context context) throws KuduException {
                this.id = "Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID();
                Configuration conf = context.getConfiguration();
                CommandLineParser parser = new CommandLineParser(conf);
                this.client = parser.getClient();
                this.table = this.client.openTable(IntegrationTestBigLinkedList.getTableName(conf));
                this.headsTable = this.client.openTable(IntegrationTestBigLinkedList.getHeadsTable(conf));
                this.session = this.client.newSession();
                this.session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
                this.session.setMutationBufferSpace(1000000);
                this.session.setIgnoreAllDuplicateRows(true);
                this.width = context.getConfiguration().getInt(IntegrationTestBigLinkedList.GENERATOR_WIDTH_KEY, 1000000);
                this.current = new byte[this.width][];
                int wrapMultiplier = context.getConfiguration().getInt(IntegrationTestBigLinkedList.GENERATOR_WRAP_KEY, 25);
                this.wrap = (long)wrapMultiplier * (long)this.width;
                this.numNodes = context.getConfiguration().getLong(IntegrationTestBigLinkedList.GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000L);
                if (this.numNodes < this.wrap) {
                    this.wrap = this.numNodes;
                }
            }

            protected void cleanup(Mapper.Context context) throws KuduException {
                this.session.close();
                this.client.shutdown();
            }

            protected void map(BytesWritable key, NullWritable value, Mapper.Context output) throws IOException {
                this.current[this.i] = new byte[key.getLength()];
                System.arraycopy(key.getBytes(), 0, this.current[this.i], 0, key.getLength());
                if (++this.i == this.current.length) {
                    this.persist(output, this.current, false);
                    this.i = 0;
                    if (this.first == null) {
                        this.first = this.current;
                    }
                    this.prev = this.current;
                    this.current = new byte[this.width][];
                    this.rowId += (long)this.current.length;
                    output.setStatus("Count " + this.rowId);
                    if (this.rowId % this.wrap == 0L) {
                        GeneratorMapper.circularLeftShift(this.first);
                        this.persist(output, this.first, true);
                        Insert insert = this.headsTable.newInsert();
                        PartialRow row = insert.getRow();
                        row.addLong("key1", Bytes.getLong(this.first[0]));
                        row.addLong("key2", Bytes.getLong(this.first[0], 8));
                        this.session.apply(insert);
                        this.session.flush();
                        this.first = null;
                        this.prev = null;
                    }
                }
            }

            private static <T> void circularLeftShift(T[] first) {
                T ez = first[0];
                System.arraycopy(first, 1, first, 0, first.length - 1);
                first[first.length - 1] = ez;
            }

            private void persist(Mapper.Context output, byte[][] data, boolean update) throws KuduException {
                for (int i = 0; i < data.length; ++i) {
                    Operation put = update ? this.table.newUpdate() : this.table.newInsert();
                    PartialRow row = put.getRow();
                    long keyOne = Bytes.getLong(data[i]);
                    long keyTwo = Bytes.getLong(data[i], 8);
                    row.addLong("key1", keyOne);
                    row.addLong("key2", keyTwo);
                    if (this.prev == null) {
                        row.setNull("prev1");
                        row.setNull("prev2");
                    } else {
                        row.addLong("prev1", Bytes.getLong(this.prev[i]));
                        row.addLong("prev2", Bytes.getLong(this.prev[i], 8));
                    }
                    if (!update) {
                        row.addLong("row_id", this.rowId + (long)i);
                        row.addString("client", this.id);
                        row.addInt("update_count", 0);
                    }
                    this.session.apply(put);
                    if (i % 1000 != 0) continue;
                    output.progress();
                }
                this.session.flush();
            }
        }

        static class OneFilePerMapperSFIF<K, V>
        extends SequenceFileInputFormat<K, V> {
            OneFilePerMapperSFIF() {
            }

            protected boolean isSplitable(JobContext context, Path filename) {
                return false;
            }
        }

        static class GeneratorInputFormat
        extends InputFormat<BytesWritable, NullWritable> {
            GeneratorInputFormat() {
            }

            public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
                GeneratorRecordReader rr = new GeneratorRecordReader();
                rr.initialize(split, context);
                return rr;
            }

            public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
                int numMappers = job.getConfiguration().getInt(IntegrationTestBigLinkedList.GENERATOR_NUM_MAPPERS_KEY, 1);
                ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
                for (int i = 0; i < numMappers; ++i) {
                    splits.add(new GeneratorInputSplit());
                }
                return splits;
            }

            static class GeneratorRecordReader
            extends RecordReader<BytesWritable, NullWritable> {
                private long count;
                private long numNodes;
                private BigLinkedListCommon.Xoroshiro128PlusRandom rand;

                GeneratorRecordReader() {
                }

                public void close() throws IOException {
                }

                public BytesWritable getCurrentKey() throws IOException, InterruptedException {
                    byte[] bytes = new byte[16];
                    this.rand.nextBytes(bytes);
                    return new BytesWritable(bytes);
                }

                public NullWritable getCurrentValue() throws IOException, InterruptedException {
                    return NullWritable.get();
                }

                public float getProgress() throws IOException, InterruptedException {
                    return (float)((double)this.count / (double)this.numNodes);
                }

                public void initialize(InputSplit arg0, TaskAttemptContext context) throws IOException, InterruptedException {
                    this.numNodes = context.getConfiguration().getLong(IntegrationTestBigLinkedList.GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000L);
                    this.rand = new BigLinkedListCommon.Xoroshiro128PlusRandom();
                }

                public boolean nextKeyValue() throws IOException, InterruptedException {
                    return this.count++ < this.numNodes;
                }
            }

            static class GeneratorInputSplit
            extends InputSplit
            implements Writable {
                GeneratorInputSplit() {
                }

                public long getLength() throws IOException, InterruptedException {
                    return 1L;
                }

                public String[] getLocations() throws IOException, InterruptedException {
                    return new String[0];
                }

                public void readFields(DataInput arg0) throws IOException {
                }

                public void write(DataOutput arg0) throws IOException {
                }
            }
        }
    }

    static class CINode {
        String key;
        String prev;
        String client;
        long rowId;
        int updateCount;

        CINode() {
        }
    }
}

