package org.apache.flink.table.planner.plan.stream.sql;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Collection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdaf;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdtf;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Array$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;

/* compiled from: NonDeterministicDagTest.scala */
@ExtendWith({ParameterizedTestExtension.class})
@ScalaSignature(bytes = "\u0006\u0001\u0011}b\u0001B\u0001\u0003\u0001M\u0011qCT8o\t\u0016$XM]7j]&\u001cH/[2EC\u001e$Vm\u001d;\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011\u0001\u00029mC:T!!\u0003\u0006\u0002\u000fAd\u0017M\u001c8fe*\u00111\u0002D\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u001b9\tQA\u001a7j].T!a\u0004\t\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0012aA8sO\u000e\u00011C\u0001\u0001\u0015!\t)\u0002$D\u0001\u0017\u0015\t9\u0002\"A\u0003vi&d7/\u0003\u0002\u001a-\tiA+\u00192mKR+7\u000f\u001e\"bg\u0016D\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\u001f]>tG)\u001a;fe6Lg.[:uS\u000e,\u0006\u000fZ1uKN#(/\u0019;fOf\u0004\"!H\u001a\u000f\u0005y\u0001dBA\u0010.\u001d\t\u00013F\u0004\u0002\"U9\u0011!%\u000b\b\u0003G!r!\u0001J\u0014\u000e\u0003\u0015R!A\n\n\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012BA\b\u0011\u0013\tia\"\u0003\u0002\f\u0019%\u0011AFC\u0001\u0004CBL\u0017B\u0001\u00180\u0003\u0019\u0019wN\u001c4jO*\u0011AFC\u0005\u0003cI\nac\u00149uS6L'0\u001a:D_:4\u0017nZ(qi&|gn\u001d\u0006\u0003]=J!\u0001N\u001b\u0003=9{g\u000eR3uKJl\u0017N\\5ti&\u001cW\u000b\u001d3bi\u0016\u001cFO]1uK\u001eL(BA\u00193\u0011\u00159\u0004\u0001\"\u00019\u0003\u0019a\u0014N\\5u}Q\u0011\u0011h\u000f\t\u0003u\u0001i\u0011A\u0001\u0005\u00067Y\u0002\r\u0001\b\u0005\b{\u0001\u0011\r\u0011\"\u0003?\u0003\u0011)H/\u001b7\u0016\u0003}\u0002\"!\u0006!\n\u0005\u00053\"aE*ue\u0016\fW\u000eV1cY\u0016$Vm\u001d;Vi&d\u0007BB\"\u0001A\u0003%q(A\u0003vi&d\u0007\u0005C\u0004F\u0001\t\u0007I\u0011\u0002$\u0002\u0015Q\u0014\u0018PU3t_24X-F\u0001H!\tA5*D\u0001J\u0015\u0005Q\u0015!B:dC2\f\u0017B\u0001'J\u0005\u001d\u0011un\u001c7fC:DaA\u0014\u0001!\u0002\u00139\u0015a\u0003;ssJ+7o\u001c7wK\u0002BQ\u0001\u0015\u0001\u0005\u0002E\u000baAY3g_J,G#\u0001*\u0011\u0005!\u001b\u0016B\u0001+J\u0005\u0011)f.\u001b;)\u0005=3\u0006CA,^\u001b\u0005A&B\u0001\u0017Z\u0015\tQ6,A\u0004kkBLG/\u001a:\u000b\u0005q\u0003\u0012!\u00026v]&$\u0018B\u00010Y\u0005)\u0011UMZ8sK\u0016\u000b7\r\u001b\u0005\u0006A\u0002!\t!U\u0001\u001ai\u0016\u001cHo\u00113d/&$\b.T3uCNKgn[,ji\"\u00046\u000e\u000b\u0002`EB\u0011qkY\u0005\u0003Ib\u0013A\u0002V3tiR+W\u000e\u001d7bi\u0016DQA\u001a\u0001\u0005\u0002E\u000bq\u0006^3ti:{g\u000eR3uKJl\u0017N\\5ti&\u001c\u0007K]8kK\u000e$\u0018n\u001c8XSRD7+\u001b8l/&$\bn\\;u!.D#!\u001a2\t\u000b%\u0004A\u0011A)\u00029Q,7\u000f^\"eG^KG\u000f['fi\u0006\u001c\u0016N\\6XSRDw.\u001e;QW\"\u0012\u0001N\u0019\u0005\u0006Y\u0002!\t!U\u0001 i\u0016\u001cHo\u00113d/&$\b.T3uC2+w-Y2z'&t7nV5uQB[\u0007FA6c\u0011\u0015y\u0007\u0001\"\u0001R\u0003\t\"Xm\u001d;DI\u000e<\u0016\u000e\u001e5NKR\fG*Z4bGf\u001c\u0016N\\6XSRDw.\u001e;QW\"\u0012aN\u0019\u0005\u0006e\u0002!\t!U\u0001#i\u0016\u001cHo\u00113d/&$\b.T3uCNKgn[,ji\"\u001cu.\u001c9pg&$X\rU6)\u0005E\u0014\u0007\"B;\u0001\t\u0003\t\u0016\u0001\u000b;fgR\u001cEmY,ji\"lU\r^1SK:\fW.Z*j].<\u0016\u000e\u001e5D_6\u0004xn]5uKB[\u0007F\u0001;c\u0011\u0015A\b\u0001\"\u0001R\u0003\u0019\"Xm\u001d;T_V\u00148-Z,ji\"\u001cu.\u001c9vi\u0016$7i\u001c7v[:\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0003o\nDQa\u001f\u0001\u0005\u0002E\u000bQ\u0005^3tiN{WO]2f/&$\bnQ8naV$X\rZ\"pYVlg.T;mi&\u001c\u0016N\\6)\u0005i\u0014\u0007\"\u0002@\u0001\t\u0003\t\u0016A\f;fgR\u001cEmY\"peJ,G.\u0019;f\u001d>tG)\u001a;fe6Lg.[:uS\u000e4UO\\2TS:\\w+\u001b;i!.C#! 2\t\r\u0005\r\u0001\u0001\"\u0001R\u0003A\"Xm\u001d;DI\u000e\u001cuN\u001d:fY\u0006$XMT8o\t\u0016$XM]7j]&\u001cH/[2Gk:\u001cgj\u001c'fMR|U\u000f\u001e9vi\"\u001a\u0011\u0011\u00012\t\r\u0005%\u0001\u0001\"\u0001R\u0003E\"Xm\u001d;DI\u000e\u001cuN\u001d:fY\u0006$XMT8o\t\u0016$XM]7j]&\u001cH/[2Gk:\u001cgj\u001c*jO\"$x*\u001e;qkRD3!a\u0002c\u0011\u0019\ty\u0001\u0001C\u0001#\u0006YC/Z:u\u0007\u0012\u001c7i\u001c:sK2\fG/Z(o\u001d>tG)\u001a;fe6Lg.[:uS\u000e\u001cuN\u001c3ji&|g\u000eK\u0002\u0002\u000e\tDa!!\u0006\u0001\t\u0003\t\u0016A\t;fgR\u001cEmY,ji\"lU\r^1D_J\u0014X\r\\1uKNKgn[,ji\"\u00046\u000eK\u0002\u0002\u0014\tDa!a\u0007\u0001\t\u0003\t\u0016!\u000b;fgR\u001cEmY,ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7MR;oGNKgn[,ji\"\u00046\u000eK\u0002\u0002\u001a\tDa!!\t\u0001\t\u0003\t\u0016\u0001\f;fgR\u001cEmY,ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7MR;oGNKgn[,ji\"|W\u000f\u001e)lQ\r\tyB\u0019\u0005\u0007\u0003O\u0001A\u0011A)\u0002CQ,7\u000f^\"eG^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d\r&dG/\u001a:)\u0007\u0005\u0015\"\r\u0003\u0004\u0002.\u0001!\t!U\u0001\u001fi\u0016\u001cHo\u00113d\u0015>Lg\u000eR5n/&$\b\u000eU6TS:\\w+\u001b;i!.D3!a\u000bc\u0011\u0019\t\u0019\u0004\u0001C\u0001#\u0006\tC/Z:u\u0007\u0012\u001c'j\\5o\t&lw+\u001b;i_V$\bk[*j].<\u0016\u000e\u001e5QW\"\u001a\u0011\u0011\u00072\t\r\u0005e\u0002\u0001\"\u0001R\u0003\t\"Xm\u001d;DI\u000edUM\u001a;K_&tG)[7XSRD\u0007k[*j].<\u0016\u000e\u001e5QW\"\u001a\u0011q\u00072\t\r\u0005}\u0002\u0001\"\u0001R\u0003\u0005\"Xm\u001d;DI\u000eTu.\u001b8ES6<\u0016\u000e\u001e5QWNKgn[,ji\"|W\u000f\u001e)lQ\r\tiD\u0019\u0005\u0007\u0003\u000b\u0002A\u0011A)\u0002IQ,7\u000f^\"eG*{\u0017N\u001c#j[^KG\u000f[8viB[7+\u001b8l/&$\bn\\;u!.D3!a\u0011c\u0011\u0019\tY\u0005\u0001C\u0001#\u0006)C/Z:u\u0007\u0012\u001c'j\\5o\t&lw+\u001b;i!.|e\u000e\\=TS:\\w+\u001b;i_V$\bk\u001b\u0015\u0004\u0003\u0013\u0012\u0007BBA)\u0001\u0011\u0005\u0011+\u0001\u0015uKN$8\tZ2MK\u001a$(j\\5o\t&lw+\u001b;i_V$\bk[*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002P\tDa!a\u0016\u0001\t\u0003\t\u0016a\u000b;fgR\u001cEm\u0019&pS:$\u0015.\\,ji\"\u00046nT;uaV$hj\u001c)l'&t7nV5uQ>,H\u000fU6)\u0007\u0005U#\r\u0003\u0004\u0002^\u0001!\t!U\u00016i\u0016\u001cHo\u00113d\u0015>Lg\u000eR5n/&$\b\u000eU6O_:$U\r^3s[&t\u0017n\u001d;jG\u001a+hnY*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002\\\tDa!a\u0019\u0001\t\u0003\t\u0016A\r;fgR\u001cEm\u0019&pS:$\u0015.\\,ji\"\u00046NT8o\t\u0016$XM]7j]&\u001cH/[2M_\u000e\fGnQ8oI&$\u0018n\u001c8)\u0007\u0005\u0005$\r\u0003\u0004\u0002j\u0001!\t!U\u00014i\u0016\u001cHo\u00113d\u0015>Lg\u000eR5n/&$\b\u000eU6O_:$U\r^3s[&t\u0017n\u001d;jG2{7-\u00197D_:$\u0017\u000e^5p]JB3!a\u001ac\u0011\u0019\ty\u0007\u0001C\u0001#\u0006\u0001D/Z:u\u0007\u0012\u001c'j\\5o\t&lgj\u001c8EKR,'/\\5oSN$\u0018n\u0019*f[\u0006Lg.\u001b8h\u0007>tG-\u001b;j_:D3!!\u001cc\u0011\u0019\t)\b\u0001C\u0001#\u0006yC/Z:u\u0007\u0012\u001cG*\u001a4u\u0015>Lg\u000eR5n/&$\bNT8o\t\u0016$XM]7j]&\u001cH/[2Qe\u00164\u0015\u000e\u001c;fe\"\u001a\u00111\u000f2\t\r\u0005m\u0004\u0001\"\u0001R\u00031\"Xm\u001d;He>,\bOQ=O_:$U\r^3s[&t\u0017n\u001d;jG\u001a+hnY,ji\"\u001cEmY*pkJ\u001cW\rK\u0002\u0002z\tDa!!!\u0001\t\u0003\t\u0016a\u000b;fgR<%o\\;q\u0005ftuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-\u00163g/&$\bn\u00113d'>,(oY3)\u0007\u0005}$\r\u0003\u0004\u0002\b\u0002!\t!U\u0001.i\u0016\u001cHOT3ti\u0016$\u0017iZ4XSRDgj\u001c8EKR,'/\\5oSN$\u0018nY$s_V\u0004\u0018N\\4LKf\u001c\bfAACE\"1\u0011Q\u0012\u0001\u0005\u0002E\u000b!\u0006^3ti\u001e\u0013x.\u001e9BO\u001etuN\u001c#fi\u0016\u0014X.\u001b8jgRL7MR;oG>s7k\\;sG\u0016\u00046\u000eK\u0002\u0002\f\nDa!a%\u0001\t\u0003\t\u0016!\n;fgR\fumZ,ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7MR5mi\u0016\u0014\u0018I]4tQ\r\t\tJ\u0019\u0005\u0007\u00033\u0003A\u0011A)\u0002aQ,7\u000f^!hO^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d\r&dG/\u001a:Be\u001e\u001cxJ\\\"eGN{WO]2fQ\r\t9J\u0019\u0005\u0007\u0003?\u0003A\u0011A)\u0002{Q,7\u000f^!hO^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d\r&dG/\u001a:Be\u001e\u001cxJ\\\"eGN{WO]2f'&t7nV5uQ>,H\u000fU6)\u0007\u0005u%\r\u0003\u0004\u0002&\u0002!\t!U\u00010i\u0016\u001cHOT8o\t\u0016$XM]7j]&\u001cH/[2BO\u001e|e.\u00119qK:$7k\\;sG\u0016\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0003G\u0013\u0007BBAV\u0001\u0011\u0005\u0011+\u0001\u001auKN$hj\u001c8EKR,'/\\5oSN$\u0018nY!hO>s\u0017\t\u001d9f]\u0012\u001cv.\u001e:dKNKgn[,ji\"|W\u000f\u001e)lQ\r\tIK\u0019\u0005\u0007\u0003c\u0003A\u0011A)\u0002kQ,7\u000f^$m_\n\fGNT8o\t\u0016$XM]7j]&\u001cH/[2BO\u001e|e.\u00119qK:$7k\\;sG\u0016\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0003_\u0013\u0007BBA\\\u0001\u0011\u0005\u0011+\u0001\u001duKN$x\t\\8cC2tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-Q4h\u001f:\f\u0005\u000f]3oIN{WO]2f'&t7nV5uQ>,H\u000fU6)\u0007\u0005U&\r\u0003\u0004\u0002>\u0002!\t!U\u0001\u001bi\u0016\u001cH/\u00169tKJ$8k\\;sG\u0016\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0003w\u0013\u0007BBAb\u0001\u0011\u0005\u0011+A\u000fuKN$X\u000b]:feR\u001cv.\u001e:dKNKgn[,ji\"|W\u000f\u001e)lQ\r\t\tM\u0019\u0005\u0007\u0003\u0013\u0004A\u0011A)\u0002_Q,7\u000f^'vYRLwJ^3s/&$\bNT8o\t\u0016$XM]7j]&\u001cH/[2VI\u000647+\u001b8l/&$\b\u000eU6)\u0007\u0005\u001d'\r\u0003\u0004\u0002P\u0002!\t!U\u0001.i\u0016\u001cHo\u0014<fe^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d+\u0012\fgmU5oW^KG\u000f[8viB[\u0007fAAgE\"1\u0011Q\u001b\u0001\u0005\u0002E\u000bA\u0007^3ti6+H\u000e^5Pm\u0016\u0014x+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e\fum\u001a$jYR,'oU5oW^KG\u000f\u001b)lQ\r\t\u0019N\u0019\u0005\u0007\u00037\u0004A\u0011A)\u0002wQ,7\u000f^!qa\u0016tGMU1oW>sW*\u001e7uS>3XM],ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-\u00163bMNKgn[,ji\"\u00046\u000eK\u0002\u0002Z\nDa!!9\u0001\t\u0003\t\u0016A\u0010;fgR\f\u0005\u000f]3oIJ\u000bgn[(o\u001bVdG/[(wKJ<\u0016\u000e\u001e5O_:$U\r^3s[&t\u0017n\u001d;jGV#\u0017MZ*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002`\nDa!a:\u0001\t\u0003\t\u0016a\n;fgR,\u0006\u000fZ1uKJ\u000bgn[(viB,HOU8x\u001dVl'-\u001a:TS:\\w+\u001b;i!.D3!!:c\u0011\u0019\ti\u000f\u0001C\u0001#\u0006AC/Z:u%\u0016$(/Y2u%\u0006t7nT;uaV$(k\\<Ok6\u0014WM]*j].<\u0016\u000e\u001e5QW\"\u001a\u00111\u001e2\t\r\u0005M\b\u0001\"\u0001R\u0003q!Xm\u001d;V]&|gnU5oW^KG\u000f[\"p[B|7/\u001b;f!.D3!!=c\u0011\u0019\tI\u0010\u0001C\u0001#\u0006yB/Z:u+:LwN\\!mYNKgn[,ji\"\u001cu.\u001c9pg&$X\rU6)\u0007\u0005](\r\u0003\u0004\u0002��\u0002!\t!U\u0001\u001ai\u0016\u001cH/\u00168j_:\fE\u000e\\*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002~\nDaA!\u0002\u0001\t\u0003\t\u0016\u0001\u000b;fgR\u001cEm\u0019&pS:<\u0016\u000e\u001e5O_:$U\r^3s[&t\u0017n\u001d;jG\u000e{g\u000eZ5uS>t\u0007f\u0001B\u0002E\"1!1\u0002\u0001\u0005\u0002E\u000bQ\u0005^3tiB\u0013xn\u0019;j[\u0016Le\u000e^3sm\u0006d'j\\5o'&t7nV5uQ>,H\u000fU6)\u0007\t%!\r\u0003\u0004\u0003\u0012\u0001!\t!U\u0001-i\u0016\u001cHo\u00113d!J|7\r^5nK&sG/\u001a:wC2Tu.\u001b8P]B[7+\u001b8l/&$\bn\\;u!.D3Aa\u0004c\u0011\u0019\u00119\u0002\u0001C\u0001#\u0006yC/Z:u\u0007\u0012\u001c\u0007K]8di&lW-\u00138uKJ4\u0018\r\u001c&pS:|eNT8o!.\u001c\u0016N\\6XSRDw.\u001e;QW\"\u001a!Q\u00032\t\r\tu\u0001\u0001\"\u0001R\u0003\u001d\"Xm\u001d;DI\u000e\u0014vn\u001e;j[\u0016Le\u000e^3sm\u0006d'j\\5o'&t7nV5uQ>,H\u000fU6)\u0007\tm!\r\u0003\u0004\u0003$\u0001!\t!U\u0001%i\u0016\u001cHo\u00113d%><H/[7f\u0013:$XM\u001d<bY*{\u0017N\\*j].<\u0016\u000e\u001e5QW\"\u001a!\u0011\u00052\t\r\t%\u0002\u0001\"\u0001R\u0003U!Xm\u001d;K_&t7*Z=D_:$\u0018-\u001b8t+.D3Aa\nc\u0011\u0019\u0011y\u0003\u0001C\u0001#\u00061B/Z:u\u0015>Lg\u000eS1t\u0005>$\bnU5eKN,6\u000eK\u0002\u0003.\tDaA!\u000e\u0001\t\u0003\t\u0016a\t;fgRTu.\u001b8ICN\u0014u\u000e\u001e5TS\u0012,7/V6TS:\\w+\u001b;i_V$\bk\u001b\u0015\u0004\u0005g\u0011\u0007B\u0002B\u001e\u0001\u0011\u0005\u0011+A\fuKN$(j\\5o\u0011\u0006\u001c8+\u001b8hY\u0016\u001c\u0016\u000eZ3VW\"\u001a!\u0011\b2\t\r\t\u0005\u0003\u0001\"\u0001R\u0003e!Xm\u001d;TK6L'j\\5o\u0017\u0016L8i\u001c8uC&t7/V6)\u0007\t}\"\r\u0003\u0004\u0003H\u0001!\t!U\u0001\u001ai\u0016\u001cH/\u00118uS*{\u0017N\\&fs\u000e{g\u000e^1j]N,6\u000eK\u0002\u0003F\tDaA!\u0014\u0001\t\u0003\t\u0016\u0001\u000f;fgR\u001cV-\\5K_&tw+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e\u001cuN\u001c3ji&|gnU5oO2,7+\u001b3f\u0011\u0006\u001cXk\u001b\u0015\u0004\u0005\u0017\u0012\u0007B\u0002B*\u0001\u0011\u0005\u0011+A\u0018uKN$8\tZ2K_&tw+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e|U\u000f\u001e9viNKgn[,ji\"\u00046\u000eK\u0002\u0003R\tDaA!\u0017\u0001\t\u0003\t\u0016\u0001\f;fgR\u0004&o\\2uS6,G)\u001a3va>s7\tZ2XSRDW*\u001a;bI\u0006$\u0018mU5oW^KG\u000f\u001b)lQ\r\u00119F\u0019\u0005\u0007\u0005?\u0002A\u0011A)\u0002_Q,7\u000f\u001e)s_\u000e$\u0018.\\3EK\u0012,\bo\u00148DI\u000e<\u0016\u000e\u001e5NKR\fG-\u0019;b'&t7nV5uQ>,H\u000fU6)\u0007\tu#\r\u0003\u0004\u0003f\u0001!\t!U\u0001,i\u0016\u001cHOU8xi&lW\rR3ekB|en\u00113d/&$\b.T3uC\u0012\fG/Y*j].<\u0016\u000e\u001e5QW\"\u001a!1\r2\t\r\t-\u0004\u0001\"\u0001R\u0003\u0001\"Xm\u001d;XS:$wn\u001e#fIV\u0004xJ\\\"eG^KG\u000f['fi\u0006$\u0017\r^1)\u0007\t%$\r\u0003\u0004\u0003r\u0001!\t!U\u0001\u001ei\u0016\u001cHOT3ti\u0016$7k\\;sG\u0016<\u0016\u000e\u001e5Nk2$\u0018nU5oW\"\u001a!q\u000e2\t\r\t]\u0004\u0001\"\u0001R\u0003e!Xm\u001d;Nk2$\u0018nU5oW>s'j\\5oK\u00124\u0016.Z<)\u0007\tU$\r\u0003\u0004\u0003~\u0001!\t!U\u0001\u001di\u0016\u001cH/T1uG\"\u0014VmY8h]&TXmU5oW^KG\u000f\u001b)lQ\r\u0011YH\u0019\u0005\u0007\u0005\u0007\u0003A\u0011A)\u0002}Q,7\u000f^'bi\u000eD'+Z2pO:L'0Z,ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7mQ8oI&$\u0018n\u001c8P]\u000e#7mU5oW^KG\u000f\u001b)lQ\r\u0011\tI\u0019\u0005\u0007\u0005\u0013\u0003A\u0011A)\u0002[Q,7\u000f^'bi\u000eD'+Z2pO:L'0Z(o\u0007\u0012\u001cw+\u001b;i\u001b\u0016$\u0018\rR1uCNKgn[,ji\"\u00046\u000eK\u0002\u0003\b\n4aAa$\u0001\u0005\tE%!\u0005+fgRLgnZ+qg\u0016\u0014HoU5oWN1!Q\u0012BJ\u0005G\u0003BA!&\u0003 6\u0011!q\u0013\u0006\u0005\u00053\u0013Y*\u0001\u0003mC:<'B\u0001BO\u0003\u0011Q\u0017M^1\n\t\t\u0005&q\u0013\u0002\u0007\u001f\nTWm\u0019;\u0011\r\t\u0015&1\u0016BX\u001b\t\u00119KC\u0002\u0003**\tQa]5oWNLAA!,\u0003(\n)R\u000b]:feR\u001cFO]3b[R\u000b'\r\\3TS:\\\u0007\u0003\u0002BY\u0005ok!Aa-\u000b\u0007\tU&\"\u0001\u0003eCR\f\u0017\u0002\u0002B]\u0005g\u0013qAU8x\t\u0006$\u0018\rC\u0006\u0003>\n5%Q1A\u0005\u0002\t}\u0016\u0001B6fsN,\"A!1\u0011\u000b!\u0013\u0019Ma2\n\u0007\t\u0015\u0017JA\u0003BeJ\f\u0017\u0010\u0005\u0003\u0003J\nEg\u0002\u0002Bf\u0005\u001b\u0004\"\u0001J%\n\u0007\t=\u0017*\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0005'\u0014)N\u0001\u0004TiJLgn\u001a\u0006\u0004\u0005\u001fL\u0005b\u0003Bm\u0005\u001b\u0013\t\u0011)A\u0005\u0005\u0003\fQa[3zg\u0002B1B!8\u0003\u000e\n\u0015\r\u0011\"\u0001\u0003@\u0006Qa-[3mI:\u000bW.Z:\t\u0017\t\u0005(Q\u0012B\u0001B\u0003%!\u0011Y\u0001\fM&,G\u000e\u001a(b[\u0016\u001c\b\u0005C\u0006\u0003f\n5%Q1A\u0005\u0002\t\u001d\u0018A\u00034jK2$G+\u001f9fgV\u0011!\u0011\u001e\t\u0006\u0011\n\r'1\u001e\t\u0005\u0005[\u0014\u00190\u0004\u0002\u0003p*\u0019!\u0011\u001f\u0006\u0002\u000bQL\b/Z:\n\t\tU(q\u001e\u0002\t\t\u0006$\u0018\rV=qK\"Y!\u0011 BG\u0005\u0003\u0005\u000b\u0011\u0002Bu\u0003-1\u0017.\u001a7e)f\u0004Xm\u001d\u0011\t\u000f]\u0012i\t\"\u0001\u0003~RA!q`B\u0002\u0007\u000b\u00199\u0001\u0005\u0003\u0004\u0002\t5U\"\u0001\u0001\t\u0011\tu&1 a\u0001\u0005\u0003D\u0001B!8\u0003|\u0002\u0007!\u0011\u0019\u0005\t\u0005K\u0014Y\u00101\u0001\u0003j\"Q11\u0002BG\u0001\u0004%\ta!\u0004\u0002\u0019\u0015D\b/Z2uK\u0012\\U-_:\u0016\u0005\r=\u0001#\u0002%\u0004\u0012\t\u0005\u0017bAB\n\u0013\n1q\n\u001d;j_:D!ba\u0006\u0003\u000e\u0002\u0007I\u0011AB\r\u0003A)\u0007\u0010]3di\u0016$7*Z=t?\u0012*\u0017\u000fF\u0002S\u00077A!b!\b\u0004\u0016\u0005\u0005\t\u0019AB\b\u0003\rAH%\r\u0005\n\u0007C\u0011i\t)Q\u0005\u0007\u001f\tQ\"\u001a=qK\u000e$X\rZ&fsN\u0004\u0003BCB\u0013\u0005\u001b\u0003\r\u0011\"\u0001\u0004(\u0005!R\r\u001f9fGR,G-S:BaB,g\u000eZ(oYf,\"a!\u000b\u0011\t!\u001b\tb\u0012\u0005\u000b\u0007[\u0011i\t1A\u0005\u0002\r=\u0012\u0001G3ya\u0016\u001cG/\u001a3Jg\u0006\u0003\b/\u001a8e\u001f:d\u0017p\u0018\u0013fcR\u0019!k!\r\t\u0015\ru11FA\u0001\u0002\u0004\u0019I\u0003C\u0005\u00046\t5\u0005\u0015)\u0003\u0004*\u0005)R\r\u001f9fGR,G-S:BaB,g\u000eZ(oYf\u0004\u0003\u0002CB\u001d\u0005\u001b#\tea\u000f\u0002\u001d\u001d,G\u000fV1cY\u0016\u001c6\r[3nCR\u00111Q\b\t\u0005\u0007\u007f\u0019\t%D\u00010\u0013\r\u0019\u0019e\f\u0002\f)\u0006\u0014G.Z*dQ\u0016l\u0017\r\u0003\u0005\u0004H\t5E\u0011IB%\u00031\u0019X\r^&fs\u001aKW\r\u001c3t)\r\u001161\n\u0005\t\u0005{\u001b)\u00051\u0001\u0003B\"A1q\nBG\t\u0003\u001a\t&A\btKRL5/\u00119qK:$wJ\u001c7z)\r\u001161\u000b\u0005\t\u0007+\u001ai\u00051\u0001\u0004X\u0005a\u0011n]!qa\u0016tGm\u00148msB!1\u0011LB2\u001d\u0011\u0019Yfa\u0018\u000f\u0007\u0001\u001ai&\u0003\u0002\n\u0015%\u00191\u0011\r\u0005\u0002\u000fA\f7m[1hK&!1QMB4\u0005!Q%i\\8mK\u0006t'bAB1\u0011!A11\u000eBG\t\u0003\u001ai'A\u0007hKR\u0014VmY8sIRK\b/\u001a\u000b\u0003\u0007_\u0002ba!\u001d\u0004~\t=VBAB:\u0015\u0011\u0019)ha\u001e\u0002\u0011QL\b/Z5oM>TAa!\u001f\u0004|\u000511m\\7n_:T!\u0001\f\u0007\n\t\r}41\u000f\u0002\u0010)f\u0004X-\u00138g_Jl\u0017\r^5p]\"A11\u0011BG\t\u0003\u001a))A\td_:\u001cX/\\3ECR\f7\u000b\u001e:fC6$Baa\"\u00042B\"1\u0011RBP!\u0019\u0019Yia&\u0004\u001c6\u00111Q\u0012\u0006\u0005\u0007\u001f\u001b\t*\u0001\u0006eCR\f7\u000f\u001e:fC6T1\u0001LBJ\u0015\r\u0019)\nD\u0001\ngR\u0014X-Y7j]\u001eLAa!'\u0004\u000e\nqA)\u0019;b'R\u0014X-Y7TS:\\\u0007\u0003BBO\u0007?c\u0001\u0001\u0002\u0007\u0004\"\u000e\u0005\u0015\u0011!A\u0001\u0006\u0003\u0019\u0019KA\u0002`IE\nBa!*\u0004,B\u0019\u0001ja*\n\u0007\r%\u0016JA\u0004O_RD\u0017N\\4\u0011\u0007!\u001bi+C\u0002\u00040&\u00131!\u00118z\u0011!\u0019\u0019l!!A\u0002\rU\u0016A\u00033bi\u0006\u001cFO]3b[B111RB\\\u0007wKAa!/\u0004\u000e\nQA)\u0019;b'R\u0014X-Y7\u0011\u0011\ru6QYB,\u0005_k!aa0\u000b\t\r\u000571Y\u0001\u0006iV\u0004H.\u001a\u0006\u0005\u0005;\u001bY(\u0003\u0003\u0004H\u000e}&A\u0002+va2,'\u0007\u0003\u0005\u0004L\n5E\u0011IBg\u0003%\u0019wN\u001c4jOV\u0014X\r\u0006\u0004\u0003��\u000e=71\u001b\u0005\t\u0007#\u001cI\r1\u0001\u0003B\u00061aMT1nKND\u0001b!6\u0004J\u0002\u00071q[\u0001\u0007MRK\b/Z:\u0011\u000b!\u0013\u0019m!71\t\rm7q\u001c\t\u0007\u0007c\u001aih!8\u0011\t\ru5q\u001c\u0003\r\u0007C\u001c\u0019.!A\u0001\u0002\u000b\u000511\u0015\u0002\u0004?\u0012\u0012\u0004f\u0002\u0001\u0004f\u000eE81\u001f\t\u0005\u0007O\u001ci/\u0004\u0002\u0004j*\u001911\u001e-\u0002\u0013\u0015DH/\u001a8tS>t\u0017\u0002BBx\u0007S\u0014!\"\u0012=uK:$w+\u001b;i\u0003\u00151\u0018\r\\;fY\t\u0019)p\t\u0002\u0004xB!1\u0011 C\u0005\u001b\t\u0019YP\u0003\u0003\u0004~\u000e}\u0018!\u00049be\u0006lW\r^3sSj,GM\u0003\u0003\u0005\u0002\u0011\r\u0011AC3yi\u0016t7/[8og*\u0019A\f\"\u0002\u000b\u0007\u0011\u001dA\"A\u0005uKN$X\u000f^5mg&!A1BB~\u0005i\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^#yi\u0016t7/[8o\u000f\u001d!yA\u0001E\u0001\t#\tqCT8o\t\u0016$XM]7j]&\u001cH/[2EC\u001e$Vm\u001d;\u0011\u0007i\"\u0019B\u0002\u0004\u0002\u0005!\u0005AQC\n\u0005\t'!9\u0002E\u0002I\t3I1\u0001b\u0007J\u0005\u0019\te.\u001f*fM\"9q\u0007b\u0005\u0005\u0002\u0011}AC\u0001C\t\u0011!!\u0019\u0003b\u0005\u0005\u0002\u0011\u0015\u0012A\u00039be\u0006lW\r^3sgR\u0011Aq\u0005\t\u0006\tS!i\u0003H\u0007\u0003\tWQ1!\u0010BN\u0013\u0011!y\u0003b\u000b\u0003\u0015\r{G\u000e\\3di&|g\u000e\u000b\u0005\u0005\"\u0011MB\u0011\bC\u001e!\u0011\u0019I\u0010\"\u000e\n\t\u0011]21 \u0002\u000b!\u0006\u0014\u0018-\\3uKJ\u001c\u0018\u0001\u00028b[\u0016\f#\u0001\"\u0010\u0002E9|g\u000eR3uKJl\u0017N\\5ti&\u001cW\u000b\u001d3bi\u0016\u001cFO]1uK\u001eLXh\u001f\u0019~\u0001")
/* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.class */
public class NonDeterministicDagTest extends TableTestBase {
    private final OptimizerConfigOptions.NonDeterministicUpdateStrategy nonDeterministicUpdateStrategy;
    private final StreamTableTestUtil util = streamTestUtil(streamTestUtil$default$1());
    private final boolean tryResolve;

