/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherException;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherJob;
import org.apache.flink.runtime.dispatcher.DispatcherJobResult;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;

public abstract class Dispatcher
extends PermanentlyFencedRpcEndpoint<DispatcherId>
implements DispatcherGateway {
    public static final String DISPATCHER_NAME = "dispatcher";
    private final Configuration configuration;
    private final JobGraphWriter jobGraphWriter;
    private final RunningJobsRegistry runningJobsRegistry;
    private final HighAvailabilityServices highAvailabilityServices;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final JobManagerSharedServices jobManagerSharedServices;
    private final HeartbeatServices heartbeatServices;
    private final BlobServer blobServer;
    private final FatalErrorHandler fatalErrorHandler;
    private final Map<JobID, DispatcherJob> runningJobs;
    private final Collection<JobGraph> recoveredJobs;
    private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
    private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
    private final JobManagerRunnerFactory jobManagerRunnerFactory;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private final HistoryServerArchivist historyServerArchivist;
    private final Executor ioExecutor;
    @Nullable
    private final String metricServiceQueryAddress;
    private final Map<JobID, CompletableFuture<Void>> dispatcherJobTerminationFutures;
    protected final CompletableFuture<ApplicationStatus> shutDownFuture;
    private DispatcherBootstrap dispatcherBootstrap;

    public Dispatcher(RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
        Preconditions.checkNotNull((Object)dispatcherServices);
        this.configuration = dispatcherServices.getConfiguration();
        this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices();
        this.resourceManagerGatewayRetriever = dispatcherServices.getResourceManagerGatewayRetriever();
        this.heartbeatServices = dispatcherServices.getHeartbeatServices();
        this.blobServer = dispatcherServices.getBlobServer();
        this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
        this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
        this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
        this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
        this.ioExecutor = dispatcherServices.getIoExecutor();
        this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(this.configuration, this.blobServer, this.fatalErrorHandler);
        this.runningJobsRegistry = this.highAvailabilityServices.getRunningJobsRegistry();
        this.runningJobs = new HashMap<JobID, DispatcherJob>(16);
        this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
        this.archivedExecutionGraphStore = dispatcherServices.getArchivedExecutionGraphStore();
        this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
        this.dispatcherJobTerminationFutures = new HashMap<JobID, CompletableFuture<Void>>(2);
        this.shutDownFuture = new CompletableFuture();
        this.dispatcherBootstrapFactory = (DispatcherBootstrapFactory)Preconditions.checkNotNull((Object)dispatcherBootstrapFactory);
        this.recoveredJobs = new HashSet<JobGraph>(recoveredJobs);
    }

    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    @Override
    public void onStart() throws Exception {
        try {
            this.startDispatcherServices();
        }
        catch (Throwable t) {
            DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", this.getAddress()), t);
            this.onFatalError((Throwable)((Object)exception));
            throw exception;
        }
        this.startRecoveredJobs();
        this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create(this.getSelfGateway(DispatcherGateway.class), this.getRpcService().getScheduledExecutor(), this::onFatalError);
    }

    private void startDispatcherServices() throws Exception {
        try {
            this.registerDispatcherMetrics(this.jobManagerMetricGroup);
        }
        catch (Exception e) {
            this.handleStartDispatcherServicesException(e);
        }
    }

    private void startRecoveredJobs() {
        for (JobGraph recoveredJob : this.recoveredJobs) {
            this.runRecoveredJob(recoveredJob);
        }
        this.recoveredJobs.clear();
    }

    private void runRecoveredJob(JobGraph recoveredJob) {
        Preconditions.checkNotNull((Object)recoveredJob);
        try {
            this.runJob(recoveredJob, ExecutionType.RECOVERY);
        }
        catch (Throwable throwable) {
            this.onFatalError((Throwable)((Object)new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable)));
        }
    }

    private void handleStartDispatcherServicesException(Exception e) throws Exception {
        try {
            this.stopDispatcherServices();
        }
        catch (Exception exception) {
            e.addSuppressed(exception);
        }
        throw e;
    }

    @Override
    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping dispatcher {}.", (Object)this.getAddress());
        CompletableFuture<Void> allJobsTerminationFuture = this.terminateRunningJobsAndGetTerminationFuture();
        return FutureUtils.runAfterwards(allJobsTerminationFuture, () -> {
            this.dispatcherBootstrap.stop();
            this.stopDispatcherServices();
            this.log.info("Stopped dispatcher {}.", (Object)this.getAddress());
        });
    }

    private void stopDispatcherServices() throws Exception {
        Exception exception = null;
        try {
            this.jobManagerSharedServices.shutdown();
        }
        catch (Exception e) {
            exception = e;
        }
        this.jobManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        this.log.info("Received JobGraph submission {} ({}).", (Object)jobGraph.getJobID(), (Object)jobGraph.getName());
        try {
            if (this.isDuplicateJob(jobGraph.getJobID())) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new DuplicateJobSubmissionException(jobGraph.getJobID())));
            }
            if (this.isPartialResourceConfigured(jobGraph)) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have resources configured. The limitation will be removed in future versions.")));
            }
            return this.internalSubmitJob(jobGraph);
        }
        catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private boolean isDuplicateJob(JobID jobId) throws FlinkException {
        RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
        try {
            jobSchedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(jobId);
        }
        catch (IOException e) {
            throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), (Throwable)e);
        }
        return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || this.runningJobs.containsKey(jobId);
    }

    private boolean isPartialResourceConfigured(JobGraph jobGraph) {
        boolean hasVerticesWithUnknownResource = false;
        boolean hasVerticesWithConfiguredResource = false;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getMinResources() == ResourceSpec.UNKNOWN) {
                hasVerticesWithUnknownResource = true;
            } else {
                hasVerticesWithConfiguredResource = true;
            }
            if (!hasVerticesWithUnknownResource || !hasVerticesWithConfiguredResource) continue;
            return true;
        }
        return false;
    }

    private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        this.log.info("Submitting job {} ({}).", (Object)jobGraph.getJobID(), (Object)jobGraph.getName());
        CompletionStage persistAndRunFuture = this.waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob).thenApply(ignored -> Acknowledge.get());
        return ((CompletableFuture)persistAndRunFuture).handleAsync((acknowledge, throwable) -> {
            if (throwable != null) {
                this.cleanUpJobData(jobGraph.getJobID(), true);
                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
                Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
                this.log.error("Failed to submit job {}.", (Object)jobGraph.getJobID(), (Object)strippedThrowable);
                throw new CompletionException((Throwable)((Object)new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable)));
            }
            return acknowledge;
        }, this.ioExecutor);
    }

    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        this.jobGraphWriter.putJobGraph(jobGraph);
        this.runJob(jobGraph, ExecutionType.SUBMISSION);
    }

    private void runJob(JobGraph jobGraph, ExecutionType executionType) {
        Preconditions.checkState((!this.runningJobs.containsKey(jobGraph.getJobID()) ? 1 : 0) != 0);
        long initializationTimestamp = System.currentTimeMillis();
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = this.createJobManagerRunner(jobGraph, initializationTimestamp);
        DispatcherJob dispatcherJob = DispatcherJob.createFor(jobManagerRunnerFuture, jobGraph.getJobID(), jobGraph.getName(), initializationTimestamp);
        this.runningJobs.put(jobGraph.getJobID(), dispatcherJob);
        JobID jobId = jobGraph.getJobID();
        CompletionStage cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync((dispatcherJobResult, throwable) -> {
            Preconditions.checkState((this.runningJobs.get(jobId) == dispatcherJob ? 1 : 0) != 0, (Object)"The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
            if (dispatcherJobResult != null) {
                return this.handleDispatcherJobResult(jobId, (DispatcherJobResult)dispatcherJobResult, executionType);
            }
            return this.dispatcherJobFailed(jobId, (Throwable)throwable);
        }, (Executor)this.getMainThreadExecutor());
        CompletionStage jobTerminationFuture = ((CompletableFuture)((CompletableFuture)cleanupJobStateFuture).thenApply(cleanupJobState -> this.removeJob(jobId, (CleanupJobState)((Object)cleanupJobState)))).thenCompose(Function.identity());
        FutureUtils.assertNoException(jobTerminationFuture);
        this.registerDispatcherJobTerminationFuture(jobId, (CompletableFuture<Void>)jobTerminationFuture);
    }

    private CleanupJobState handleDispatcherJobResult(JobID jobId, DispatcherJobResult dispatcherJobResult, ExecutionType executionType) {
        if (dispatcherJobResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) {
            return this.dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
        }
        return this.jobReachedTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
    }

    private CleanupJobState dispatcherJobFailed(JobID jobId, Throwable throwable) {
        this.jobMasterFailed(jobId, throwable);
        return CleanupJobState.LOCAL;
    }

    CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
        RpcService rpcService = this.getRpcService();
        return CompletableFuture.supplyAsync(() -> {
            try {
                JobManagerRunner runner = this.jobManagerRunnerFactory.createJobManagerRunner(jobGraph, this.configuration, rpcService, this.highAvailabilityServices, this.heartbeatServices, this.jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(this.jobManagerMetricGroup), this.fatalErrorHandler, initializationTimestamp);
                runner.start();
                return runner;
            }
            catch (Exception e) {
                throw new CompletionException((Throwable)((Object)new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e)));
            }
        }, this.ioExecutor);
    }

    @Override
    public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
        return CompletableFuture.completedFuture(Collections.unmodifiableSet(new HashSet<JobID>(this.runningJobs.keySet())));
    }

    @Override
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        return CompletableFuture.supplyAsync(() -> {
            this.log.info("Disposing savepoint {}.", (Object)savepointPath);
            try {
                Checkpoints.disposeSavepoint(savepointPath, this.configuration, classLoader, this.log);
            }
            catch (IOException | FlinkException e) {
                throw new CompletionException(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), e));
            }
            return Acknowledge.get();
        }, this.jobManagerSharedServices.getScheduledExecutorService());
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
        Optional<DispatcherJob> maybeJob = this.getDispatcherJob(jobId);
        return maybeJob.map(job -> job.cancel(timeout)).orElseGet(() -> {
            this.log.debug("Dispatcher is unable to cancel job {}: not found", (Object)jobId);
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        });
    }

    @Override
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
        CompletableFuture taskManagerOverviewFuture = this.runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
        List optionalJobInformation = this.queryJobMastersForInformation(dispatcherJob -> dispatcherJob.requestJobStatus(timeout));
        FutureUtils.ConjunctFuture allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
        CompletionStage allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
        JobsOverview completedJobsOverview = this.archivedExecutionGraphStore.getStoredJobsOverview();
        return ((CompletableFuture)allJobsFuture).thenCombine((CompletionStage)taskManagerOverviewFuture, (runningJobsStatus, resourceOverview) -> {
            JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
            return new ClusterOverview((ResourceOverview)resourceOverview, allJobsOverview);
        });
    }

    @Override
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
        List individualOptionalJobDetails = this.queryJobMastersForInformation(dj -> dj.requestJobDetails(timeout));
        FutureUtils.ConjunctFuture optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails);
        CompletionStage combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
        Collection<JobDetails> completedJobDetails = this.archivedExecutionGraphStore.getAvailableJobDetails();
        return ((CompletableFuture)combinedJobDetails).thenApply(runningJobDetails -> {
            ArrayList<JobDetails> allJobDetails = new ArrayList<JobDetails>(completedJobDetails.size() + runningJobDetails.size());
            allJobDetails.addAll((Collection<JobDetails>)runningJobDetails);
            allJobDetails.addAll(completedJobDetails);
            return new MultipleJobsDetails(allJobDetails);
        });
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
        Optional<DispatcherJob> maybeJob = this.getDispatcherJob(jobId);
        return maybeJob.map(job -> job.requestJobStatus(timeout)).orElseGet(() -> {
            JobDetails jobDetails = this.archivedExecutionGraphStore.getAvailableJobDetails(jobId);
            if (jobDetails == null) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
            }
            return CompletableFuture.completedFuture(jobDetails.getStatus());
        });
    }

    @Override
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobID jobId, JobVertexID jobVertexId) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.requestOperatorBackPressureStats(jobVertexId));
    }

    @Override
    public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
        Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException = throwable -> {
            ArchivedExecutionGraph archivedExecutionGraph = this.archivedExecutionGraphStore.get(jobId);
            if (archivedExecutionGraph == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException((Throwable)throwable));
            }
            return archivedExecutionGraph;
        };
        Optional<DispatcherJob> maybeJob = this.getDispatcherJob(jobId);
        return maybeJob.map(job -> job.requestJob(timeout)).orElse(FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)))).exceptionally(checkExecutionGraphStoreOnException);
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
        DispatcherJob job = this.runningJobs.get(jobId);
        if (job == null) {
            ArchivedExecutionGraph archivedExecutionGraph = this.archivedExecutionGraphStore.get(jobId);
            if (archivedExecutionGraph == null) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
            }
            return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
        }
        return job.getResultFuture().thenApply(dispatcherJobResult -> JobResult.createFrom(dispatcherJobResult.getArchivedExecutionGraph()));
    }

    @Override
    public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(Time timeout) {
        if (this.metricServiceQueryAddress != null) {
            return CompletableFuture.completedFuture(Collections.singleton(this.metricServiceQueryAddress));
        }
        return CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServiceAddresses(Time timeout) {
        return this.runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServiceAddresses(timeout));
    }

    @Override
    public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
        return CompletableFuture.completedFuture(this.blobServer.getPort());
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob, Time timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(JobID jobId, String targetDirectory, boolean terminate, Time timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout));
    }

    @Override
    public CompletableFuture<Acknowledge> shutDownCluster() {
        return this.shutDownCluster(ApplicationStatus.SUCCEEDED);
    }

    @Override
    public CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
        this.shutDownFuture.complete(applicationStatus);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(JobID jobId, OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest, Time timeout) {
        return this.performOperationOnJobMasterGateway(jobId, gateway -> gateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
    }

    private void registerDispatcherJobTerminationFuture(JobID jobId, CompletableFuture<Void> dispatcherJobTerminationFuture) {
        Preconditions.checkState((!this.dispatcherJobTerminationFutures.containsKey(jobId) ? 1 : 0) != 0);
        this.dispatcherJobTerminationFutures.put(jobId, dispatcherJobTerminationFuture);
        dispatcherJobTerminationFuture.thenRunAsync(() -> {
            CompletableFuture<Void> terminationFuture = this.dispatcherJobTerminationFutures.remove(jobId);
            if (terminationFuture != null && terminationFuture != dispatcherJobTerminationFuture) {
                this.dispatcherJobTerminationFutures.put(jobId, terminationFuture);
            }
        }, this.getMainThreadExecutor());
    }

    private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) {
        DispatcherJob job = (DispatcherJob)Preconditions.checkNotNull((Object)this.runningJobs.remove(jobId));
        CompletableFuture<Void> jobTerminationFuture = job.closeAsync();
        return jobTerminationFuture.thenRunAsync(() -> this.cleanUpJobData(jobId, cleanupJobState.cleanupHAData), this.ioExecutor);
    }

    private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
        this.jobManagerMetricGroup.removeJob(jobId);
        boolean jobGraphRemoved = false;
        if (cleanupHA) {
            try {
                this.jobGraphWriter.removeJobGraph(jobId);
                jobGraphRemoved = true;
            }
            catch (Exception e) {
                this.log.warn("Could not properly remove job {} from submitted job graph store.", (Object)jobId, (Object)e);
            }
            try {
                this.runningJobsRegistry.clearJob(jobId);
            }
            catch (IOException e) {
                this.log.warn("Could not properly remove job {} from the running jobs registry.", (Object)jobId, (Object)e);
            }
            if (jobGraphRemoved) {
                try {
                    this.highAvailabilityServices.cleanupJobData(jobId);
                }
                catch (Exception e) {
                    this.log.warn("Could not properly clean data for job {} stored by ha services", (Object)jobId, (Object)e);
                }
            }
        } else {
            try {
                this.jobGraphWriter.releaseJobGraph(jobId);
            }
            catch (Exception e) {
                this.log.warn("Could not properly release job {} from submitted job graph store.", (Object)jobId, (Object)e);
            }
        }
        this.blobServer.cleanupJob(jobId, jobGraphRemoved);
    }

    private void terminateRunningJobs() {
        this.log.info("Stopping all currently running jobs of dispatcher {}.", (Object)this.getAddress());
        HashSet<JobID> jobsToRemove = new HashSet<JobID>(this.runningJobs.keySet());
        for (JobID jobId : jobsToRemove) {
            this.terminateJob(jobId);
        }
    }

    private void terminateJob(JobID jobId) {
        DispatcherJob dispatcherJob = this.runningJobs.get(jobId);
        if (dispatcherJob != null) {
            dispatcherJob.closeAsync();
        }
    }

    private CompletableFuture<Void> terminateRunningJobsAndGetTerminationFuture() {
        this.terminateRunningJobs();
        Collection<CompletableFuture<Void>> values = this.dispatcherJobTerminationFutures.values();
        return FutureUtils.completeAll(values);
    }

    protected void onFatalError(Throwable throwable) {
        this.fatalErrorHandler.onFatalError(throwable);
    }

    protected CleanupJobState jobReachedTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        Preconditions.checkArgument((boolean)archivedExecutionGraph.getState().isTerminalState(), (String)"Job %s is in state %s which is not terminal.", (Object[])new Object[]{archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()});
        this.log.info("Job {} reached terminal state {}.", (Object)archivedExecutionGraph.getJobID(), (Object)archivedExecutionGraph.getState());
        this.archiveExecutionGraph(archivedExecutionGraph);
        return archivedExecutionGraph.getState().isGloballyTerminalState() ? CleanupJobState.GLOBAL : CleanupJobState.LOCAL;
    }

    private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
        try {
            this.archivedExecutionGraphStore.put(archivedExecutionGraph);
        }
        catch (IOException e) {
            this.log.info("Could not store completed job {}({}).", new Object[]{archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), e});
        }
        CompletableFuture<Acknowledge> executionGraphFuture = this.historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
        executionGraphFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.log.info("Could not archive completed job {}({}) to the history server.", new Object[]{archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), throwable});
            }
        });
    }

    private void jobMasterFailed(JobID jobId, Throwable cause) {
        this.onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
    }

    private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) {
        DispatcherJob job = this.runningJobs.get(jobId);
        if (job == null) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }
        if (!job.isInitialized()) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.")));
        }
        return job.getJobMasterGateway();
    }

    private <T> CompletableFuture<T> performOperationOnJobMasterGateway(JobID jobId, Function<JobMasterGateway, CompletableFuture<T>> operation) {
        return this.getJobMasterGateway(jobId).thenCompose(operation);
    }

    private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
        return this.resourceManagerGatewayRetriever.getFuture();
    }

    private Optional<DispatcherJob> getDispatcherJob(JobID jobId) {
        return Optional.ofNullable(this.runningJobs.get(jobId));
    }

    private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
        return ((CompletableFuture)this.getResourceManagerGateway().thenApply(resourceManagerCommand)).thenCompose(Function.identity());
    }

    private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCollection) {
        return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    @Nonnull
    private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<DispatcherJob, CompletableFuture<T>> queryFunction) {
        ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<CompletableFuture<Optional<T>>>(this.runningJobs.size());
        for (DispatcherJob job : this.runningJobs.values()) {
            CompletionStage queryResult = queryFunction.apply(job).handle((value, t) -> Optional.ofNullable(value));
            optionalJobInformation.add((CompletableFuture<Optional<T>>)queryResult);
        }
        return optionalJobInformation;
    }

    private CompletableFuture<Void> waitForTerminatingJob(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
        CompletionStage jobManagerTerminationFuture = this.getJobTerminationFuture(jobId).exceptionally(throwable -> {
            throw new CompletionException((Throwable)((Object)new DispatcherException(String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId), (Throwable)throwable)));
        });
        return ((CompletableFuture)jobManagerTerminationFuture).thenAcceptAsync(FunctionUtils.uncheckedConsumer(ignored -> {
            this.dispatcherJobTerminationFutures.remove(jobId);
            action.accept((Object)jobGraph);
        }), (Executor)this.getMainThreadExecutor());
    }

    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
        if (this.runningJobs.containsKey(jobId)) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new DispatcherException(String.format("Job with job id %s is still running.", jobId))));
        }
        return this.dispatcherJobTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
    }

    private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
        jobManagerMetricGroup.gauge("numRunningJobs", () -> (long)this.runningJobs.size());
    }

    public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
        return CompletableFuture.runAsync(() -> this.terminateJob(jobId), this.getMainThreadExecutor());
    }

    static enum CleanupJobState {
        LOCAL(false),
        GLOBAL(true);

        final boolean cleanupHAData;

        private CleanupJobState(boolean cleanupHAData) {
            this.cleanupHAData = cleanupHAData;
        }
    }

    protected static enum ExecutionType {
        SUBMISSION,
        RECOVERY;

    }
}

