/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskSelectiveReadingITCase {
    @Test
    public void testSequentialReading() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source0 = env.addSource((SourceFunction)new TestStringSource("Source0", new String[]{"Hello-1", "Hello-2", "Hello-3", "Hello-4", "Hello-5", "Hello-6"}));
        DataStreamSource source1 = env.addSource((SourceFunction)new TestIntegerSource("Source1", new Integer[]{1, 2, 3})).setParallelism(2);
        TestListResultSink resultSink = new TestListResultSink();
        TestSequentialReadingStreamOperator twoInputStreamOperator = new TestSequentialReadingStreamOperator("Operator0");
        twoInputStreamOperator.setChainingStrategy(ChainingStrategy.NEVER);
        source0.connect((DataStream)source1).transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)twoInputStreamOperator).addSink((SinkFunction)resultSink);
        env.execute("Selective reading test");
        List result = resultSink.getResult();
        List<String> expected1 = Arrays.asList("[Operator0-1]: [Source0-0]: Hello-1", "[Operator0-1]: [Source0-0]: Hello-2", "[Operator0-1]: [Source0-0]: Hello-3", "[Operator0-1]: [Source0-0]: Hello-4", "[Operator0-1]: [Source0-0]: Hello-5", "[Operator0-1]: [Source0-0]: Hello-6");
        List<String> expected2 = Arrays.asList("[Operator0-2]: 1", "[Operator0-2]: 2", "[Operator0-2]: 3", "[Operator0-2]: 2", "[Operator0-2]: 4", "[Operator0-2]: 6");
        Collections.sort(expected2);
        Assert.assertEquals((long)(expected1.size() + expected2.size()), (long)result.size());
        Assert.assertEquals(expected1, result.subList(0, expected1.size()));
        List result2 = result.subList(expected1.size(), expected1.size() + expected2.size());
        Collections.sort(result2);
        Assert.assertEquals(expected2, result2);
    }

    private static class TestIntegerSource
    extends TestSource<Integer> {
        public TestIntegerSource(String name, Integer[] elements) {
            super(name, elements);
        }

        @Override
        protected Integer outValue(Integer inValue, int subTaskIndex) {
            return inValue * (subTaskIndex + 1);
        }
    }

    private static class TestStringSource
    extends TestSource<String> {
        public TestStringSource(String name, String[] elements) {
            super(name, elements);
        }

        @Override
        protected String outValue(String inValue, int subTaskIndex) {
            return "[" + this.name + "-" + subTaskIndex + "]: " + inValue;
        }
    }

    private static abstract class TestSource<T>
    extends RichParallelSourceFunction<T> {
        private static final long serialVersionUID = 1L;
        protected final String name;
        private volatile boolean running = true;
        private transient RuntimeContext context;
        private final T[] elements;

        public TestSource(String name, T[] elements) {
            this.name = name;
            this.elements = elements;
        }

        public void open(OpenContext openContext) throws Exception {
            this.context = this.getRuntimeContext();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            for (int elementIndex = 0; this.running && elementIndex < this.elements.length; ++elementIndex) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect(this.outValue(this.elements[elementIndex], this.context.getTaskInfo().getIndexOfThisSubtask()));
                    continue;
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        protected abstract T outValue(T var1, int var2);
    }
}

