package org.apache.flink.table.planner.runtime.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.class */
public abstract class AtomicRtasITCaseBase {

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension();
    protected TableEnvironment tEnv;

    protected abstract TableEnvironment getTableEnvironment();

    @BeforeEach
    void setup() {
        this.tEnv = getTableEnvironment();
        List singletonList = Collections.singletonList(Row.of(new Object[]{1, "ZM"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(singletonList);
        this.tEnv.executeSql("create table t1(a int, b varchar) with ('connector' = 'COLLECTION')");
    }

    @AfterEach
    void clean() {
        TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
        TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
    }

    @Test
    void testAtomicReplaceTableAs(@TempDir Path path) throws Exception {
        commonTestForAtomicReplaceTableAs("atomic_replace_table", false, true, path.toFile());
    }

    @Test
    void testAtomicReplaceTableAsWithReplacedTableNotExists(@TempDir Path path) throws Exception {
        commonTestForAtomicReplaceTableAs("atomic_replace_table_not_exists", false, false, path.toFile());
    }

    @Test
    void testAtomicCreateOrReplaceTableAs(@TempDir Path path) throws Exception {
        commonTestForAtomicReplaceTableAs("atomic_create_or_replace_table", true, true, path.toFile());
    }

    @Test
    void testAtomicCreateOrReplaceTableAsWithReplacedTableNotExists(@TempDir Path path) throws Exception {
        commonTestForAtomicReplaceTableAs("atomic_create_or_replace_table_not_exists", true, false, path.toFile());
    }

    private void commonTestForAtomicReplaceTableAs(String str, boolean z, boolean z2, File file) throws Exception {
        if (z2) {
            this.tEnv.executeSql("create table " + str + " (a int) with ('connector' = 'PRINT')");
        }
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, true);
        String absolutePath = file.getAbsolutePath();
        String str2 = getCreateOrReplaceSqlFragment(z, str) + " with ('connector' = 'test-staging', 'data-dir' = '" + absolutePath + "') as select * from t1";
        if (!z && !z2) {
            Assertions.assertThatThrownBy(() -> {
                this.tEnv.executeSql(str2);
            }).isInstanceOf(TableException.class).hasMessage("The table `default_catalog`.`default_database`.`" + str + "` to be replaced doesn't exist. You can try to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement.");
            return;
        }
        this.tEnv.executeSql(str2).await();
        if (z2) {
            Assertions.assertThat(this.tEnv.listTables()).contains(new String[]{str});
        } else {
            Assertions.assertThat(this.tEnv.listTables()).doesNotContain(new String[]{str});
        }
        verifyDataFile(absolutePath, "data");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).contains(new String[]{"begin", "commit"});
        Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
        if (z) {
            Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).contains(new SupportsStaging.StagingPurpose[]{SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS});
        } else {
            Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).contains(new SupportsStaging.StagingPurpose[]{SupportsStaging.StagingPurpose.REPLACE_TABLE_AS});
        }
    }

    @Test
    void testAtomicReplaceTableAsWithException(@TempDir Path path) {
        commonTestForAtomicReplaceTableAsWithException("atomic_replace_table_fail", false, path.toFile());
    }

    @Test
    void testAtomicCreateOrReplaceTableAsWithException(@TempDir Path path) {
        commonTestForAtomicReplaceTableAsWithException("atomic_create_or_replace_table_fail", true, path.toFile());
    }

    private void commonTestForAtomicReplaceTableAsWithException(String str, boolean z, File file) {
        this.tEnv.executeSql("create table " + str + " (a int) with ('connector' = 'PRINT')");
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, true);
        String absolutePath = file.getAbsolutePath();
        String createOrReplaceSqlFragment = getCreateOrReplaceSqlFragment(z, str);
        Assertions.assertThatCode(() -> {
            this.tEnv.executeSql(createOrReplaceSqlFragment + " with ('connector' = 'test-staging', 'data-dir' = '" + absolutePath + "', 'sink-fail' = 'true') as select * from t1").await();
        }).hasRootCauseMessage("Test StagedTable abort method.");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).contains(new String[]{"begin", "abort"});
    }

    @Test
    void testWithoutAtomicReplaceTableAs(@TempDir Path path) throws Exception {
        commonTestForWithoutAtomicReplaceTableAs("non_atomic_replace_table", false, path.toFile());
    }

    @Test
    void testWithoutAtomicCreateOrReplaceTableAs(@TempDir Path path) throws Exception {
        commonTestForWithoutAtomicReplaceTableAs("non_atomic_create_or_replace_table", true, path.toFile());
    }

    private void commonTestForWithoutAtomicReplaceTableAs(String str, boolean z, File file) throws Exception {
        this.tEnv.executeSql("create table " + str + " (a int) with ('connector' = 'PRINT')");
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, false);
        String absolutePath = file.getAbsolutePath();
        this.tEnv.executeSql(getCreateOrReplaceSqlFragment(z, str) + " with ('connector' = 'test-staging', 'data-dir' = '" + absolutePath + "') as select * from t1").await();
        Assertions.assertThat(this.tEnv.listTables()).contains(new String[]{str});
        verifyDataFile(absolutePath, "_data");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(0);
        Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(0);
    }

    private void verifyDataFile(String str, String str2) throws IOException {
        File file = new File(str, str2);
        Assertions.assertThat(file).exists();
        Assertions.assertThat(file).isFile();
        Assertions.assertThat(FileUtils.readFileUtf8(file)).isEqualTo("1,ZM");
    }

    private String getCreateOrReplaceSqlFragment(boolean z, String str) {
        return z ? " create or replace table " + str : " replace table " + str;
    }
}