    /* compiled from: NonDeterministicDagTest.scala */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest$TestingUpsertSink.class */
    public final class TestingUpsertSink implements UpsertStreamTableSink<RowData> {
        private final String[] keys;
        private final String[] fieldNames;
        private final DataType[] fieldTypes;
        private Option<String[]> expectedKeys;
        private Option<Object> expectedIsAppendOnly;
        private final /* synthetic */ NonDeterministicDagTest $outer;

        public TypeInformation<Tuple2<Boolean, RowData>> getOutputType() {
            return super.getOutputType();
        }

        public String[] keys() {
            return this.keys;
        }

        public String[] fieldNames() {
            return this.fieldNames;
        }

        public DataType[] fieldTypes() {
            return this.fieldTypes;
        }

        public Option<String[]> expectedKeys() {
            return this.expectedKeys;
        }

        public void expectedKeys_$eq(Option<String[]> option) {
            this.expectedKeys = option;
        }

        public Option<Object> expectedIsAppendOnly() {
            return this.expectedIsAppendOnly;
        }

        public void expectedIsAppendOnly_$eq(Option<Object> option) {
            this.expectedIsAppendOnly = option;
        }

        public TableSchema getTableSchema() {
            TableSchema.Builder builder = TableSchema.builder();
            Predef$.MODULE$.assert(fieldNames().length == fieldTypes().length);
            builder.fields(fieldNames(), fieldTypes());
            if (keys() == null || !new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(keys())).nonEmpty()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                builder.primaryKey(keys());
            }
            return builder.build();
        }

