/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.server;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.LostDeviceSession;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public class ClusterSendToDeviceMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ClusterSendToDeviceMessageHandler.class);
    private static final HeaderKey<Boolean> resumeSession = HeaderKey.of((String)"resume-session", (Object)true);
    private final DeviceSessionManager sessionManager;
    private final MessageHandler handler;
    private final DeviceRegistry registry;
    private final DecodedClientMessageHandler decodedClientMessageHandler;

    public ClusterSendToDeviceMessageHandler(DeviceSessionManager sessionManager, MessageHandler handler, DeviceRegistry registry, DecodedClientMessageHandler decodedClientMessageHandler) {
        this.sessionManager = sessionManager;
        this.handler = handler;
        this.registry = registry;
        this.decodedClientMessageHandler = decodedClientMessageHandler;
        this.init();
    }

    private void init() {
        this.handler.handleSendToDeviceMessage(this.sessionManager.getCurrentServerId()).flatMap(msg -> this.handleMessage((Message)msg).onErrorResume(err -> {
            log.error("handle send to device message error {}", msg, err);
            return Mono.empty();
        }), 10240).subscribe();
    }

    private DeviceMessageReply createReply(Message message) {
        if (message instanceof RepayableDeviceMessage) {
            return ((RepayableDeviceMessage)message).newReply();
        }
        return new CommonDeviceMessageReply().deviceId(((DeviceMessage)message).getDeviceId()).messageId(message.getMessageId());
    }

    private Mono<Void> handleMessage(Message msg) {
        if (!(msg instanceof DeviceMessage)) {
            return Mono.empty();
        }
        DeviceMessage message = (DeviceMessage)msg;
        if (message.getDeviceId() == null) {
            log.warn("deviceId is null :{}", (Object)message);
            return Mono.empty();
        }
        return this.sessionManager.getSession(message.getDeviceId()).map(session -> this.sendTo((DeviceSession)session, message)).defaultIfEmpty((Object)Mono.defer(() -> this.sendToUnknownSession(message))).flatMap(Function.identity()).contextWrite((ContextView)TraceHolder.readToContext((ContextView)Context.empty(), (Map)message.getHeaders()));
    }

    private Mono<Void> sendTo(DeviceSession session, DeviceMessage message) {
        if (session.isWrapFrom(ChildrenDeviceSession.class)) {
            return this.sendToParentSession(session.getOperator(), ((ChildrenDeviceSession)session.unwrap(ChildrenDeviceSession.class)).getParentDevice(), message);
        }
        DeviceOperator device = session.getOperator();
        if (session.isWrapFrom(LostDeviceSession.class)) {
            return this.retryResume(device, message);
        }
        CodecContext context = new CodecContext(device, message, DeviceSession.trace((DeviceSession)session));
        return ((Flux)device.getProtocol().flatMap(protocol -> protocol.getMessageCodec(context.session.getTransport())).flatMapMany(codec -> codec.encode((MessageEncodeContext)context)).as((Function)FluxTracer.create((String)DeviceTracer.SpanName.encode((String)device.getDeviceId()), (span, msg) -> span.setAttribute(DeviceTracer.SpanKey.message, (Object)msg.toString())))).map(msg -> context.session.send(msg).then()).defaultIfEmpty((Object)Mono.defer(() -> this.handleUnsupportedMessage(context))).flatMap(Function.identity()).onErrorResume(err -> {
            if (!(err instanceof DeviceOperationException)) {
                log.error("handle send to device message error {}", (Object)context.message, err);
            }
            if (!context.alreadyReply) {
                return this.doReply(context, (DeviceMessage)this.createReply((Message)context.message).error(err));
            }
            return Mono.empty();
        }).then(Mono.defer(() -> this.handleMessageSent(context)));
    }

    private Mono<Void> handleMessageSent(CodecContext context) {
        if (context.alreadyReply) {
            return Mono.empty();
        }
        if (context.message.getHeader(Headers.async).orElse(false).booleanValue()) {
            return this.doReply(context, (DeviceMessage)this.createReply((Message)context.message).message(ErrorCode.REQUEST_HANDLING.getText()).code(ErrorCode.REQUEST_HANDLING.name()).success()).then();
        }
        return Mono.empty();
    }

    private Mono<Void> handleUnsupportedMessage(CodecContext context) {
        if (!context.alreadyReply) {
            if (context.message instanceof DisconnectDeviceMessage) {
                return this.sessionManager.remove(context.device.getDeviceId(), false).then(this.doReply(context, (DeviceMessage)this.createReply((Message)context.message).success()));
            }
            if (context.message instanceof ChildDeviceMessage) {
                ChildDeviceMessage child = (ChildDeviceMessage)context.message;
                Message childMsg = child.getChildDeviceMessage();
                if (childMsg instanceof DisconnectDeviceMessage) {
                    return this.sessionManager.remove(((DisconnectDeviceMessage)childMsg).getDeviceId(), false).then(this.doReply(context, (DeviceMessage)this.createReply((Message)context.message).success()));
                }
                if (childMsg instanceof DeviceStateCheckMessage) {
                    return this.doReply(context, (DeviceMessage)this.createReply((Message)context.message).success());
                }
            }
            return this.doReply(context, (DeviceMessage)this.createReply((Message)context.message).error(ErrorCode.UNSUPPORTED_MESSAGE));
        }
        return Mono.empty();
    }

    private Mono<Void> sendToUnknownSession(DeviceMessage message) {
        return this.registry.getDevice(message.getDeviceId()).flatMap(device -> device.getSelfConfig((ConfigKey)DeviceConfigKey.parentGatewayId).flatMap(arg_0 -> ((DeviceRegistry)this.registry).getDevice(arg_0)).map(parentDevice -> this.sendToParentSession((DeviceOperator)device, (DeviceOperator)parentDevice, message)).defaultIfEmpty((Object)Mono.defer(() -> this.sendToNoSession((DeviceOperator)device, message)))).flatMap(Function.identity());
    }

    private Mono<Void> sendToNoSession(DeviceOperator device, DeviceMessage message) {
        log.warn("device session state failed,try resume. {}", (Object)message);
        return this.sessionManager.checkAlive(message.getDeviceId(), false).flatMap(exists -> {
            if (exists.booleanValue()) {
                return this.retryResume(device, message);
            }
            boolean resume = message.getHeader(resumeSession).orElse(false);
            return this.doReply(device, (DeviceMessage)this.createReply((Message)message).addHeader("reason", (Object)"session_not_exists").error(resume ? ErrorCode.CONNECTION_LOST : ErrorCode.CLIENT_OFFLINE));
        });
    }

    private Mono<Void> retryResume(DeviceOperator device, DeviceMessage message) {
        if (message.getHeader(resumeSession).isPresent()) {
            return this.doReply(device, (DeviceMessage)this.createReply((Message)message).error(ErrorCode.CONNECTION_LOST));
        }
        message.addHeader(resumeSession, (Object)true);
        if (this.handler instanceof DeviceOperationBroker) {
            return device.getSelfConfig((ConfigKey)DeviceConfigKey.connectionServerId).flatMap(serverId -> ((DeviceOperationBroker)this.handler).send(serverId, (Publisher)Mono.just((Object)message))).flatMap(i -> {
                if (i > 0) {
                    return Mono.empty();
                }
                return this.doReply(device, (DeviceMessage)this.createReply((Message)message).error(ErrorCode.CONNECTION_LOST));
            });
        }
        return this.doReply(device, (DeviceMessage)this.createReply((Message)message).error(ErrorCode.CONNECTION_LOST));
    }

    private Mono<Void> sendToParentSession(DeviceOperator device, DeviceOperator parent, DeviceMessage message) {
        ChildDeviceMessage child = new ChildDeviceMessage();
        child.setDeviceId(parent.getDeviceId());
        child.setChildDeviceId(device.getDeviceId());
        child.setChildDeviceMessage((Message)message);
        child.setMessageId(message.getMessageId());
        Headers.copyFunctionalHeader((Message)message, (Message)child);
        return this.handleMessage((Message)child);
    }

    Mono<Void> doReply(DeviceOperator device, DeviceMessage message) {
        return this.decodedClientMessageHandler.handleMessage(device, (Message)message).then();
    }

    Mono<Void> doReply(CodecContext context, DeviceMessage message) {
        if (context != null) {
            if (context.message.getHeader(Headers.sendAndForget).orElse(false).booleanValue() || context.alreadyReply) {
                return Mono.empty();
            }
            context.alreadyReply = true;
            return this.doReply(context.device, message).contextWrite((ContextView)TraceHolder.readToContext((ContextView)Context.empty(), (Map)context.message.getHeaders()));
        }
        return this.doReply((DeviceOperator)null, message);
    }

    class CodecContext
    implements ToDeviceMessageContext {
        private final DeviceOperator device;
        private final DeviceMessage message;
        private final DeviceSession session;
        private volatile boolean alreadyReply = false;

        CodecContext(DeviceOperator device, DeviceMessage message, DeviceSession session) {
            this.device = device;
            this.message = message;
            this.session = session;
        }

        @Nullable
        public DeviceOperator getDevice() {
            return this.device;
        }

        public Mono<DeviceOperator> getDevice(String deviceId) {
            return ClusterSendToDeviceMessageHandler.this.registry.getDevice(deviceId);
        }

        public Map<String, Object> getConfiguration() {
            return super.getConfiguration();
        }

        public Optional<Object> getConfig(String key) {
            return super.getConfig(key);
        }

        @Nonnull
        public Message getMessage() {
            return this.message;
        }

        @Nonnull
        public Mono<Void> reply(@Nonnull Publisher<? extends DeviceMessage> replyMessage) {
            this.alreadyReply = true;
            return Flux.from(replyMessage).flatMap(msg -> ClusterSendToDeviceMessageHandler.this.decodedClientMessageHandler.handleMessage(this.device, (Message)msg)).then();
        }

        public Mono<Boolean> sendToDevice(@Nonnull EncodedMessage message) {
            return this.session.send(message);
        }

        public Mono<Void> disconnect() {
            return ClusterSendToDeviceMessageHandler.this.sessionManager.remove(this.device.getDeviceId(), true).then();
        }

        @Nonnull
        public DeviceSession getSession() {
            return this.session;
        }

        public Mono<DeviceSession> getSession(String deviceId) {
            return ClusterSendToDeviceMessageHandler.this.sessionManager.getSession(deviceId);
        }

        public Mono<Boolean> sessionIsAlive(String deviceId) {
            return ClusterSendToDeviceMessageHandler.this.sessionManager.isAlive(deviceId);
        }
    }
}

