package org.apache.flume.source.kafka;

import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/kafka/KafkaSourceUtil.class */
public class KafkaSourceUtil {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);

    public static Properties getKafkaProperties(Context context) {
        log.info("context={}", context.toString());
        Properties generateDefaultKafkaProps = generateDefaultKafkaProps();
        setKafkaProps(context, generateDefaultKafkaProps);
        addDocumentedKafkaProps(context, generateDefaultKafkaProps);
        return generateDefaultKafkaProps;
    }

    public static ConsumerConnector getConsumer(Properties properties) {
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    private static Properties generateDefaultKafkaProps() {
        Properties properties = new Properties();
        properties.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED, KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
        properties.put(KafkaSourceConstants.CONSUMER_TIMEOUT, KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
        properties.put(KafkaSourceConstants.GROUP_ID, KafkaSourceConstants.DEFAULT_GROUP_ID);
        return properties;
    }

    private static void setKafkaProps(Context context, Properties properties) {
        for (Map.Entry entry : context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX).entrySet()) {
            properties.put(entry.getKey(), entry.getValue());
            if (log.isDebugEnabled()) {
                log.debug("Reading a Kafka Producer Property: key: " + ((String) entry.getKey()) + ", value: " + ((String) entry.getValue()));
            }
        }
    }

    private static void addDocumentedKafkaProps(Context context, Properties properties) throws ConfigurationException {
        String string = context.getString(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME);
        if (string == null) {
            throw new ConfigurationException("ZookeeperConnect must contain at least one ZooKeeper server");
        }
        properties.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, string);
        String string2 = context.getString(KafkaSourceConstants.GROUP_ID_FLUME);
        if (string2 != null) {
            properties.put(KafkaSourceConstants.GROUP_ID, string2);
        }
    }
}
