/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.device.session;

import io.scalecube.services.annotations.ServiceMethod;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.device.session.AbstractDeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClusterDeviceSessionManager
extends AbstractDeviceSessionManager {
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceSessionManager.class);
    private final RpcManager rpcManager;
    private final Map<String, Service> services = new NonBlockingHashMap();

    public ClusterDeviceSessionManager(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
    }

    @Override
    public void init() {
        super.init();
        this.rpcManager.registerService((Object)new ServiceImpl(() -> this));
        this.rpcManager.getServices(Service.class).subscribe(service -> this.addService(service.serverNodeId(), (Service)service.service()));
        this.rpcManager.listen(Service.class).subscribe(e -> {
            if (e.getType() == ServiceEvent.Type.removed) {
                this.services.remove(e.getServerNodeId());
            } else if (e.getType() == ServiceEvent.Type.added) {
                this.rpcManager.getService(e.getServerNodeId(), Service.class).subscribe(service -> this.addService(e.getServerNodeId(), (Service)service));
            }
        });
    }

    private void addService(String serverId, Service rpc) {
        this.services.put(serverId, new ErrorHandleService(serverId, rpc));
    }

    @Override
    public final String getCurrentServerId() {
        return this.rpcManager.currentServerId();
    }

    @Override
    protected final Mono<Boolean> initSessionConnection(DeviceSession session) {
        if (this.services.size() == 0) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.getServices().concatMap(service -> service.init(session.getDeviceId())).takeUntil(Boolean::booleanValue).any(Boolean::booleanValue);
    }

    @Override
    protected final Mono<Long> removeRemoteSession(String deviceId) {
        if (this.services.size() == 0) {
            return Reactors.ALWAYS_ZERO_LONG;
        }
        return this.getServices().flatMap(service -> service.remove(deviceId)).reduce(Math::addExact);
    }

    @Override
    protected final Mono<Long> getRemoteTotalSessions() {
        if (this.services.size() == 0) {
            return Reactors.ALWAYS_ZERO_LONG;
        }
        return this.getServices().flatMap(Service::total).reduce(Math::addExact);
    }

    @Override
    protected final Mono<Boolean> remoteSessionIsAlive(String deviceId) {
        if (this.services.size() == 0) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.getServices().flatMap(service -> service.isAlive(deviceId)).any(Boolean::booleanValue).defaultIfEmpty((Object)false);
    }

    @Override
    protected Mono<Boolean> checkRemoteSessionIsAlive(String deviceId) {
        if (this.services.size() == 0) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.getServices().flatMap(service -> service.checkAlive(deviceId)).any(Boolean::booleanValue).defaultIfEmpty((Object)false);
    }

    @Override
    protected Flux<DeviceSessionInfo> remoteSessions(String serverId) {
        if (StringUtils.isEmpty((Object)serverId)) {
            return this.getServices().flatMap(Service::sessions);
        }
        Service service = this.services.get(serverId);
        return service == null ? Flux.empty() : service.sessions();
    }

    private Flux<Service> getServices() {
        return Flux.fromIterable(this.services.values());
    }

    static class ErrorHandleService
    implements Service {
        private final String id;
        private final Service service;

        private void handleError(Throwable error) {
            log.warn("cluster[{}] session manager is failed", (Object)this.id, (Object)error);
        }

        @Override
        public Mono<Boolean> isAlive(String deviceId) {
            return this.service.isAlive(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override
        public Mono<Boolean> checkAlive(String deviceId) {
            return this.service.checkAlive(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override
        public Mono<Long> total() {
            return this.service.total().onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override
        public Mono<Boolean> init(String deviceId) {
            return this.service.init(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override
        public Mono<Long> remove(String deviceId) {
            return this.service.remove(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override
        public Flux<DeviceSessionInfo> sessions() {
            return this.service.sessions().onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Mono.empty();
            });
        }

        public ErrorHandleService(String id, Service service) {
            this.id = id;
            this.service = service;
        }
    }

    public static class ServiceImpl
    implements Service {
        private final Supplier<AbstractDeviceSessionManager> managerSupplier;

        private <T, Arg0> T doWith(Arg0 arg0, BiFunction<AbstractDeviceSessionManager, Arg0, T> arg, T defaultValue) {
            AbstractDeviceSessionManager manager = this.managerSupplier.get();
            if (manager == null) {
                return defaultValue;
            }
            return arg.apply(manager, arg0);
        }

        @Override
        public Mono<Boolean> checkAlive(String deviceId) {
            return this.doWith(deviceId, (manager, id) -> manager.checkLocalAlive(deviceId), Reactors.ALWAYS_FALSE);
        }

        @Override
        public Mono<Boolean> isAlive(String deviceId) {
            return this.doWith(deviceId, (manager, id) -> {
                AbstractDeviceSessionManager.DeviceSessionRef ref = manager.localSessions.get(deviceId);
                if (ref == null) {
                    return Reactors.ALWAYS_FALSE;
                }
                if (ref.loaded == null) {
                    return Reactors.ALWAYS_TRUE;
                }
                return ref.loaded.isAliveAsync();
            }, Reactors.ALWAYS_FALSE);
        }

        @Override
        public Mono<Long> total() {
            return this.doWith(null, (manager, nil) -> manager.totalSessions(true), Reactors.ALWAYS_ZERO_LONG);
        }

        @Override
        public Mono<Boolean> init(String deviceId) {
            return this.doWith(deviceId, AbstractDeviceSessionManager::doInit, Reactors.ALWAYS_FALSE);
        }

        @Override
        public Mono<Long> remove(String deviceId) {
            return this.doWith(deviceId, AbstractDeviceSessionManager::removeFromCluster, Reactors.ALWAYS_ZERO_LONG);
        }

        @Override
        public Flux<DeviceSessionInfo> sessions() {
            return this.doWith(null, (manager, ignore) -> manager.getLocalSessionInfo(), Flux.empty());
        }

        public ServiceImpl(Supplier<AbstractDeviceSessionManager> managerSupplier) {
            this.managerSupplier = managerSupplier;
        }
    }

    @io.scalecube.services.annotations.Service
    public static interface Service {
        @ServiceMethod
        public Mono<Boolean> isAlive(String var1);

        @ServiceMethod
        public Mono<Boolean> checkAlive(String var1);

        @ServiceMethod
        public Mono<Long> total();

        @ServiceMethod
        public Mono<Boolean> init(String var1);

        @ServiceMethod
        public Mono<Long> remove(String var1);

        @ServiceMethod
        public Flux<DeviceSessionInfo> sessions();
    }
}

