/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StreamingJobGraphGeneratorTest
extends TestLogger {
    @Test
    public void testParallelismOneNotChained() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy(new int[]{0}).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.addSink((SinkFunction)new SinkFunction<Tuple2<String, String>>(){

            public void invoke(Tuple2<String, String> value) {
            }
        });
        StreamGraph streamGraph = env.getStreamGraph("test job");
        JobGraph jobGraph = streamGraph.getJobGraph();
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)jobGraph.getNumberOfVertices());
        Assert.assertEquals((long)1L, (long)((JobVertex)verticesSorted.get(0)).getParallelism());
        Assert.assertEquals((long)1L, (long)((JobVertex)verticesSorted.get(1)).getParallelism());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapSinkVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)mapSinkVertex.getInputs().get(0)).getSource().getResultType());
    }

    @Test
    public void testDisabledCheckpointing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{0}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        Assert.assertFalse((String)"Checkpointing enabled", (boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
        Assert.assertEquals((long)Long.MAX_VALUE, (long)snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
        Assert.assertFalse((boolean)snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        StreamConfig streamConfig = new StreamConfig(((JobVertex)verticesSorted.get(0)).getConfiguration());
        Assert.assertEquals((Object)CheckpointingMode.AT_LEAST_ONCE, (Object)streamConfig.getCheckpointMode());
    }

    @Test
    public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{0}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        Assert.assertFalse((String)"Checkpointing enabled", (boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled());
        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        StreamConfig streamConfig = new StreamConfig(((JobVertex)verticesSorted.get(0)).getConfiguration());
        Assert.assertEquals((Object)CheckpointingMode.AT_LEAST_ONCE, (Object)streamConfig.getCheckpointMode());
        Assert.assertFalse((boolean)streamConfig.isUnalignedCheckpointsEnabled());
    }

    @Test
    public void testUnalignedCheckAndAtLeastOnce() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{0}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        StreamConfig streamConfig = new StreamConfig(((JobVertex)verticesSorted.get(0)).getConfiguration());
        Assert.assertEquals((Object)CheckpointingMode.AT_LEAST_ONCE, (Object)streamConfig.getCheckpointMode());
        Assert.assertFalse((boolean)streamConfig.isUnalignedCheckpointsEnabled());
    }

    @Test
    public void generatorForwardsSavepointRestoreSettings() {
        StreamGraph streamGraph = new StreamGraph(new ExecutionConfig(), new CheckpointConfig(), SavepointRestoreSettings.forPath((String)"hello"));
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
        Assert.assertThat((Object)savepointRestoreSettings.getRestorePath(), (Matcher)CoreMatchers.is((Object)"hello"));
    }

    @Test
    public void testChainStartEndSetting() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction)new MapFunction<Integer, Integer>(){

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapPrintVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)mapPrintVertex.getInputs().get(0)).getSource().getResultType());
        StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
        StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
        Map chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(((Object)((Object)this)).getClass().getClassLoader());
        StreamConfig printConfig = (StreamConfig)chainedConfigs.values().iterator().next();
        Assert.assertTrue((boolean)sourceConfig.isChainStart());
        Assert.assertTrue((boolean)sourceConfig.isChainEnd());
        Assert.assertTrue((boolean)mapConfig.isChainStart());
        Assert.assertFalse((boolean)mapConfig.isChainEnd());
        Assert.assertFalse((boolean)printConfig.isChainStart());
        Assert.assertTrue((boolean)printConfig.isChainEnd());
    }

    @Test
    public void testOperatorCoordinatorAddedToJobVertex() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource stream = env.fromSource((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestingSource");
        OneInputTransformation resultTransform = new OneInputTransformation(stream.getTransformation(), "AnyName", (StreamOperatorFactory)new CoordinatedTransformOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, env.getParallelism());
        new TestingSingleOutputStreamOperator(env, resultTransform).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        Assert.assertEquals((long)2L, (long)jobGraph.getVerticesAsArray()[0].getOperatorCoordinators().size());
    }

    @Test
    public void testResourcesForChainedSourceSink() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder((double)0.1, (int)100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder((double)0.2, (int)200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder((double)0.3, (int)300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder((double)0.4, (int)400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder((double)0.5, (int)500).build();
        Method opMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        Method sinkMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new ParallelSourceFunction<Tuple2<Integer, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            }

            public void cancel() {
            }
        });
        opMethod.invoke((Object)source, resource1);
        SingleOutputStreamOperator map = source.map((MapFunction)new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
                return value;
            }
        });
        opMethod.invoke((Object)map, resource2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction)new FilterFunction<Tuple2<Integer, Integer>>(){

            public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
                return false;
            }
        });
        opMethod.invoke((Object)filter, resource3);
        SingleOutputStreamOperator reduce = filter.keyBy(new int[]{0}).reduce((ReduceFunction)new ReduceFunction<Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        });
        opMethod.invoke((Object)reduce, resource4);
        DataStreamSink sink = reduce.addSink((SinkFunction)new SinkFunction<Tuple2<Integer, Integer>>(){

            public void invoke(Tuple2<Integer, Integer> value) throws Exception {
            }
        });
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        JobVertex sourceMapFilterVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex reduceSinkVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertTrue((boolean)sourceMapFilterVertex.getMinResources().equals((Object)resource3.merge(resource2).merge(resource1)));
        Assert.assertTrue((boolean)reduceSinkVertex.getPreferredResources().equals((Object)resource4.merge(resource5)));
    }

    @Test
    public void testResourcesForIteration() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder((double)0.1, (int)100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder((double)0.2, (int)200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder((double)0.3, (int)300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder((double)0.4, (int)400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder((double)0.5, (int)500).build();
        Method opMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        Method sinkMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new ParallelSourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        }).name("test_source");
        opMethod.invoke((Object)source, resource1);
        IterativeStream iteration = source.iterate(3000L);
        opMethod.invoke((Object)iteration, resource2);
        SingleOutputStreamOperator flatMap = iteration.flatMap((FlatMapFunction)new FlatMapFunction<Integer, Integer>(){

            public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
            }
        }).name("test_flatMap");
        opMethod.invoke((Object)flatMap, resource3);
        SingleOutputStreamOperator increment = flatMap.filter((FilterFunction)new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return false;
            }
        }).name("test_filter");
        opMethod.invoke((Object)increment, resource4);
        DataStreamSink sink = iteration.closeWith((DataStream)increment).addSink((SinkFunction)new SinkFunction<Integer>(){

            public void invoke(Integer value) throws Exception {
            }
        }).disableChaining().name("test_sink");
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("test_source")) {
                Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource1));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Source")) {
                Assert.assertTrue((boolean)jobVertex.getPreferredResources().equals((Object)resource2));
                continue;
            }
            if (jobVertex.getName().contains("test_flatMap")) {
                Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource3.merge(resource4)));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Tail")) {
                Assert.assertTrue((boolean)jobVertex.getPreferredResources().equals((Object)ResourceSpec.DEFAULT));
                continue;
            }
            if (!jobVertex.getName().contains("test_sink")) continue;
            Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource5));
        }
    }

    @Test
    public void testInputOutputFormat() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new InputFormatSourceFunction((InputFormat)new TypeSerializerInputFormat(TypeInformation.of(Long.class)), TypeInformation.of(Long.class)), TypeInformation.of(Long.class)).name("source");
        source.writeUsingOutputFormat((OutputFormat)new DiscardingOutputFormat()).name("sink1");
        source.writeUsingOutputFormat((OutputFormat)new DiscardingOutputFormat()).name("sink2");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assert.assertEquals((long)1L, (long)jobGraph.getNumberOfVertices());
        JobVertex jobVertex = (JobVertex)jobGraph.getVertices().iterator().next();
        Assert.assertTrue((boolean)(jobVertex instanceof InputOutputFormatVertex));
        InputOutputFormatContainer formatContainer = new InputOutputFormatContainer(new TaskConfig(jobVertex.getConfiguration()), Thread.currentThread().getContextClassLoader());
        Map inputFormats = formatContainer.getInputFormats();
        Map outputFormats = formatContainer.getOutputFormats();
        Assert.assertEquals((long)1L, (long)inputFormats.size());
        Assert.assertEquals((long)2L, (long)outputFormats.size());
        HashMap<String, OperatorID> nameToOperatorIds = new HashMap<String, OperatorID>();
        StreamConfig headConfig = new StreamConfig(jobVertex.getConfiguration());
        nameToOperatorIds.put(headConfig.getOperatorName(), headConfig.getOperatorID());
        Map chainedConfigs = headConfig.getTransitiveChainedTaskConfigs(Thread.currentThread().getContextClassLoader());
        for (StreamConfig config : chainedConfigs.values()) {
            nameToOperatorIds.put(config.getOperatorName(), config.getOperatorID());
        }
        InputFormat sourceFormat = (InputFormat)((UserCodeWrapper)inputFormats.get(nameToOperatorIds.get("Source: source"))).getUserCodeObject();
        Assert.assertTrue((boolean)(sourceFormat instanceof TypeSerializerInputFormat));
        OutputFormat sinkFormat1 = (OutputFormat)((UserCodeWrapper)outputFormats.get(nameToOperatorIds.get("Sink: sink1"))).getUserCodeObject();
        Assert.assertTrue((boolean)(sinkFormat1 instanceof DiscardingOutputFormat));
        OutputFormat sinkFormat2 = (OutputFormat)((UserCodeWrapper)outputFormats.get(nameToOperatorIds.get("Sink: sink2"))).getUserCodeObject();
        Assert.assertTrue((boolean)(sinkFormat2 instanceof DiscardingOutputFormat));
    }

    @Test
    public void testCoordinatedOperator() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestSource");
        source.addSink((SinkFunction)new DiscardingSink());
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assert.assertEquals((long)1L, (long)jobGraph.getNumberOfVertices());
        JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
        List coordinatorProviders = jobVertex.getOperatorCoordinators();
        Assert.assertEquals((long)1L, (long)coordinatorProviders.size());
        ClassLoader classLoader = ((Object)((Object)this)).getClass().getClassLoader();
        Assert.assertEquals(SourceOperatorStreamTask.class, (Object)jobVertex.getInvokableClass(classLoader));
        StreamOperatorFactory operatorFactory = new StreamConfig(jobVertex.getConfiguration()).getStreamOperatorFactory(classLoader);
        Assert.assertTrue((boolean)(operatorFactory instanceof SourceOperatorFactory));
    }

    @Test
    public void testShuffleModePipelined() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.PIPELINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.PIPELINED));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)verticesSorted.size());
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testShuffleModeBatch() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.BATCH));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.BATCH));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)3L, (long)verticesSorted.size());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)mapVertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testShuffleModeUndefined() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.UNDEFINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.UNDEFINED));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)verticesSorted.size());
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testStreamingJobTypeByDefault() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new String[]{"test"}).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        Assert.assertEquals((Object)JobType.STREAMING, (Object)jobGraph.getJobType());
    }

    @Test
    public void testBatchJobType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.fromElements((Object[])new String[]{"test"}).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        Assert.assertEquals((Object)JobType.BATCH, (Object)jobGraph.getJobType());
    }

    @Test
    public void testPartitionTypesInBatchMode() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(4);
        env.disableOperatorChaining();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1});
        source.map((MapFunction & Serializable)value -> value).setParallelism(1).rescale().map((MapFunction & Serializable)value -> value).rebalance().map((MapFunction & Serializable)value -> value).keyBy((KeySelector & Serializable)value -> value).map((MapFunction & Serializable)value -> value).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertThat(verticesSorted.get(0), this.hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED));
        Assert.assertThat(verticesSorted.get(1), this.hasOutputPartitionType(ResultPartitionType.BLOCKING));
        Assert.assertThat(verticesSorted.get(2), this.hasOutputPartitionType(ResultPartitionType.BLOCKING));
        Assert.assertThat(verticesSorted.get(3), this.hasOutputPartitionType(ResultPartitionType.BLOCKING));
        Assert.assertThat(verticesSorted.get(4), this.hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED));
    }

    private Matcher<JobVertex> hasOutputPartitionType(ResultPartitionType partitionType) {
        return new FeatureMatcher<JobVertex, ResultPartitionType>(CoreMatchers.equalTo((Object)partitionType), "output partition type", "output partition type"){

            protected ResultPartitionType featureValueOf(JobVertex actual) {
                return ((IntermediateDataSet)actual.getProducedDataSets().get(0)).getResultType();
            }
        };
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testConflictShuffleModeWithBufferTimeout() {
        this.testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
    }

    @Test
    public void testNormalShuffleModeWithBufferTimeout() {
        this.testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED);
    }

    private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode shuffleMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setBufferTimeout(100L);
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        PartitionTransformation transformation = new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), shuffleMode);
        DataStream partitionStream = new DataStream(env, (Transformation)transformation);
        partitionStream.map((MapFunction & Serializable)value -> value).print();
        StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    @Test
    public void testIteration() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        IterativeStream iteration = source.iterate(3000L);
        iteration.name("iteration").setParallelism(2);
        SingleOutputStreamOperator map = iteration.map((MapFunction & Serializable)x -> x + 1).name("map").setParallelism(2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction & Serializable)x -> false).name("filter").setParallelism(2);
        iteration.closeWith((DataStream)filter).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        SlotSharingGroup slotSharingGroup = jobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
        Assert.assertNotNull((Object)slotSharingGroup);
        CoLocationGroup iterationSourceCoLocationGroup = null;
        CoLocationGroup iterationSinkCoLocationGroup = null;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            Assert.assertEquals((Object)slotSharingGroup, (Object)jobVertex.getSlotSharingGroup());
            if (jobVertex.getName().startsWith("IterationSource")) {
                iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
                Assert.assertTrue((boolean)iterationSourceCoLocationGroup.getVertexIds().contains(jobVertex.getID()));
                continue;
            }
            if (jobVertex.getName().startsWith("IterationSink")) {
                iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
                Assert.assertTrue((boolean)iterationSinkCoLocationGroup.getVertexIds().contains(jobVertex.getID()));
                continue;
            }
            Assert.assertNull((Object)jobVertex.getCoLocationGroup());
        }
        Assert.assertNotNull(iterationSourceCoLocationGroup);
        Assert.assertNotNull(iterationSinkCoLocationGroup);
        Assert.assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup);
    }

    @Test
    public void testDefaultJobType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), env.getConfig(), env.getCheckpointConfig()).generate();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assert.assertEquals((Object)JobType.STREAMING, (Object)jobGraph.getJobType());
    }

    @Test
    public void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromElements((Object[])new Integer[]{1}).map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory());
        StreamGraph streamGraph = chainEnv.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assert.assertTrue((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(0)), (StreamNode)((StreamNode)streamNodes.get(1)), (StreamGraph)streamGraph));
        Assert.assertFalse((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(1)), (StreamNode)((StreamNode)streamNodes.get(2)), (StreamGraph)streamGraph));
    }

    @Test
    public void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromElements((Object[])new Integer[]{1}).disableChaining().map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory());
        StreamGraph streamGraph = chainEnv.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assert.assertFalse((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(0)), (StreamNode)((StreamNode)streamNodes.get(1)), (StreamGraph)streamGraph));
        Assert.assertTrue((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(1)), (StreamNode)((StreamNode)streamNodes.get(2)), (StreamGraph)streamGraph));
    }

    @Test
    public void testYieldingOperatorProperlyChainedOnLegacySources() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromElements((Object[])new Integer[]{1}).map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory()).map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory()).map((MapFunction & Serializable)x -> x).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)vertices.size());
        Assert.assertEquals((long)2L, (long)((JobVertex)vertices.get(0)).getOperatorIDs().size());
        Assert.assertEquals((long)5L, (long)((JobVertex)vertices.get(1)).getOperatorIDs().size());
    }

    @Test
    public void testYieldingOperatorProperlyChainedOnNewSources() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromSource((Source)new NumberSequenceSource(0L, 10L), WatermarkStrategy.noWatermarks(), "input").map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory()).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)1L, (long)vertices.size());
        Assert.assertEquals((long)4L, (long)((JobVertex)vertices.get(0)).getOperatorIDs().size());
    }

    @Test
    public void testDeterministicUnionOrder() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        JobGraph jobGraph = this.getUnionJobGraph((StreamExecutionEnvironment)env);
        JobVertex jobSink = (JobVertex)Iterables.getLast((Iterable)jobGraph.getVerticesSortedTopologicallyFromSources());
        List expectedSourceOrder = jobSink.getInputs().stream().map(edge -> edge.getSource().getProducer().getName()).collect(Collectors.toList());
        for (int i = 0; i < 100; ++i) {
            JobGraph jobGraph2 = this.getUnionJobGraph((StreamExecutionEnvironment)env);
            JobVertex jobSink2 = (JobVertex)Iterables.getLast((Iterable)jobGraph2.getVerticesSortedTopologicallyFromSources());
            Assert.assertNotEquals((String)"Different runs should yield different vertexes", (Object)jobSink, (Object)jobSink2);
            List actualSourceOrder = jobSink2.getInputs().stream().map(edge -> edge.getSource().getProducer().getName()).collect(Collectors.toList());
            Assert.assertEquals((String)"Union inputs reordered", expectedSourceOrder, actualSourceOrder);
        }
    }

    private JobGraph getUnionJobGraph(StreamExecutionEnvironment env) {
        this.createSource(env, 1).union(new DataStream[]{this.createSource(env, 2)}).union(new DataStream[]{this.createSource(env, 3)}).union(new DataStream[]{this.createSource(env, 4)}).addSink((SinkFunction)new DiscardingSink());
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private DataStream<Integer> createSource(StreamExecutionEnvironment env, int index) {
        return env.fromElements((Object[])new Integer[]{index}).name("source" + index).map((MapFunction & Serializable)i -> i).name("map" + index);
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testNotSupportInputSelectableOperatorIfCheckpointing() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000L);
        DataStreamSource source1 = env.fromElements((Object[])new String[]{"1"});
        DataStreamSource source2 = env.fromElements((Object[])new Integer[]{1});
        source1.connect((DataStream)source2).transform("test", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)new TestAnyModeReadingStreamOperator("test operator")).print();
        StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    @Test
    public void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
        ResourceSpec resource = ResourceSpec.UNKNOWN;
        List<ResourceSpec> resourceSpecs = Arrays.asList(resource, resource, resource, resource);
        Configuration taskManagerConfig = new Configuration(){
            {
                this.set(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS, new HashMap<String, String>(){
                    {
                        this.put("DATAPROC", "6");
                        this.put("PYTHON", "4");
                    }
                });
            }
        };
        ArrayList<Map<ManagedMemoryUseCase, Integer>> operatorScopeManagedMemoryUseCaseWeights = new ArrayList<Map<ManagedMemoryUseCase, Integer>>();
        ArrayList<Set<ManagedMemoryUseCase>> slotScopeManagedMemoryUseCases = new ArrayList<Set<ManagedMemoryUseCase>>();
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        slotScopeManagedMemoryUseCases.add(Collections.emptySet());
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.emptyMap());
        slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        slotScopeManagedMemoryUseCases.add(Collections.emptySet());
        JobGraph jobGraph = this.createJobGraphForManagedMemoryFractionTest(resourceSpecs, operatorScopeManagedMemoryUseCaseWeights, slotScopeManagedMemoryUseCases);
        JobVertex vertex1 = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex vertex2 = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex vertex3 = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
        StreamConfig sourceConfig = new StreamConfig(vertex1.getConfiguration());
        this.verifyFractions(sourceConfig, 0.3, 0.0, 0.0, taskManagerConfig);
        StreamConfig map1Config = (StreamConfig)Iterables.getOnlyElement(sourceConfig.getTransitiveChainedTaskConfigs(StreamingJobGraphGeneratorTest.class.getClassLoader()).values());
        this.verifyFractions(map1Config, 0.3, 0.4, 0.0, taskManagerConfig);
        StreamConfig map2Config = new StreamConfig(vertex2.getConfiguration());
        this.verifyFractions(map2Config, 0.0, 0.4, 0.0, taskManagerConfig);
        StreamConfig map3Config = new StreamConfig(vertex3.getConfiguration());
        this.verifyFractions(map3Config, 1.0, 0.0, 0.0, taskManagerConfig);
    }

    private JobGraph createJobGraphForManagedMemoryFractionTest(List<ResourceSpec> resourceSpecs, List<Map<ManagedMemoryUseCase, Integer>> operatorScopeUseCaseWeights, List<Set<ManagedMemoryUseCase>> slotScopeUseCases) throws Exception {
        Method opMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new ParallelSourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) {
            }

            public void cancel() {
            }
        });
        opMethod.invoke((Object)source, resourceSpecs.get(0));
        SingleOutputStreamOperator map1 = source.map((MapFunction & Serializable)value -> value);
        opMethod.invoke((Object)map1, resourceSpecs.get(1));
        SingleOutputStreamOperator map2 = map1.rebalance().map((MapFunction & Serializable)value -> value);
        opMethod.invoke((Object)map2, resourceSpecs.get(2));
        SingleOutputStreamOperator map3 = map2.rebalance().map((MapFunction & Serializable)value -> value).slotSharingGroup("test");
        opMethod.invoke((Object)map3, resourceSpecs.get(3));
        this.declareManagedMemoryUseCaseForTranformation(source.getTransformation(), operatorScopeUseCaseWeights.get(0), slotScopeUseCases.get(0));
        this.declareManagedMemoryUseCaseForTranformation(map1.getTransformation(), operatorScopeUseCaseWeights.get(1), slotScopeUseCases.get(1));
        this.declareManagedMemoryUseCaseForTranformation(map2.getTransformation(), operatorScopeUseCaseWeights.get(2), slotScopeUseCases.get(2));
        this.declareManagedMemoryUseCaseForTranformation(map3.getTransformation(), operatorScopeUseCaseWeights.get(3), slotScopeUseCases.get(3));
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private void declareManagedMemoryUseCaseForTranformation(Transformation<?> transformation, Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights, Set<ManagedMemoryUseCase> slotScopeUseCases) {
        for (Map.Entry<ManagedMemoryUseCase, Integer> entry : operatorScopeUseCaseWeights.entrySet()) {
            transformation.declareManagedMemoryUseCaseAtOperatorScope(entry.getKey(), entry.getValue().intValue());
        }
        for (ManagedMemoryUseCase useCase : slotScopeUseCases) {
            transformation.declareManagedMemoryUseCaseAtSlotScope(useCase);
        }
    }

    private void verifyFractions(StreamConfig streamConfig, double expectedBatchFrac, double expectedPythonFrac, double expectedStateBackendFrac, Configuration tmConfig) {
        double delta = 1.0E-6;
        Assert.assertEquals((double)expectedStateBackendFrac, (double)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, tmConfig, ClassLoader.getSystemClassLoader()), (double)1.0E-6);
        Assert.assertEquals((double)expectedPythonFrac, (double)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, tmConfig, ClassLoader.getSystemClassLoader()), (double)1.0E-6);
        Assert.assertEquals((double)expectedBatchFrac, (double)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, tmConfig, ClassLoader.getSystemClassLoader()), (double)1.0E-6);
    }

    @Test
    public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
        StreamGraph streamGraph = this.createStreamGraphForSlotSharingTest();
        streamGraph.getStreamNodes().stream().filter(n -> "map1".equals(n.getOperatorName())).findFirst().get().setSlotSharingGroup("testSlotSharingGroup");
        streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)4L, (long)verticesSorted.size());
        List<JobVertex> verticesMatched = StreamingJobGraphGeneratorTest.getExpectedVerticesList(verticesSorted);
        JobVertex source1Vertex = verticesMatched.get(0);
        JobVertex source2Vertex = verticesMatched.get(1);
        JobVertex map1Vertex = verticesMatched.get(2);
        JobVertex map2Vertex = verticesMatched.get(3);
        this.assertSameSlotSharingGroup(source1Vertex, source2Vertex, map2Vertex);
        this.assertDistinctSharingGroups(source1Vertex, map1Vertex);
    }

    @Test
    public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
        StreamGraph streamGraph = this.createStreamGraphForSlotSharingTest();
        streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)4L, (long)verticesSorted.size());
        List<JobVertex> verticesMatched = StreamingJobGraphGeneratorTest.getExpectedVerticesList(verticesSorted);
        JobVertex source1Vertex = verticesMatched.get(0);
        JobVertex source2Vertex = verticesMatched.get(1);
        JobVertex map1Vertex = verticesMatched.get(2);
        JobVertex map2Vertex = verticesMatched.get(3);
        this.assertSameSlotSharingGroup(source1Vertex, map1Vertex);
        this.assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex);
    }

    @Test
    public void testSlotSharingResourceConfiguration() {
        String slotSharingGroup1 = "slot-a";
        String slotSharingGroup2 = "slot-b";
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        ResourceProfile resourceProfile3 = ResourceProfile.fromResources((double)3.0, (int)30);
        HashMap<String, ResourceProfile> slotSharingGroupResource = new HashMap<String, ResourceProfile>();
        slotSharingGroupResource.put("slot-a", resourceProfile1);
        slotSharingGroupResource.put("slot-b", resourceProfile2);
        slotSharingGroupResource.put("default", resourceProfile3);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{1, 2, 3}).name("slot-a").slotSharingGroup("slot-a").map((MapFunction & Serializable)x -> x + 1).name("slot-b").slotSharingGroup("slot-b").map((MapFunction & Serializable)x -> x * x).name("default").slotSharingGroup("default");
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        int numVertex = 0;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            ++numVertex;
            if (jobVertex.getName().contains("slot-a")) {
                Assert.assertEquals((Object)jobVertex.getSlotSharingGroup().getResourceProfile(), (Object)resourceProfile1);
                continue;
            }
            if (jobVertex.getName().contains("slot-b")) {
                Assert.assertEquals((Object)jobVertex.getSlotSharingGroup().getResourceProfile(), (Object)resourceProfile2);
                continue;
            }
            if (jobVertex.getName().contains("default")) {
                Assert.assertEquals((Object)jobVertex.getSlotSharingGroup().getResourceProfile(), (Object)resourceProfile3);
                continue;
            }
            Assert.fail();
        }
        Assert.assertThat((Object)numVertex, (Matcher)CoreMatchers.is((Object)3));
    }

    @Test
    public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)10);
        HashMap<String, ResourceProfile> slotSharingGroupResource = new HashMap<String, ResourceProfile>();
        slotSharingGroupResource.put("default", resourceProfile);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction & Serializable)x -> x + 1);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        int numVertex = 0;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            ++numVertex;
            Assert.assertEquals((Object)jobVertex.getSlotSharingGroup().getResourceProfile(), (Object)resourceProfile);
        }
        Assert.assertThat((Object)numVertex, (Matcher)CoreMatchers.is((Object)2));
    }

    @Test
    public void testNamingOfChainedMultipleInputs() {
        String[] sources = new String[]{"source-1", "source-2", "source-3"};
        JobGraph graph = this.createGraphWithMultipleInputs(true, sources);
        JobVertex head = (JobVertex)graph.getVerticesSortedTopologicallyFromSources().iterator().next();
        Arrays.stream(sources).forEach(source -> Assert.assertTrue((boolean)head.getName().contains((CharSequence)source)));
    }

    @Test
    public void testNamingOfNonChainedMultipleInputs() {
        String[] sources = new String[]{"source-1", "source-2", "source-3"};
        JobGraph graph = this.createGraphWithMultipleInputs(false, sources);
        JobVertex head = (JobVertex)Iterables.find((Iterable)graph.getVertices(), vertex -> vertex.getInvokableClassName().equals(MultipleInputStreamTask.class.getName()));
        Assert.assertFalse((String)head.getName(), (boolean)head.getName().contains("source-1"));
    }

    public JobGraph createGraphWithMultipleInputs(boolean chain, String ... inputNames) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleInputTransformation transform = new MultipleInputTransformation("mit", (StreamOperatorFactory)new UnusedOperatorFactory(), Types.LONG, env.getParallelism());
        Arrays.stream(inputNames).map(name -> env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), name).getTransformation()).forEach(arg_0 -> ((MultipleInputTransformation)transform).addInput(arg_0));
        transform.setChainingStrategy(chain ? ChainingStrategy.HEAD_WITH_SOURCES : ChainingStrategy.NEVER);
        env.addOperator((Transformation)transform);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private static List<JobVertex> getExpectedVerticesList(List<JobVertex> vertices) {
        ArrayList<JobVertex> verticesMatched = new ArrayList<JobVertex>();
        List<String> expectedOrder = Arrays.asList("source1", "source2", "map1", "map2");
        for (int i = 0; i < expectedOrder.size(); ++i) {
            for (JobVertex vertex : vertices) {
                if (!vertex.getName().contains(expectedOrder.get(i))) continue;
                verticesMatched.add(vertex);
            }
        }
        return verticesMatched;
    }

    private StreamGraph createStreamGraphForSlotSharingTest() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source1 = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source1");
        source1.rebalance().map((MapFunction & Serializable)v -> v).name("map1");
        SingleOutputStreamOperator source2 = env.fromElements((Object[])new Integer[]{4, 5, 6}).name("source2");
        DataStream partitioned = new DataStream(env, (Transformation)new PartitionTransformation(source2.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), ShuffleMode.BATCH));
        partitioned.map((MapFunction & Serializable)v -> v).name("map2");
        return env.getStreamGraph();
    }

    private void assertSameSlotSharingGroup(JobVertex ... vertices) {
        for (int i = 0; i < vertices.length - 1; ++i) {
            Assert.assertEquals((Object)vertices[i].getSlotSharingGroup(), (Object)vertices[i + 1].getSlotSharingGroup());
        }
    }

    private void assertDistinctSharingGroups(JobVertex ... vertices) {
        for (int i = 0; i < vertices.length - 1; ++i) {
            for (int j = i + 1; j < vertices.length; ++j) {
                Assert.assertNotEquals((Object)vertices[i].getSlotSharingGroup(), (Object)vertices[j].getSlotSharingGroup());
            }
        }
    }

    private static Method getSetResourcesMethodAndSetAccessible(Class<?> clazz) throws NoSuchMethodException {
        Method setResourcesMethod = clazz.getDeclaredMethod("setResources", ResourceSpec.class);
        setResourcesMethod.setAccessible(true);
        return setResourcesMethod;
    }

    private static class TestingSingleOutputStreamOperator<OUT>
    extends SingleOutputStreamOperator<OUT> {
        public TestingSingleOutputStreamOperator(StreamExecutionEnvironment environment, Transformation<OUT> transformation) {
            super(environment, transformation);
        }
    }

    private static class CoordinatedTransformOperatorFactory
    extends AbstractStreamOperatorFactory<Integer>
    implements CoordinatedOperatorFactory<Integer>,
    OneInputStreamOperatorFactory<Integer, Integer> {
        private CoordinatedTransformOperatorFactory() {
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
            return new OperatorCoordinator.Provider(){

                public OperatorID getOperatorId() {
                    return null;
                }

                public OperatorCoordinator create(OperatorCoordinator.Context context) {
                    return null;
                }
            };
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            return null;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return null;
        }
    }

    private static class YieldingTestOperatorFactory<T>
    extends SimpleOperatorFactory<T>
    implements YieldingOperatorFactory<T>,
    OneInputStreamOperatorFactory<T, T> {
        private YieldingTestOperatorFactory() {
            super((StreamOperator)new StreamMap((MapFunction & Serializable)x -> x));
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
        }
    }

    private static final class UnusedOperatorFactory
    extends AbstractStreamOperatorFactory<Long> {
        private UnusedOperatorFactory() {
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            throw new UnsupportedOperationException();
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }
}

