/*
 * Decompiled with CFR 0.152.
 */
package com.hotent.sse.server.service.impl;

import com.hotent.base.exception.BaseException;
import com.hotent.base.id.IdGenerator;
import com.hotent.base.util.JsonUtil;
import com.hotent.base.util.TenantUtil;
import com.hotent.sse.server.service.SseService;
import com.hotent.sse.server.vo.MessageVO;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;

@Service
public class SseServiceImpl
implements SseService {
    private static final Logger logger = LoggerFactory.getLogger(SseServiceImpl.class);
    private static final ConcurrentHashMap<String, SseEmitter> SSE_EMITTER_MAP = new ConcurrentHashMap();
    @Resource
    private IdGenerator idGenerator;

    private String generateUid(String account) {
        return String.format("%s_%s_%s", account, TenantUtil.getCurrentTenantId(), this.idGenerator.getSuid());
    }

    private String generateUidPrev(String account) {
        return account + "_" + TenantUtil.getCurrentTenantId();
    }

    @Override
    public SseEmitter crateSse(String account) {
        String uid = this.generateUid(account);
        SseEmitter sseEmitter = new SseEmitter(Long.valueOf(0L));
        sseEmitter.onCompletion(() -> {
            logger.info("[{}]\u7ed3\u675f\u94fe\u63a5", (Object)uid);
            SSE_EMITTER_MAP.remove(uid);
        });
        sseEmitter.onTimeout(() -> logger.info("[{}]\u94fe\u63a5\u8d85\u65f6", (Object)uid));
        sseEmitter.onError(throwable -> {
            try {
                logger.info("[{}]\u94fe\u63a5\u5f02\u5e38\uff0c{}", (Object)uid, (Object)throwable.toString());
                sseEmitter.send(SseEmitter.event().id(uid).name("\u53d1\u751f\u5f02\u5e38").data((Object)"\u53d1\u751f\u5f02\u5e38\u8bf7\u91cd\u8bd5").reconnectTime(3000L));
                SSE_EMITTER_MAP.put(uid, sseEmitter);
            }
            catch (IOException e) {
                logger.error("SSE\u5f02\u5e38", (Throwable)e);
            }
        });
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(5000L));
            SSE_EMITTER_MAP.put(uid, sseEmitter);
        }
        catch (IOException e) {
            logger.error("SSE\u8fd4\u56de\u4fe1\u606f\u5f02\u5e38", (Throwable)e);
        }
        SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().data((Object)"heartbeat");
        Flux.interval((Duration)Duration.ofSeconds(178L)).doOnNext(tick -> {
            try {
                sseEmitter.send(heartbeat);
            }
            catch (Exception e) {
                SSE_EMITTER_MAP.remove(uid);
            }
        }).subscribe();
        return sseEmitter;
    }

    @Override
    public List<String> sendMessage(MessageVO messageVO) {
        String message = JsonUtil.toJsonString((Object)messageVO);
        String key = this.generateUidPrev(messageVO.getAccount());
        ArrayList<String> uidList = new ArrayList<String>();
        boolean isExists = false;
        for (Map.Entry<String, SseEmitter> entry : SSE_EMITTER_MAP.entrySet()) {
            if (!entry.getKey().startsWith(key)) continue;
            isExists = true;
            String uid = entry.getKey();
            SseEmitter emitter = entry.getValue();
            if (!this.sendOne(emitter, uid, message)) continue;
            uidList.add(uid);
        }
        if (!isExists) {
            throw new BaseException(key + " \u5bf9\u5e94\u7684\u8fde\u63a5\u4e0d\u5b58\u5728\uff01");
        }
        return uidList;
    }

    private boolean sendOne(SseEmitter emitter, String uid, String message) {
        String messageId = this.idGenerator.getSuid();
        try {
            emitter.send(SseEmitter.event().id(messageId).reconnectTime(60000L).data((Object)message));
            logger.info("\u7528\u6237{},\u6d88\u606fID\uff1a{}\uff0c\u63a8\u9001\u6210\u529f\uff1a{}", new Object[]{uid, messageId, message});
            return true;
        }
        catch (IOException e) {
            SSE_EMITTER_MAP.remove(uid);
            logger.info("\u7528\u6237{},\u6d88\u606fID\uff1a{}\uff0c\u6d88\u606f\u63a8\u9001\u5931\u8d25\uff1a{}", new Object[]{uid, messageId, message});
            emitter.complete();
            return false;
        }
    }

    @Override
    public void closeSse(String uid) {
        if (SSE_EMITTER_MAP.containsKey(uid)) {
            SseEmitter sseEmitter = SSE_EMITTER_MAP.get(uid);
            sseEmitter.complete();
            SSE_EMITTER_MAP.remove(uid);
        }
    }

    @Override
    public List<String> getSseEmitterKeys() {
        if (((ConcurrentHashMap.CollectionView)((Object)SSE_EMITTER_MAP.keySet())).size() <= 0) {
            return new ArrayList<String>();
        }
        return new ArrayList<String>(SSE_EMITTER_MAP.keySet());
    }
}

