/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blocklist.BlocklistUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpecFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={TestLoggerExtension.class})
public class TaskManagerDisconnectOnShutdownITCase {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerDisconnectOnShutdownITCase.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Test
    public void testTaskManagerProcessFailure() {
        Configuration config = new Configuration();
        config.set(JobManagerOptions.ADDRESS, (Object)"localhost");
        config.set(JobManagerOptions.PORT, (Object)0);
        config.set(RestOptions.BIND_PORT, (Object)"0");
        config.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, (Object)-1);
        config.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)2);
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"4m"));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, (Object)MemorySize.parse((String)"3200k"));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, (Object)MemorySize.parse((String)"3200k"));
        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, (Object)MemorySize.parse((String)"128m"));
        config.set(TaskManagerOptions.CPU_CORES, (Object)1.0);
        config.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, (Object)"full");
        config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofSeconds(30L));
        String javaCommand = CommonTestUtils.getJavaCommandPath();
        if (javaCommand == null) {
            org.junit.jupiter.api.Assertions.fail((String)"cannot find java executable");
        }
        final TaskManagerConnectionTracker tracker = new TaskManagerConnectionTracker();
        TestProcessBuilder.TestProcess taskManagerProcess = null;
        try {
            try (SessionClusterEntrypoint clusterEntrypoint = new SessionClusterEntrypoint(config){

                protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
                    return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory((ResourceManagerFactory)new TestingStandaloneResourceManagerFactory(tracker));
                }
            };){
                clusterEntrypoint.startCluster();
                Configuration taskManagerConfig = new Configuration(config);
                taskManagerConfig.set(JobManagerOptions.PORT, (Object)clusterEntrypoint.getRpcPort());
                TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
                taskManagerProcessBuilder.addConfigAsMainClassArgs(taskManagerConfig);
                taskManagerProcess = taskManagerProcessBuilder.start();
                tracker.waitForTaskManagerConnected();
                taskManagerProcess.destroy();
                tracker.waitForTaskManagerDisconnected();
                Assertions.assertThat((int)tracker.getNumberOfConnectedTaskManager()).isEqualTo(1);
            }
            if (taskManagerProcess == null || !taskManagerProcess.getProcess().isAlive()) return;
        }
        catch (Throwable t) {
            try {
                TaskManagerDisconnectOnShutdownITCase.printProcessLog(taskManagerProcess);
                org.junit.jupiter.api.Assertions.fail((String)t.getMessage());
                return;
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                if (taskManagerProcess != null && taskManagerProcess.getProcess().isAlive()) {
                    LOG.error("TaskManager did not shutdown in time.");
                    TaskManagerDisconnectOnShutdownITCase.printProcessLog(taskManagerProcess);
                    taskManagerProcess.getProcess().destroyForcibly();
                }
            }
        }
        LOG.error("TaskManager did not shutdown in time.");
        TaskManagerDisconnectOnShutdownITCase.printProcessLog(taskManagerProcess);
        taskManagerProcess.getProcess().destroyForcibly();
        return;
    }

    protected static void printProcessLog(TestProcessBuilder.TestProcess process) {
        if (process == null) {
            System.out.println("-----------------------------------------");
            System.out.println(" TaskManager WAS NOT STARTED.");
            System.out.println("-----------------------------------------");
        } else {
            System.out.println("-----------------------------------------");
            System.out.println(" BEGIN SPAWNED PROCESS LOG FOR TaskManager");
            System.out.println("-----------------------------------------");
            System.out.println(process.getErrorOutput().toString());
            System.out.println("-----------------------------------------");
            System.out.println("\t\tEND SPAWNED PROCESS LOG");
            System.out.println("-----------------------------------------");
        }
    }

    private static class TaskManagerConnectionTracker {
        private final CompletableFuture<Void> taskManagerConnectedFuture = new CompletableFuture();
        private final CompletableFuture<Void> taskManagerDisconnectedFuture = new CompletableFuture();
        private final AtomicInteger numberOfConnectedTaskManager = new AtomicInteger();

        private TaskManagerConnectionTracker() {
        }

        public void connectTaskManager() {
            this.numberOfConnectedTaskManager.incrementAndGet();
            this.taskManagerConnectedFuture.complete(null);
        }

        public void disconnectTaskManager() {
            this.taskManagerDisconnectedFuture.complete(null);
        }

        public void waitForTaskManagerConnected() throws Exception {
            this.taskManagerConnectedFuture.get();
        }

        public void waitForTaskManagerDisconnected() throws Exception {
            this.taskManagerConnectedFuture.get();
        }

        public int getNumberOfConnectedTaskManager() {
            return this.numberOfConnectedTaskManager.get();
        }
    }

    private static class TestingStandaloneResourceManagerFactory
    extends ResourceManagerFactory<ResourceID> {
        TaskManagerConnectionTracker tracker;

        public TestingStandaloneResourceManagerFactory(TaskManagerConnectionTracker tracker) {
            this.tracker = tracker;
        }

        protected ResourceManager<ResourceID> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, UUID leaderSessionId, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices, Executor ioExecutor) {
            Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime((Configuration)configuration);
            return new StandaloneResourceManager(rpcService, leaderSessionId, resourceId, heartbeatServices, delegationTokenManager, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, BlocklistUtils.loadBlocklistHandlerFactory((Configuration)configuration), resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime, Time.fromDuration((Duration)((Duration)configuration.get(RpcOptions.ASK_TIMEOUT_DURATION))), ioExecutor){

                public void disconnectTaskManager(ResourceID resourceId, Exception cause) {
                    tracker.disconnectTaskManager();
                    super.disconnectTaskManager(resourceId, cause);
                }

                public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
                    CompletableFuture result = super.sendSlotReport(taskManagerResourceId, taskManagerRegistrationId, slotReport, timeout);
                    tracker.connectTaskManager();
                    return result;
                }
            };
        }

        protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(Configuration configuration) throws ConfigurationException {
            return ResourceManagerRuntimeServicesConfiguration.fromConfiguration((Configuration)StandaloneResourceManagerFactory.getConfigurationWithoutResourceLimitationIfSet((Configuration)configuration), (WorkerResourceSpecFactory)ArbitraryWorkerResourceSpecFactory.INSTANCE);
        }
    }
}

