/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTestBase;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SourceOperatorStreamTaskTest
extends SourceStreamTaskTestBase {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_RECORDS = 10;

    @Test
    public void testMetrics() throws Exception {
        this.testMetrics((FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception>)((FunctionWithException)SourceOperatorStreamTask::new), (StreamOperatorFactory<?>)new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), (Matcher<Double>)Matchers.lessThanOrEqualTo((Comparable)Double.valueOf(1000000.0)));
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        TaskStateSnapshot taskStateSnapshot = this.executeAndWaitForCheckpoint(1L, null, IntStream.range(0, 10));
        this.executeAndWaitForCheckpoint(2L, taskStateSnapshot, IntStream.range(10, 20));
    }

    @Test
    public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
        boolean checkpointId = true;
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness(1L, null);){
            this.getAndMaybeAssignSplit(testHarness);
            CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.SAVEPOINT_TERMINATE, CheckpointStorageLocationReference.getDefault());
            this.triggerCheckpointWaitForFinish(testHarness, 1L, checkpointOptions);
            LinkedList<Object> expectedOutput = new LinkedList<Object>();
            expectedOutput.add(Watermark.MAX_WATERMARK);
            expectedOutput.add(new CheckpointBarrier(1L, 1L, checkpointOptions));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    public void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness();){
            testHarness.processAll();
            testHarness.finishProcessing();
            List<Watermark> expectedOutput = Collections.singletonList(Watermark.MAX_WATERMARK);
            Assert.assertThat((Object)testHarness.getOutput().toArray(), (Matcher)CoreMatchers.equalTo((Object)expectedOutput.toArray()));
        }
    }

    @Test
    public void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness();){
            testHarness.getStreamTask().cancel();
            testHarness.finishProcessing();
            Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.hasSize((int)0));
        }
    }

    @Test
    public void testExternallyInducedSource() throws Exception {
        int numEventsBeforeCheckpoint = 10;
        int totalNumEvents = 20;
        TestingExternallyInducedSourceReader testingReader = new TestingExternallyInducedSourceReader(10, 20);
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness(new TestingExternallyInducedSource(testingReader), 0L, null);){
            TestingExternallyInducedSourceReader runtimeTestingReader = (TestingExternallyInducedSourceReader)((SourceOperator)testHarness.getStreamTask().mainOperator).getSourceReader();
            testHarness.processAll();
            Assert.assertEquals((long)20L, (long)runtimeTestingReader.numEmittedEvents);
            Assert.assertTrue((boolean)runtimeTestingReader.checkpointed);
            Assert.assertEquals((long)1234L, (long)runtimeTestingReader.checkpointedId);
            Assert.assertEquals((long)10L, (long)runtimeTestingReader.checkpointedAt);
        }
    }

    private TaskStateSnapshot executeAndWaitForCheckpoint(long checkpointId, TaskStateSnapshot initialSnapshot, IntStream expectedRecords) throws Exception {
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness(checkpointId, initialSnapshot);){
            MockSourceSplit split = this.getAndMaybeAssignSplit(testHarness);
            this.addRecords(split, 10);
            testHarness.processAll();
            CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
            this.triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions);
            LinkedList<CheckpointBarrier> expectedOutput = new LinkedList<CheckpointBarrier>();
            expectedRecords.forEach(r -> expectedOutput.offer((CheckpointBarrier)new StreamRecord((Object)r, Long.MIN_VALUE)));
            expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
            Assert.assertEquals((long)checkpointId, (long)testHarness.taskStateManager.getReportedCheckpointId());
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
            TaskStateSnapshot taskStateSnapshot = testHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
            return taskStateSnapshot;
        }
    }

    private void triggerCheckpointWaitForFinish(StreamTaskMailboxTestHarness<Integer> testHarness, long checkpointId, CheckpointOptions checkpointOptions) throws Exception {
        OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
        testHarness.taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointId);
        Future checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        this.getSourceReaderFromTask(testHarness).markAvailable();
        this.processUntil(testHarness, checkpointFuture::isDone);
        Future checkpointNotified = testHarness.getStreamTask().notifyCheckpointCompleteAsync(checkpointId);
        this.processUntil(testHarness, checkpointNotified::isDone);
        waitForAcknowledgeLatch.await();
    }

    private void processUntil(StreamTaskMailboxTestHarness testHarness, Supplier<Boolean> condition) throws Exception {
        do {
            testHarness.getStreamTask().runMailboxStep();
        } while (!condition.get().booleanValue());
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness() throws Exception {
        return this.createTestHarness(0L, null);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(long checkpointId, TaskStateSnapshot snapshot) throws Exception {
        return this.createTestHarness(new MockSource(Boundedness.BOUNDED, 1), checkpointId, snapshot);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(MockSource source, long checkpointId, TaskStateSnapshot snapshot) throws Exception {
        SourceOperatorFactory sourceOperatorFactory = new SourceOperatorFactory((Source)source, WatermarkStrategy.noWatermarks());
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
        if (snapshot != null) {
            builder.setTaskStateSnapshot(checkpointId, snapshot);
        }
        return builder.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)sourceOperatorFactory, OPERATOR_ID).build();
    }

    private MockSourceSplit getAndMaybeAssignSplit(StreamTaskMailboxTestHarness<Integer> testHarness) throws Exception {
        List assignedSplits = this.getSourceReaderFromTask(testHarness).getAssignedSplits();
        if (assignedSplits.isEmpty()) {
            MockSourceSplit split = new MockSourceSplit(0, 0);
            AddSplitEvent addSplitEvent = new AddSplitEvent(Collections.singletonList(split), (SimpleVersionedSerializer)new MockSourceSplitSerializer());
            testHarness.getStreamTask().dispatchOperatorEvent(OPERATOR_ID, new SerializedValue((Object)addSplitEvent));
            while (assignedSplits.isEmpty()) {
                testHarness.getStreamTask().runMailboxStep();
            }
            this.getSourceReaderFromTask(testHarness).markAvailable();
        }
        return (MockSourceSplit)assignedSplits.get(0);
    }

    private void addRecords(MockSourceSplit split, int numRecords) {
        int startingIndex;
        for (int i = startingIndex = split.index(); i < startingIndex + numRecords; ++i) {
            split.addRecord(i);
        }
    }

    private MockSourceReader getSourceReaderFromTask(StreamTaskMailboxTestHarness<Integer> testHarness) {
        return (MockSourceReader)((SourceOperator)testHarness.getStreamTask().mainOperator).getSourceReader();
    }

    private static class TestingExternallyInducedSourceReader
    implements ExternallyInducedSourceReader<Integer, MockSourceSplit>,
    Serializable {
        private static final long CHECKPOINT_ID = 1234L;
        private final int numEventsBeforeCheckpoint;
        private final int totalNumEvents;
        private int numEmittedEvents;
        private boolean checkpointed;
        private int checkpointedAt;
        private long checkpointedId;

        TestingExternallyInducedSourceReader(int numEventsBeforeCheckpoint, int totalNumEvents) {
            this.numEventsBeforeCheckpoint = numEventsBeforeCheckpoint;
            this.totalNumEvents = totalNumEvents;
            this.numEmittedEvents = 0;
            this.checkpointed = false;
            this.checkpointedAt = -1;
        }

        public Optional<Long> shouldTriggerCheckpoint() {
            if (this.numEmittedEvents == this.numEventsBeforeCheckpoint && !this.checkpointed) {
                return Optional.of(1234L);
            }
            return Optional.empty();
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> output) throws Exception {
            ++this.numEmittedEvents;
            if (this.numEmittedEvents == this.numEventsBeforeCheckpoint) {
                return InputStatus.NOTHING_AVAILABLE;
            }
            if (this.numEmittedEvents < this.totalNumEvents) {
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.END_OF_INPUT;
        }

        public List<MockSourceSplit> snapshotState(long checkpointId) {
            this.checkpointed = true;
            this.checkpointedAt = this.numEmittedEvents;
            this.checkpointedId = checkpointId;
            return Collections.emptyList();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> splits) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() throws Exception {
        }
    }

    private static class TestingExternallyInducedSource
    extends MockSource {
        private static final long serialVersionUID = 3078454109555893721L;
        private final TestingExternallyInducedSourceReader reader;

        private TestingExternallyInducedSource(TestingExternallyInducedSourceReader reader) {
            super(Boundedness.CONTINUOUS_UNBOUNDED, 1);
            this.reader = reader;
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            return this.reader;
        }
    }
}

