/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketState;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BucketTest {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final String bucketId = "testing-bucket";
    private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
    private static final Encoder ENCODER = new SimpleStringEncoder();

    @Test
    public void shouldNotCleanupResumablesThatArePartOfTheAckedCheckpoint() throws IOException {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = BucketTest.getRecoverableWriter(path);
        Bucket<String, String> bucketUnderTest = BucketTest.createBucket((RecoverableWriter)recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        bucketUnderTest.write((Object)"test-element", 0L);
        BucketState state = bucketUnderTest.onReceptionOfCheckpoint(0L);
        Assert.assertThat((Object)state, BucketTest.hasActiveInProgressFile());
        bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
        Assert.assertThat((Object)((Object)recoverableWriter), BucketTest.hasCalledDiscard(0));
    }

    @Test
    public void shouldCleanupOutdatedResumablesOnCheckpointAck() throws IOException {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = BucketTest.getRecoverableWriter(path);
        Bucket<String, String> bucketUnderTest = BucketTest.createBucket((RecoverableWriter)recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        bucketUnderTest.write((Object)"test-element", 0L);
        BucketState state = bucketUnderTest.onReceptionOfCheckpoint(0L);
        Assert.assertThat((Object)state, BucketTest.hasActiveInProgressFile());
        bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
        bucketUnderTest.onReceptionOfCheckpoint(1L);
        bucketUnderTest.onReceptionOfCheckpoint(2L);
        bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
        Assert.assertThat((Object)((Object)recoverableWriter), BucketTest.hasCalledDiscard(2));
    }

    @Test
    public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = BucketTest.getRecoverableWriter(path);
        Bucket<String, String> bucketUnderTest = BucketTest.createBucket((RecoverableWriter)recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        BucketState state = bucketUnderTest.onReceptionOfCheckpoint(0L);
        Assert.assertThat((Object)state, BucketTest.hasNoActiveInProgressFile());
        bucketUnderTest.onReceptionOfCheckpoint(1L);
        bucketUnderTest.onReceptionOfCheckpoint(2L);
        bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
        Assert.assertThat((Object)((Object)recoverableWriter), BucketTest.hasCalledDiscard(0));
    }

    @Test
    public void inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws IOException {
        StubNonResumableWriter nonResumableWriter = new StubNonResumableWriter();
        Bucket<String, String> bucket = this.getRestoredBucketWithOnlyInProgressPart(nonResumableWriter);
        Assert.assertThat((Object)nonResumableWriter, BucketTest.hasMethodCallCountersEqualTo(1, 0, 1));
        Assert.assertThat(bucket, BucketTest.hasNullInProgressFile(true));
    }

    @Test
    public void inProgressFileShouldBeRestoredIfWriterSupportsResume() throws IOException {
        StubResumableWriter resumableWriter = new StubResumableWriter();
        Bucket<String, String> bucket = this.getRestoredBucketWithOnlyInProgressPart(resumableWriter);
        Assert.assertThat((Object)resumableWriter, BucketTest.hasMethodCallCountersEqualTo(1, 1, 0));
        Assert.assertThat(bucket, BucketTest.hasNullInProgressFile(false));
    }

    @Test
    public void pendingFilesShouldBeRestored() throws IOException {
        int expectedRecoverForCommitCounter = 10;
        StubNonResumableWriter writer = new StubNonResumableWriter();
        Bucket<String, String> bucket = this.getRestoredBucketWithOnlyPendingParts(writer, 10);
        Assert.assertThat((Object)writer, BucketTest.hasMethodCallCountersEqualTo(0, 0, 10));
        Assert.assertThat(bucket, BucketTest.hasNullInProgressFile(true));
    }

    private static TypeSafeMatcher<TestRecoverableWriter> hasCalledDiscard(final int times) {
        return new TypeSafeMatcher<TestRecoverableWriter>(){

            protected boolean matchesSafely(TestRecoverableWriter writer) {
                return writer.getCleanupCallCounter() == times;
            }

            public void describeTo(Description description) {
                description.appendText("the TestRecoverableWriter to have called discardRecoverableState() ").appendValue((Object)times).appendText(" times.");
            }
        };
    }

    private static TypeSafeMatcher<BucketState<String>> hasActiveInProgressFile() {
        return new TypeSafeMatcher<BucketState<String>>(){

            protected boolean matchesSafely(BucketState<String> state) {
                return state.getInProgressFileRecoverable() != null;
            }

            public void describeTo(Description description) {
                description.appendText("a BucketState with active in-progress file.");
            }
        };
    }

    private static TypeSafeMatcher<BucketState<String>> hasNoActiveInProgressFile() {
        return new TypeSafeMatcher<BucketState<String>>(){

            protected boolean matchesSafely(BucketState<String> state) {
                return state.getInProgressFileRecoverable() == null;
            }

            public void describeTo(Description description) {
                description.appendText("a BucketState with no active in-progress file.");
            }
        };
    }

    private static TypeSafeMatcher<Bucket<String, String>> hasNullInProgressFile(final boolean isNull) {
        return new TypeSafeMatcher<Bucket<String, String>>(){

            protected boolean matchesSafely(Bucket<String, String> bucket) {
                InProgressFileWriter inProgressPart = bucket.getInProgressPart();
                return isNull == (inProgressPart == null);
            }

            public void describeTo(Description description) {
                description.appendText("a Bucket with its inProgressPart being ").appendText(isNull ? " null." : " not null.");
            }
        };
    }

    private static TypeSafeMatcher<BaseStubWriter> hasMethodCallCountersEqualTo(final int supportsResumeCalls, final int recoverCalls, final int recoverForCommitCalls) {
        return new TypeSafeMatcher<BaseStubWriter>(){

            protected boolean matchesSafely(BaseStubWriter writer) {
                return writer.getSupportsResumeCallCounter() == supportsResumeCalls && writer.getRecoverCallCounter() == recoverCalls && writer.getRecoverForCommitCallCounter() == recoverForCommitCalls;
            }

            public void describeTo(Description description) {
                description.appendText("a Writer where:").appendText(" supportsResume was called ").appendValue((Object)supportsResumeCalls).appendText(" times,").appendText(" recover was called ").appendValue((Object)recoverCalls).appendText(" times,").appendText(" and recoverForCommit was called ").appendValue((Object)recoverForCommitCalls).appendText(" times.").appendText("'");
            }
        };
    }

    private static Bucket<String, String> createBucket(RecoverableWriter writer, Path bucketPath, int subtaskIdx, int initialPartCounter, OutputFileConfig outputFileConfig) throws IOException {
        return Bucket.getNew((int)subtaskIdx, (Object)bucketId, (Path)bucketPath, (long)initialPartCounter, (BucketWriter)new RowWiseBucketWriter(writer, ENCODER), rollingPolicy, null, (OutputFileConfig)outputFileConfig);
    }

    private static Bucket<String, String> restoreBucket(RecoverableWriter writer, int subtaskIndex, long initialPartCounter, BucketState<String> bucketState, OutputFileConfig outputFileConfig) throws Exception {
        return Bucket.restore((int)subtaskIndex, (long)initialPartCounter, (BucketWriter)new RowWiseBucketWriter(writer, ENCODER), rollingPolicy, bucketState, null, (OutputFileConfig)outputFileConfig);
    }

    private static TestRecoverableWriter getRecoverableWriter(Path path) {
        try {
            FileSystem fs = FileSystem.get((URI)path.toUri());
            if (!(fs instanceof LocalFileSystem)) {
                Assert.fail((String)("Expected Local FS but got a " + fs.getClass().getName() + " for path: " + path));
            }
            return new TestRecoverableWriter((LocalFileSystem)fs);
        }
        catch (IOException e) {
            Assert.fail();
            return null;
        }
    }

    private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(BaseStubWriter writer) throws IOException {
        BucketState stateWithOnlyInProgressFile = new BucketState((Object)"test", new Path(), 12345L, (InProgressFileWriter.InProgressFileRecoverable)new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable((RecoverableWriter.ResumeRecoverable)new NoOpRecoverable()), new HashMap());
        return Bucket.restore((int)0, (long)1L, (BucketWriter)new RowWiseBucketWriter((RecoverableWriter)writer, ENCODER), rollingPolicy, (BucketState)stateWithOnlyInProgressFile, null, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(BaseStubWriter writer, int numberOfPendingParts) throws IOException {
        Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint = this.createPendingPartsPerCheckpoint(numberOfPendingParts);
        BucketState initStateWithOnlyInProgressFile = new BucketState((Object)"test", new Path(), 12345L, null, completePartsPerCheckpoint);
        return Bucket.restore((int)0, (long)1L, (BucketWriter)new RowWiseBucketWriter((RecoverableWriter)writer, ENCODER), rollingPolicy, (BucketState)initStateWithOnlyInProgressFile, null, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
        HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>>();
        for (int checkpointId = 0; checkpointId < noOfCheckpoints; ++checkpointId) {
            ArrayList<OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable> pending = new ArrayList<OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable>();
            pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable((RecoverableWriter.CommitRecoverable)new NoOpRecoverable()));
            pendingCommittablesPerCheckpoint.put(Long.valueOf(checkpointId), pending);
        }
        return pendingCommittablesPerCheckpoint;
    }

    private static class BaseStubWriter
    extends NoOpRecoverableWriter {
        private final boolean supportsResume;
        private int supportsResumeCallCounter = 0;
        private int recoverCallCounter = 0;
        private int recoverForCommitCallCounter = 0;

        private BaseStubWriter(boolean supportsResume) {
            this.supportsResume = supportsResume;
        }

        int getSupportsResumeCallCounter() {
            return this.supportsResumeCallCounter;
        }

        int getRecoverCallCounter() {
            return this.recoverCallCounter;
        }

        int getRecoverForCommitCallCounter() {
            return this.recoverForCommitCallCounter;
        }

        @Override
        public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumable) throws IOException {
            ++this.recoverCallCounter;
            return new NoOpRecoverableFsDataOutputStream();
        }

        @Override
        public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable resumable) throws IOException {
            Preconditions.checkArgument((boolean)(resumable instanceof NoOpRecoverable));
            ++this.recoverForCommitCallCounter;
            return new NoOpCommitter();
        }

        @Override
        public boolean supportsResume() {
            ++this.supportsResumeCallCounter;
            return this.supportsResume;
        }
    }

    private static class StubNonResumableWriter
    extends BaseStubWriter {
        StubNonResumableWriter() {
            super(false);
        }
    }

    private static class StubResumableWriter
    extends BaseStubWriter {
        StubResumableWriter() {
            super(true);
        }
    }

    private static class TestRecoverableWriter
    extends LocalRecoverableWriter {
        private int cleanupCallCounter = 0;

        TestRecoverableWriter(LocalFileSystem fs) {
            super(fs);
        }

        int getCleanupCallCounter() {
            return this.cleanupCallCounter;
        }

        public boolean requiresCleanupOfRecoverableState() {
            return true;
        }

        public boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumable) throws IOException {
            ++this.cleanupCallCounter;
            return false;
        }

        public String toString() {
            return "TestRecoverableWriter has called discardRecoverableState() " + this.cleanupCallCounter + " times.";
        }
    }
}

