package org.jetlinks.rule.engine.cluster.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/scheduler/ClusterRpcSchedulerRegistry.class */
public class ClusterRpcSchedulerRegistry implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterRpcSchedulerRegistry.class);
    private final RpcManager rpcManager;
    private final Map<String, Scheduler> locals = new NonBlockingHashMap();
    private final Map<String, Scheduler> remotes = new NonBlockingHashMap();
    private final Sinks.Many<Scheduler> joinListener = Reactors.createMany();
    private final Sinks.Many<Scheduler> leaveListener = Reactors.createMany();

    public ClusterRpcSchedulerRegistry(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
        init();
    }

    void init() {
        this.rpcManager.getServices(SchedulerRpcService.class).subscribe(rpcService -> {
            this.remotes.put(rpcService.id(), new ClusterRemoteScheduler(rpcService.id(), (SchedulerRpcService) rpcService.service()));
        });
        this.rpcManager.listen(SchedulerRpcService.class).flatMap(serviceEvent -> {
            return handleEvent(serviceEvent).onErrorResume(th -> {
                return Mono.empty();
            });
        }).subscribe(r1 -> {
        });
    }

    private Mono<Void> handleEvent(ServiceEvent serviceEvent) {
        Scheduler remove;
        if (serviceEvent.getType() == ServiceEvent.Type.added) {
            return this.rpcManager.getService(serviceEvent.getServerNodeId(), serviceEvent.getServiceId(), SchedulerRpcService.class).map(schedulerRpcService -> {
                return new ClusterRemoteScheduler(serviceEvent.getServiceId(), schedulerRpcService);
            }).doOnNext(clusterRemoteScheduler -> {
                if (this.remotes.put(serviceEvent.getServiceId(), clusterRemoteScheduler) != null || this.joinListener.currentSubscriberCount() <= 0) {
                    return;
                }
                this.joinListener.emitNext(clusterRemoteScheduler, Reactors.emitFailureHandler());
            }).then();
        }
        if (serviceEvent.getType() == ServiceEvent.Type.removed && null != (remove = this.remotes.remove(serviceEvent.getServiceId())) && this.leaveListener.currentSubscriberCount() > 0) {
            this.leaveListener.emitNext(remove, Reactors.emitFailureHandler());
        }
        return Mono.empty();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList(this.locals.values());
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> getSchedulers() {
        return Flux.concat(new Publisher[]{Flux.fromIterable(getLocalSchedulers()), Flux.fromIterable(this.remotes.values())});
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinListener.asFlux();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveListener.asFlux();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public void register(Scheduler scheduler) {
        if (this.locals.containsKey(scheduler.getId())) {
            throw new IllegalStateException("scheduler " + scheduler.getId() + " already registered");
        }
        this.locals.put(scheduler.getId(), scheduler);
        this.rpcManager.registerService(scheduler.getId(), new SchedulerRpcServiceImpl(scheduler));
    }
}
