/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sort;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.sort.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.sort.CollectionDataInput;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class MultiInputSortingDataInputsTest {
    @Test
    public void passThroughThenSortedInput() throws Exception {
        this.twoInputOrderTest(1, 0);
    }

    @Test
    public void sortedThenPassThroughInput() throws Exception {
        this.twoInputOrderTest(0, 1);
    }

    public void twoInputOrderTest(int preferredIndex, int sortedIndex) throws Exception {
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        List<StreamElement> sortedInputElements = Arrays.asList(new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 1L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 3L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)2, 2L), Watermark.MAX_WATERMARK);
        CollectionDataInput sortedInput = new CollectionDataInput(sortedInputElements, sortedIndex);
        List<StreamElement> preferredInputElements = Arrays.asList(new StreamRecord((Object)99, 3L), new StreamRecord((Object)99, 1L), new Watermark(99L));
        CollectionDataInput preferredInput = new CollectionDataInput(preferredInputElements, preferredIndex);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> value;
        try (MockEnvironment environment = MockEnvironment.builder().build();){
            MultiInputSortingDataInput.SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs((AbstractInvokable)new DummyInvokable(), (StreamTaskInput[])new StreamTaskInput[]{sortedInput}, (KeySelector[])new KeySelector[]{keySelector}, (TypeSerializer[])new TypeSerializer[]{new IntSerializer()}, (TypeSerializer)new IntSerializer(), (StreamTaskInput[])new StreamTaskInput[]{preferredInput}, (MemoryManager)environment.getMemoryManager(), (IOManager)environment.getIOManager(), (boolean)true, (double)1.0, (Configuration)new Configuration());
            StreamTaskInput[] sortingDataInputs = selectableSortingInputs.getSortedInputs();
            StreamTaskInput[] preferredDataInputs = selectableSortingInputs.getPassThroughInputs();
            try (StreamTaskInput preferredTaskInput = preferredDataInputs[0];
                 StreamTaskInput sortedTaskInput = sortingDataInputs[0];){
                InputStatus inputStatus;
                MultipleInputSelectionHandler selectionHandler = new MultipleInputSelectionHandler(selectableSortingInputs.getInputSelectable(), 2);
                StreamOneInputProcessor[] inputProcessors = new StreamOneInputProcessor[2];
                inputProcessors[preferredIndex] = new StreamOneInputProcessor(preferredTaskInput, collectingDataOutput, (BoundedMultiInput)new DummyOperatorChain());
                inputProcessors[sortedIndex] = new StreamOneInputProcessor(sortedTaskInput, collectingDataOutput, (BoundedMultiInput)new DummyOperatorChain());
                StreamMultipleInputProcessor processor = new StreamMultipleInputProcessor(selectionHandler, inputProcessors);
                while ((inputStatus = processor.processInput()) != InputStatus.END_OF_INPUT) {
                }
            }
        }
        Assert.assertThat(collectingDataOutput.events, (Matcher)CoreMatchers.equalTo(Arrays.asList(new StreamRecord((Object)99, 3L), new StreamRecord((Object)99, 1L), new Watermark(99L), new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 2L), new StreamRecord((Object)2, 3L), Watermark.MAX_WATERMARK)));
    }

    @Test
    public void simpleFixedLengthKeySorting() throws Exception {
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        List<StreamElement> elements = Arrays.asList(new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 1L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 3L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)2, 2L), Watermark.MAX_WATERMARK);
        CollectionDataInput dataInput1 = new CollectionDataInput(elements, 0);
        CollectionDataInput dataInput2 = new CollectionDataInput(elements, 1);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> value;
        try (MockEnvironment environment = MockEnvironment.builder().build();){
            MultiInputSortingDataInput.SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs((AbstractInvokable)new DummyInvokable(), (StreamTaskInput[])new StreamTaskInput[]{dataInput1, dataInput2}, (KeySelector[])new KeySelector[]{keySelector, keySelector}, (TypeSerializer[])new TypeSerializer[]{new IntSerializer(), new IntSerializer()}, (TypeSerializer)new IntSerializer(), (StreamTaskInput[])new StreamTaskInput[0], (MemoryManager)environment.getMemoryManager(), (IOManager)environment.getIOManager(), (boolean)true, (double)1.0, (Configuration)new Configuration());
            StreamTaskInput[] sortingDataInputs = selectableSortingInputs.getSortedInputs();
            try (StreamTaskInput input1 = sortingDataInputs[0];
                 StreamTaskInput input2 = sortingDataInputs[1];){
                InputStatus inputStatus;
                MultipleInputSelectionHandler selectionHandler = new MultipleInputSelectionHandler(selectableSortingInputs.getInputSelectable(), 2);
                StreamMultipleInputProcessor processor = new StreamMultipleInputProcessor(selectionHandler, new StreamOneInputProcessor[]{new StreamOneInputProcessor(input1, collectingDataOutput, (BoundedMultiInput)new DummyOperatorChain()), new StreamOneInputProcessor(input2, collectingDataOutput, (BoundedMultiInput)new DummyOperatorChain())});
                while ((inputStatus = processor.processInput()) != InputStatus.END_OF_INPUT) {
                }
            }
        }
        Assert.assertThat(collectingDataOutput.events, (Matcher)CoreMatchers.equalTo(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 2L), new StreamRecord((Object)2, 2L), new StreamRecord((Object)2, 3L), Watermark.MAX_WATERMARK, new StreamRecord((Object)2, 3L), Watermark.MAX_WATERMARK)));
    }

    @Test
    public void watermarkPropagation() throws Exception {
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        List<StreamElement> elements1 = Arrays.asList(new StreamRecord((Object)2, 3L), new Watermark(3L), new StreamRecord((Object)3, 3L), new Watermark(7L));
        List<StreamElement> elements2 = Arrays.asList(new StreamRecord((Object)0, 3L), new Watermark(1L), new StreamRecord((Object)1, 3L), new Watermark(3L));
        CollectionDataInput dataInput1 = new CollectionDataInput(elements1, 0);
        CollectionDataInput dataInput2 = new CollectionDataInput(elements2, 1);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> value;
        try (MockEnvironment environment = MockEnvironment.builder().build();){
            MultiInputSortingDataInput.SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs((AbstractInvokable)new DummyInvokable(), (StreamTaskInput[])new StreamTaskInput[]{dataInput1, dataInput2}, (KeySelector[])new KeySelector[]{keySelector, keySelector}, (TypeSerializer[])new TypeSerializer[]{new IntSerializer(), new IntSerializer()}, (TypeSerializer)new IntSerializer(), (StreamTaskInput[])new StreamTaskInput[0], (MemoryManager)environment.getMemoryManager(), (IOManager)environment.getIOManager(), (boolean)true, (double)1.0, (Configuration)new Configuration());
            StreamTaskInput[] sortingDataInputs = selectableSortingInputs.getSortedInputs();
            try (StreamTaskInput input1 = sortingDataInputs[0];
                 StreamTaskInput input2 = sortingDataInputs[1];){
                InputStatus inputStatus;
                MultipleInputSelectionHandler selectionHandler = new MultipleInputSelectionHandler(selectableSortingInputs.getInputSelectable(), 2);
                StreamMultipleInputProcessor processor = new StreamMultipleInputProcessor(selectionHandler, new StreamOneInputProcessor[]{new StreamOneInputProcessor(input1, collectingDataOutput, (BoundedMultiInput)new DummyOperatorChain()), new StreamOneInputProcessor(input2, collectingDataOutput, (BoundedMultiInput)new DummyOperatorChain())});
                while ((inputStatus = processor.processInput()) != InputStatus.END_OF_INPUT) {
                }
            }
        }
        Assert.assertThat(collectingDataOutput.events, (Matcher)CoreMatchers.equalTo(Arrays.asList(new StreamRecord((Object)0, 3L), new StreamRecord((Object)1, 3L), new Watermark(3L), new StreamRecord((Object)2, 3L), new StreamRecord((Object)3, 3L), new Watermark(7L))));
    }

    private static class DummyOperatorChain
    implements BoundedMultiInput {
        private DummyOperatorChain() {
        }

        public void endInput(int inputId) {
        }
    }
}

