/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SerializableSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class CheckpointStoreITCase
extends TestLogger {
    private static final Configuration CONFIGURATION = new Configuration().set(HighAvailabilityOptions.HA_MODE, (Object)BlockingHighAvailabilityServiceFactory.class.getName());
    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(CONFIGURATION).build());

    @Before
    public void setUp() {
        BlockingHighAvailabilityServiceFactory.reset();
        FailingMapper.reset();
    }

    @Test
    public void testJobClientRemainsResponsiveDuringCompletedCheckpointStoreRecovery() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)0L));
        env.addSource(this.emitUntil((SerializableSupplier<Boolean>)(SerializableSupplier & Serializable)() -> FailingMapper.failedAndProcessed)).map((MapFunction)new FailingMapper()).sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        BlockingHighAvailabilityServiceFactory.fetchRemoteCheckpointsStart.await();
        for (int i = 0; i < 10; ++i) {
            JobStatus jobStatus = (JobStatus)jobClient.getJobStatus().get();
            Assert.assertEquals((Object)JobStatus.INITIALIZING, (Object)jobStatus);
        }
        BlockingHighAvailabilityServiceFactory.fetchRemoteCheckpointsFinished.countDown();
        jobClient.getJobExecutionResult().get();
        Preconditions.checkState((boolean)FailingMapper.failedAndProcessed);
    }

    private SourceFunction<Integer> emitUntil(final SerializableSupplier<Boolean> until) {
        return new SourceFunction<Integer>(){
            private volatile boolean running = true;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run(SourceFunction.SourceContext<Integer> ctx) {
                while (this.running && !((Boolean)until.get()).booleanValue()) {
                    Object object = ctx.getCheckpointLock();
                    synchronized (object) {
                        ctx.collect((Object)0);
                        try {
                            Thread.sleep(100L);
                        }
                        catch (InterruptedException e) {
                            ExceptionUtils.rethrow((Throwable)e);
                        }
                    }
                }
            }

            public void cancel() {
                this.running = false;
            }
        };
    }

    public static class BlockingHighAvailabilityServiceFactory
    implements HighAvailabilityServicesFactory {
        private static volatile CountDownLatch fetchRemoteCheckpointsStart = new CountDownLatch(1);
        private static volatile CountDownLatch fetchRemoteCheckpointsFinished = new CountDownLatch(1);

        static void reset() {
            fetchRemoteCheckpointsStart = new CountDownLatch(1);
            fetchRemoteCheckpointsFinished = new CountDownLatch(1);
        }

        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
            CheckpointRecoveryFactory checkpointRecoveryFactory = PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(maxCheckpoints -> {
                fetchRemoteCheckpointsStart.countDown();
                try {
                    fetchRemoteCheckpointsFinished.await();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return new StandaloneCompletedCheckpointStore(maxCheckpoints);
            });
            return new EmbeddedHaServicesWithLeadershipControl(executor, checkpointRecoveryFactory);
        }
    }

    private static class FailingMapper
    implements MapFunction<Integer, Integer> {
        private static volatile boolean failed = false;
        private static volatile boolean failedAndProcessed = false;

        private FailingMapper() {
        }

        static void reset() {
            failed = false;
            failedAndProcessed = false;
        }

        public Integer map(Integer element) throws Exception {
            if (!failed) {
                failed = true;
                throw new RuntimeException();
            }
            failedAndProcessed = true;
            return element;
        }
    }
}

