/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.classloading;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ClassLoaderITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
    private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar";
    private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "target/streaming-customsplit-test-jar.jar";
    private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar";
    private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "target/streaming-checkpointed-classloader-test-jar.jar";
    private static final String KMEANS_JAR_PATH = "target/kmeans-test-jar.jar";
    private static final String USERCODETYPE_JAR_PATH = "target/usercodetype-test-jar.jar";
    private static final String CUSTOM_KV_STATE_JAR_PATH = "target/custom_kv_state-test-jar.jar";
    private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "target/checkpointing_custom_kv_state-test-jar.jar";
    private static final String CLASSLOADING_POLICY_JAR_PATH = "target/classloading_policy-test-jar.jar";
    @ClassRule
    public static final TemporaryFolder FOLDER = new TemporaryFolder();
    private static MiniClusterResource miniClusterResource = null;
    private static final int parallelism = 4;

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration config = new Configuration();
        config.set(StateBackendOptions.STATE_BACKEND, (Object)"filesystem");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)FOLDER.newFolder().getAbsoluteFile().toURI().toString());
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)FOLDER.newFolder().getAbsoluteFile().toURI().toString());
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"80m"));
        config.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)"filesystem");
        config.set(FsStateChangelogOptions.BASE_PATH, (Object)FOLDER.newFolder().getAbsolutePath());
        config.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, (Object)true);
        miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).setConfiguration(config).build());
        miniClusterResource.before();
    }

    @AfterClass
    public static void tearDownClass() {
        if (miniClusterResource != null) {
            miniClusterResource.after();
        }
    }

    @After
    public void tearDown() {
        TestStreamEnvironment.unsetAsContext();
        TestEnvironment.unsetAsContext();
    }

    @Test
    public void testCustomSplitJobWithCustomClassLoaderJar() throws ProgramInvocationException {
        PackagedProgram inputSplitTestProg = PackagedProgram.newBuilder().setJarFile(new File(INPUT_SPLITS_PROG_JAR_FILE)).build();
        TestEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)), Collections.emptyList());
        inputSplitTestProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testStreamingCustomSplitJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram streamingInputSplitTestProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)), Collections.emptyList());
        streamingInputSplitTestProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
        URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
        PackagedProgram inputSplitTestProg2 = PackagedProgram.newBuilder().setJarFile(new File(INPUT_SPLITS_PROG_JAR_FILE)).build();
        TestEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.emptyList(), Collections.singleton(classpath));
        inputSplitTestProg2.invokeInteractiveModeForExecution();
    }

    @Test
    public void testStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram streamingProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_PROG_JAR_FILE)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)), Collections.emptyList());
        streamingProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram streamingCheckpointedProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList());
        Assertions.assertThatThrownBy(() -> Class.forName("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")).isInstanceOf(ClassNotFoundException.class);
        Assertions.assertThatThrownBy(() -> streamingCheckpointedProg.invokeInteractiveModeForExecution()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SerializedThrowable.class, (String)"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")});
    }

    @Test
    public void testKMeansJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram kMeansProg = PackagedProgram.newBuilder().setJarFile(new File(KMEANS_JAR_PATH)).setArguments(new String[]{"0|50.90|16.20|72.08|\n1|73.65|61.76|62.89|\n2|61.73|49.95|92.74|\n3|1.60|70.11|16.32|\n4|2.43|19.81|89.56|\n5|67.99|9.00|14.48|\n6|87.80|84.49|55.83|\n7|90.26|42.99|53.29|\n8|51.36|6.16|9.35|\n9|12.43|9.52|12.54|\n10|80.01|8.78|29.74|\n11|92.76|2.93|80.07|\n12|46.32|100.00|22.98|\n13|34.11|45.61|58.60|\n14|68.82|16.36|96.60|\n15|81.47|76.45|28.40|\n16|65.55|40.21|43.43|\n17|84.22|88.56|13.31|\n18|36.99|68.36|57.12|\n19|28.87|37.69|91.04|\n20|31.56|13.22|86.00|\n21|18.49|34.45|54.52|\n22|13.33|94.02|92.07|\n23|91.19|81.62|55.06|\n24|85.78|39.02|25.58|\n25|94.41|47.07|78.23|\n26|90.62|10.43|80.20|\n27|31.52|85.81|39.79|\n28|24.65|77.98|26.35|\n29|69.34|75.79|63.96|\n30|22.56|78.61|66.66|\n31|91.74|83.82|73.92|\n32|76.64|89.53|44.66|\n33|36.02|73.01|92.32|\n34|87.86|18.94|10.74|\n35|91.94|34.61|5.20|\n36|12.52|47.01|95.29|\n37|44.01|26.19|78.50|\n38|26.20|73.36|10.08|\n39|15.21|17.37|54.33|\n40|27.96|94.81|44.41|\n41|26.44|44.81|70.88|\n42|53.29|26.69|2.40|\n43|23.94|11.50|1.71|\n44|19.00|25.48|50.80|\n45|82.26|1.88|58.08|\n46|47.56|82.54|82.73|\n47|51.54|35.10|32.95|\n48|86.71|55.51|19.08|\n49|54.16|23.68|32.41|\n50|71.81|32.83|46.66|\n51|20.70|14.19|64.96|\n52|57.17|88.56|55.23|\n53|91.39|49.38|70.55|\n54|47.90|62.07|76.03|\n55|55.70|37.77|30.15|\n56|87.87|74.62|25.95|\n57|95.70|45.04|15.27|\n58|41.61|89.37|24.45|\n59|82.19|20.84|11.13|\n60|49.88|2.62|18.62|\n61|16.42|53.30|74.13|\n62|38.37|72.62|35.16|\n63|43.26|49.59|92.56|\n64|28.96|2.36|78.49|\n65|88.41|91.43|92.55|\n66|98.61|79.58|33.03|\n67|4.94|18.65|30.78|\n68|75.89|79.30|63.90|\n69|93.18|76.26|9.50|\n70|73.43|70.50|76.49|\n71|78.64|90.87|34.49|\n72|58.47|63.07|8.82|\n73|69.74|54.36|64.43|\n74|38.47|36.60|33.39|\n75|51.07|14.75|2.54|\n76|24.18|16.85|15.00|\n77|7.56|50.72|93.45|\n78|64.28|97.01|57.31|\n79|85.30|24.13|76.57|\n80|72.78|30.78|13.11|\n81|18.42|17.45|32.20|\n82|87.44|74.98|87.90|\n83|38.30|17.77|37.33|\n84|63.62|7.90|34.23|\n85|8.84|67.87|30.65|\n86|76.12|51.83|80.12|\n87|32.30|74.79|4.39|\n88|41.73|45.34|18.66|\n89|58.13|18.43|83.38|\n90|98.10|33.46|83.07|\n91|17.76|4.10|88.51|\n92|60.58|18.15|59.96|\n93|50.11|33.25|85.64|\n94|97.74|60.93|38.97|\n95|76.31|52.50|95.43|\n96|7.71|85.85|36.26|\n97|9.32|72.21|42.17|\n98|71.29|51.88|57.62|\n99|31.39|7.27|88.74|", "0|1.96|65.04|20.82|\n1|53.99|84.23|81.59|\n2|97.28|74.50|40.32|\n3|63.57|24.53|87.07|\n4|28.10|43.27|86.53|\n5|99.51|62.70|64.48|\n6|30.31|30.36|80.46|", "25"}).build();
        TestEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(KMEANS_JAR_PATH)), Collections.emptyList());
        kMeansProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testUserCodeTypeJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram userCodeTypeProg = PackagedProgram.newBuilder().setJarFile(new File(USERCODETYPE_JAR_PATH)).build();
        TestEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(USERCODETYPE_JAR_PATH)), Collections.emptyList());
        userCodeTypeProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
        File checkpointDir = FOLDER.newFolder();
        File outputDir = FOLDER.newFolder();
        PackagedProgram program = PackagedProgram.newBuilder().setJarFile(new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)).setArguments(new String[]{checkpointDir.toURI().toString(), outputDir.toURI().toString()}).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)), Collections.emptyList());
        try {
            program.invokeInteractiveModeForExecution();
            Assert.fail((String)"exception should happen");
        }
        catch (ProgramInvocationException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, SuccessException.class).isPresent());
        }
    }

    @Test
    public void testDisposeSavepointWithCustomKvState() throws Exception {
        MiniClusterClient clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());
        Deadline deadline = new FiniteDuration(100L, TimeUnit.SECONDS).fromNow();
        File checkpointDir = FOLDER.newFolder();
        File outputDir = FOLDER.newFolder();
        PackagedProgram program = PackagedProgram.newBuilder().setJarFile(new File(CUSTOM_KV_STATE_JAR_PATH)).setArguments(new String[]{String.valueOf(4), checkpointDir.toURI().toString(), "5000", outputDir.toURI().toString(), "false"}).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)), Collections.emptyList());
        Thread invokeThread = new Thread(() -> {
            block2: {
                try {
                    program.invokeInteractiveModeForExecution();
                }
                catch (ProgramInvocationException ex) {
                    if (ex.getCause() != null && ex.getCause() instanceof JobCancellationException) break block2;
                    ex.printStackTrace();
                }
            }
        });
        LOG.info("Starting program invoke thread");
        invokeThread.start();
        JobID jobId = null;
        LOG.info("Waiting for job status running.");
        while (jobId == null && deadline.hasTimeLeft()) {
            Collection jobs = (Collection)clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            for (JobStatusMessage job : jobs) {
                if (job.getJobState() != JobStatus.RUNNING) continue;
                jobId = job.getJobId();
                LOG.info("Job running. ID: " + jobId);
                break;
            }
            if (jobId != null) continue;
            Thread.sleep(100L);
        }
        String savepointPath = null;
        for (int i = 0; i < 20; ++i) {
            LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
            try {
                savepointPath = (String)clusterClient.triggerSavepoint(jobId, null, SavepointFormatType.CANONICAL).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                continue;
            }
            catch (Exception cause) {
                LOG.info("Failed to trigger savepoint. Retrying...", (Throwable)cause);
                Thread.sleep(500L);
            }
        }
        Assert.assertNotNull((String)"Failed to trigger savepoint", savepointPath);
        clusterClient.disposeSavepoint(savepointPath).get();
        clusterClient.cancel(jobId).get();
        invokeThread.join(deadline.timeLeft().toMillis());
        Assert.assertFalse((String)"Program invoke thread still running", (boolean)invokeThread.isAlive());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgramWithChildFirstClassLoader() throws IOException, ProgramInvocationException {
        String childResourceDirName = "child0";
        String testResourceName = "test-resource";
        File childResourceDir = FOLDER.newFolder(childResourceDirName);
        File childResource = new File(childResourceDir, testResourceName);
        Assert.assertTrue((boolean)childResource.createNewFile());
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CLASSLOADING_POLICY_JAR_PATH)), Collections.emptyList());
        Configuration childFirstConf = new Configuration();
        childFirstConf.setString("classloader.resolve-order", "child-first");
        PackagedProgram childFirstProgram = PackagedProgram.newBuilder().setJarFile(new File(CLASSLOADING_POLICY_JAR_PATH)).setUserClassPaths(Collections.singletonList(childResourceDir.toURI().toURL())).setConfiguration(childFirstConf).setArguments(new String[]{testResourceName, childResourceDirName}).build();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(childFirstProgram.getUserCodeClassLoader());
        try {
            childFirstProgram.invokeInteractiveModeForExecution();
        }
        finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgramWithParentFirstClassLoader() throws IOException, ProgramInvocationException {
        String childResourceDirName = "child1";
        String testResourceName = "test-resource";
        File childResourceDir = FOLDER.newFolder(childResourceDirName);
        File childResource = new File(childResourceDir, testResourceName);
        Assert.assertTrue((boolean)childResource.createNewFile());
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CLASSLOADING_POLICY_JAR_PATH)), Collections.emptyList());
        Configuration parentFirstConf = new Configuration();
        parentFirstConf.setString("classloader.resolve-order", "parent-first");
        PackagedProgram parentFirstProgram = PackagedProgram.newBuilder().setJarFile(new File(CLASSLOADING_POLICY_JAR_PATH)).setUserClassPaths(Collections.singletonList(childResourceDir.toURI().toURL())).setConfiguration(parentFirstConf).setArguments(new String[]{testResourceName, "test-classes"}).build();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(parentFirstProgram.getUserCodeClassLoader());
        try {
            parentFirstProgram.invokeInteractiveModeForExecution();
        }
        finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }
}