        public void setKeyFields(String[] strArr) {
            if (expectedKeys().isDefined() && strArr == null) {
                throw new AssertionError("Provided key fields should not be null.");
            }
            if (expectedKeys().isEmpty()) {
            }
        }

        public void setIsAppendOnly(Boolean bool) {
            if (expectedIsAppendOnly().isEmpty()) {
            }
        }

        public TypeInformation<RowData> getRecordType() {
            return InternalTypeInfo.ofFields((LogicalType[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(fieldTypes())).map(dataType -> {
                return dataType.getLogicalType();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(LogicalType.class))), fieldNames());
        }

        public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) {
            throw Predef$.MODULE$.$qmark$qmark$qmark();
        }

        public TestingUpsertSink configure(String[] strArr, TypeInformation<?>[] typeInformationArr) {
            return new TestingUpsertSink(this.$outer, keys(), strArr, (DataType[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(typeInformationArr)).map(typeInformation -> {
                return TypeConversions.fromLegacyInfoToDataType(typeInformation);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(DataType.class))));
        }

        /* renamed from: configure, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ TableSink m1222configure(String[] strArr, TypeInformation[] typeInformationArr) {
            return configure(strArr, (TypeInformation<?>[]) typeInformationArr);
        }

        public TestingUpsertSink(NonDeterministicDagTest nonDeterministicDagTest, String[] strArr, String[] strArr2, DataType[] dataTypeArr) {
            this.keys = strArr;
            this.fieldNames = strArr2;
            this.fieldTypes = dataTypeArr;
            if (nonDeterministicDagTest == null) {
                throw null;
            }
            this.$outer = nonDeterministicDagTest;
            this.expectedKeys = None$.MODULE$;
            this.expectedIsAppendOnly = None$.MODULE$;
        }
    }

    @Parameters(name = "nonDeterministicUpdateStrategy={0}")
    public static Collection<OptimizerConfigOptions.NonDeterministicUpdateStrategy> parameters() {
        return NonDeterministicDagTest$.MODULE$.parameters();
    }

    private StreamTableTestUtil util() {
        return this.util;
    }

    private boolean tryResolve() {
        return this.tryResolve;
    }

    @BeforeEach
    public void before() {
        util().tableConfig().getConfiguration().set(OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, this.nonDeterministicUpdateStrategy);
        util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, BoxesRunTime.boxToInteger(4));
        final NonDeterministicDagTest nonDeterministicDagTest = null;
        util().addTableSource("T", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "a").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "b").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "c").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "d").dynamicInvoker().invoke() /* invoke-custom */)}), (TypeInformation) new CaseClassTypeInfo<Tuple4<Object, Object, String, Object>>(nonDeterministicDagTest) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$3
            public /* synthetic */ TypeInformation[] protected$types(NonDeterministicDagTest$$anon$3 nonDeterministicDagTest$$anon$3) {
                return nonDeterministicDagTest$$anon$3.types;
            }

            public TypeSerializer<Tuple4<Object, Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                new ScalaCaseClassSerializer<Tuple4<Object, Object, String, Object>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$3$$anon$1
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple4<Object, Object, String, Object> m1219createInstance(Object[] objArr) {
                        return new Tuple4<>(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[0])), BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[1])), (String) objArr[2], BoxesRunTime.boxToBoolean(BoxesRunTime.unboxToBoolean(objArr[3])));
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            public TypeSerializer<Tuple4<Object, Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                return createSerializer(executionConfig.getSerializerConfig());
            }

            {
                super(Tuple4.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Boolean.TYPE), Nil$.MODULE$)))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Boolean.TYPE), Nil$.MODULE$)))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3", "_4"})));
            }
        });
        final NonDeterministicDagTest nonDeterministicDagTest2 = null;
        util().addDataStream("T1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "a").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "b").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "c").dynamicInvoker().invoke() /* invoke-custom */), (Expression) package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "proctime").dynamicInvoker().invoke() /* invoke-custom */).proctime(), (Expression) package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "rowtime").dynamicInvoker().invoke() /* invoke-custom */).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(nonDeterministicDagTest2) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$4
            public /* synthetic */ TypeInformation[] protected$types(NonDeterministicDagTest$$anon$4 nonDeterministicDagTest$$anon$4) {
                return nonDeterministicDagTest$$anon$4.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$4$$anon$2
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple3<Object, String, Object> m1221createInstance(Object[] objArr) {
                        return new Tuple3<>(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[0])), (String) objArr[1], BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[2])));
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                return createSerializer(executionConfig.getSerializerConfig());
            }

            {
                super(Tuple3.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), Nil$.MODULE$))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), Nil$.MODULE$))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3"})));
            }
        });
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table src (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d bigint,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table upsert_src (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_computed_col (\n                               |  a int,\n                               |  b bigint,\n                               |  c string,\n                               |  d int,\n                               |  `day` as DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),\n                               |  primary key(a, c) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |create temporary table cdc_with_meta (\n        | a int,\n        | b bigint,\n        | c string,\n        | d boolean,\n        | metadata_1 int metadata,\n        | metadata_2 string metadata,\n        | metadata_3 bigint metadata,\n        | primary key (a) not enforced\n        |) with (\n        | 'connector' = 'values',\n        | 'changelog-mode' = 'I,UA,UB,D',\n        | 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n        |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_watermark (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | op_ts timestamp_ltz(3),\n                               | primary key (a) not enforced,\n                               | watermark for op_ts as op_ts - interval '5' second\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D',\n                               | 'readable-metadata' = 'op_ts:timestamp_ltz(3)'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_meta_and_wm (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | op_ts timestamp_ltz(3) metadata,\n                               | primary key (a) not enforced,\n                               | watermark for op_ts as op_ts - interval '5' second\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D',\n                               | 'readable-metadata' = 'op_ts:timestamp_ltz(3)'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink_with_composite_pk (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d bigint,\n                               | primary key (a,d) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink_with_pk (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink_without_pk (\n                               | a int,\n                               | b bigint,\n                               | c string\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table dim_with_pk (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table dim_without_pk (\n                               | a int,\n                               | b bigint,\n                               | c string\n                               |) with (\n                               | 'connector' = 'values'\n                               |)")).stripMargin());
        util().tableEnv().createTemporaryFunction("ndFunc", new TestNonDeterministicUdf());
        util().tableEnv().createTemporaryFunction("ndTableFunc", new TestNonDeterministicUdtf());
        util().tableEnv().createTemporaryFunction("ndAggFunc", new TestNonDeterministicUdaf());
        util().tableEnv().createTemporaryFunction("str_split", new JavaUserDefinedTableFunctions.StringSplit());
    }

    @TestTemplate
    public void testCdcWithMetaSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testNonDeterministicProjectionWithSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n          |insert into sink_without_pk\n          |select\n          |  a,\n          |  if(a > 100, b+d, b) as b,\n          |  case when d > 100 then json_value(c, '$.count')\n          |  else cast(b as string) || '#' end as c\n          |from (\n          |select a, b, c, d from (\n          |  select *, row_number() over(partition by a order by d desc) as rn\n          |  from (\n          |    select a, d as b, c, ndFunc(b) as d from cdc\n          |  ) tmp\n          |) tmp where rn = 1) tmp\n          |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("The column(s): d(generated by non-deterministic function: ndFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcWithMetaSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_without_pk\n                                                                        |select a, metadata_3, c\n                                                                        |from cdc_with_meta\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcWithMetaLegacySinkWithPk() {
        util().tableEnv().registerTableSinkInternal("legacy_upsert_sink", new TestingUpsertSink(this, new String[]{"a"}, new String[]{"a", "b", "c"}, new DataType[]{(DataType) DataTypes.INT().notNull(), DataTypes.BIGINT(), DataTypes.VARCHAR(100)}));
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into legacy_upsert_sink\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcWithMetaLegacySinkWithoutPk() {
        util().tableEnv().registerTableSinkInternal("legacy_retract_sink", util().createRetractTableSink(new String[]{"a", "b", "c"}, new LogicalType[]{new IntType(), new BigIntType(), VarCharType.STRING_TYPE}));
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into legacy_retract_sink\n                                   |select a, metadata_3, c\n                                   |from cdc_with_meta\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcWithMetaSinkWithCompositePk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_composite_pk\n                                   |select a, b, c, metadata_3\n                                   |from cdc_with_meta\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcWithMetaRenameSinkWithCompositePk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_meta_rename (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | metadata_3 bigint metadata,\n                               | e as metadata_3,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D',\n                               | 'readable-metadata' = 'metadata_3:BIGINT'\n                               |)")).stripMargin());
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_composite_pk\n                                   |select a, b, c, e from cdc_with_meta_rename\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testSourceWithComputedColumnSinkWithPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_with_pk\n                                                                        |select a, b, `day`\n                                                                        |from cdc_with_computed_col\n                                                                        |where b > 100\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testSourceWithComputedColumnMultiSink() {
        StatementSet createStatementSet = util().tableEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink_without_pk\n                            |select a, sum(b), `day`\n                            |from cdc_with_computed_col\n                            |group by a, `day`\n                            |")).stripMargin());
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink_with_pk\n                            |select a, b, `day`\n                            |from cdc_with_computed_col\n                            |where b > 100\n                            |")).stripMargin());
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlan(createStatementSet);
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcCorrelateNonDeterministicFuncSinkWithPK() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_pk\n                                   |select\n                                   |  t1.a, t1.b, a1\n                                   |from cdc t1, lateral table(ndTableFunc(a)) as T(a1)\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): EXPR$0(generated by non-deterministic function: ndTableFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcCorrelateNonDeterministicFuncNoLeftOutput() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_pk(a)\n                                   |select\n                                   |  cast(a1 as integer) a\n                                   |from cdc t1, lateral table(ndTableFunc(a)) as T(a1)\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): EXPR$0(generated by non-deterministic function: ndTableFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcCorrelateNonDeterministicFuncNoRightOutput() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, b, c\n                                 |from cdc t1 join lateral table(ndTableFunc(a)) as T(a1) on true\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcCorrelateOnNonDeterministicCondition() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n             |insert into sink_with_pk\n             |select a, b, c\n             |from cdc t1 join lateral table(str_split(c)) as T(c1)\n             | -- the join predicate can only be empty or literal true for now\n             |  on ndFunc(b) > 100\n             |")).stripMargin());
        }).hasMessageContaining("unexpected correlate variable $cor0 in the plan") instanceof TableException;
    }

    @TestTemplate
    public void testCdcWithMetaCorrelateSinkWithPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_pk\n                                   |select t1.a, t1.metadata_1, T.c1\n                                   |from cdc_with_meta t1, lateral table(str_split(c)) as T(c1)\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_1' in cdc source may cause wrong result or error on downstream operators") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcWithNonDeterministicFuncSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, ndFunc(b), c\n                                 |from cdc\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcWithNonDeterministicFuncSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_without_pk\n                                                                        |select a, ndFunc(b), c\n                                                                        |from cdc\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): EXPR$1(generated by non-deterministic function: ndFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcWithNonDeterministicFilter() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t1.c\n                                 |from cdc t1\n                                 |where t1.b > UNIX_TIMESTAMP() - 300\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithPkSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithoutPkSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_without_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcLeftJoinDimWithPkSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 left join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithoutPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_without_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithPkOnlySinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t1.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcLeftJoinDimWithoutPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_without_pk\n         |select t1.a, t1.b, t2.c\n         |from (\n         |  select *, proctime() proctime from cdc\n         |) t1 left join dim_without_pk for system_time as of t1.proctime as t2\n         |on t1.a = t2.a\n         |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t2.b, t1.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testCdcJoinDimWithPkNonDeterministicFuncSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_without_pk\n                                   |select ndFunc(t2.a) a, t1.b, t1.c\n                                   |from (\n                                   |  select *, proctime() proctime from cdc\n                                   |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                   |on t1.a = t2.a\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): a(generated by non-deterministic function: ndFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcJoinDimWithPkNonDeterministicLocalCondition() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_without_pk\n                                   |select t1.a, t1.b, t1.c\n                                   |from (\n                                   |  select *, proctime() proctime from cdc\n                                   |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                   |on t1.a = t2.a and ndFunc(t2.b) > 100\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("exists non deterministic function: 'ndFunc' in condition: '>(ndFunc($1), 100)' which may cause wrong result") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcJoinDimWithPkNonDeterministicLocalCondition2() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_pk\n                                   |select t1.a, t2.b as version, t2.c\n                                   |from (\n                                   |  select *, proctime() proctime from cdc\n                                   |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                   |on t1.a = t2.a\n                                   |  -- check dim table data's freshness\n                                   |  and t2.b > UNIX_TIMESTAMP() - 300\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("exists non deterministic function: 'UNIX_TIMESTAMP' in condition: '>($1, -(UNIX_TIMESTAMP(), 300))' which may cause wrong result") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcJoinDimNonDeterministicRemainingCondition() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_pk\n                                   |select t1.a, t2.b, t2.c\n                                   |from (\n                                   |  select *, proctime() proctime from cdc\n                                   |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                   |on t1.a = t2.a\n                                   |  -- non deterministic function in remaining condition\n                                   |  and t1.b > ndFunc(t2.b)\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("exists non deterministic function: 'ndFunc' in condition: '>($1, ndFunc($3))' which may cause wrong result") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcLeftJoinDimWithNonDeterministicPreFilter() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |insert into sink_with_pk\n           |select t1.a, t2.b as version, t2.c\n           |from (\n           |  select *, proctime() proctime from cdc\n           |) t1 left join dim_with_pk for system_time as of t1.proctime as t2\n           |on t1.a = t2.a\n           |  and t1.b > UNIX_TIMESTAMP() - 300\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("exists non deterministic function: 'UNIX_TIMESTAMP' in condition: '>($1, -(UNIX_TIMESTAMP(), 300))' which may cause wrong result") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testGroupByNonDeterministicFuncWithCdcSource() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |insert into sink_with_pk\n           |select\n           |  a, count(*) cnt, `day`\n           |from (\n           |  select *, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') `day` from cdc\n           |) t\n           |group by `day`, a\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testGroupByNonDeterministicUdfWithCdcSource() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_with_pk\n                                                                        |select\n                                                                        |  ndFunc(a), count(*) cnt, c\n                                                                        |from cdc\n                                                                        |group by ndFunc(a), c\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): EXPR$0(generated by non-deterministic function: ndFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testNestedAggWithNonDeterministicGroupingKeys() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |insert into sink_with_pk\n           |select\n           |  a, sum(b) qmt, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') `day`\n           |from (\n           |  select *, row_number() over (partition by a order by PROCTIME() desc) rn from src\n           |) t\n           |where rn = 1\n           |group by a, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testGroupAggNonDeterministicFuncOnSourcePk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n           |select\n           |  `day`, count(*) cnt, sum(b) qmt\n           |from (\n           |  select *, concat(cast(a as varchar), DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) `day` from cdc\n           |) t\n           |group by `day`\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testAggWithNonDeterministicFilterArgs() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_pk\n         |select\n         |  a\n         |  ,count(*) cnt\n         |  ,cast(count(distinct c) filter (where b > UNIX_TIMESTAMP() - 180) as varchar) valid_uv\n         |from T\n         |group by a\n         |")).stripMargin());
    }

    @TestTemplate
    public void testAggWithNonDeterministicFilterArgsOnCdcSource() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |insert into sink_with_pk\n           |select\n           |  a\n           |  ,count(*) cnt\n           |  ,cast(count(distinct c) filter (where b > UNIX_TIMESTAMP() - 180) as varchar) valid_uv\n           |from cdc\n           |group by a\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): $f2(generated by non-deterministic function: UNIX_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testAggWithNonDeterministicFilterArgsOnCdcSourceSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |insert into sink_without_pk\n           |select\n           |  a\n           |  ,count(*) cnt\n           |  ,cast(count(distinct c) filter (where b > UNIX_TIMESTAMP() - 180) as varchar) valid_uv\n           |from cdc\n           |group by a\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): $f2(generated by non-deterministic function: UNIX_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testNonDeterministicAggOnAppendSourceSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  a\n                                 |  ,ndAggFunc(b) ndCnt\n                                 |  ,max(c) mc\n                                 |from T\n                                 |group by a\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testNonDeterministicAggOnAppendSourceSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_without_pk\n                                                                        |select\n                                                                        |  a\n                                                                        |  ,ndAggFunc(b) ndCnt\n                                                                        |  ,max(c) mc\n                                                                        |from T\n                                                                        |group by a\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): ndCnt(generated by non-deterministic function: ndAggFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testGlobalNonDeterministicAggOnAppendSourceSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  max(a)\n                                 |  ,ndAggFunc(b) ndCnt\n                                 |  ,max(c) mc\n                                 |from T\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testGlobalNonDeterministicAggOnAppendSourceSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_without_pk\n                                                                        |select\n                                                                        |  max(a)\n                                                                        |  ,ndAggFunc(b) ndCnt\n                                                                        |  ,max(c) mc\n                                                                        |from T\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): ndCnt(generated by non-deterministic function: ndAggFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testUpsertSourceSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, b, c\n                                 |from upsert_src\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testUpsertSourceSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select a, b, c\n                                 |from upsert_src\n                                 |")).stripMargin());
    }

    @TestTemplate
    public void testMultiOverWithNonDeterministicUdafSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_composite_pk\n        |SELECT\n        |  a\n        |  ,COUNT(distinct b)  OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |  ,b\n        |  ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |FROM T1\n      ")).stripMargin());
    }

    @TestTemplate
    public void testOverWithNonDeterministicUdafSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |SELECT\n        |  a\n        |  ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)\n        |  ,b\n        |FROM T1\n      ")).stripMargin());
    }

    @TestTemplate
    public void testMultiOverWithNonDeterministicAggFilterSinkWithPk() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |insert into sink_with_composite_pk\n            |SELECT\n            |  a\n            |  ,COUNT(distinct b) OVER (PARTITION BY a ORDER BY proctime\n            |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n            |  ,b\n            |  ,SUM(a) filter (where b > UNIX_TIMESTAMP() - 180) OVER (PARTITION BY a ORDER BY proctime\n            |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n            |FROM T1\n      ")).stripMargin());
        }).hasMessageContaining("OVER must be applied to aggregate function") instanceof ValidationException;
    }

    @TestTemplate
    public void testAppendRankOnMultiOverWithNonDeterministicUdafSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_composite_pk\n        |select a, uv, b, nd from (\n        | select\n        |  a, uv, b, nd,\n        |  row_number() over (partition by a order by uv desc) rn\n        | from (\n        |  SELECT\n        |    a\n        |    ,COUNT(distinct b)  OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |    ,b\n        |    ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |  FROM T1\n        |  )\n        |) where rn = 1\n      ")).stripMargin());
    }

    @TestTemplate
    public void testAppendRankOnMultiOverWithNonDeterministicUdafSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |select a, nd, b from (\n        | select\n        |  a, uv, b, nd,\n        |  row_number() over (partition by a order by uv desc) rn\n        | from (\n        |  SELECT\n        |    a\n        |    ,COUNT(distinct b)  OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |    ,b\n        |    ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |  FROM T1\n        |  )\n        |) where rn = 1\n      ")).stripMargin());
    }

    @TestTemplate
    public void testUpdateRankOutputRowNumberSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                | create temporary view v1 as\n                                |  select a, max(c) c, sum(b) filter (where b > 0) cnt\n                                |  from src\n                                |  group by a\n                                | ")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_composite_pk\n         |select a, cnt, c, rn from (\n         | select\n         |  a, cnt, c, row_number() over (partition by a order by cnt desc) rn\n         | from v1\n         | ) t where t.rn <= 100\n         |")).stripMargin());
    }

    @TestTemplate
    public void testRetractRankOutputRowNumberSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                | create temporary view v1 as\n                                |  select a, max(c) c, sum(b) cnt\n                                |  from src\n                                |  group by a\n                                | ")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_composite_pk\n         |select a, cnt, c, rn from (\n         | select\n         |  a, cnt, c, row_number() over (partition by a order by cnt desc) rn\n         | from v1\n         | ) t where t.rn <= 100\n         |")).stripMargin());
    }

    @TestTemplate
    public void testUnionSinkWithCompositePk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_composite_pk\n                                   |select a, b, c, d\n                                   |from src\n                                   |union\n                                   |select a, b, c, metadata_3\n                                   |from cdc_with_meta\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testUnionAllSinkWithCompositePk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_with_composite_pk\n                                   |select a, b, c, d\n                                   |from src\n                                   |union all\n                                   |select a, b, c, metadata_3\n                                   |from cdc_with_meta\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testUnionAllSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                                                        |insert into sink_without_pk\n                                                                        |select a, b, c\n                                                                        |from src\n                                                                        |union all\n                                                                        |select a, metadata_3, c\n                                                                        |from cdc_with_meta\n                                                                        |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcJoinWithNonDeterministicCondition() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                   |insert into sink_without_pk\n                                   |select\n                                   |  t1.a\n                                   |  ,t2.b\n                                   |  ,t1.c\n                                   |from cdc t1 join cdc t2\n                                   |  on ndFunc(t1.b) = ndFunc(t2.b)\n                                   |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): $f4(generated by non-deterministic function: ndFunc ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testProctimeIntervalJoinSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                |insert into sink_without_pk\n                                |SELECT t2.a, t2.c, t1.b FROM T1 t1 JOIN T1 t2 ON\n                                |  t1.a = t2.a AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @TestTemplate
    public void testCdcProctimeIntervalJoinOnPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                |insert into sink_without_pk\n                                |SELECT t2.a, t2.b, t1.c FROM (\n                                | select *, proctime() proctime from cdc) t1 JOIN\n                                | (select *, proctime() proctime from cdc) t2 ON\n                                |  t1.a = t2.a AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @TestTemplate
    public void testCdcProctimeIntervalJoinOnNonPkSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                  |insert into sink_without_pk\n                                  |SELECT t2.a, t2.b, t1.c FROM (\n                                  | select *, proctime() proctime from cdc) t1 JOIN\n                                  | (select *, proctime() proctime from cdc) t2 ON\n                                  |  t1.b = t2.b AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("can not satisfy the determinism requirement") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcRowtimeIntervalJoinSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |SELECT t2.a, t1.b, t2.c FROM cdc_with_watermark t1 JOIN cdc_with_watermark t2 ON\n        |  t1.a = t2.a AND t1.op_ts > t2.op_ts - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @TestTemplate
    public void testCdcRowtimeIntervalJoinSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT t2.a, t1.b, t2.c FROM cdc_with_watermark t1 JOIN cdc_with_watermark t2 ON\n        |  t1.a = t2.a AND t1.op_ts > t2.op_ts - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @TestTemplate
    public void testJoinKeyContainsUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t2.`c-day`, t2.b, t2.d\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |join (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         |) t2\n         |  on t1.a = t2.a\n         |")).stripMargin());
    }

    @TestTemplate
    public void testJoinHasBothSidesUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t2.a, t2.`c-day`, t2.b, t2.d\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |join (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         |) t2\n         |  on t1.b = t2.b\n         |")).stripMargin());
    }

    @TestTemplate
    public void testJoinHasBothSidesUkSinkWithoutPk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |insert into sink_with_pk\n           |select t1.a, t2.a, t2.`c-day`\n           |from (\n           |  select a, b, c, d\n           |  from cdc\n           | ) t1\n           |join (\n           |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n           |  from cdc\n           |) t2\n           |  on t1.b = t2.b\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): c-day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testJoinHasSingleSideUk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n           |select t1.a, t2.`c-day`, t2.b, t2.d\n           |from (\n           |  select a, b, c, d\n           |  from cdc\n           | ) t1\n           |join (\n           |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n           |  from cdc\n           |) t2\n           |  on t1.b = t2.b\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): c-day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testSemiJoinKeyContainsUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t1.`c-day`, t1.b, t1.d\n         |from (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         | ) t1\n         |where t1.a in (\n         |  select a from cdc where b > 100\n         |)\n         |")).stripMargin());
    }

    @TestTemplate
    public void testAntiJoinKeyContainsUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t1.`c-day`, t1.b, t1.d\n         |from (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         | ) t1\n         |where t1.a not in (\n         |  select a from cdc where b > 100\n         |)\n         |")).stripMargin());
    }

    @TestTemplate
    public void testSemiJoinWithNonDeterministicConditionSingleSideHasUk() {
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n           |select t1.a, t1.b, t1.c, t1.d\n           |from (\n           |  select a, b, c, d\n           |  from cdc\n           | ) t1\n           |where t1.c in (\n           |  select CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) c from cdc where b > 100\n           |)\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): c(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testCdcJoinWithNonDeterministicOutputSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY TABLE t_order (\n                                | order_id INT,\n                                | order_name STRING,\n                                | product_id INT,\n                                | user_id INT,\n                                | PRIMARY KEY(order_id) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values',\n                                | 'changelog-mode' = 'I,UA,UB,D'\n                                |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY TABLE t_logistics (\n                                | logistics_id INT,\n                                | logistics_target STRING,\n                                | logistics_source STRING,\n                                | logistics_time TIMESTAMP(0),\n                                | order_id INT,\n                                | PRIMARY KEY(logistics_id) NOT ENFORCED\n                                |) WITH (\n                                |  'connector' = 'values',\n                                | 'changelog-mode' = 'I,UA,UB,D'\n                                |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY TABLE t_join_sink (\n                                | order_id INT,\n                                | order_name STRING,\n                                | logistics_id INT,\n                                | logistics_target STRING,\n                                | logistics_source STRING,\n                                | logistics_time timestamp,\n                                | PRIMARY KEY(order_id) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values',\n                                | 'sink-insert-only' = 'false'\n                                |)")).stripMargin());
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n           |INSERT INTO t_join_sink\n           |SELECT ord.order_id,\n           |ord.order_name,\n           |logistics.logistics_id,\n           |logistics.logistics_target,\n           |logistics.logistics_source,\n           |now()\n           |FROM t_order AS ord\n           |LEFT JOIN t_logistics AS logistics ON ord.order_id=logistics.order_id\n           |")).stripMargin());
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("The column(s): logistics_time(generated by non-deterministic function: NOW ) can not satisfy the determinism requirement") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testProctimeDedupOnCdcWithMetadataSinkWithPk() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |insert into sink_with_pk\n            |SELECT a, metadata_3, c\n            |FROM (\n            |  SELECT *,\n            |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum\n            |  FROM cdc_with_meta\n            |)\n            |WHERE rowNum = 1\n      ")).stripMargin());
        }).hasMessageContaining("StreamPhysicalDeduplicate doesn't support consuming update and delete changes") instanceof TableException;
    }

    @TestTemplate
    public void testProctimeDedupOnCdcWithMetadataSinkWithoutPk() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |insert into sink_without_pk\n            |SELECT a, metadata_3, c\n            |FROM (\n            |  SELECT *,\n            |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum\n            |  FROM cdc_with_meta\n            |)\n            |WHERE rowNum = 1\n      ")).stripMargin());
        }).hasMessageContaining("StreamPhysicalDeduplicate doesn't support consuming update and delete changes") instanceof TableException;
    }

    @TestTemplate
    public void testRowtimeDedupOnCdcWithMetadataSinkWithPk() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |insert into sink_with_pk\n            |SELECT a, b, c\n            |FROM (\n            |  SELECT *,\n            |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum\n            |  FROM cdc_with_meta_and_wm\n            |)\n            |WHERE rowNum = 1\n      ")).stripMargin());
        }).hasMessageContaining("StreamPhysicalDeduplicate doesn't support consuming update and delete changes") instanceof TableException;
    }

    @TestTemplate
    public void testWindowDedupOnCdcWithMetadata() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink1 (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | ts timestamp(3),\n                               | primary key (a,ts) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |INSERT INTO sink1\n            |SELECT a, b, c, window_start\n            |FROM (\n            |SELECT *,\n            |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end\n            |   ORDER BY op_ts DESC) as rownum\n            |FROM TABLE(TUMBLE(TABLE cdc_with_meta_and_wm, DESCRIPTOR(op_ts), INTERVAL '1' MINUTE))\n            |)\n            |WHERE rownum <= 1")).stripMargin());
        }).hasMessageContaining("StreamPhysicalWindowDeduplicate doesn't support consuming update and delete changes") instanceof TableException;
    }

    @TestTemplate
    public void testNestedSourceWithMultiSink() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE nested_src (\n         |  id int,\n         |  deepNested row<nested1 row<name string, `value` int>,\n         |    nested2 row<num int, flag boolean>>,\n         |  name string,\n         |  metadata_1 int metadata,\n         |  metadata_2 string metadata,\n         |  primary key(id, name) not enforced\n         |) WITH (\n         |  'connector' = 'values',\n         |  'nested-projection-supported' = 'true',\n         |  'changelog-mode' = 'I,UA,UB,D',\n         |  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n         |)\n         |")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |create view v1 as\n        |SELECT id,\n        |       deepNested.nested2.num AS a,\n        |       deepNested.nested1.name AS name,\n        |       deepNested.nested1.`value` + deepNested.nested2.num + metadata_1 as b\n        |FROM nested_src\n        |")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink1 (\n                               |  a int,\n                               |  b string,\n                               |  d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink2 (\n                               |  a int,\n                               |  b string,\n                               |  d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        StatementSet createStatementSet = util().tableEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink1\n         |select a, `day`, sum(b)\n         |from (select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day` from v1) t\n         |group by a, `day`\n         |")).stripMargin());
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink2\n                            |select a, name, b\n                            |from v1\n                            |where b > 100\n                            |")).stripMargin());
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlan(createStatementSet);
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testMultiSinkOnJoinedView() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table src1 (\n                               |  a int,\n                               |  b bigint,\n                               |  c string,\n                               |  d int,\n                               |  primary key(a, c) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table src2 (\n                               |  a int,\n                               |  b bigint,\n                               |  c string,\n                               |  d int,\n                               |  primary key(a, c) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink1 (\n                               |  a int,\n                               |  b string,\n                               |  c bigint,\n                               |  d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink2 (\n                               |  a int,\n                               |  b string,\n                               |  c bigint,\n                               |  d string\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |create temporary view v1 as\n         |select\n         |  t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n         |from (\n         |  select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n         |  from src1\n         | ) t1\n         |join (\n         |  select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n         |  from src2\n         |) t2\n         | on t1.a = t2.d\n         |")).stripMargin());
        StatementSet createStatementSet = util().tableEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink1\n                            |select a, `day`, sum(b), count(distinct c)\n                            |from v1\n                            |group by a, `day`\n                            |")).stripMargin());
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink2\n                            |select a, `day`, b, c\n                            |from v1\n                            |where b > 100\n                            |")).stripMargin());
        ThrowableAssert.ThrowingCallable throwingCallable = () -> {
            this.util().verifyExecPlan(createStatementSet);
        };
        if (tryResolve()) {
            boolean z = Assertions.assertThatThrownBy(throwingCallable).hasMessageContaining("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism") instanceof TableException;
        } else {
            Assertions.assertThatCode(throwingCallable).doesNotThrowAnyException();
        }
    }

    @TestTemplate
    public void testMatchRecognizeSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |create temporary view v1 as\n                                |select *, PROCTIME() as proctime from src\n                                |")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT T1.a, T1.b, cast(T1.matchProctime as varchar)\n        |FROM v1\n        |MATCH_RECOGNIZE (\n        |PARTITION BY c\n        |ORDER BY proctime\n        |MEASURES\n        |  A.a as a,\n        |  A.b as b,\n        |  MATCH_PROCTIME() as matchProctime\n        |ONE ROW PER MATCH\n        |PATTERN (A)\n        |DEFINE\n        |  A AS A.a > 1\n        |) AS T1\n        |")).stripMargin());
    }

    @TestTemplate
    public void testMatchRecognizeWithNonDeterministicConditionOnCdcSinkWithPk() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |insert into sink_with_pk\n            |SELECT T.a, T.b, cast(T.matchRowtime as varchar)\n            |FROM cdc_with_meta_and_wm\n            |MATCH_RECOGNIZE (\n            |PARTITION BY c\n            |ORDER BY op_ts\n            |MEASURES\n            |  A.a as a,\n            |  A.b as b,\n            |  MATCH_ROWTIME(op_ts) as matchRowtime\n            |ONE ROW PER MATCH\n            |PATTERN (A)\n            |DEFINE\n            |  A AS A.op_ts >= CURRENT_TIMESTAMP\n            |) AS T\n      ")).stripMargin());
        }).hasMessageContaining("Match Recognize doesn't support consuming update and delete changes") instanceof TableException;
    }

    @TestTemplate
    public void testMatchRecognizeOnCdcWithMetaDataSinkWithPk() {
        boolean z = Assertions.assertThatThrownBy(() -> {
            this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n            |insert into sink_with_pk\n            |SELECT T.a, T.b, cast(T.ts as varchar)\n            |FROM cdc_with_meta_and_wm\n            |MATCH_RECOGNIZE (\n            |PARTITION BY c\n            |ORDER BY op_ts\n            |MEASURES\n            |  A.a as a,\n            |  A.b as b,\n            |  A.op_ts as ts,\n            |  MATCH_ROWTIME(op_ts) as matchRowtime\n            |ONE ROW PER MATCH\n            |PATTERN (A)\n            |DEFINE\n            |  A AS A.a > 0\n            |) AS T\n      ")).stripMargin());
        }).hasMessageContaining("Match Recognize doesn't support consuming update and delete changes") instanceof TableException;
    }

    public NonDeterministicDagTest(OptimizerConfigOptions.NonDeterministicUpdateStrategy nonDeterministicUpdateStrategy) {
        this.nonDeterministicUpdateStrategy = nonDeterministicUpdateStrategy;
        OptimizerConfigOptions.NonDeterministicUpdateStrategy nonDeterministicUpdateStrategy2 = OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE;
        this.tryResolve = nonDeterministicUpdateStrategy != null ? nonDeterministicUpdateStrategy.equals(nonDeterministicUpdateStrategy2) : nonDeterministicUpdateStrategy2 == null;
    }
}
