/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.scalecube;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public class ExtendedClusterImpl
implements ExtendedCluster {
    private static final Logger log = LoggerFactory.getLogger(ExtendedClusterImpl.class);
    private static final String FEATURE_QUALIFIER = "_c_fts_q";
    private static final String FEATURE_FROM = "_c_fts_f";
    private final ClusterImpl real;
    private final Sinks.Many<Message> messageSink = Sinks.many().multicast().directBestEffort();
    private final Sinks.Many<Message> gossipSink = Sinks.many().multicast().directBestEffort();
    private final Sinks.Many<MembershipEvent> membershipEvents = Sinks.many().multicast().directBestEffort();
    private final List<ClusterMessageHandler> handlers = new CopyOnWriteArrayList<ClusterMessageHandler>();
    private volatile boolean started;
    private final List<Mono<Void>> startThen = new CopyOnWriteArrayList<Mono<Void>>();
    private final List<Message> messageCache = new CopyOnWriteArrayList<Message>();
    private long cacheEndWithTime;
    private final List<String> localFeatures = new CopyOnWriteArrayList<String>();
    private final Map<String, Set<String>> featureMembers = new ConcurrentHashMap<String, Set<String>>();
    private final Disposable.Composite disposable = Disposables.composite();

    public ExtendedClusterImpl(ClusterConfig config) {
        this(new ClusterImpl(config));
    }

    public ExtendedClusterImpl(ClusterImpl impl) {
        this.real = impl.handler(cluster -> new ClusterMessageHandlerDispatcher());
    }

    private void addFeature(Member member, Collection<String> features) {
        if (CollectionUtils.isEmpty(features)) {
            return;
        }
        log.debug("register cluster [{}] feature:{}", (Object)(member.alias() == null ? member.id() : member.alias()), features);
        for (String feature : features) {
            Set members = this.featureMembers.computeIfAbsent(feature, k -> new ConcurrentHashMap().keySet((String)k));
            members.add(member.id());
            if (!StringUtils.hasText((String)member.alias())) continue;
            members.add(member.alias());
        }
    }

    private void removeFeature(Member member) {
        for (Set<String> value : this.featureMembers.values()) {
            if (value.remove(member.id())) {
                log.debug("remove cluster [{}] features", (Object)(member.alias() == null ? member.id() : member.alias()));
            }
            if (!StringUtils.hasText((String)member.alias())) continue;
            value.remove(member.alias());
        }
    }

    private <T> void doHandler(T e, BiConsumer<ClusterMessageHandler, T> consumer) {
        for (ClusterMessageHandler handler : this.handlers) {
            consumer.accept(handler, (ClusterMessageHandler)e);
        }
    }

    @Override
    public ExtendedClusterImpl handler(Function<ExtendedCluster, ClusterMessageHandler> handlerFunction) {
        ClusterMessageHandler handler = handlerFunction.apply(this);
        this.handlers.add(handler);
        this.writeCacheMessage(handler);
        return this;
    }

    @Override
    public ExtendedClusterImpl handler(ClusterMessageHandler handler) {
        this.handlers.add(handler);
        this.writeCacheMessage(handler);
        return this;
    }

    private void writeCacheMessage(ClusterMessageHandler handler) {
        for (Message message : this.messageCache) {
            handler.onMessage(message);
        }
    }

    public Mono<ExtendedCluster> start() {
        this.started = true;
        this.cacheEndWithTime = System.currentTimeMillis() + Duration.ofSeconds(30L).toMillis();
        return this.real.start().then(Mono.defer(this::broadcastFeature)).then(Flux.fromIterable(this.startThen).flatMap(Function.identity()).then(Mono.fromRunnable(this.startThen::clear))).then(Mono.fromRunnable(this::startBroadcastFeature)).thenReturn((Object)this);
    }

    public ExtendedCluster startAwait() {
        this.start().block();
        return this;
    }

    private Mono<Void> broadcastFeature() {
        return this.spreadGossip(Message.builder().qualifier(FEATURE_QUALIFIER).header(FEATURE_FROM, this.member().id()).data(this.localFeatures).build()).then();
    }

    private void startBroadcastFeature() {
        this.addFeature(this.member(), this.localFeatures);
        this.disposable.add(Flux.interval((Duration)Duration.ofSeconds(10L), (Duration)Duration.ofSeconds(30L)).flatMap(l -> this.broadcastFeature().onErrorResume(err -> Mono.empty())).subscribe());
    }

    @Override
    public Flux<MembershipEvent> listenMembership() {
        return this.membershipEvents.asFlux();
    }

    @Override
    public Disposable listenMessage(@Nonnull String qualifier, BiFunction<Message, ExtendedCluster, Mono<Void>> handler) {
        return this.listen(this.messageSink, qualifier, handler);
    }

    @Override
    public Disposable listenGossip(@Nonnull String qualifier, BiFunction<Message, ExtendedCluster, Mono<Void>> handler) {
        return this.listen(this.gossipSink, qualifier, handler);
    }

    private Disposable listen(Sinks.Many<Message> sink, @Nonnull String qualifier, BiFunction<Message, ExtendedCluster, Mono<Void>> handler) {
        return sink.asFlux().filter(msg -> Objects.equals(qualifier, msg.qualifier())).flatMap(msg -> ((Mono)handler.apply((Message)msg, this)).contextWrite((ContextView)TraceHolder.readToContext((ContextView)Context.empty(), (Map)msg.headers())).onErrorResume(err -> {
            log.error(err.getMessage(), err);
            return Mono.empty();
        })).subscribe();
    }

    public Address address() {
        return this.real.address();
    }

    public Mono<Void> send(Member member, Message message) {
        if (TraceHolder.isEnabled()) {
            return TraceHolder.writeContextTo((Object)Message.with((Message)message), Message.Builder::header).flatMap(msg -> this.real.send(member, msg.build()));
        }
        return this.real.send(member, message);
    }

    public Mono<Void> send(Address address, Message message) {
        if (TraceHolder.isEnabled()) {
            return TraceHolder.writeContextTo((Object)Message.with((Message)message), Message.Builder::header).flatMap(msg -> this.real.send(address, msg.build()));
        }
        return this.real.send(address, message);
    }

    public Mono<Message> requestResponse(Address address, Message request) {
        if (TraceHolder.isEnabled()) {
            return TraceHolder.writeContextTo((Object)Message.with((Message)request), Message.Builder::header).flatMap(msg -> this.real.requestResponse(address, request));
        }
        return this.real.requestResponse(address, request);
    }

    public Mono<Message> requestResponse(Member member, Message request) {
        if (TraceHolder.isEnabled()) {
            return TraceHolder.writeContextTo((Object)Message.with((Message)request), Message.Builder::header).flatMap(msg -> this.real.requestResponse(member, request));
        }
        return this.real.requestResponse(member, request);
    }

    public Mono<String> spreadGossip(Message message) {
        if (TraceHolder.isEnabled()) {
            return TraceHolder.writeContextTo((Object)Message.with((Message)message), Message.Builder::header).flatMap(msg -> this.real.spreadGossip(message));
        }
        return this.real.spreadGossip(message);
    }

    public <T> Optional<T> metadata() {
        return this.real.metadata();
    }

    public <T> Optional<T> metadata(Member member) {
        return this.real.metadata(member);
    }

    public Member member() {
        return this.real.member();
    }

    public Optional<Member> member(String id) {
        return this.real.member(id);
    }

    public Optional<Member> member(Address address) {
        return this.real.member(address);
    }

    public Collection<Member> members() {
        return this.real.members();
    }

    public Collection<Member> otherMembers() {
        return this.real.otherMembers();
    }

    public <T> Mono<Void> updateMetadata(T metadata) {
        if (!this.started) {
            this.startThen.add((Mono<Void>)this.real.updateMetadata(metadata));
            return null;
        }
        return this.real.updateMetadata(metadata);
    }

    public void shutdown() {
        this.real.shutdown();
    }

    public Mono<Void> onShutdown() {
        return this.real.onShutdown();
    }

    public boolean isShutdown() {
        return this.real.isShutdown();
    }

    @Override
    public void registerFeatures(Collection<String> feature) {
        this.localFeatures.addAll(feature);
    }

    @Override
    public List<Member> featureMembers(String feature) {
        Set<String> members = this.featureMembers.get(feature);
        if (CollectionUtils.isEmpty(members)) {
            return Collections.emptyList();
        }
        Collection<Member> other = this.otherMembers();
        ArrayList<Member> supports = new ArrayList<Member>(other.size() + 1);
        for (Member member : other) {
            if (!members.contains(member.id())) continue;
            supports.add(member);
        }
        if (members.contains(this.member().id())) {
            supports.add(this.member());
        }
        return supports;
    }

    @Override
    public boolean supportFeature(String member, String featureId) {
        Set<String> members = this.featureMembers.get(featureId);
        if (CollectionUtils.isEmpty(members)) {
            return false;
        }
        return members.contains(member);
    }

    class ClusterMessageHandlerDispatcher
    implements ClusterMessageHandler {
        ClusterMessageHandlerDispatcher() {
        }

        public void onMessage(Message message) {
            if (System.currentTimeMillis() <= ExtendedClusterImpl.this.cacheEndWithTime && ExtendedClusterImpl.this.messageCache.size() < 2048) {
                ExtendedClusterImpl.this.messageCache.add(message);
            }
            ExtendedClusterImpl.this.messageSink.emitNext((Object)message, Reactors.emitFailureHandler());
            ExtendedClusterImpl.this.doHandler(message, ClusterMessageHandler::onMessage);
        }

        public void onGossip(Message gossip) {
            if (ExtendedClusterImpl.FEATURE_QUALIFIER.equals(gossip.qualifier())) {
                String from = gossip.header(ExtendedClusterImpl.FEATURE_FROM);
                ExtendedClusterImpl.this.member(from).ifPresent(mem -> ExtendedClusterImpl.this.addFeature(mem, (Collection)gossip.data()));
                return;
            }
            ExtendedClusterImpl.this.messageSink.emitNext((Object)gossip, Reactors.emitFailureHandler());
            ExtendedClusterImpl.this.doHandler(gossip, ClusterMessageHandler::onGossip);
        }

        public void onMembershipEvent(MembershipEvent event) {
            ExtendedClusterImpl.this.membershipEvents.emitNext((Object)event, Reactors.emitFailureHandler());
            ExtendedClusterImpl.this.doHandler(event, ClusterMessageHandler::onMembershipEvent);
            if (event.isRemoved() || event.isLeaving()) {
                ExtendedClusterImpl.this.removeFeature(event.member());
            }
            if (event.isAdded()) {
                ExtendedClusterImpl.this.broadcastFeature().subscribe();
            }
        }
    }
}

