package org.jetlinks.rule.engine.cluster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.scheduler.SchedulerSelector;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskSnapshot;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.defaults.ScheduleJobCompiler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/ClusterRuleEngine.class */
public class ClusterRuleEngine implements RuleEngine {
    private static final Logger log = LoggerFactory.getLogger(ClusterRuleEngine.class);
    private final SchedulerRegistry schedulerRegistry;
    private final TaskSnapshotRepository repository;
    private final SchedulerSelector schedulerSelector;

    public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository taskSnapshotRepository) {
        this(schedulerRegistry, taskSnapshotRepository, SchedulerSelector.selectAll);
    }

    public Mono<Void> shutdown(String str) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> {
            return scheduler.shutdown(str);
        }).then(this.repository.removeTaskByInstanceId(str)).then();
    }

    private Mono<Void> shutdown(TaskSnapshot taskSnapshot) {
        return this.schedulerRegistry.getSchedulers().filter(scheduler -> {
            return Objects.equals(taskSnapshot.getSchedulerId(), scheduler.getId());
        }).flatMap(scheduler2 -> {
            return scheduler2.shutdownTask(taskSnapshot.getId());
        }).then(this.repository.removeTaskById(taskSnapshot.getId()));
    }

    public Flux<Task> startRule(String str, RuleModel ruleModel) {
        log.debug("starting rule {}\n{}", str, ruleModel.toString());
        Map map = (Map) new ScheduleJobCompiler(str, ruleModel).compile().stream().collect(Collectors.toMap((v0) -> {
            return v0.getNodeId();
        }, Function.identity()));
        ArrayList arrayList = new ArrayList(map.size());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(map);
        Flux flatMap = this.repository.findByInstanceId(str).doOnNext(taskSnapshot -> {
        }).flatMap(taskSnapshot2 -> {
            ScheduleJob scheduleJob = (ScheduleJob) map.get(taskSnapshot2.getJob().getNodeId());
            ScheduleJob job = taskSnapshot2.getJob();
            if (scheduleJob != null && Objects.equals(scheduleJob.getExecutor(), job.getExecutor())) {
                return getTaskBySnapshot(taskSnapshot2).flatMap(task -> {
                    arrayList.add(task);
                    return task.setJob(scheduleJob).then(task.reload()).thenReturn(task);
                }).switchIfEmpty(Mono.fromRunnable(() -> {
                }));
            }
            if (scheduleJob == null || Objects.equals(scheduleJob.getExecutor(), job.getExecutor())) {
                log.debug("shutdown removed job:{}", taskSnapshot2.getJob().getNodeId());
            } else {
                concurrentHashMap.put(scheduleJob.getNodeId(), scheduleJob);
                log.debug("change job [{}] executor:{} -> {}", new Object[]{taskSnapshot2.getJob().getNodeId(), taskSnapshot2.getJob().getExecutor(), scheduleJob.getExecutor()});
            }
            return shutdown(taskSnapshot2).then(Mono.empty());
        });
        Flux defer = Flux.defer(() -> {
            return doStart(concurrentHashMap.values());
        });
        arrayList.getClass();
        return flatMap.concatWith(defer.doOnNext((v1) -> {
            r2.add(v1);
        })).collectList().map((v0) -> {
            return Flux.fromIterable(v0);
        }).flatMapMany(flux -> {
            return this.repository.saveTaskSnapshots(flux.flatMap((v0) -> {
                return v0.dump();
            })).thenMany(flux);
        }).onErrorResume(th -> {
            return Flux.fromIterable(arrayList).flatMap((v0) -> {
                return v0.shutdown();
            }).then(Mono.error(th));
        });
    }

    protected Flux<Task> doStart(Collection<ScheduleJob> collection) {
        return Flux.defer(() -> {
            return Flux.fromIterable(collection).flatMap(this::scheduleTask).collectList().flatMapIterable(Function.identity()).flatMap(task -> {
                return task.start().thenReturn(task);
            });
        });
    }

    private Flux<Task> getTaskBySnapshot(TaskSnapshot taskSnapshot) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> {
            return scheduler.getTask(taskSnapshot.getId());
        });
    }

    private Flux<Scheduler> selectScheduler(ScheduleJob scheduleJob) {
        return (Flux) this.schedulerRegistry.getSchedulers().filterWhen(scheduler -> {
            return scheduler.canSchedule(scheduleJob);
        }).as(flux -> {
            return this.schedulerSelector.select(flux, scheduleJob);
        });
    }

    private Flux<Task> scheduleTask(ScheduleJob scheduleJob) {
        return selectScheduler(scheduleJob).switchIfEmpty(Mono.error(() -> {
            return new UnsupportedOperationException("no scheduler for " + scheduleJob.getExecutor());
        })).flatMap(scheduler -> {
            return scheduler.schedule(scheduleJob);
        });
    }

    public Flux<Task> getTasks(String str) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> {
            return scheduler.getSchedulingTask(str);
        });
    }

    public Flux<Worker> getWorkers() {
        return this.schedulerRegistry.getSchedulers().flatMap((v0) -> {
            return v0.getWorkers();
        });
    }

    public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository taskSnapshotRepository, SchedulerSelector schedulerSelector) {
        this.schedulerRegistry = schedulerRegistry;
        this.repository = taskSnapshotRepository;
        this.schedulerSelector = schedulerSelector;
    }
}
