/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.api.worker.WorkerSelector;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class LocalScheduler
implements Scheduler {
    private final String id;
    private WorkerSelector workerSelector = defaultSelector;
    static final WorkerSelector defaultSelector = (workers1, rule) -> workers1.take(1L);
    private final Map<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
    private final Map<String, Map<String, List<Task>>> executors = new ConcurrentHashMap<String, Map<String, List<Task>>>();
    private final Map<String, Task> tasks = new ConcurrentHashMap<String, Task>();

    public LocalScheduler(String id) {
        this.id = id;
    }

    @Override
    public Flux<Worker> getWorkers() {
        return Flux.fromIterable(this.workers.values());
    }

    @Override
    public Mono<Worker> getWorker(String workerId) {
        return Mono.justOrEmpty((Object)this.workers.get(workerId));
    }

    @Override
    public Mono<Boolean> canSchedule(ScheduleJob job) {
        return this.findWorker(job.getExecutor(), job).hasElements();
    }

    protected Flux<Worker> findWorker(String executor, ScheduleJob schedulingRule) {
        return this.workerSelector.select((Flux<Worker>)Flux.fromIterable(this.workers.values()).filterWhen(exe -> exe.getSupportExecutors().map(list -> list.contains(executor)).defaultIfEmpty((Object)false)), schedulingRule);
    }

    @Override
    public Flux<Task> schedule(ScheduleJob job) {
        return Flux.fromIterable(this.getExecutor(job.getInstanceId(), job.getNodeId())).flatMap(task -> {
            this.removeTask((Task)task);
            return task.shutdown();
        }).thenMany(this.createExecutor(job));
    }

    @Override
    public Mono<Void> shutdown(String instanceId) {
        return this.getSchedulingTask(instanceId).doOnNext(task -> this.tasks.remove(task.getId())).concatMapDelayError(Task::shutdown).doAfterTerminate(() -> this.clearExecutor(instanceId)).then();
    }

    @Override
    public Mono<Void> shutdownTask(String taskId) {
        Task task = this.removeTask(taskId);
        if (null != task) {
            return task.shutdown().then();
        }
        return Mono.empty();
    }

    private Flux<Task> createExecutor(ScheduleJob job) {
        return this.findWorker(job.getExecutor(), job).switchIfEmpty((Publisher)Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor()))).flatMap(worker -> worker.createTask(this.id, job)).doOnNext(this::addTask);
    }

    private void addTask(Task task) {
        this.tasks.put(task.getId(), task);
        this.getExecutor(task.getJob().getInstanceId(), task.getJob().getNodeId()).add(task);
    }

    private Task removeTask(String taskId) {
        Task task = this.tasks.get(taskId);
        if (task != null) {
            this.removeTask(task);
        }
        return task;
    }

    private void removeTask(Task task) {
        this.tasks.remove(task.getId());
        this.getExecutor(task.getJob().getInstanceId(), task.getJob().getNodeId()).remove(task);
    }

    @Override
    public Flux<Task> getSchedulingTask(String instanceId) {
        return Flux.fromIterable(this.getExecutor(instanceId).values()).flatMapIterable(Function.identity());
    }

    @Override
    public Mono<Task> getTask(String taskId) {
        return Mono.justOrEmpty((Object)this.tasks.get(taskId));
    }

    @Override
    public Flux<Task> getSchedulingTasks() {
        return Flux.fromIterable(this.executors.values()).flatMapIterable(Map::values).flatMapIterable(Function.identity());
    }

    @Override
    public Mono<Long> totalTask() {
        return this.getSchedulingTasks().count();
    }

    private List<Task> getExecutor(String instanceId, String nodeId) {
        return this.getExecutor(instanceId).computeIfAbsent(nodeId, ignore -> new CopyOnWriteArrayList());
    }

    private void clearExecutor(String instanceId) {
        this.executors.remove(instanceId);
    }

    private Map<String, List<Task>> getExecutor(String instanceId) {
        return this.executors.computeIfAbsent(instanceId, ignore -> new ConcurrentHashMap());
    }

    public void addWorker(Worker worker) {
        this.workers.put(worker.getId(), worker);
    }

    @Override
    public String getId() {
        return this.id;
    }

    public void setWorkerSelector(WorkerSelector workerSelector) {
        this.workerSelector = workerSelector;
    }
}

