/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api.datastream;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class DataStreamWithSharedPartitionNodeITCase {
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(3).setNumberTaskManagers(1).build());

    @Test
    public void testJobWithSharePartitionNode() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream source = env.fromData((Object[])new Integer[]{1, 2, 3, 4}).partitionCustom((Partitioner)new TestPartitioner(), (KeySelector & Serializable)f -> f);
        source.addSink((SinkFunction)new CollectSink("first"));
        source.addSink((SinkFunction)new CollectSink("second")).setParallelism(2);
        env.execute();
        this.checkSinkResult("first-0", Arrays.asList(1, 2, 3, 4));
        this.checkSinkResult("second-0", Arrays.asList(1, 3));
        this.checkSinkResult("second-1", Arrays.asList(2, 4));
    }

    private void checkSinkResult(String nameAndIndex, List<Integer> expected) {
        List actualResult = (List)CollectSink.result.get(nameAndIndex);
        Assert.assertEquals(expected, (Object)actualResult);
    }

    private static class CollectSink
    extends RichSinkFunction<Integer> {
        private static final Object resultLock = new Object();
        @GuardedBy(value="resultLock")
        private static final Map<String, List<Integer>> result = new HashMap<String, List<Integer>>();
        private final String name;

        public CollectSink(String name) {
            this.name = name;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(Integer value, SinkFunction.Context context) throws Exception {
            Object object = resultLock;
            synchronized (object) {
                String key = this.name + "-" + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                result.compute(key, (k, v) -> v == null ? new ArrayList() : v).add(value);
            }
        }
    }

    private static class TestPartitioner
    implements Partitioner<Integer> {
        private int nextChannelToSendTo = -1;

        private TestPartitioner() {
        }

        public int partition(Integer key, int numPartitions) {
            this.nextChannelToSendTo = (this.nextChannelToSendTo + 1) % numPartitions;
            return this.nextChannelToSendTo;
        }
    }
}

