/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class SourceNAryInputChainingITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Test
    public void testDirectSourcesOnlyChainCreation() throws Exception {
        DataStream<Long> stream = this.createProgramWithSourcesOnly();
        JobGraph jobGraph = SourceNAryInputChainingITCase.sinkAndCompileJobGraph(stream);
        Assert.assertEquals((long)1L, (long)jobGraph.getNumberOfVertices());
    }

    @Test
    public void testDirectSourcesOnlyExecution() throws Exception {
        DataStream<Long> stream = this.createProgramWithSourcesOnly();
        List result = DataStreamUtils.collectBoundedStream(stream, (String)"N-Ary Source Chaining Test Program");
        SourceNAryInputChainingITCase.verifySequence(result, 1L, 30L);
    }

    @Test
    public void testMixedInputsChainCreation() throws Exception {
        DataStream<Long> stream = this.createProgramWithMixedInputs();
        JobGraph jobGraph = SourceNAryInputChainingITCase.sinkAndCompileJobGraph(stream);
        Assert.assertEquals((long)3L, (long)jobGraph.getNumberOfVertices());
    }

    @Test
    public void testMixedInputsExecution() throws Exception {
        DataStream<Long> stream = this.createProgramWithMixedInputs();
        List result = DataStreamUtils.collectBoundedStream(stream, (String)"N-Ary Source Chaining Test Program");
        SourceNAryInputChainingITCase.verifySequence(result, 1L, 30L);
    }

    @Test
    public void testMixedInputsWithUnionChainCreation() throws Exception {
        DataStream<Long> stream = this.createProgramWithUnionInput();
        JobGraph jobGraph = SourceNAryInputChainingITCase.sinkAndCompileJobGraph(stream);
        Assert.assertEquals((long)4L, (long)jobGraph.getNumberOfVertices());
    }

    @Test
    public void testMixedInputsWithUnionExecution() throws Exception {
        DataStream<Long> stream = this.createProgramWithUnionInput();
        List result = DataStreamUtils.collectBoundedStream(stream, (String)"N-Ary Source Chaining Test Program");
        SourceNAryInputChainingITCase.verifySequence(result, 1L, 40L);
    }

    @Test
    public void testMixedInputsWithMultipleUnionsChainCreation() throws Exception {
        DataStream<Long> stream = this.createProgramWithMultipleUnionInputs();
        JobGraph jobGraph = SourceNAryInputChainingITCase.sinkAndCompileJobGraph(stream);
        Assert.assertEquals((long)6L, (long)jobGraph.getNumberOfVertices());
    }

    @Test
    public void testMixedInputsWithMultipleUnionsExecution() throws Exception {
        DataStream<Long> stream = this.createProgramWithMultipleUnionInputs();
        List result = DataStreamUtils.collectBoundedStream(stream, (String)"N-Ary Source Chaining Test Program");
        SourceNAryInputChainingITCase.verifySequence(result, 1L, 60L);
    }

    private DataStream<Long> createProgramWithSourcesOnly() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.getConfig().enableObjectReuse();
        DataStreamSource source1 = env.fromSource((Source)new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1");
        DataStreamSource source2 = env.fromSource((Source)new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2");
        DataStreamSource source3 = env.fromSource((Source)new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3");
        return SourceNAryInputChainingITCase.nAryInputStreamOperation(new DataStream[]{source1, source2, source3});
    }

    private DataStream<Long> createProgramWithMixedInputs() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.getConfig().enableObjectReuse();
        DataStreamSource source1 = env.fromSource((Source)new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1");
        DataStreamSource source2 = env.fromSource((Source)new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2");
        DataStreamSource source3 = env.fromSource((Source)new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3");
        SingleOutputStreamOperator stream1 = source1.map((MapFunction & Serializable)v -> v);
        SingleOutputStreamOperator stream3 = source3.map((MapFunction & Serializable)v -> v);
        return SourceNAryInputChainingITCase.nAryInputStreamOperation(new DataStream[]{stream1, source2, stream3});
    }

    private DataStream<Long> createProgramWithUnionInput() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.getConfig().enableObjectReuse();
        DataStreamSource source1 = env.fromSource((Source)new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1");
        DataStreamSource source2 = env.fromSource((Source)new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2");
        DataStreamSource source3 = env.fromSource((Source)new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3");
        DataStreamSource source4 = env.fromSource((Source)new NumberSequenceSource(31L, 40L), WatermarkStrategy.noWatermarks(), "source-4");
        return SourceNAryInputChainingITCase.nAryInputStreamOperation(new DataStream[]{source1.map((MapFunction & Serializable)v -> v), source2.union(new DataStream[]{source3}), source4});
    }

    private DataStream<Long> createProgramWithMultipleUnionInputs() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.getConfig().enableObjectReuse();
        DataStreamSource source1 = env.fromSource((Source)new NumberSequenceSource(1L, 10L), WatermarkStrategy.noWatermarks(), "source-1");
        DataStreamSource source2 = env.fromSource((Source)new NumberSequenceSource(11L, 20L), WatermarkStrategy.noWatermarks(), "source-2");
        DataStreamSource source3 = env.fromSource((Source)new NumberSequenceSource(21L, 30L), WatermarkStrategy.noWatermarks(), "source-3");
        DataStreamSource source4 = env.fromSource((Source)new NumberSequenceSource(31L, 40L), WatermarkStrategy.noWatermarks(), "source-4");
        DataStreamSource source5 = env.fromSource((Source)new NumberSequenceSource(41L, 50L), WatermarkStrategy.noWatermarks(), "source-5");
        DataStreamSource source6 = env.fromSource((Source)new NumberSequenceSource(51L, 60L), WatermarkStrategy.noWatermarks(), "source-6");
        return SourceNAryInputChainingITCase.nAryInputStreamOperation(new DataStream[]{source1.map((MapFunction & Serializable)v -> v), source2.union(new DataStream[]{source3}), source4.map((MapFunction & Serializable)v -> v).union(new DataStream[]{source5.map((MapFunction & Serializable)v -> v)}), source6});
    }

    private static DataStream<Long> nAryInputStreamOperation(DataStream<?> ... inputs) {
        StreamExecutionEnvironment env = inputs[0].getExecutionEnvironment();
        MultipleInputTransformation transform = new MultipleInputTransformation("MultipleInputOperator", (StreamOperatorFactory)new NAryUnionOpFactory(inputs.length), Types.LONG, env.getParallelism());
        for (DataStream<?> input : inputs) {
            transform.addInput(input.getTransformation());
        }
        transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        env.addOperator((Transformation)transform);
        return new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)transform);
    }

    private static JobGraph sinkAndCompileJobGraph(DataStream<?> stream) {
        stream.sinkTo((Sink)new DiscardingSink());
        StreamGraph streamGraph = stream.getExecutionEnvironment().getStreamGraph();
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
    }

    private static void verifySequence(List<Long> sequence, long from, long to) {
        if ((long)sequence.size() != to - from + 1L) {
            Assert.fail((String)String.format("Expected: Sequence [%d, %d]. Found: %s", from, to, sequence));
        }
        ArrayList<Long> list = new ArrayList<Long>(sequence);
        list.sort(Long::compareTo);
        int pos = 0;
        long value = from;
        while (value <= to) {
            if (value != list.get(pos)) {
                Assert.fail((String)String.format("Expected: Sequence [%d, %d]. Found: %s", from, to, list));
            }
            ++value;
            ++pos;
        }
    }

    private static final class PassThoughInput<T>
    extends AbstractInput<T, T> {
        PassThoughInput(AbstractStreamOperatorV2<T> owner, int inputId) {
            super(owner, inputId);
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }
    }

    private static final class NAryUnionOpFactory
    extends AbstractStreamOperatorFactory<Long> {
        private final int numInputs;

        NAryUnionOpFactory(int numInputs) {
            this.numInputs = numInputs;
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            NAryUnionOp<Long> operator = new NAryUnionOp<Long>(parameters, this.numInputs);
            return (T)((Object)operator);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return NAryUnionOp.class;
        }
    }

    private static final class NAryUnionOp<T>
    extends AbstractStreamOperatorV2<T>
    implements MultipleInputStreamOperator<T> {
        private final int numInputs;

        public NAryUnionOp(StreamOperatorParameters<T> parameters, int numInputs) {
            super(parameters, numInputs);
            this.numInputs = numInputs;
        }

        public List<Input> getInputs() {
            ArrayList<Input> inputs = new ArrayList<Input>();
            for (int i = 1; i <= this.numInputs; ++i) {
                inputs.add((Input)new PassThoughInput(this, i));
            }
            return inputs;
        }
    }
}

