/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder$;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001Q3A!\u0001\u0002\u0001+\tQ2\t[1oO\u0016dwnZ'pI\u0016LeNZ3sK:\u001cW\rV3ti*\u00111\u0001B\u0001\u0007gR\u0014X-Y7\u000b\u0005\u00151\u0011\u0001\u00039isNL7-\u00197\u000b\u0005\u001dA\u0011!\u0002:vY\u0016\u001c(BA\u0005\u000b\u0003\u0011\u0001H.\u00198\u000b\u0005-a\u0011a\u00029mC:tWM\u001d\u0006\u0003\u001b9\tQ\u0001^1cY\u0016T!a\u0004\t\u0002\u000b\u0019d\u0017N\\6\u000b\u0005E\u0011\u0012AB1qC\u000eDWMC\u0001\u0014\u0003\ry'oZ\u0002\u0001'\t\u0001a\u0003\u0005\u0002\u001855\t\u0001D\u0003\u0002\u001a\u0015\u0005)Q\u000f^5mg&\u00111\u0004\u0007\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bu\u0001A\u0011\u0001\u0010\u0002\rqJg.\u001b;?)\u0005y\u0002C\u0001\u0011\u0001\u001b\u0005\u0011\u0001b\u0002\u0012\u0001\u0005\u0004%IaI\u0001\u0005kRLG.F\u0001%!\t9R%\u0003\u0002'1\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1\u0001\u0006\u0001Q\u0001\n\u0011\nQ!\u001e;jY\u0002BQA\u000b\u0001\u0005\u0002-\naAY3g_J,G#\u0001\u0017\u0011\u00055\u0002T\"\u0001\u0018\u000b\u0003=\nQa]2bY\u0006L!!\r\u0018\u0003\tUs\u0017\u000e\u001e\u0015\u0003SM\u0002\"\u0001N\u001c\u000e\u0003UR!A\u000e\n\u0002\u000b),h.\u001b;\n\u0005a*$A\u0002\"fM>\u0014X\rC\u0003;\u0001\u0011\u00051&\u0001\u0006uKN$8+\u001a7fGRD#!\u000f\u001f\u0011\u0005Qj\u0014B\u0001 6\u0005\u0011!Vm\u001d;\t\u000b\u0001\u0003A\u0011A\u0016\u0002'Q,7\u000f^(oK2+g/\u001a7He>,\bOQ=)\u0005}b\u0004\"B\"\u0001\t\u0003Y\u0013!\t;fgR$vo\u001c'fm\u0016dwI]8va\nKHj\\2bY\u001ecwNY1m\u001f\u001a4\u0007F\u0001\"=\u0011\u00151\u0005\u0001\"\u0001,\u0003\u0001\"Xm\u001d;Uo>dUM^3m\u000fJ|W\u000f\u001d\"z\u0019>\u001c\u0017\r\\$m_\n\fGn\u00148)\u0005\u0015c\u0004\"B%\u0001\t\u0003Y\u0013a\t;fgR$V-\u001c9pe\u0006d'j\\5o/&$\b\u000eR3ekBd\u0017nY1uKZKWm\u001e\u0015\u0003\u0011rBQ\u0001\u0014\u0001\u0005\u0002-\nQ\u0004^3tiR+W\u000e]8sC2Tu.\u001b8XSRD7\t[1oO\u0016dwn\u001a\u0015\u0003\u0017rBQa\u0014\u0001\u0005\u0002-\nA\u0003^3ti\u001e\u0013x.\u001e9Cs^KG\u000f[+oS>t\u0007F\u0001(=\u0011\u0015\u0011\u0006\u0001\"\u0001,\u0003%\"Xm\u001d;Qe>\u0004\u0018mZ1uKV\u0003H-\u0019;f\u0017&tG-Q7p]\u001e\u0014V\r\u001c(pI\u0016\u0014En\\2lg\"\u0012\u0011\u000b\u0010")
public class ChangelogModeInferenceTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Before
    public void before() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE MyTable (\n        | word STRING,\n        | number INT\n        |) WITH (\n        | 'connector' = 'COLLECTION',\n        | 'is-bounded' = 'false'\n        |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE Orders (\n        | amount INT,\n        | currency STRING,\n        | rowtime TIMESTAMP(3),\n        | proctime AS PROCTIME(),\n        | WATERMARK FOR rowtime AS rowtime\n        |) WITH (\n        | 'connector' = 'COLLECTION',\n        | 'is-bounded' = 'false'\n        |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE ratesHistory (\n        | currency STRING,\n        | rate INT,\n        | rowtime TIMESTAMP(3),\n        | WATERMARK FOR rowtime AS rowtime\n        |) WITH (\n        |  'connector' = 'COLLECTION',\n        |  'is-bounded' = 'false',\n        |  'changelog-mode' = 'I'\n        |)\n      ")).stripMargin());
        this.util().addTable(" CREATE VIEW DeduplicatedView AS SELECT currency, rate, rowtime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime DESC) AS rowNum    FROM ratesHistory  ) T   WHERE rowNum = 1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE ratesChangelogStream (\n        | currency STRING,\n        | rate INT,\n        | rowtime TIMESTAMP(3),\n        | WATERMARK FOR rowtime as rowtime,\n        | PRIMARY KEY(currency) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,UB,D'\n        |)\n      ")).stripMargin());
    }

    @Test
    public void testSelect() {
        this.util().verifyRelPlan("SELECT word, number FROM MyTable", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testOneLevelGroupBy() {
        this.util().verifyRelPlan("SELECT COUNT(number) FROM MyTable GROUP BY word", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTwoLevelGroupByLocalGlobalOff() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |  SELECT word, COUNT(number) as cnt FROM MyTable GROUP BY word\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTwoLevelGroupByLocalGlobalOn() {
        this.util().enableMiniBatch();
        this.util().tableEnv().getConfig().setIdleStateRetentionTime(Time.hours((long)1L), Time.hours((long)2L));
        this.util().tableEnv().getConfig().getConfiguration().setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE.toString());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |  SELECT word, COUNT(number) as cnt FROM MyTable GROUP BY word\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithDeduplicateView() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN DeduplicatedView FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithChangelog() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testGroupByWithUnion() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE MyTable2 (\n        | word STRING,\n        | cnt INT\n        |) WITH (\n        | 'connector' = 'COLLECTION',\n        | 'is-bounded' = 'false'\n        |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |   SELECT word, COUNT(number) AS cnt FROM MyTable GROUP BY word\n        |   UNION ALL\n        |   SELECT word, cnt FROM MyTable2\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPropagateUpdateKindAmongRelNodeBlocks() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), true);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |create table sink1 (\n        |  a INT,\n        |  b VARCHAR\n        |) with (\n        |  'connector' = 'values',\n        |  'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |create table sink2 (\n        |  a INT,\n        |  b VARCHAR,\n        |  primary key (b) not enforced\n        |) with (\n        |  'connector' = 'values',\n        |  'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW v1 AS\n        |SELECT\n        |  SUM(number) AS number, word\n        |FROM MyTable\n        |GROUP BY word\n        |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW v2 AS\n        |SELECT number + 1 AS number, word FROM v1\n        |UNION ALL\n        |SELECT number - 1 AS number, word FROM v1\n        |")).stripMargin());
        StatementSet statementSet = this.util().tableEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word > 'a'\n        |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word < 'a'\n        |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink2 SELECT * FROM v1\n        |")).stripMargin());
        this.util().verifyRelPlan(statementSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }
}

