/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.device.session;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;

public abstract class AbstractDeviceSessionManager
implements DeviceSessionManager {
    private static final Logger log = LoggerFactory.getLogger(AbstractDeviceSessionManager.class);
    private static final AtomicLongFieldUpdater<AbstractDeviceSessionManager> CLOSE_WIP = AtomicLongFieldUpdater.newUpdater(AbstractDeviceSessionManager.class, "closeWip");
    protected final Map<String, DeviceSessionRef> localSessions = new ConcurrentHashMap<String, DeviceSessionRef>(2048);
    private final List<Function<DeviceSessionEvent, Mono<Void>>> sessionEventHandlers = new CopyOnWriteArrayList<Function<DeviceSessionEvent, Mono<Void>>>();
    protected final Disposable.Composite disposable = Disposables.composite();
    private Duration sessionLoadTimeout = Duration.ofSeconds(5L);
    private Duration sessionCheckInterval = Duration.ofSeconds(30L);
    private int sessionCheckConcurrency = Integer.getInteger("jetlinks.session.check.concurrency", Runtime.getRuntime().availableProcessors() * 64);
    private int sessionCloseConcurrency = Integer.getInteger("jetlinks.session.close.concurrency", 3000);
    protected Sinks.Many<DeviceSession> closeSink = Reactors.createMany();
    private volatile long closeWip = 0L;

    public abstract String getCurrentServerId();

    protected abstract Mono<Boolean> initSessionConnection(DeviceSession var1);

    protected abstract Mono<Long> removeRemoteSession(String var1);

    protected abstract Mono<Long> getRemoteTotalSessions();

    protected abstract Mono<Boolean> remoteSessionIsAlive(String var1);

    protected abstract Mono<Boolean> checkRemoteSessionIsAlive(String var1);

    protected abstract Flux<DeviceSessionInfo> remoteSessions(String var1);

    public void init() {
        Scheduler scheduler = Schedulers.newSingle((String)"device-session-checker");
        this.disposable.add((Disposable)scheduler);
        this.disposable.add(Flux.interval((Duration)this.sessionCheckInterval, (Scheduler)scheduler).onBackpressureDrop().concatMap(time -> this.executeInterval()).subscribe());
        this.disposable.add(this.closeSink.asFlux().bufferTimeout(1000, Duration.ofSeconds(1L)).onBackpressureBuffer().concatMap(flux -> Flux.fromIterable((Iterable)flux).filter(session -> !this.localSessions.containsKey(session.getDeviceId())).flatMap(this::closeSessionSafe).then(), 0).subscribe());
    }

    protected Mono<Void> executeInterval() {
        return this.checkSession().onErrorResume(err -> Mono.empty());
    }

    public void shutdown() {
        this.disposable.dispose();
    }

    public Mono<DeviceSession> getSession(String deviceId) {
        return this.getSession(deviceId, true);
    }

    public Mono<DeviceSession> getSession(String deviceId, boolean unregisterWhenNotAlive) {
        if (StringUtils.isEmpty((Object)deviceId)) {
            return Mono.empty();
        }
        DeviceSessionRef ref = this.localSessions.get(deviceId);
        if (ref == null) {
            return Mono.empty();
        }
        if (unregisterWhenNotAlive) {
            return ref.ref().filterWhen(this::checkSessionAlive);
        }
        return ref.ref();
    }

    public Flux<DeviceSession> getSessions() {
        return Flux.fromIterable(this.localSessions.values()).flatMap(DeviceSessionRef::ref);
    }

    private Mono<Boolean> checkSessionAlive(String id) {
        DeviceSessionRef ref = this.localSessions.get(id);
        if (ref == null || ref.loaded == null) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.checkSessionAlive(ref.loaded);
    }

    private Mono<Boolean> checkSessionAlive(DeviceSession session) {
        if (session == null) {
            return Reactors.ALWAYS_FALSE;
        }
        return session.isAliveAsync().defaultIfEmpty((Object)true).flatMap(alive -> {
            if (!alive.booleanValue()) {
                return this.removeLocalSession(session).thenReturn((Object)false);
            }
            return Reactors.ALWAYS_TRUE;
        });
    }

    public final Mono<Long> remove(String deviceId, boolean onlyLocal) {
        if (onlyLocal) {
            return this.removeLocalSession(deviceId);
        }
        return Flux.merge((Publisher[])new Publisher[]{this.removeLocalSession(deviceId), this.removeRemoteSession(deviceId)}).reduce(Math::addExact);
    }

    public final Mono<Boolean> isAlive(String deviceId, boolean onlyLocal) {
        Mono localAlive = this.getSession(deviceId).hasElement();
        if (onlyLocal) {
            return localAlive;
        }
        return localAlive.flatMap(alive -> {
            if (alive.booleanValue()) {
                return Reactors.ALWAYS_TRUE;
            }
            return this.remoteSessionIsAlive(deviceId);
        });
    }

    public Mono<Boolean> checkAlive(String deviceId, boolean onlyLocal) {
        Mono<Boolean> localAlive = this.checkLocalAlive(deviceId);
        if (onlyLocal) {
            return localAlive;
        }
        return localAlive.flatMap(alive -> {
            if (alive.booleanValue()) {
                return Reactors.ALWAYS_TRUE;
            }
            return this.checkRemoteSessionIsAlive(deviceId);
        });
    }

    protected final Mono<Boolean> checkLocalAlive(String deviceId) {
        return this.getSession(deviceId).flatMap(session -> session.getOperator() == null ? Reactors.ALWAYS_FALSE : this.syncConnectionInfo(session.getOperator(), (DeviceSession)session)).defaultIfEmpty((Object)false);
    }

    protected final Mono<Boolean> syncConnectionInfo(DeviceOperator device, DeviceSession session) {
        return device.online(this.getCurrentServerId(), session.getClientAddress().map(String::valueOf).orElse(""), -1L).thenReturn((Object)true);
    }

    public final Mono<Long> totalSessions(boolean onlyLocal) {
        Mono total = Mono.just((Object)this.localSessions.size());
        if (onlyLocal) {
            return total;
        }
        return Mono.zip((Mono)total, this.getRemoteTotalSessions(), Math::addExact);
    }

    public final Flux<DeviceSessionInfo> getSessionInfo() {
        return Flux.concat((Publisher[])new Publisher[]{this.getLocalSessionInfo(), this.remoteSessions(null)});
    }

    public final Flux<DeviceSessionInfo> getSessionInfo(String serverId) {
        if (this.getCurrentServerId().equals(serverId)) {
            return this.getLocalSessionInfo();
        }
        return this.remoteSessions(serverId);
    }

    public final Flux<DeviceSessionInfo> getLocalSessionInfo() {
        return Flux.fromIterable(this.localSessions.values()).mapNotNull(ref -> ref.loaded).map(session -> DeviceSessionInfo.of((String)this.getCurrentServerId(), (DeviceSession)session));
    }

    public Mono<DeviceSession> compute(@Nonnull String deviceId, Mono<DeviceSession> creator, Function<DeviceSession, Mono<DeviceSession>> updater) {
        DeviceSessionRef ref = this.localSessions.compute(deviceId, (? super K _id, ? super V old) -> {
            if (old == null) {
                if (creator == null) {
                    return null;
                }
                return new DeviceSessionRef((String)_id, this, creator);
            }
            if (updater == null) {
                return old;
            }
            old.update(s -> s.flatMap(updater));
            return old;
        });
        return ref == null ? Mono.empty() : ref.ref();
    }

    public final Mono<DeviceSession> compute(@Nonnull String deviceId, @Nonnull Function<Mono<DeviceSession>, Mono<DeviceSession>> computer) {
        return this.localSessions.compute(deviceId, (? super K _id, ? super V old) -> {
            if (old != null) {
                old.update(computer);
                return old;
            }
            return new DeviceSessionRef((String)_id, this, (Mono<DeviceSession>)((Mono)computer.apply(Mono.empty())));
        }).ref();
    }

    private Mono<DeviceSession> handleSessionCompute0(DeviceSession old, DeviceSession newSession) {
        if (old != null && old.isChanged(newSession) && newSession.getOperator() != null) {
            log.info("device [{}] session [{}] changed to [{}]", new Object[]{old.getDeviceId(), old, newSession});
            old.close();
            return newSession.getOperator().online(this.getCurrentServerId(), (String)newSession.getClientAddress().map(InetSocketAddress::toString).orElse(null), -1L).then(this.handleSessionCompute(old, newSession));
        }
        return this.handleSessionCompute(old, newSession);
    }

    protected Mono<DeviceSession> handleSessionCompute(DeviceSession old, DeviceSession newSession) {
        return Mono.just((Object)newSession);
    }

    protected final Mono<Void> closeSessionSafe(DeviceSession session) {
        return this.closeSession0(session).onErrorResume(err -> {
            log.warn("close session [{}] error", (Object)session.getDeviceId(), err);
            return Mono.empty();
        });
    }

    private Mono<Void> closeSession0(DeviceSession session) {
        try {
            session.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        if (session.getOperator() == null) {
            CLOSE_WIP.decrementAndGet(this);
            return Mono.empty();
        }
        return this.initSessionConnection(session).flatMap(alive -> {
            boolean sessionExists;
            boolean bl = sessionExists = alive != false || this.localSessions.containsKey(session.getDeviceId());
            if (sessionExists) {
                log.info("device [{}] session [{}] closed,but session still exists!", (Object)session.getDeviceId(), (Object)session);
                return this.fireEvent(DeviceSessionEvent.of((DeviceSessionEvent.Type)DeviceSessionEvent.Type.unregister, (DeviceSession)session, (boolean)true));
            }
            log.info("device [{}] session [{}] closed", (Object)session.getDeviceId(), (Object)session);
            return session.getOperator().offline().then(this.fireEvent(DeviceSessionEvent.of((DeviceSessionEvent.Type)DeviceSessionEvent.Type.unregister, (DeviceSession)session, (boolean)false)));
        }).doAfterTerminate(() -> CLOSE_WIP.decrementAndGet(this));
    }

    protected final Mono<Void> closeSession(DeviceSession session) {
        if (CLOSE_WIP.incrementAndGet(this) > (long)this.sessionCloseConcurrency && this.closeSink.tryEmitNext((Object)session).isSuccess()) {
            return Mono.empty();
        }
        return this.closeSession0(session);
    }

    private Mono<Long> removeLocalSession(DeviceSession session) {
        DeviceSessionRef ref = this.localSessions.get(session.getDeviceId());
        if (null == ref) {
            return Reactors.ALWAYS_ZERO_LONG;
        }
        return ref.close(session);
    }

    protected final Mono<Long> removeLocalSession(String deviceId) {
        DeviceSessionRef ref = this.localSessions.remove(deviceId);
        if (ref != null) {
            return ref.close();
        }
        return Reactors.ALWAYS_ZERO_LONG;
    }

    private Mono<DeviceSession> doRegister(DeviceSession session) {
        if (session.getOperator() == null) {
            return Mono.empty();
        }
        return this.remoteSessionIsAlive(session.getDeviceId()).flatMap(alive -> session.getOperator().online(this.getCurrentServerId(), (String)session.getClientAddress().map(InetSocketAddress::toString).orElse(null), alive != false ? -1L : session.connectTime()).then(this.fireEvent(DeviceSessionEvent.of((DeviceSessionEvent.Type)DeviceSessionEvent.Type.register, (DeviceSession)session, (boolean)alive)))).thenReturn((Object)session);
    }

    protected Mono<Void> fireEvent(DeviceSessionEvent event) {
        if (this.sessionEventHandlers.isEmpty()) {
            return Mono.empty();
        }
        return Flux.fromIterable(this.sessionEventHandlers).flatMap(handler -> Mono.defer(() -> (Mono)handler.apply(event)).onErrorResume(err -> {
            log.error("fire session event error {}", (Object)event, err);
            return Mono.empty();
        })).then();
    }

    protected Mono<Boolean> doInit(String deviceId) {
        DeviceOperator device;
        DeviceSession session;
        DeviceSessionRef ref = this.localSessions.get(deviceId);
        if (ref != null && (session = ref.loaded) != null && (device = ref.loaded.getOperator()) != null) {
            return device.online(this.getCurrentServerId(), session.getClientAddress().map(String::valueOf).orElse(""), -1L).thenReturn((Object)true);
        }
        return Mono.empty();
    }

    protected Mono<Long> removeFromCluster(String deviceId) {
        DeviceSessionRef ref = this.localSessions.remove(deviceId);
        if (ref != null) {
            ref.disposable.dispose();
            DeviceSession session = ref.loaded;
            if (ref.loaded != null) {
                session.close();
                if (session.getOperator() == null) {
                    return Reactors.ALWAYS_ONE_LONG;
                }
                return session.getOperator().getConnectionServerId().map(this.getCurrentServerId()::equals).defaultIfEmpty((Object)false).flatMap(sameServer -> {
                    Mono before = Mono.empty();
                    if (sameServer.booleanValue()) {
                        before = session.getOperator().offline().then();
                    }
                    return before.then(this.fireEvent(DeviceSessionEvent.of((DeviceSessionEvent.Type)DeviceSessionEvent.Type.unregister, (DeviceSession)session, (sameServer == false ? 1 : 0) != 0)));
                }).thenReturn((Object)1L);
            }
        }
        return Reactors.ALWAYS_ZERO_LONG;
    }

    public Disposable listenEvent(Function<DeviceSessionEvent, Mono<Void>> handler) {
        this.sessionEventHandlers.add(handler);
        return () -> this.sessionEventHandlers.remove(handler);
    }

    protected Mono<Void> checkSession() {
        return Flux.fromIterable(this.localSessions.values()).filter(ref -> ref.loaded != null).flatMap(ref -> this.checkSessionAlive(ref.loaded).onErrorResume(err -> {
            log.warn("check session alive error", err);
            return Mono.empty();
        }), this.sessionCheckConcurrency).then();
    }

    public void setSessionLoadTimeout(Duration sessionLoadTimeout) {
        this.sessionLoadTimeout = sessionLoadTimeout;
    }

    public Duration getSessionLoadTimeout() {
        return this.sessionLoadTimeout;
    }

    public Duration getSessionCheckInterval() {
        return this.sessionCheckInterval;
    }

    public void setSessionCheckInterval(Duration sessionCheckInterval) {
        this.sessionCheckInterval = sessionCheckInterval;
    }

    public void setSessionCheckConcurrency(int sessionCheckConcurrency) {
        this.sessionCheckConcurrency = sessionCheckConcurrency;
    }

    public void setSessionCloseConcurrency(int sessionCloseConcurrency) {
        this.sessionCloseConcurrency = sessionCloseConcurrency;
    }

    protected static class DeviceSessionRef {
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, Mono> LOADER = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, Mono.class, "loader");
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, Sinks.One> AWAIT = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, Sinks.One.class, "await");
        private final AbstractDeviceSessionManager manager;
        private volatile Sinks.One<DeviceSession> await;
        public final String deviceId;
        public volatile DeviceSession loaded;
        protected volatile Mono<DeviceSession> loader;
        private volatile Disposable disposable;
        private volatile Set<String> children;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Set<String> children() {
            if (this.children != null) {
                return this.children;
            }
            DeviceSessionRef deviceSessionRef = this;
            synchronized (deviceSessionRef) {
                if (this.children != null) {
                    return this.children;
                }
                this.children = ConcurrentHashMap.newKeySet();
                return this.children;
            }
        }

        public void removeChild(String id) {
            if (this.children != null) {
                this.children.remove(id);
            }
        }

        public DeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, Mono<DeviceSession> ref) {
            this.deviceId = deviceId;
            this.manager = manager;
            this.update(ref);
        }

        public DeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, DeviceSession ref) {
            this.deviceId = deviceId;
            this.manager = manager;
            this.loaded = ref;
            this.await = Sinks.one();
            this.await.tryEmitValue((Object)ref);
        }

        public void update(Function<Mono<DeviceSession>, Mono<DeviceSession>> updater) {
            this.update(updater.apply((Mono<DeviceSession>)Mono.fromSupplier(() -> this.loaded)));
        }

        public void update(Mono<DeviceSession> ref) {
            Sinks.One old;
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            if ((old = AWAIT.getAndSet(this, Sinks.one())) != null) {
                old.tryEmitEmpty();
            }
            this.loader = ref.flatMap(this::handleLoaded).timeout(this.manager.sessionLoadTimeout, Mono.error(() -> new TimeoutException("device [" + this.deviceId + "] session load timeout"))).switchIfEmpty(Mono.fromRunnable(this::loadEmpty)).doOnError(this::loadError).doOnNext(this::afterLoaded);
        }

        private void handleParentChanged(DeviceSession from, DeviceSession to) {
            DeviceSessionRef fromRef = this.manager.localSessions.get(from.getDeviceId());
            DeviceSessionRef toRef = this.manager.localSessions.get(to.getDeviceId());
            if (null != fromRef) {
                fromRef.removeChild(this.deviceId);
            }
            if (null != toRef) {
                toRef.children().add(this.deviceId);
            }
        }

        private Mono<DeviceSession> handleLoaded(DeviceSession session) {
            DeviceSession old = this.loaded;
            this.loaded = session;
            this.await().tryEmitValue((Object)session);
            this.handleParent(parent -> parent.children().add(session.getDeviceId()));
            if (session.isWrapFrom(ChildrenDeviceSession.class)) {
                ((ChildrenDeviceSession)session.unwrap(ChildrenDeviceSession.class)).doOnParentChanged(this::handleParentChanged);
            }
            if (old == null) {
                return this.manager.doRegister(session).then(this.manager.handleSessionCompute0(null, session));
            }
            return this.manager.handleSessionCompute0(old, session);
        }

        private void afterLoaded(DeviceSession session) {
            if (!session.equals(this.loaded)) {
                this.loaded.close();
            }
            this.loaded = session;
        }

        protected void handleParent(Consumer<DeviceSessionRef> parent) {
            DeviceSessionRef ref;
            if (this.loaded.isWrapFrom(ChildrenDeviceSession.class) && null != (ref = this.manager.localSessions.get(((ChildrenDeviceSession)this.loaded.unwrap(ChildrenDeviceSession.class)).getParent().getDeviceId()))) {
                parent.accept(ref);
            }
        }

        protected Mono<Void> checkChildren() {
            if (this.children != null) {
                return Flux.fromIterable(this.children).flatMap(x$0 -> this.manager.checkSessionAlive(x$0)).then();
            }
            return Mono.empty();
        }

        private Mono<Long> close(DeviceSession session) {
            if (this.loaded == session && this.manager.localSessions.remove(this.deviceId, this)) {
                if (this.disposable != null && !this.disposable.isDisposed()) {
                    this.disposable.dispose();
                }
                return this.doClose(this.loaded);
            }
            return Reactors.ALWAYS_ZERO_LONG;
        }

        private Mono<Long> close() {
            DeviceSession loaded;
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            if ((loaded = this.loaded) != null) {
                return this.doClose(loaded);
            }
            return Reactors.ALWAYS_ZERO_LONG;
        }

        private Mono<Long> doClose(DeviceSession session) {
            this.handleParent(ref -> ref.removeChild(session.getDeviceId()));
            return this.manager.closeSession(session).then(this.checkChildren()).then(Reactors.ALWAYS_ONE_LONG);
        }

        private void loadError(Throwable err) {
            if (this.loaded != null) {
                this.loaded.close();
            }
            this.await().tryEmitError(err);
            this.manager.localSessions.remove(this.deviceId, this);
        }

        private void loadEmpty() {
            if (this.loaded != null) {
                this.loaded.close();
            }
            this.await().tryEmitEmpty();
            this.manager.localSessions.remove(this.deviceId, this);
        }

        private void tryLoad(ContextView contextView) {
            Mono loader = LOADER.getAndSet(this, null);
            if (loader != null) {
                this.disposable = loader.contextWrite(contextView).subscribe();
            }
        }

        public Mono<DeviceSession> ref() {
            return Mono.deferContextual(ctx -> {
                this.tryLoad((ContextView)ctx);
                return this.await().asMono();
            });
        }

        private Sinks.One<DeviceSession> await() {
            return AWAIT.get(this);
        }
    }
}

