/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.server.core.scheduler;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.remote.transporter.TransportService;

@Service
public class InstanceStatusCheckService {
    private static final Logger log = LoggerFactory.getLogger(InstanceStatusCheckService.class);
    private static final int MAX_BATCH_NUM_APP = 10;
    private static final int MAX_BATCH_NUM_INSTANCE = 3000;
    private static final int MAX_BATCH_UPDATE_NUM = 500;
    private static final long DISPATCH_TIMEOUT_MS = 30000L;
    private static final long RECEIVE_TIMEOUT_MS = 60000L;
    private static final long RUNNING_TIMEOUT_MS = 60000L;
    private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000L;
    public static final long CHECK_INTERVAL = 10000L;
    private final TransportService transportService;
    private final DispatchService dispatchService;
    private final InstanceManager instanceManager;
    private final WorkflowInstanceManager workflowInstanceManager;
    private final AppInfoRepository appInfoRepository;
    private final JobInfoRepository jobInfoRepository;
    private final InstanceInfoRepository instanceInfoRepository;
    private final WorkflowInfoRepository workflowInfoRepository;
    private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;

    public void checkWorkflowInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        List allAppIds = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty((Collection)allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            this.checkWorkflowInstance(allAppIds);
        }
        catch (Exception e) {
            log.error("[InstanceStatusChecker] WorkflowInstance status check failed.", (Throwable)e);
        }
        log.info("[InstanceStatusChecker] WorkflowInstance status check using {}.", (Object)stopwatch.stop());
    }

    public void checkWaitingDispatchInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        List allAppIds = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty((Collection)allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            Lists.partition((List)allAppIds, (int)10).forEach(this::handleWaitingDispatchInstance);
        }
        catch (Exception e) {
            log.error("[InstanceStatusChecker] WaitingDispatchInstance status check failed.", (Throwable)e);
        }
        log.info("[InstanceStatusChecker] WaitingDispatchInstance status check using {}.", (Object)stopwatch.stop());
    }

    public void checkWaitingWorkerReceiveInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        List allAppIds = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty((Collection)allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            Lists.partition((List)allAppIds, (int)10).forEach(this::handleWaitingWorkerReceiveInstance);
        }
        catch (Exception e) {
            log.error("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check failed.", (Throwable)e);
        }
        log.info("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check using {}.", (Object)stopwatch.stop());
    }

    public void checkRunningInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        List allAppIds = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty((Collection)allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            Lists.partition((List)allAppIds, (int)10).forEach(this::handleRunningInstance);
        }
        catch (Exception e) {
            log.error("[InstanceStatusChecker] RunningInstance status check failed.", (Throwable)e);
        }
        log.info("[InstanceStatusChecker] RunningInstance status check using {}.", (Object)stopwatch.stop());
    }

    private void handleWaitingDispatchInstance(List<Long> appIds) {
        ArrayList partAppIds = Lists.newArrayList(appIds);
        long threshold = System.currentTimeMillis() - 30000L;
        List waitingDispatchInstances = this.instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan((List)partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, (Pageable)PageRequest.of((int)0, (int)3000));
        while (!waitingDispatchInstances.isEmpty()) {
            ArrayList<Long> overloadAppIdList = new ArrayList<Long>();
            long startTime = System.currentTimeMillis();
            Map<Long, List<InstanceInfoDO>> waitingDispatchInstancesMap = waitingDispatchInstances.stream().collect(Collectors.groupingBy(InstanceInfoDO::getAppId));
            for (Map.Entry<Long, List<InstanceInfoDO>> entry : waitingDispatchInstancesMap.entrySet()) {
                Long currentAppId = entry.getKey();
                List<InstanceInfoDO> currentAppWaitingDispatchInstances = entry.getValue();
                Set jobIds = currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getJobId).collect(Collectors.toSet());
                Map<Long, JobInfoDO> jobInfoMap = this.jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
                log.warn("[InstanceStatusChecker] find some instance in app({}) which is not triggered as expected: {}", (Object)currentAppId, currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getInstanceId).collect(Collectors.toList()));
                Holder overloadFlag = new Holder((Object)false);
                currentAppWaitingDispatchInstances.parallelStream().forEach(instance -> {
                    if (((Boolean)overloadFlag.get()).booleanValue()) {
                        return;
                    }
                    Optional jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
                    if (jobInfoOpt.isPresent()) {
                        this.dispatchService.dispatch((JobInfoDO)jobInfoOpt.get(), instance.getInstanceId(), Optional.of(instance), Optional.of(overloadFlag));
                    } else {
                        log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", (Object)instance.getJobId(), instance);
                        Optional opt = this.instanceInfoRepository.findById((Object)instance.getId());
                        opt.ifPresent(instanceInfoDO -> this.updateFailedInstance((InstanceInfoDO)instanceInfoDO, "can't find job info"));
                    }
                });
                threshold = System.currentTimeMillis() - 30000L;
                if (!((Boolean)overloadFlag.get()).booleanValue()) continue;
                overloadAppIdList.add(currentAppId);
            }
            log.info("[InstanceStatusChecker] process {} task,use {} ms", (Object)waitingDispatchInstances.size(), (Object)(System.currentTimeMillis() - startTime));
            if (!overloadAppIdList.isEmpty()) {
                log.warn("[InstanceStatusChecker] app[{}] is overload, so skip check waiting dispatch instance", overloadAppIdList);
                partAppIds.removeAll(overloadAppIdList);
            }
            if (partAppIds.isEmpty()) break;
            waitingDispatchInstances = this.instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan((List)partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, (Pageable)PageRequest.of((int)0, (int)3000));
        }
    }

    private void handleWaitingWorkerReceiveInstance(List<Long> partAppIds) {
        long threshold = System.currentTimeMillis() - 60000L;
        List waitingWorkerReceiveInstances = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, (Pageable)PageRequest.of((int)0, (int)3000));
        while (!waitingWorkerReceiveInstances.isEmpty()) {
            log.warn("[InstanceStatusChecker] find some instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
            List partitions = Lists.partition((List)waitingWorkerReceiveInstances, (int)500);
            for (List partition : partitions) {
                this.dispatchService.redispatchBatchAsyncLockFree(partition.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()), InstanceStatus.WAITING_WORKER_RECEIVE.getV());
            }
            threshold = System.currentTimeMillis() - 60000L;
            waitingWorkerReceiveInstances = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, (Pageable)PageRequest.of((int)0, (int)3000));
        }
    }

    private void handleRunningInstance(List<Long> partAppIds) {
        long threshold = System.currentTimeMillis() - 60000L;
        List failedInstances = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), (Pageable)PageRequest.of((int)0, (int)3000));
        while (!failedInstances.isEmpty()) {
            Set jobIds = failedInstances.stream().map(BriefInstanceInfo::getJobId).collect(Collectors.toSet());
            Map<Long, JobInfoDO> jobInfoMap = this.jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
            log.warn("[InstanceStatusCheckService] find some instances have not received status report for a long time : {}", failedInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
            failedInstances.forEach(instance -> {
                Optional jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
                if (!jobInfoOpt.isPresent()) {
                    Optional opt = this.instanceInfoRepository.findById((Object)instance.getId());
                    opt.ifPresent(e -> this.updateFailedInstance((InstanceInfoDO)e, "worker report timeout, maybe TaskTracker down"));
                    return;
                }
                TimeExpressionType timeExpressionType = TimeExpressionType.of((int)((JobInfoDO)jobInfoOpt.get()).getTimeExpressionType());
                SwitchableStatus switchableStatus = SwitchableStatus.of((int)((JobInfoDO)jobInfoOpt.get()).getStatus());
                if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) {
                    Optional opt = this.instanceInfoRepository.findById((Object)instance.getId());
                    opt.ifPresent(e -> this.updateFailedInstance((InstanceInfoDO)e, "worker report timeout, maybe TaskTracker down"));
                    return;
                }
                if (instance.getRunningTimes() < (long)((JobInfoDO)jobInfoOpt.get()).getInstanceRetryNum().intValue()) {
                    this.dispatchService.redispatchAsync(instance.getInstanceId(), InstanceStatus.RUNNING.getV());
                } else {
                    Optional opt = this.instanceInfoRepository.findById((Object)instance.getId());
                    opt.ifPresent(e -> this.updateFailedInstance((InstanceInfoDO)e, "worker report timeout, maybe TaskTracker down"));
                }
            });
            threshold = System.currentTimeMillis() - 60000L;
            failedInstances = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), (Pageable)PageRequest.of((int)0, (int)3000));
        }
    }

    private void checkWorkflowInstance(List<Long> allAppIds) {
        long threshold = System.currentTimeMillis() - 60000L;
        Lists.partition(allAppIds, (int)10).forEach(partAppIds -> {
            List waitingWfInstanceList = this.workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
            if (!CollectionUtils.isEmpty((Collection)waitingWfInstanceList)) {
                List wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList());
                log.warn("[WorkflowInstanceChecker] wfInstance({}) is not started as expected, oms try to restart these workflowInstance.", wfInstanceIds);
                waitingWfInstanceList.forEach(wfInstance -> {
                    Optional workflowOpt = this.workflowInfoRepository.findById((Object)wfInstance.getWorkflowId());
                    workflowOpt.ifPresent(workflowInfo -> {
                        this.workflowInstanceManager.start((WorkflowInfoDO)workflowInfo, wfInstance.getWfInstanceId());
                        log.info("[Workflow-{}|{}] restart workflowInstance successfully~", (Object)workflowInfo.getId(), (Object)wfInstance.getWfInstanceId());
                    });
                });
            }
        });
    }

    private void updateFailedInstance(InstanceInfoDO instance, String result) {
        log.warn("[InstanceStatusChecker] instance[{}] failed due to {}, instanceInfo: {}", new Object[]{instance.getInstanceId(), result, instance});
        instance.setStatus(Integer.valueOf(InstanceStatus.FAILED.getV()));
        instance.setFinishedTime(Long.valueOf(System.currentTimeMillis()));
        instance.setGmtModified(new Date());
        instance.setResult(result);
        this.instanceInfoRepository.saveAndFlush((Object)instance);
        this.instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, result);
    }

    public InstanceStatusCheckService(TransportService transportService, DispatchService dispatchService, InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, AppInfoRepository appInfoRepository, JobInfoRepository jobInfoRepository, InstanceInfoRepository instanceInfoRepository, WorkflowInfoRepository workflowInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository) {
        this.transportService = transportService;
        this.dispatchService = dispatchService;
        this.instanceManager = instanceManager;
        this.workflowInstanceManager = workflowInstanceManager;
        this.appInfoRepository = appInfoRepository;
        this.jobInfoRepository = jobInfoRepository;
        this.instanceInfoRepository = instanceInfoRepository;
        this.workflowInfoRepository = workflowInfoRepository;
        this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
    }
}

