/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class LatencyMarkerITCase {
    @Test
    public void testBroadcast() throws Exception {
        int inputCount = 100000;
        int parallelism = 4;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.getConfig().setLatencyTrackingInterval(2000L);
        env.setRestartStrategy(RestartStrategies.noRestart());
        List broadcastData = IntStream.range(0, inputCount).boxed().collect(Collectors.toList());
        DataStreamSource broadcastDataStream = env.fromData(broadcastData).setParallelism(1);
        DataStreamSource dataStream = env.fromData((Object[])new String[]{"test"});
        MapStateDescriptor stateDescriptor = new MapStateDescriptor("BroadcastState", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator processor = dataStream.connect(broadcastDataStream.broadcast(new MapStateDescriptor[]{stateDescriptor})).process((BroadcastProcessFunction)new BroadcastProcessFunction<String, Integer, Integer>(){
            int expected = 0;

            public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<Integer> out) {
            }

            public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<Integer> out) {
                if (value != this.expected++) {
                    throw new AssertionError((Object)String.format("Value was supposed to be: '%s', but was: '%s'", this.expected - 1, value));
                }
                out.collect((Object)value);
            }
        });
        processor.addSink(new MigrationTestUtils.AccumulatorCountingSink()).setParallelism(1);
        JobExecutionResult executionResult = env.execute();
        Integer count = (Integer)executionResult.getAccumulatorResult(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR);
        Assert.assertEquals((long)(inputCount * parallelism), (long)count.intValue());
    }
}

