/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.util.Collection;
import java.util.Optional;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.stream.sql.AsyncLookupJoinITCase$;
import org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableSource$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestAddWithOpen$;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestExceptionThrown$;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestMod$;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestWrapperUdf$;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u0005\u001dg\u0001B\u0001\u0003\u0001M\u0011Q#Q:z]\u000edun\\6va*{\u0017N\\%U\u0007\u0006\u001cXM\u0003\u0002\u0004\t\u0005\u00191/\u001d7\u000b\u0005\u00151\u0011AB:ue\u0016\fWN\u0003\u0002\b\u0011\u00059!/\u001e8uS6,'BA\u0005\u000b\u0003\u001d\u0001H.\u00198oKJT!a\u0003\u0007\u0002\u000bQ\f'\r\\3\u000b\u00055q\u0011!\u00024mS:\\'BA\b\u0011\u0003\u0019\t\u0007/Y2iK*\t\u0011#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001)A\u0011Q\u0003G\u0007\u0002-)\u0011qCB\u0001\u0006kRLGn]\u0005\u00033Y\u0011!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016D\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\u0012Y\u0016<\u0017mY=UC\ndWmU8ve\u000e,\u0007CA\u000f!\u001b\u0005q\"\"A\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0005r\"a\u0002\"p_2,\u0017M\u001c\u0005\tG\u0001\u0011\t\u0011)A\u0005I\u00059!-Y2lK:$\u0007CA\u0013:\u001d\t1sG\u0004\u0002(m9\u0011\u0001&\u000e\b\u0003SQr!AK\u001a\u000f\u0005-\u0012dB\u0001\u00172\u001d\ti\u0003'D\u0001/\u0015\ty##\u0001\u0004=e>|GOP\u0005\u0002#%\u0011q\u0002E\u0005\u0003\u001b9I!a\u0003\u0007\n\u0005%Q\u0011BA\u0004\t\u0013\t9b!\u0003\u00029-\u0005Q2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK&\u0011!h\u000f\u0002\u0011'R\fG/\u001a\"bG.,g\u000eZ'pI\u0016T!\u0001\u000f\f\t\u000bu\u0002A\u0011\u0001 \u0002\rqJg.\u001b;?)\ry\u0014I\u0011\t\u0003\u0001\u0002i\u0011A\u0001\u0005\u00067q\u0002\r\u0001\b\u0005\u0006Gq\u0002\r\u0001\n\u0005\b\t\u0002\u0011\r\u0011\"\u0001F\u0003\u0011!\u0017\r^1\u0016\u0003\u0019\u00032a\u0012'O\u001b\u0005A%BA%K\u0003%IW.\\;uC\ndWM\u0003\u0002L=\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u00055C%\u0001\u0002'jgR\u0004\"a\u0014*\u000e\u0003AS!!\u0015\u0007\u0002\u000bQL\b/Z:\n\u0005M\u0003&a\u0001*po\"1Q\u000b\u0001Q\u0001\n\u0019\u000bQ\u0001Z1uC\u0002Bqa\u0016\u0001C\u0002\u0013\u0005Q)\u0001\u0005vg\u0016\u0014H)\u0019;b\u0011\u0019I\u0006\u0001)A\u0005\r\u0006IQo]3s\t\u0006$\u0018\r\t\u0005\u00067\u0002!\t\u0005X\u0001\u0007E\u00164wN]3\u0015\u0003u\u0003\"!\b0\n\u0005}s\"\u0001B+oSRD#AW1\u0011\u0005\t,W\"A2\u000b\u0005\u0011\u0004\u0012!\u00026v]&$\u0018B\u00014d\u0005\u0019\u0011UMZ8sK\")\u0001\u000e\u0001C!9\u0006)\u0011M\u001a;fe\"\u0012qM\u001b\t\u0003E.L!\u0001\\2\u0003\u000b\u00053G/\u001a:\t\u000b9\u0004A\u0011B8\u0002#\r\u0014X-\u0019;f\u0019>|7.\u001e9UC\ndW\rF\u0002^afDQ!]7A\u0002I\f\u0011\u0002^1cY\u0016t\u0015-\\3\u0011\u0005M4hBA\u000fu\u0013\t)h$\u0001\u0004Qe\u0016$WMZ\u0005\u0003ob\u0014aa\u0015;sS:<'BA;\u001f\u0011\u0015!U\u000e1\u0001{!\u0011Y\u0018\u0011\u0001(\u000f\u0005qthBA\u0017~\u0013\u0005y\u0012BA@\u001f\u0003\u001d\u0001\u0018mY6bO\u0016L1!TA\u0002\u0015\tyh\u0004C\u0004\u0002\b\u0001!I!!\u0003\u0002\u001f\r\u0014X-\u0019;f'\u000e\fg\u000eV1cY\u0016$R!XA\u0006\u0003\u001bAa!]A\u0003\u0001\u0004\u0011\bB\u0002#\u0002\u0006\u0001\u0007!\u0010\u0003\u0004\u0002\u0012\u0001!\t\u0001X\u0001+i\u0016\u001cH/Q:z]\u000eTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.Z(o\u001bVdG/[&fs\u001aKW\r\u001c3tQ\u0011\ty!!\u0006\u0011\u0007\t\f9\"C\u0002\u0002\u001a\r\u0014A\u0001V3ti\"1\u0011Q\u0004\u0001\u0005\u0002q\u000b!\u0004^3ti\u0006\u001b\u0018P\\2K_&tG+Z7q_J\fG\u000eV1cY\u0016DC!a\u0007\u0002\u0016!1\u00111\u0005\u0001\u0005\u0002q\u000ba\u0005^3ti\u0006\u001b\u0018P\\2K_&tG+Z7q_J\fG\u000eV1cY\u0016<\u0016\u000e\u001e5QkNDGi\\<oQ\u0011\t\t#!\u0006\t\r\u0005%\u0002\u0001\"\u0001]\u00031\"Xm\u001d;Bgft7MS8j]R+W\u000e]8sC2$\u0016M\u00197f/&$\bNT8o\u000bF,\u0018\r\u001c$jYR,'\u000f\u000b\u0003\u0002(\u0005U\u0001BBA\u0018\u0001\u0011\u0005A,\u0001\u0019uKN$\u0018i]=oG2+g\r\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b'pG\u0006d\u0007K]3eS\u000e\fG/\u001a\u0015\u0005\u0003[\t)\u0002\u0003\u0004\u00026\u0001!\t\u0001X\u0001(i\u0016\u001cH/Q:z]\u000eTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.Z(o\u001bVdG/\u001b$jK2$7\u000f\u000b\u0003\u00024\u0005U\u0001BBA\u001e\u0001\u0011\u0005A,\u0001\u0018uKN$\u0018i]=oG*{\u0017N\u001c+f[B|'/\u00197UC\ndWm\u00148Nk2$\u0018NR5fY\u0012\u001cx+\u001b;i+\u00124\u0007\u0006BA\u001d\u0003+Aa!!\u0011\u0001\t\u0003a\u0016a\n;fgR\f5/\u001f8d\u0015>Lg\u000eV3na>\u0014\u0018\r\u001c+bE2,w+\u001b;i+\u00124g)\u001b7uKJDC!a\u0010\u0002\u0016!1\u0011q\t\u0001\u0005\u0002q\u000bA\u0005^3ti\u0006;w-\u00118e\u0003NLhn\u0019'fMRTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.\u001a\u0015\u0005\u0003\u000b\n)\u0002\u0003\u0004\u0002N\u0001!\t\u0001X\u0001\u001fi\u0016\u001cH/Q:z]\u000edUM\u001a;K_&tG+Z7q_J\fG\u000eV1cY\u0016DC!a\u0013\u0002\u0016!1\u00111\u000b\u0001\u0005\u0002q\u000bQ\u0006^3ti\u0016C8-\u001a9uS>tG\u000b\u001b:po:4%o\\7Bgft7MS8j]R+W\u000e]8sC2$\u0016M\u00197fQ\u0011\t\t&!\u0006)\u000f\u0001\tI&!\u001a\u0002hA!\u00111LA1\u001b\t\tiFC\u0002\u0002`\r\faA];o]\u0016\u0014\u0018\u0002BA2\u0003;\u0012qAU;o/&$\b.A\u0003wC2,Xm\t\u0002\u0002jA!\u00111NA9\u001b\t\tiGC\u0002\u0002p\r\fqA];o]\u0016\u00148/\u0003\u0003\u0002t\u00055$!\u0004)be\u0006lW\r^3sSj,GmB\u0004\u0002x\tA\t!!\u001f\u0002+\u0005\u001b\u0018P\\2M_>\\W\u000f\u001d&pS:LEkQ1tKB\u0019\u0001)a\u001f\u0007\r\u0005\u0011\u0001\u0012AA?'\u0011\tY(a \u0011\u0007u\t\t)C\u0002\u0002\u0004z\u0011a!\u00118z%\u00164\u0007bB\u001f\u0002|\u0011\u0005\u0011q\u0011\u000b\u0003\u0003sB\u0001\"a#\u0002|\u0011\u0005\u0011QR\u0001\u000ba\u0006\u0014\u0018-\\3uKJ\u001cHCAAH!\u0019\t\t*a'\u0002 6\u0011\u00111\u0013\u0006\u0005\u0003+\u000b9*\u0001\u0003vi&d'BAAM\u0003\u0011Q\u0017M^1\n\t\u0005u\u00151\u0013\u0002\u000b\u0007>dG.Z2uS>t\u0007#B\u000f\u0002\"\u0006\u0015\u0016bAAR=\t)\u0011I\u001d:bsB!\u0011qUAW\u001b\t\tIK\u0003\u0003\u0002,\u0006]\u0015\u0001\u00027b]\u001eLA!a,\u0002*\n1qJ\u00196fGRD\u0003\"!#\u00024\u0006\u0005\u00171\u0019\t\u0005\u0003k\u000bYL\u0004\u0003\u0002l\u0005]\u0016\u0002BA]\u0003[\nQ\u0002U1sC6,G/\u001a:ju\u0016$\u0017\u0002BA_\u0003\u007f\u0013!\u0002U1sC6,G/\u001a:t\u0015\u0011\tI,!\u001c\u0002\t9\fW.Z\u0011\u0003\u0003\u000b\fq\u0005T3hC\u000eLH+\u00192mKN{WO]2f{m\u0004T\u0010\f\u0011Ti\u0006$XMQ1dW\u0016tG-P>2{\u0002")
public class AsyncLookupJoinITCase
extends StreamingWithStateTestBase {
    private final boolean legacyTableSource;
    private final List<Row> data;
    private final List<Row> userData;

    @Parameterized.Parameters(name="LegacyTableSource={0}, StateBackend={1}")
    public static Collection<Object[]> parameters() {
        return AsyncLookupJoinITCase$.MODULE$.parameters();
    }

    public List<Row> data() {
        return this.data;
    }

    public List<Row> userData() {
        return this.userData;
    }

    @Override
    @Before
    public void before() {
        super.before();
        this.env().getConfig().disableObjectReuse();
        this.createScanTable("src", this.data());
        this.createLookupTable("user_table", this.userData());
    }

    @Override
    @After
    public void after() {
        super.after();
        if (this.legacyTableSource) {
            Assert.assertEquals((long)0L, (long)InMemoryLookupableTableSource$.MODULE$.RESOURCE_COUNTER().get());
        } else {
            Assert.assertEquals((long)0L, (long)TestValuesTableFactory.RESOURCE_COUNTER.get());
        }
    }

    private void createLookupTable(String tableName, List<Row> data) {
        if (this.legacyTableSource) {
            TableSchema userSchema = TableSchema.builder().field("age", Types.INT()).field("id", Types.LONG()).field("name", Types.STRING()).build();
            InMemoryLookupableTableSource$.MODULE$.createTemporaryTable((TableEnvironment)this.tEnv(), true, data, userSchema, tableName, InMemoryLookupableTableSource$.MODULE$.createTemporaryTable$default$6());
        } else {
            String dataId = TestValuesTableFactory.registerData(data);
            this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           |CREATE TABLE ", " (\n           |  `age` INT,\n           |  `id` BIGINT,\n           |  `name` STRING\n           |) WITH (\n           |  'connector' = 'values',\n           |  'data-id' = '", "',\n           |  'async' = 'true'\n           |)\n           |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tableName, dataId})))).stripMargin());
        }
    }

    private void createScanTable(String tableName, List<Row> data) {
        String dataId = TestValuesTableFactory.registerData(data);
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE ", " (\n         |  `id` BIGINT,\n         |  `len` INT,\n         |  `content` STRING,\n         |  `proctime` AS PROCTIME()\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tableName, dataId})))).stripMargin());
    }

    @Test
    public void testAsyncJoinTemporalTableOnMultiKeyFields() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.id, t1.len, D.name\n        |FROM (select content, id, len, proctime FROM src AS T) t1\n        |JOIN user_table for system_time as of t1.proctime AS D\n        |ON t1.content = D.name AND t1.id = D.id\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,12,Julian", "3,15,Fabian"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncJoinTemporalTable() {
        String sql = "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncJoinTemporalTableWithPushDown() {
        String sql = "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2,15,Hello,Jark", "3,15,Fabian,Fabian"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncJoinTemporalTableWithNonEqualFilter() {
        String sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2,15,Hello,Jark,22", "3,15,Fabian,Fabian,33"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncLeftJoinTemporalTableWithLocalPredicate() {
        String sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM src AS T LEFT JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' WHERE T.id > 1";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2,15,Hello,null,null", "3,15,Fabian,Fabian,33", "8,11,Hello world,null,null", "9,12,Hello world!,null,null"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncJoinTemporalTableOnMultiFields() {
        String sql = "SELECT T.id, T.len, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,12,Julian", "3,15,Fabian"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncJoinTemporalTableOnMultiFieldsWithUdf() {
        this.tEnv().registerFunction("mod1", (ScalarFunction)UserDefinedFunctionTestUtils$TestMod$.MODULE$);
        this.tEnv().registerFunction("wrapper1", (ScalarFunction)UserDefinedFunctionTestUtils$TestWrapperUdf$.MODULE$);
        String sql = "SELECT T.id, T.len, wrapper1(D.name) as name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON mod1(T.id, 4) = D.id AND T.content = D.name";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,12,Julian", "3,15,Fabian"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncJoinTemporalTableWithUdfFilter() {
        this.tEnv().registerFunction("add", (ScalarFunction)new UserDefinedFunctionTestUtils.TestAddWithOpen());
        String sql = "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id WHERE add(T.id, D.id) > 3 AND add(T.id, 2) > 3 AND add (D.id, 2) > 3";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2,15,Hello,Jark", "3,15,Fabian,Fabian"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
        Assert.assertEquals((long)0L, (long)UserDefinedFunctionTestUtils$TestAddWithOpen$.MODULE$.aliveCounter().get());
    }

    @Test
    public void testAggAndAsyncLeftJoinTemporalTable() {
        String sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T group by len";
        Table table1 = this.tEnv().sqlQuery(sql1);
        this.tEnv().registerTable("t1", table1);
        String sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table for system_time as of t1.proctime AS D ON t1.id = D.id";
        TestingRetractSink sink = new TestingRetractSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql2)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink).setParallelism(1);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"3,Fabian,33", "8,null,null", "9,null,null"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAsyncLeftJoinTemporalTable() {
        String sql = "SELECT T.id, T.len, D.name, D.age FROM src AS T LEFT JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,12,Julian,11", "2,15,Jark,22", "3,15,Fabian,33", "8,11,null,null", "9,12,null,null"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testExceptionThrownFromAsyncJoinTemporalTable() {
        this.tEnv().registerFunction("errorFunc", (ScalarFunction)UserDefinedFunctionTestUtils$TestExceptionThrown$.MODULE$);
        String sql = "SELECT T.id, T.len, D.name, D.age FROM src AS T LEFT JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id where errorFunc(D.name) > cast(1000 as decimal(10,4))";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        try {
            this.env().execute();
        }
        catch (Throwable throwable) {
            Optional exception = ExceptionUtils.findThrowable((Throwable)throwable, NumberFormatException.class);
            Assert.assertTrue((boolean)exception.isPresent());
            Assert.assertTrue((boolean)((Throwable)exception.get()).getMessage().contains("Cannot parse"));
            return;
        }
        Assert.fail((String)"NumberFormatException is expected here!");
    }

    public AsyncLookupJoinITCase(boolean legacyTableSource, StreamingWithStateTestBase.StateBackendMode backend) {
        this.legacyTableSource = legacyTableSource;
        super(backend);
        this.data = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToInteger((int)12), "Julian"})), this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToInteger((int)15), "Hello"})), this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToInteger((int)15), "Fabian"})), this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)8L), BoxesRunTime.boxToInteger((int)11), "Hello world"})), this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)9L), BoxesRunTime.boxToInteger((int)12), "Hello world!"}))}));
        this.userData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)11), BoxesRunTime.boxToLong((long)1L), "Julian"})), this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)22), BoxesRunTime.boxToLong((long)2L), "Jark"})), this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)33), BoxesRunTime.boxToLong((long)3L), "Fabian"}))}));
    }
}

