/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskSelectiveReadingTest {
    @Test
    public void testAnyOrderedReading() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 4"));
        this.testBase(new TestAnyModeReadingStreamOperator("Operator0"), true, expectedOutput, true);
    }

    @Test
    public void testAnyUnorderedReading() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 4"));
        this.testBase(new TestAnyModeReadingStreamOperator("Operator0"), false, expectedOutput, false);
    }

    @Test
    public void testSequentialReading() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 4"));
        this.testBase(new TestSequentialReadingStreamOperator("Operator0"), false, expectedOutput, true);
    }

    @Test
    public void testSpecialRuleReading() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: 4"));
        this.testBase(new SpecialRuleReadingStreamOperator("Operator0", 3, 4, 2), false, expectedOutput, true);
    }

    @Test
    public void testReadFinishedInput() throws Exception {
        block2: {
            try {
                this.testBase(new TestReadFinishedInputStreamOperator(), false, new ConcurrentLinkedQueue<Object>(), true);
                Assert.fail((String)"should throw an IOException");
            }
            catch (Exception t) {
                if (ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"only first input is selected but it is already finished").isPresent()) break block2;
                throw t;
            }
        }
    }

    private void testBase(TwoInputStreamOperator<String, Integer, String> streamOperator, boolean prepareDataBeforeProcessing, ConcurrentLinkedQueue<Object> expectedOutput, boolean orderedCheck) throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TestSelectiveReadingTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator(streamOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        boolean isProcessing = false;
        if (!prepareDataBeforeProcessing) {
            ((TestSelectiveReadingTask)testHarness.getTask()).startProcessing();
            isProcessing = true;
        }
        testHarness.processElement(new StreamRecord((Object)"Hello-1"), 0, 0);
        if (!prepareDataBeforeProcessing) {
            testHarness.waitForInputProcessing();
        }
        testHarness.processElement(new StreamRecord((Object)"Hello-2"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-3"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)1), 1, 0);
        testHarness.processElement(new StreamRecord((Object)2), 1, 0);
        testHarness.processElement(new StreamRecord((Object)3), 1, 0);
        testHarness.processElement(new StreamRecord((Object)4), 1, 0);
        testHarness.endInput();
        if (!isProcessing) {
            ((TestSelectiveReadingTask)testHarness.getTask()).startProcessing();
        }
        testHarness.waitForTaskCompletion(10000L);
        LinkedBlockingQueue<Object> output = testHarness.getOutput();
        if (orderedCheck) {
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, output);
        } else {
            Object[] expectedResult = (String[])expectedOutput.stream().map(record -> ((StreamRecord)record).getValue().toString()).toArray(String[]::new);
            Arrays.sort(expectedResult);
            Object[] result = (String[])output.stream().map(record -> ((StreamRecord)record).getValue().toString()).toArray(String[]::new);
            Arrays.sort(result);
            Assert.assertArrayEquals((String)"Output was not correct.", (Object[])expectedResult, (Object[])result);
        }
    }

    private static class TestReadFinishedInputStreamOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, Integer, String>,
    InputSelectable {
        private InputSelection inputSelection = InputSelection.FIRST;

        TestReadFinishedInputStreamOperator() {
        }

        public InputSelection nextSelection() {
            return this.inputSelection;
        }

        public void processElement1(StreamRecord<String> element) {
        }

        public void processElement2(StreamRecord<Integer> element) {
        }
    }

    private static class SpecialRuleReadingStreamOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, Integer, String>,
    InputSelectable,
    BoundedMultiInput {
        private final String name;
        private final int input1Records;
        private final int input2Records;
        private final int maxContinuousReadingRecords;
        private int input1ReadingRecords;
        private int input2ReadingRecords;
        private int continuousReadingRecords;
        private InputSelection inputSelection;

        SpecialRuleReadingStreamOperator(String name, int input1Records, int input2Records, int maxContinuousReadingRecords) {
            this.name = name;
            this.input1Records = input1Records;
            this.input2Records = input2Records;
            this.maxContinuousReadingRecords = maxContinuousReadingRecords;
            this.input1ReadingRecords = 0;
            this.input2ReadingRecords = 0;
            this.continuousReadingRecords = 0;
            this.inputSelection = InputSelection.FIRST;
        }

        public InputSelection nextSelection() {
            return this.inputSelection;
        }

        public void processElement1(StreamRecord<String> element) {
            this.output.collect((Object)element.replace((Object)("[" + this.name + "-1]: " + (String)element.getValue())));
            ++this.input1ReadingRecords;
            ++this.continuousReadingRecords;
            if (this.continuousReadingRecords == this.maxContinuousReadingRecords) {
                this.continuousReadingRecords = 0;
                if (this.input2ReadingRecords < this.input2Records) {
                    this.inputSelection = InputSelection.SECOND;
                    return;
                }
            }
            this.inputSelection = InputSelection.FIRST;
        }

        public void processElement2(StreamRecord<Integer> element) {
            this.output.collect((Object)element.replace((Object)("[" + this.name + "-2]: " + element.getValue())));
            ++this.input2ReadingRecords;
            ++this.continuousReadingRecords;
            if (this.continuousReadingRecords == this.maxContinuousReadingRecords) {
                this.continuousReadingRecords = 0;
                if (this.input1ReadingRecords < this.input1Records) {
                    this.inputSelection = InputSelection.FIRST;
                    return;
                }
            }
            this.inputSelection = InputSelection.SECOND;
        }

        public void endInput(int inputId) {
            this.inputSelection = inputId == 1 ? InputSelection.SECOND : InputSelection.FIRST;
        }
    }

    private static class TestSelectiveReadingTask<IN1, IN2, OUT>
    extends TwoInputStreamTask<IN1, IN2, OUT> {
        private volatile boolean started = false;

        TestSelectiveReadingTask(Environment env) throws Exception {
            super(env);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (!this.started) {
                TestSelectiveReadingTask testSelectiveReadingTask = this;
                synchronized (testSelectiveReadingTask) {
                    ((Object)((Object)this)).wait();
                }
            }
            super.processInput(controller);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void startProcessing() {
            this.started = true;
            TestSelectiveReadingTask testSelectiveReadingTask = this;
            synchronized (testSelectiveReadingTask) {
                ((Object)((Object)this)).notifyAll();
            }
        }
    }
}

