package org.apache.flume.source.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/kafka/KafkaSource.class */
public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
    private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
    private ConsumerConnector consumer;
    private ConsumerIterator<byte[], byte[]> it;
    private String topic;
    private int batchUpperLimit;
    private int timeUpperLimit;
    private int consumerTimeout;
    private boolean kafkaAutoCommitEnabled;
    private Context context;
    private Properties kafkaProps;
    private final List<Event> eventList = new ArrayList();
    private KafkaSourceCounter counter;

    public PollableSource.Status process() throws EventDeliveryException {
        long currentTimeMillis = System.currentTimeMillis();
        long currentTimeMillis2 = System.currentTimeMillis() + this.timeUpperLimit;
        try {
            boolean z = false;
            long nanoTime = System.nanoTime();
            while (this.eventList.size() < this.batchUpperLimit && System.currentTimeMillis() < currentTimeMillis2) {
                z = hasNext();
                if (z) {
                    MessageAndMetadata next = this.it.next();
                    byte[] bArr = (byte[]) next.message();
                    byte[] bArr2 = (byte[]) next.key();
                    HashMap hashMap = new HashMap();
                    hashMap.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                    hashMap.put(KafkaSourceConstants.TOPIC, this.topic);
                    if (bArr2 != null) {
                        hashMap.put(KafkaSourceConstants.KEY, new String(bArr2));
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Message: {}", new String(bArr));
                    }
                    this.eventList.add(EventBuilder.withBody(bArr, hashMap));
                }
                if (log.isDebugEnabled()) {
                    log.debug("Waited: {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    log.debug("Event #: {}", Integer.valueOf(this.eventList.size()));
                }
            }
            this.counter.addToKafkaEventGetTimer((System.nanoTime() - nanoTime) / 1000000);
            this.counter.addToEventReceivedCount(Long.valueOf(this.eventList.size()).longValue());
            if (this.eventList.size() > 0) {
                getChannelProcessor().processEventBatch(this.eventList);
                this.counter.addToEventAcceptedCount(this.eventList.size());
                this.eventList.clear();
                if (log.isDebugEnabled()) {
                    log.debug("Wrote {} events to channel", Integer.valueOf(this.eventList.size()));
                }
                if (!this.kafkaAutoCommitEnabled) {
                    long nanoTime2 = System.nanoTime();
                    this.consumer.commitOffsets();
                    this.counter.addToKafkaCommitTimer((System.nanoTime() - nanoTime2) / 1000000);
                }
            }
            if (z) {
                return PollableSource.Status.READY;
            }
            if (log.isDebugEnabled()) {
                this.counter.incrementKafkaEmptyCount();
                log.debug("Returning with backoff. No more data to read");
            }
            return PollableSource.Status.BACKOFF;
        } catch (Exception e) {
            log.error("KafkaSource EXCEPTION, {}", e);
            return PollableSource.Status.BACKOFF;
        }
    }

    public void configure(Context context) {
        this.context = context;
        this.batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, 1000).intValue();
        this.timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, 1000).intValue();
        this.topic = context.getString(KafkaSourceConstants.TOPIC);
        if (this.topic == null) {
            throw new ConfigurationException("Kafka topic must be specified.");
        }
        this.kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
        this.consumerTimeout = Integer.parseInt(this.kafkaProps.getProperty(KafkaSourceConstants.CONSUMER_TIMEOUT));
        this.kafkaAutoCommitEnabled = Boolean.parseBoolean(this.kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED));
        if (this.counter == null) {
            this.counter = new KafkaSourceCounter(getName());
        }
    }

    public synchronized void start() {
        log.info("Starting {}...", this);
        try {
            this.consumer = KafkaSourceUtil.getConsumer(this.kafkaProps);
            HashMap hashMap = new HashMap();
            hashMap.put(this.topic, 1);
            try {
                this.it = ((KafkaStream) ((List) this.consumer.createMessageStreams(hashMap).get(this.topic)).get(0)).iterator();
                log.info("Kafka source {} started.", getName());
                this.counter.start();
                super.start();
            } catch (Exception e) {
                throw new FlumeException("Unable to get message iterator from Kafka", e);
            }
        } catch (Exception e2) {
            throw new FlumeException("Unable to create consumer. Check whether the ZooKeeper server is up and that the Flume agent can connect to it.", e2);
        }
    }

    public synchronized void stop() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        this.counter.stop();
        log.info("Kafka Source {} stopped. Metrics: {}", getName(), this.counter);
        super.stop();
    }

    boolean hasNext() {
        try {
            this.it.hasNext();
            return true;
        } catch (ConsumerTimeoutException e) {
            return false;
        }
    }
}
