/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.IOException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class CustomDistributionITCase
extends TestLogger {
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(8).build());

    @Test
    public void testPartitionWithDistribution1() throws Exception {
        final TestDataDist1 dist = new TestDataDist1();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(dist.getParallelism());
        DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator result = DataSetUtils.partitionByRange(input, (DataDistribution)dist, (int[])new int[]{0}).mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>(){

            public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
                int pIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                for (Tuple3<Integer, Long, String> s : values) {
                    Integer[] lower;
                    boolean correctlyPartitioned = true;
                    if (pIdx == 0) {
                        Integer[] upper = dist.boundaries[0];
                        if (((Integer)s.f0).compareTo(upper[0]) > 0) {
                            correctlyPartitioned = false;
                        }
                    } else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
                        lower = dist.boundaries[pIdx - 1];
                        Integer[] upper = dist.boundaries[pIdx];
                        if (((Integer)s.f0).compareTo(upper[0]) > 0 || ((Integer)s.f0).compareTo(lower[0]) <= 0) {
                            correctlyPartitioned = false;
                        }
                    } else {
                        lower = dist.boundaries[pIdx - 1];
                        if (((Integer)s.f0).compareTo(lower[0]) <= 0) {
                            correctlyPartitioned = false;
                        }
                    }
                    if (correctlyPartitioned) continue;
                    Assert.fail((String)("Record was not correctly partitioned: " + s.toString()));
                }
            }
        });
        result.output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    @Test
    public void testRangeWithDistribution2() throws Exception {
        final TestDataDist2 dist = new TestDataDist2();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(dist.getParallelism());
        DataSource input = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1, (Object)5, (Object)"Hi"), new Tuple3((Object)1, (Object)6, (Object)"Hi"), new Tuple3((Object)1, (Object)7, (Object)"Hi"), new Tuple3((Object)1, (Object)11, (Object)"Hello"), new Tuple3((Object)2, (Object)3, (Object)"World"), new Tuple3((Object)2, (Object)4, (Object)"World"), new Tuple3((Object)2, (Object)5, (Object)"World"), new Tuple3((Object)2, (Object)13, (Object)"Hello World"), new Tuple3((Object)3, (Object)8, (Object)"Say"), new Tuple3((Object)4, (Object)0, (Object)"Why"), new Tuple3((Object)4, (Object)2, (Object)"Java"), new Tuple3((Object)4, (Object)11, (Object)"Say Hello"), new Tuple3((Object)5, (Object)1, (Object)"Hi Java!"), new Tuple3((Object)5, (Object)2, (Object)"Hi Java?"), new Tuple3((Object)5, (Object)3, (Object)"Hi Java again")});
        MapPartitionOperator result = DataSetUtils.partitionByRange((DataSet)input, (DataDistribution)dist, (int[])new int[]{0, 1}).mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>(){

            public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
                int pIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                boolean correctlyPartitioned = true;
                for (Tuple3<Integer, Integer, String> s : values) {
                    Integer[] lower;
                    if (pIdx == 0) {
                        Integer[] upper = dist.boundaries[0];
                        if (((Integer)s.f0).compareTo(upper[0]) > 0 || ((Integer)s.f0).compareTo(upper[0]) == 0 && ((Integer)s.f1).compareTo(upper[1]) > 0) {
                            correctlyPartitioned = false;
                        }
                    } else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
                        lower = dist.boundaries[pIdx - 1];
                        Integer[] upper = dist.boundaries[pIdx];
                        if (((Integer)s.f0).compareTo(upper[0]) > 0 || ((Integer)s.f0).compareTo(upper[0]) == 0 && ((Integer)s.f1).compareTo(upper[1]) > 0 || ((Integer)s.f0).compareTo(lower[0]) < 0 || ((Integer)s.f0).compareTo(lower[0]) == 0 && ((Integer)s.f1).compareTo(lower[1]) <= 0) {
                            correctlyPartitioned = false;
                        }
                    } else {
                        lower = dist.boundaries[pIdx - 1];
                        if (((Integer)s.f0).compareTo(lower[0]) < 0 || ((Integer)s.f0).compareTo(lower[0]) == 0 && ((Integer)s.f1).compareTo(lower[1]) <= 0) {
                            correctlyPartitioned = false;
                        }
                    }
                    if (correctlyPartitioned) continue;
                    Assert.fail((String)("Record was not correctly partitioned: " + s.toString()));
                }
            }
        });
        result.output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    @Test
    public void testPartitionKeyLessDistribution() throws Exception {
        final TestDataDist2 dist = new TestDataDist2();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(dist.getParallelism());
        DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator result = DataSetUtils.partitionByRange(input, (DataDistribution)dist, (int[])new int[]{0}).mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>(){

            public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
                int pIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                for (Tuple3<Integer, Long, String> s : values) {
                    Integer[] lower;
                    boolean correctlyPartitioned = true;
                    if (pIdx == 0) {
                        Integer[] upper = dist.boundaries[0];
                        if (((Integer)s.f0).compareTo(upper[0]) > 0) {
                            correctlyPartitioned = false;
                        }
                    } else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
                        lower = dist.boundaries[pIdx - 1];
                        Integer[] upper = dist.boundaries[pIdx];
                        if (((Integer)s.f0).compareTo(upper[0]) > 0 || ((Integer)s.f0).compareTo(lower[0]) <= 0) {
                            correctlyPartitioned = false;
                        }
                    } else {
                        lower = dist.boundaries[pIdx - 1];
                        if (((Integer)s.f0).compareTo(lower[0]) <= 0) {
                            correctlyPartitioned = false;
                        }
                    }
                    if (correctlyPartitioned) continue;
                    Assert.fail((String)("Record was not correctly partitioned: " + s.toString()));
                }
            }
        });
        result.output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    @Test(expected=IllegalArgumentException.class)
    public void testPartitionMoreThanDistribution() throws Exception {
        TestDataDist2 dist = new TestDataDist2();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
        DataSetUtils.partitionByRange(input, (DataDistribution)dist, (int[])new int[]{0, 1, 2});
    }

    public static class TestDataDist2
    implements DataDistribution {
        public Integer[][] boundaries = new Integer[][]{{1, 6}, {2, 4}, {3, 9}, {4, 1}, {5, 2}};

        public int getParallelism() {
            return this.boundaries.length;
        }

        public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
            return this.boundaries[bucketNum];
        }

        public int getNumberOfFields() {
            return 2;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
        }

        public void write(DataOutputView out) throws IOException {
        }

        public void read(DataInputView in) throws IOException {
        }
    }

    public static class TestDataDist1
    implements DataDistribution {
        public Integer[][] boundaries = new Integer[][]{{4}, {9}, {13}, {18}};

        public int getParallelism() {
            return this.boundaries.length;
        }

        public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
            return this.boundaries[bucketNum];
        }

        public int getNumberOfFields() {
            return 1;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
        }

        public void write(DataOutputView out) throws IOException {
        }

        public void read(DataInputView in) throws IOException {
        }
    }
}

