/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ChangelogRescalingITCase
extends TestLogger {
    private static final int EVENTS_PER_SECOND_PER_READER = 100;
    private static final int PAYLOAD_SIZE = 1000;
    private static final Time WINDOW_SIZE = Time.milliseconds((long)100L);
    private static final Time WINDOW_SLIDE = Time.milliseconds((long)10L);
    private static final int ACCUMULATE_TIME_MILLIS = 5000;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private final int parallelism1;
    private final int parallelism2;
    private MiniClusterWithClientResource cluster;

    @Parameterized.Parameters(name="Rescale {0} -> {1}")
    public static Object[] parameters() {
        return new Object[][]{{6, 4}, {4, 6}};
    }

    public ChangelogRescalingITCase(int parallelism1, int parallelism2) {
        this.parallelism1 = parallelism1;
        this.parallelism2 = parallelism2;
    }

    @Before
    public void before() throws Exception {
        Configuration configuration = new Configuration();
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)this.temporaryFolder.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberSlotsPerTaskManager(Math.max(this.parallelism1, this.parallelism2)).build());
        this.cluster.before();
    }

    @After
    public void after() {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
    }

    @Test
    public void test() throws Exception {
        JobID jobID1 = this.submit(this.configureJob(this.parallelism1, this.temporaryFolder.newFolder()), graph -> {});
        Thread.sleep(5000L);
        String cpLocation = this.checkpointAndCancel(jobID1);
        JobID jobID2 = this.submit(this.configureJob(this.parallelism2, this.temporaryFolder.newFolder()), graph -> graph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)cpLocation)));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)this.cluster.getMiniCluster(), (JobID)jobID2, (boolean)true);
        this.cluster.getClusterClient().cancel(jobID2).get();
    }

    private JobID submit(Configuration conf, Consumer<JobGraph> updateGraph) throws InterruptedException, ExecutionException {
        JobGraph jobGraph = this.createJobGraph(conf);
        updateGraph.accept(jobGraph);
        return (JobID)this.cluster.getClusterClient().submitJob(jobGraph).get();
    }

    private JobGraph createJobGraph(Configuration conf) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        SingleOutputStreamOperator map = env.fromSource((Source)new ThrottlingNumberSequenceSource(0L, Long.MAX_VALUE, 100), WatermarkStrategy.noWatermarks(), "Sequence Source").keyBy(ChangelogRescalingITCase::key).map((MapFunction & Serializable)el -> {
            byte[] bytes = new byte[1000];
            ThreadLocalRandom.current().nextBytes(bytes);
            return new TestEvent((long)el, bytes);
        });
        DataStreamUtils.reinterpretAsKeyedStream((DataStream)map, (KeySelector & Serializable)e -> ChangelogRescalingITCase.key(((TestEvent)e).id)).window((WindowAssigner)SlidingProcessingTimeWindows.of((Time)WINDOW_SIZE, (Time)WINDOW_SLIDE)).process((ProcessWindowFunction)new ProcessWindowFunction<TestEvent, String, Long, TimeWindow>(){

            public void process(Long key, ProcessWindowFunction.Context context, Iterable<TestEvent> elements, Collector<String> out) {
            }
        }).sinkTo((Sink)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static long key(Long num) {
        return num % 1000L;
    }

    private Configuration configureJob(int parallelism, File cpDir) {
        Configuration conf = new Configuration();
        conf.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, (Object)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.set(CoreOptions.DEFAULT_PARALLELISM, (Object)parallelism);
        conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)true);
        conf.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, (Object)CheckpointingMode.EXACTLY_ONCE);
        conf.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(10L));
        conf.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"filesystem");
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)cpDir.toURI().toString());
        conf.set(StateBackendOptions.STATE_BACKEND, (Object)"hashmap");
        conf.set(CheckpointingOptions.LOCAL_RECOVERY, (Object)false);
        conf.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, (Object)MemorySize.ofMebiBytes((long)10L));
        conf.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, (Object)Duration.ofMinutes(3L));
        conf.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ofMebiBytes((long)1L));
        conf.set(PipelineOptions.OBJECT_REUSE, (Object)true);
        conf.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, (Object)true);
        conf.set(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, (Object)Duration.ZERO);
        conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, (Object)false);
        conf.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"none");
        return conf;
    }

    private String checkpointAndCancel(JobID jobID) throws Exception {
        CommonTestUtils.waitForCheckpoint((JobID)jobID, (MiniCluster)this.cluster.getMiniCluster(), (int)1);
        this.cluster.getClusterClient().cancel(jobID).get();
        this.waitForSuccessfulTermination(jobID);
        return (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)jobID, (MiniCluster)this.cluster.getMiniCluster()).orElseThrow(() -> {
            throw new NoSuchElementException("No checkpoint was created yet");
        });
    }

    private void waitForSuccessfulTermination(JobID jobID) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> ((JobStatus)this.cluster.getClusterClient().getJobStatus(jobID).get()).isGloballyTerminalState());
        if (((JobStatus)this.cluster.getClusterClient().getJobStatus(jobID).get()).isGloballyTerminalState()) {
            ((JobResult)this.cluster.getClusterClient().requestJobResult(jobID).get()).getSerializedThrowable().ifPresent(serializedThrowable -> {
                throw new RuntimeException((Throwable)serializedThrowable);
            });
        }
    }

    private static final class SourceRateLimiter {
        private final AtomicBoolean newTokensAdded = new AtomicBoolean(false);
        private final int tokensToAdd;
        private int tokensAvailable;

        public SourceRateLimiter(int tokensPerSecond) {
            this(tokensPerSecond < 10 ? 1000 : 100, tokensPerSecond < 10 ? tokensPerSecond : tokensPerSecond / 10);
        }

        public SourceRateLimiter(int intervalMs, int tokensToAdd) {
            Preconditions.checkArgument((intervalMs > 0 ? 1 : 0) != 0);
            Preconditions.checkArgument((tokensToAdd > 0 ? 1 : 0) != 0);
            this.tokensToAdd = tokensToAdd;
            this.tokensAvailable = tokensToAdd;
            new Timer("source-limiter", true).scheduleAtFixedRate(new TimerTask(){

                @Override
                public void run() {
                    newTokensAdded.set(true);
                }
            }, intervalMs, (long)intervalMs);
        }

        public boolean request() {
            if (this.tokensAvailable == 0 && this.newTokensAdded.compareAndSet(true, false)) {
                this.tokensAvailable = this.tokensToAdd;
            }
            if (this.tokensAvailable > 0) {
                --this.tokensAvailable;
                return true;
            }
            return false;
        }
    }

    private static class ThrottlingIteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
    extends IteratorSourceReader<E, IterT, SplitT> {
        private final SourceRateLimiter rateLimiter;

        public ThrottlingIteratorSourceReader(SourceReaderContext context, SourceRateLimiter rateLimiter) {
            super(context);
            this.rateLimiter = rateLimiter;
        }

        public InputStatus pollNext(ReaderOutput<E> output) {
            if (this.rateLimiter.request()) {
                return super.pollNext(output);
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
    }

    private static class ThrottlingNumberSequenceSource
    extends NumberSequenceSource {
        private final int numbersPerSecond;

        public ThrottlingNumberSequenceSource(long from, long to, int numbersPerSecondPerReader) {
            super(from, to);
            this.numbersPerSecond = numbersPerSecondPerReader;
        }

        public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext readerContext) {
            return new ThrottlingIteratorSourceReader(readerContext, new SourceRateLimiter(this.numbersPerSecond));
        }
    }

    private static final class TestEvent
    implements Serializable {
        private final long id;
        private final byte[] payload;

        private TestEvent(long id, byte[] payload) {
            this.id = id;
            this.payload = payload;
        }
    }
}

