package org.apache.flink.table.planner.plan.optimize.program;

import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.planner.factories.TestValuesCatalog;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.class */
public class FlinkRuntimeFilterProgramTest extends TableTestBase {
    public static final long SUITABLE_DIM_ROW_COUNT = 131072;
    public static final long SUITABLE_FACT_ROW_COUNT = 1073741824;
    private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
    private final TestValuesCatalog catalog = new TestValuesCatalog("testCatalog", "test_database", true);

    @BeforeEach
    void setup() {
        this.catalog.open();
        this.util.tableEnv().registerCatalog("testCatalog", this.catalog);
        this.util.tableEnv().useCatalog("testCatalog");
        TableConfig config = this.util.tableEnv().getConfig();
        config.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, true);
        config.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, MemorySize.parse("10m"));
        this.util.tableEnv().executeSql("create table dim (\n  id BIGINT,\n  male BOOLEAN,\n  amount BIGINT,\n  price BIGINT,\n  dim_date_sk BIGINT\n)  with (\n 'connector' = 'values',\n 'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.util.tableEnv().executeSql("create table fact (\n  id BIGINT,\n  name STRING,\n  amount BIGINT,\n  price BIGINT,\n  fact_date_sk BIGINT\n) with (\n  'connector' = 'values',\n  'runtime-source' = 'NewSource',\n  'bounded' = 'true'\n)");
    }

    @Test
    void testSimpleInnerJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact, dim where fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testSemiJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
        this.util.verifyPlan("select * from fact where fact.fact_date_sk in (select dim_date_sk from dim where dim.price < 500)");
    }

    @Test
    void testLeftOuterJoinWithLeftBuild() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from dim left outer join fact on fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testLeftOuterJoinWithRightBuild() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact left outer join dim on fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testFullOuterJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact full outer join (select * from dim where dim.price < 500) on fact_date_sk = dim_date_sk");
    }

    @Test
    void testAntiJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact where fact.fact_date_sk not in (select dim_date_sk from dim where dim.price < 500)");
    }

    @Test
    void testNestedLoopJoin() throws Exception {
        setupTableRowCount("dim", 1L);
        setupTableRowCount("fact", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from fact, dim where fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testProbeSideIsTooSmall() throws Exception {
        setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
        setupTableRowCount("fact", 134217728L);
        this.util.verifyPlan("select * from fact, dim where fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testBuildSideIsTooLarge() throws Exception {
        setupTableRowCount("dim", 1048576L);
        setupTableRowCount("fact", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from fact, dim where fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testFilterRatioIsTooSmall() throws Exception {
        setupSuitableTableStatistics();
        setupTableColumnNdv("dim", "amount", 768L);
        setupTableColumnNdv("fact", "amount", 1024L);
        this.util.verifyPlan("select * from fact, dim where fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testBuildSideIsJoinWithoutExchange() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("fact2", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from dim, fact, fact2 where fact.amount = fact2.amount and fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testBuildSideIsJoinWithTwoAggInputs() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("fact2", SUITABLE_FACT_ROW_COUNT);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
        this.util.verifyPlan("select * from fact join (select * from (select dim_date_sk, sum(dim.price) from dim group by dim_date_sk) agg1 join (select dim_date_sk, sum(dim.amount) from dim group by dim_date_sk) agg2 on agg1.dim_date_sk = agg2.dim_date_sk) as dimSide on fact.fact_date_sk = dimSide.dim_date_sk");
    }

    @Test
    void testBuildSideIsLeftJoinWithoutExchange() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT,\n  fact_date_sk BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("fact2", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from fact2 join (select * from fact left join dim on dim.amount = fact.amount and dim.price < 500) as dimSide on fact2.amount = dimSide.amount");
    }

    @Test
    void testBuildSideIsAggWithoutExchange() throws Exception {
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact join (select dim_date_sk, sum(dim.price) from dim where  dim.price < 500 group by dim_date_sk) dimSide on fact.fact_date_sk = dimSide.dim_date_sk");
    }

    @Test
    void testBuildSideIsCalcWithoutExchange() throws Exception {
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact join (select dim_date_sk, sum(dim.price) + 1 as sum_price from dim where  dim.price < 500 group by dim_date_sk) dimSide on fact.fact_date_sk = dimSide.dim_date_sk");
    }

    @Test
    void testCannotInjectMoreThanOneRuntimeFilterInSamePlace() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table dim2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("dim2", SUITABLE_DIM_ROW_COUNT);
        this.util.verifyPlan("select * from fact, dim, dim2 where fact.amount = dim.amount and fact.amount = dim2.amount and dim.price < 500");
    }

    @Test
    void testPushDownProbeSideWithCalc() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from dim, fact where dim.amount = fact.amount and dim.price < 500 and fact.price > 600");
    }

    @Test
    void testCannotPushDownProbeSideWithCalc() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from dim inner join (select fact_date_sk, RAND(10) as random from fact) as factSide on dim.amount = factSide.random and dim.price < 500");
    }

    @Test
    void testPushDownProbeSideToAllInputsOfJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("fact2", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from fact, fact2, dim where fact.amount = fact2.amount and fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testPushDownProbeSideToOneInputOfJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("fact2", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from fact, fact2, dim where fact.price = fact2.price and fact.amount = dim.amount and dim.price < 500");
    }

    @Test
    void testCannotPushDownProbeSideWithJoin() throws Exception {
        setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id2 BIGINT,\n  amount2 BIGINT,\n  price2 BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        setupTableRowCount("fact2", SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from (select * from fact inner join fact2 on fact.id = fact2.id2) as factSide inner join dim on factSide.amount = dim.amount and factSide.price2 = dim.price and dim.price < 500");
    }

    @Test
    void testPushDownProbeSideWithAgg() throws Exception {
        setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
        setupTableRowCount("fact", 1099511627776L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
        this.util.verifyPlan("select * from dim join (select id, fact_date_sk, sum(fact.price) from fact group by (id, fact_date_sk)) factSide on dim.dim_date_sk = factSide.fact_date_sk and dim.price < 500");
    }

    @Test
    void testCannotPushDownProbeSideWithAgg() throws Exception {
        setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
        setupTableRowCount("fact", 1099511627776L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
        this.util.verifyPlan("select * from dim join (select id, fact_date_sk, sum(fact.price) as sum_price from fact group by (id, fact_date_sk)) factSide on dim.price = factSide.sum_price and dim.price < 500");
    }

    @Test
    void testPushDownProbeSideWithUnion() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("Select * from (select id, fact_date_sk, amount as amount1 from fact where price < 500 union all select id, fact_date_sk, amount from fact where price > 600) fact2, dim where fact2.fact_date_sk = dim.dim_date_sk and dim.price < 500");
    }

    @Test
    void testDoesNotApplyRuntimeFilterAndDPPOnSameKey() throws Exception {
        setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
        createPartitionedFactTable(SUITABLE_FACT_ROW_COUNT);
        this.util.verifyPlan("select * from dim, fact_part where fact_part.fact_date_sk = dim.dim_date_sk and dim.price < 500");
    }

    @Test
    void testProbeSideIsTableSourceWithoutExchange() throws Exception {
        setupSuitableTableStatistics();
        this.util.verifyPlan("select * from fact, dim where fact.amount = dim.amount and dim.price = 500");
    }

    private void createPartitionedFactTable(long j) throws Exception {
        this.util.tableEnv().executeSql("CREATE TABLE fact_part (\n  id BIGINT,\n  name STRING,\n  amount BIGINT,\n  price BIGINT,\n  fact_date_sk BIGINT\n) PARTITIONED BY (fact_date_sk)\nWITH (\n 'connector' = 'values',\n 'runtime-source' = 'NewSource',\n 'partition-list' = 'fact_date_sk:1990;fact_date_sk:1991;fact_date_sk:1992',\n 'dynamic-filtering-fields' = 'fact_date_sk;amount',\n 'bounded' = 'true'\n)");
        CatalogPartitionSpec catalogPartitionSpec = new CatalogPartitionSpec(Collections.singletonMap("fact_date_sk", "666"));
        this.catalog.createPartition(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), "fact_part"), catalogPartitionSpec, new CatalogPartitionImpl(new HashMap(), ""), true);
        this.catalog.alterPartitionStatistics(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), "fact_part"), catalogPartitionSpec, new CatalogTableStatistics(j, 10, 1000L, 2000L), true);
    }

    private void setupSuitableTableStatistics() throws Exception {
        setupTableRowCount("dim", SUITABLE_DIM_ROW_COUNT);
        setupTableRowCount("fact", SUITABLE_FACT_ROW_COUNT);
    }

    private void setupTableRowCount(String str, long j) throws Exception {
        this.catalog.alterTableStatistics(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), str), new CatalogTableStatistics(j, 1, 1L, 1L), false);
    }

    private void setupTableColumnNdv(String str, String str2, long j) throws Exception {
        this.catalog.alterTableColumnStatistics(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), str), new CatalogColumnStatistics(Collections.singletonMap(str2, new CatalogColumnStatisticsDataLong(-123L, 763322L, Long.valueOf(j), 79L))), false);
    }
}
