/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.cluster.AbstractDeviceOperationBroker;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

@Deprecated
public class ClusterDeviceOperationBroker
extends AbstractDeviceOperationBroker {
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceOperationBroker.class);
    private static final String QUALIFIER_REPLY = "cdob_r";
    private static final String QUALIFIER_SEND = "cdob_s";
    final ExtendedCluster cluster;
    final DeviceSessionManager sessionManager;
    private final Map<String, RepayableDeviceMessage<?>> awaits = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(5L)).removalListener(notify -> {
        if (notify.getCause() == RemovalCause.EXPIRED) {
            try {
                log.debug("discard await reply message[{}] message,{}", notify.getKey(), notify.getValue());
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }).build().asMap();
    private final Sinks.Many<Message> sendToDevice = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);

    public ClusterDeviceOperationBroker(ExtendedCluster cluster, DeviceSessionManager sessionManager) {
        this.cluster = cluster;
        this.sessionManager = sessionManager;
        cluster.handler(ignore -> new ClusterHandler());
    }

    private String currentServerId() {
        return this.sessionManager.getCurrentServerId();
    }

    @Override
    public Flux<DeviceStateInfo> getDeviceState(String deviceGatewayServerId, Collection<String> deviceIdList) {
        return Flux.fromIterable(deviceIdList).flatMap(id -> this.sessionManager.checkAlive(id, false).map(alive -> new DeviceStateInfo(id, alive != false ? (byte)1 : (byte)-1)));
    }

    @Override
    public Flux<DeviceMessageReply> handleReply(String deviceId, String messageId, Duration timeout) {
        return super.handleReply(deviceId, messageId, timeout).doOnCancel(() -> this.awaits.remove(this.getAwaitReplyKey(deviceId, messageId)));
    }

    @Override
    public Mono<Integer> send(String deviceGatewayServerId, Publisher<? extends Message> message) {
        if (this.currentServerId().equals(deviceGatewayServerId)) {
            return Flux.from(message).flatMap(this::handleSendToDevice).then(Reactors.ALWAYS_ONE);
        }
        Member member = this.getMember(deviceGatewayServerId);
        if (null == member) {
            return Reactors.ALWAYS_ZERO;
        }
        return Flux.from(message).flatMap(msg -> {
            msg.addHeader(Headers.sendFrom, (Object)this.sessionManager.getCurrentServerId());
            if (msg instanceof RepayableDeviceMessage) {
                String key = this.getAwaitReplyKey((DeviceMessage)((RepayableDeviceMessage)msg));
                this.awaits.put(key, (RepayableDeviceMessage)msg);
            }
            return this.cluster.send(member, io.scalecube.cluster.transport.api.Message.builder().qualifier(QUALIFIER_SEND).data(msg).build());
        }).then(Reactors.ALWAYS_ONE);
    }

    @Override
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return Reactors.ALWAYS_ZERO;
    }

    @Override
    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        return this.sendToDevice.asFlux();
    }

    private Mono<Void> handleSendToDevice(Message message) {
        if (message instanceof RepayableDeviceMessage) {
            RepayableDeviceMessage msg = (RepayableDeviceMessage)message;
            this.awaits.put(this.getAwaitReplyKey((DeviceMessage)msg), msg);
        }
        if (this.sendToDevice.currentSubscriberCount() == 0) {
            log.warn("no handler for message {}", (Object)message);
            return this.doReply(this.createReply(message).error(ErrorCode.SYSTEM_ERROR));
        }
        try {
            this.sendToDevice.emitNext((Object)message, Reactors.emitFailureHandler());
        }
        catch (Throwable err) {
            return this.doReply(this.createReply(message).error(err));
        }
        return Mono.empty();
    }

    private DeviceMessageReply createReply(Message message) {
        if (message instanceof RepayableDeviceMessage) {
            return ((RepayableDeviceMessage)message).newReply();
        }
        return new CommonDeviceMessageReply().messageId(message.getMessageId());
    }

    @Override
    protected Mono<Void> doReply(DeviceMessageReply reply) {
        RepayableDeviceMessage<?> msg = this.awaits.remove(this.getAwaitReplyKey((DeviceMessage)reply));
        Member member = null;
        if (null != msg) {
            member = msg.getHeader(Headers.sendFrom).map(this::getMember).orElse(null);
        }
        Function<Member, Mono> handler = _member -> this.cluster.send((Member)_member, io.scalecube.cluster.transport.api.Message.builder().qualifier(QUALIFIER_REPLY).data((Object)reply).build());
        if (null != member) {
            return handler.apply(member);
        }
        return Flux.fromIterable((Iterable)this.cluster.otherMembers()).flatMap(handler).then();
    }

    @Override
    public Disposable handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        return Disposables.disposed();
    }

    public Member getMember(String id) {
        for (Member member : this.cluster.otherMembers()) {
            if (!Objects.equals(member.id(), id) && !Objects.equals(member.alias(), id)) continue;
            return member;
        }
        return null;
    }

    class ClusterHandler
    implements ClusterMessageHandler {
        ClusterHandler() {
        }

        public void onGossip(io.scalecube.cluster.transport.api.Message gossip) {
            this.onMessage(gossip);
        }

        public void onMessage(io.scalecube.cluster.transport.api.Message message) {
            if (ClusterDeviceOperationBroker.QUALIFIER_SEND.equals(message.qualifier())) {
                Message msg = (Message)message.data();
                TraceHolder.copyContext((Map)message.headers(), (Object)msg, Message::addHeader);
                ClusterDeviceOperationBroker.this.handleSendToDevice((Message)message.data()).contextWrite((ContextView)TraceHolder.readToContext((ContextView)Context.empty(), (Map)message.headers())).subscribe();
            } else if (ClusterDeviceOperationBroker.QUALIFIER_REPLY.equals(message.qualifier())) {
                Message msg = (Message)message.data();
                TraceHolder.copyContext((Map)message.headers(), (Object)msg, Message::addHeader);
                ClusterDeviceOperationBroker.this.handleReply((DeviceMessageReply)message.data());
            }
        }
    }
}

