/*
 * Decompiled with CFR 0.152.
 */
package com.hotent.todo.mq;

import com.hotent.base.util.BeanUtils;
import com.hotent.todo.config.TodoRocketMqConfig;
import com.hotent.todo.mq.JmsListenerRegistrar;
import com.hotent.todo.mq.MessageHandlerProcessor;
import dm.jdbc.util.StringUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class CustomKafkaListenerRegistrar
implements JmsListenerRegistrar {
    public static final String GROUP_ID = "todo_group";
    public static final Logger logger = LoggerFactory.getLogger(CustomKafkaListenerRegistrar.class);
    protected static final Map<String, MQPushConsumer> consumerContainers = new HashMap<String, MQPushConsumer>();
    @Resource
    MessageHandlerProcessor messageHandlerProcessor;
    @Resource
    TodoRocketMqConfig todoRocketMqConfig;
    @Value(value="${eip.topic.send_todo:q_from_bpm_todo}")
    String sendTaskTopic;

    @Override
    public void registerListener(final String topic) throws MQClientException {
        if (consumerContainers.containsKey(topic)) {
            return;
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TodoGroup_" + topic);
        consumer.subscribe(topic, "*");
        consumer.setNamesrvAddr(this.todoRocketMqConfig.getNamesrvAddr());
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.registerMessageListener(new MessageListenerConcurrently(){

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt message : msgs) {
                    try {
                        String topicKey = message.getProperty("topicKey");
                        CustomKafkaListenerRegistrar.this.messageHandlerProcessor.process(topicKey, new String(message.getBody()), topic, message.getBornTimestamp());
                    }
                    catch (Exception e) {
                        logger.error("\u6d88\u8d39\u6d88\u606f\u5931\u8d25=>{}", (Throwable)e);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        consumerContainers.put(topic, (MQPushConsumer)consumer);
    }

    @Override
    public void initialRegisterAllListener() {
        Set<String> oldTopics = consumerContainers.keySet();
        Set<String> newTopics = this.getAllTopicKeys();
        newTopics.add(this.sendTaskTopic);
        if (oldTopics.size() == 0 && newTopics.size() == 0) {
            return;
        }
        HashSet toRemoves = new HashSet();
        HashSet toAdds = new HashSet();
        oldTopics.forEach(m -> {
            if (!newTopics.contains(m)) {
                toRemoves.add(m);
            }
        });
        newTopics.forEach(n -> {
            if (!oldTopics.contains(n)) {
                toAdds.add(n);
            }
        });
    }

    @Override
    public void destoryListener(String topic) {
        if (!consumerContainers.containsKey(topic)) {
            return;
        }
        MQPushConsumer container = consumerContainers.get(topic);
        container.shutdown();
        consumerContainers.remove(topic);
    }

    private Set<String> getAllTopicKeys() {
        HashSet<String> topics = new HashSet<String>();
        ArrayList allAS = new ArrayList();
        if (BeanUtils.isEmpty(allAS)) {
            return topics;
        }
        allAS.forEach(as -> {
            String topicName = as.getTopicName();
            String queueStatus = as.getQueueStatus();
            if (StringUtil.isNotEmpty((String)topicName) && "1".equals(queueStatus)) {
                topics.add(topicName);
            }
        });
        return topics;
    }
}

