/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class PipelinedRegionSchedulingITCase
extends TestLogger {
    @Test
    public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() throws Exception {
        JobResult jobResult = this.executeSchedulingTest(2);
        Assert.assertThat((Object)jobResult.getSerializedThrowable().isPresent(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testFailsOnInsufficientSlots() throws Exception {
        JobResult jobResult = this.executeSchedulingTest(1);
        Assert.assertThat((Object)jobResult.getSerializedThrowable().isPresent(), (Matcher)CoreMatchers.is((Object)true));
        Throwable jobFailure = ((SerializedThrowable)jobResult.getSerializedThrowable().get()).deserializeError(ClassLoader.getSystemClassLoader());
        Optional cause = ExceptionUtils.findThrowable((Throwable)jobFailure, NoResourceAvailableException.class);
        Assert.assertThat((Object)cause.isPresent(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)((NoResourceAvailableException)cause.get()).getMessage(), (Matcher)CoreMatchers.containsString((String)"Slot request bulk is not fulfillable!"));
    }

    @Test(timeout=120000L)
    public void testRecoverFromPartitionException() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)RestartStrategyOptions.RestartStrategyType.FIXED_DELAY.getMainValue());
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)1);
        OneTimeFailingReceiverWithPartitionException.hasFailed.set(false);
        JobResult jobResult = this.executeSchedulingTest(this.createJobGraphWithThreeStages(2), 2, configuration);
        Assert.assertThat((Object)jobResult.getSerializedThrowable().isPresent(), (Matcher)CoreMatchers.is((Object)false));
    }

    private JobResult executeSchedulingTest(int numSlots) throws Exception {
        return this.executeSchedulingTest(this.createJobGraph(2), numSlots, new Configuration());
    }

    private JobResult executeSchedulingTest(JobGraph jobGraph, int numSlots, Configuration configuration) throws Exception {
        configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, (Object)30000L);
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Default);
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().withRandomPorts().setConfiguration(configuration).setNumTaskManagers(1).setNumSlotsPerTaskManager(numSlots).build();
        try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);){
            JobResult jobResult;
            miniCluster.start();
            MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
            JobID jobID = (JobID)miniClusterClient.submitJob(jobGraph).get();
            CompletableFuture resultFuture = miniClusterClient.requestJobResult(jobID);
            JobResult jobResult2 = jobResult = (JobResult)resultFuture.get();
            return jobResult2;
        }
    }

    private JobGraph createJobGraph(int parallelism) {
        SlotSharingGroup group1 = new SlotSharingGroup();
        JobVertex source1 = new JobVertex("source1");
        source1.setInvokableClass(PipelinedSender.class);
        source1.setParallelism(parallelism * 2);
        source1.setSlotSharingGroup(group1);
        SlotSharingGroup group2 = new SlotSharingGroup();
        JobVertex source2 = new JobVertex("source2");
        source2.setInvokableClass(NoOpInvokable.class);
        source2.setParallelism(parallelism);
        source2.setSlotSharingGroup(group2);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(Receiver.class);
        sink.setParallelism(parallelism);
        sink.setSlotSharingGroup(group1);
        sink.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        sink.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph((JobVertex[])new JobVertex[]{source1, source2, sink});
    }

    private JobGraph createJobGraphWithThreeStages(int parallelism) {
        SlotSharingGroup group1 = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(parallelism);
        source.setSlotSharingGroup(group1);
        SlotSharingGroup group2 = new SlotSharingGroup();
        JobVertex map = new JobVertex("map");
        map.setInvokableClass(NoOpInvokable.class);
        map.setParallelism(parallelism);
        map.setSlotSharingGroup(group2);
        SlotSharingGroup group3 = new SlotSharingGroup();
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(OneTimeFailingReceiverWithPartitionException.class);
        sink.setParallelism(parallelism);
        sink.setSlotSharingGroup(group3);
        map.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph((JobVertex[])new JobVertex[]{source, map, sink});
    }

    public static class OneTimeFailingReceiverWithPartitionException
    extends AbstractInvokable {
        private static final AtomicBoolean hasFailed = new AtomicBoolean(false);

        public OneTimeFailingReceiverWithPartitionException(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (hasFailed.compareAndSet(false, true)) {
                throw new PartitionNotFoundException(this.getEnvironment().getInputGate(0).getChannel(1).getPartitionId());
            }
        }
    }

    public static class Receiver
    extends AbstractInvokable {
        public Receiver(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (this.getEnvironment().getAllInputGates().length < 2) {
                throw new IllegalStateException();
            }
            String[] tmpDirs = this.getEnvironment().getTaskManagerInfo().getTmpDirectories();
            List readers = Arrays.asList(this.getEnvironment().getAllInputGates()).stream().map(inputGate -> new RecordReader((InputGate)inputGate, IntValue.class, tmpDirs)).collect(Collectors.toList());
            for (RecordReader reader : readers) {
                while (reader.hasNext()) {
                    reader.next();
                }
            }
        }
    }

    public static class PipelinedSender
    extends AbstractInvokable {
        public PipelinedSender(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (this.getEnvironment().getAllWriters().length < 1) {
                throw new IllegalStateException();
            }
            try (RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));){
                writer.emit((IOReadableWritable)new IntValue(42));
                writer.flushAll();
            }
            if (this.getIndexInSubtaskGroup() == 0) {
                Thread.sleep(2000L);
            }
        }
    }
}

