package org.jetlinks.rule.engine.cluster.scheduler;

import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceInfo;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.supports.scalecube.EmptyServiceMethodRegistry;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/scheduler/ClusterSchedulerRegistry.class */
public class ClusterSchedulerRegistry implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerRegistry.class);
    private static final String REG_QUALIFIER = "rule/scheduler/reg";
    private static final String PULL_QUALIFIER = "rule/scheduler/pull";
    private static final String REG_FROM_HEADER = "from";
    private static final String SERVICE_SCHEDULER_ID_TAG = "schedulerId";
    private final ExtendedCluster cluster;
    private final ServiceCall serviceCall;
    private final Scheduler localScheduler;
    private final Map<String, ClusterRemoteScheduler> remotes = new NonBlockingHashMap();
    private final Sinks.Many<Scheduler> schedulerJoin = Sinks.many().multicast().directBestEffort();
    private final Sinks.Many<Scheduler> schedulerLeave = Sinks.many().multicast().directBestEffort();

    public ClusterSchedulerRegistry(ExtendedCluster extendedCluster, ServiceCall serviceCall, Scheduler scheduler) {
        this.cluster = extendedCluster;
        this.serviceCall = serviceCall;
        this.localScheduler = scheduler;
        init();
    }

    void init() {
        this.cluster.handler(extendedCluster -> {
            return new ClusterMessageHandler() { // from class: org.jetlinks.rule.engine.cluster.scheduler.ClusterSchedulerRegistry.1
                public void onGossip(Message message) {
                    ClusterSchedulerRegistry.this.handleClusterMessage(message);
                }

                public void onMessage(Message message) {
                    ClusterSchedulerRegistry.this.handleClusterMessage(message);
                }

                public void onMembershipEvent(MembershipEvent membershipEvent) {
                    ClusterRemoteScheduler clusterRemoteScheduler;
                    if (membershipEvent.isAdded() || membershipEvent.isUpdated()) {
                        ClusterSchedulerRegistry.this.pullRemote(membershipEvent.member()).subscribe();
                    }
                    if ((membershipEvent.isRemoved() || membershipEvent.isLeaving()) && null != (clusterRemoteScheduler = (ClusterRemoteScheduler) ClusterSchedulerRegistry.this.remotes.remove(membershipEvent.member().id()))) {
                        ClusterSchedulerRegistry.this.schedulerLeave.emitNext(clusterRemoteScheduler, Reactors.emitFailureHandler());
                    }
                }
            };
        });
        Flux.fromIterable(this.cluster.otherMembers()).flatMap(this::pullRemote).blockLast();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> pullRemote(Member member) {
        return this.cluster.send(member, Message.withQualifier(PULL_QUALIFIER).header(REG_FROM_HEADER, this.cluster.member().id()).data(this.localScheduler.getId()).build()).onErrorResume(th -> {
            log.error(th.getMessage(), th);
            return Mono.empty();
        });
    }

    private Mono<Void> pushRemote(Member member) {
        return this.cluster.send(member, Message.withQualifier(REG_QUALIFIER).header(REG_FROM_HEADER, this.cluster.member().id()).data(this.localScheduler.getId()).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleClusterMessage(Message message) {
        if (Objects.equals(REG_QUALIFIER, message.qualifier()) || Objects.equals(PULL_QUALIFIER, message.qualifier())) {
            String header = message.header(REG_FROM_HEADER);
            String str = (String) message.data();
            if (this.localScheduler.getId().equals(str)) {
                log.warn("register same local scheduler [{}] from {}", str, this.cluster.member(header).orElse(null));
                return;
            } else {
                if (this.remotes.containsKey(header)) {
                    return;
                }
                log.debug("register new scheduler {} from {}", str, this.cluster.member(header).orElse(null));
                ClusterRemoteScheduler clusterRemoteScheduler = new ClusterRemoteScheduler(str, (SchedulerRpcService) this.serviceCall.router((serviceRegistry, serviceMessage) -> {
                    return serviceRegistry.lookupService(serviceMessage).stream().filter(serviceReference -> {
                        return Objects.equals(str, serviceReference.tags().get(SERVICE_SCHEDULER_ID_TAG));
                    }).findFirst();
                }).methodRegistry(EmptyServiceMethodRegistry.INSTANCE).api(SchedulerRpcService.class));
                if (this.remotes.put(header, clusterRemoteScheduler) != null) {
                    this.schedulerJoin.emitNext(clusterRemoteScheduler, Reactors.emitFailureHandler());
                }
            }
        }
        if (Objects.equals(PULL_QUALIFIER, message.qualifier())) {
            this.cluster.member(message.header(REG_FROM_HEADER)).ifPresent(member -> {
                pushRemote(member).subscribe();
            });
        }
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public List<Scheduler> getLocalSchedulers() {
        return Collections.singletonList(this.localScheduler);
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> getSchedulers() {
        return Flux.concat(new Publisher[]{Flux.fromIterable(getLocalSchedulers()), Flux.fromIterable(this.remotes.values())});
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.schedulerJoin.asFlux().onBackpressureBuffer();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.schedulerLeave.asFlux().onBackpressureBuffer();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public void register(Scheduler scheduler) {
        throw new UnsupportedOperationException();
    }

    public static ServiceInfo createService(Scheduler scheduler) {
        return ServiceInfo.fromServiceInstance(new SchedulerRpcServiceImpl(scheduler)).tag(SERVICE_SCHEDULER_ID_TAG, scheduler.getId()).build();
    }
}
