/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZeroQueueConsumerImpl<T>
extends ConsumerImpl<T> {
    private static final Logger log = LoggerFactory.getLogger(ZeroQueueConsumerImpl.class);
    private final Lock zeroQueueLock = new ReentrantLock();
    private volatile boolean waitingOnReceiveForZeroQueueSize = false;

    public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, ConsumerImpl.SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
        this(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
    }

    public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, ConsumerImpl.SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
        super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId, schema, interceptors);
    }

    @Override
    protected Message<T> internalReceive() throws PulsarClientException {
        this.zeroQueueLock.lock();
        try {
            Message<T> message = this.fetchSingleMessageFromBroker();
            return message;
        }
        finally {
            this.zeroQueueLock.unlock();
        }
    }

    @Override
    protected CompletableFuture<Message<T>> internalReceiveAsync() {
        CompletableFuture future = super.internalReceiveAsync();
        if (!future.isDone()) {
            this.sendFlowPermitsToBroker(this.cnx(), 1);
        }
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message<T> fetchSingleMessageFromBroker() throws PulsarClientException {
        if (this.incomingMessages.size() > 0) {
            log.error("The incoming message queue should never be greater than 0 when Queue size is 0");
            this.incomingMessages.clear();
        }
        try {
            ClientCnx msgCnx;
            Message message;
            this.waitingOnReceiveForZeroQueueSize = true;
            ZeroQueueConsumerImpl zeroQueueConsumerImpl = this;
            synchronized (zeroQueueConsumerImpl) {
                if (this.isConnected()) {
                    this.sendFlowPermitsToBroker(this.cnx(), 1);
                }
            }
            while (true) {
                message = (Message)this.incomingMessages.take();
                this.lastDequeuedMessage = message.getMessageId();
                msgCnx = ((MessageImpl)message).getCnx();
                ZeroQueueConsumerImpl zeroQueueConsumerImpl2 = this;
                synchronized (zeroQueueConsumerImpl2) {
                    if (msgCnx == this.cnx()) {
                        this.waitingOnReceiveForZeroQueueSize = false;
                        break;
                    }
                }
            }
            this.stats.updateNumMsgsReceived(message);
            msgCnx = message;
            return msgCnx;
        }
        catch (InterruptedException e) {
            this.stats.incrementNumReceiveFailed();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        finally {
            this.waitingOnReceiveForZeroQueueSize = false;
            this.incomingMessages.clear();
        }
    }

    @Override
    protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) {
        super.consumerIsReconnectedToBroker(cnx, currentQueueSize);
        if (this.waitingOnReceiveForZeroQueueSize || currentQueueSize > 0 || this.listener != null) {
            this.sendFlowPermitsToBroker(cnx, 1);
        }
    }

    @Override
    protected boolean canEnqueueMessage(Message<T> message) {
        if (this.listener != null) {
            this.triggerZeroQueueSizeListener(message);
            return false;
        }
        return true;
    }

    private void triggerZeroQueueSizeListener(Message<T> message) {
        Preconditions.checkNotNull(this.listener, "listener can't be null");
        Preconditions.checkNotNull(message, "unqueued message can't be null");
        this.listenerExecutor.execute(() -> {
            this.stats.updateNumMsgsReceived(message);
            try {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Calling message listener for unqueued message {}", new Object[]{this.topic, this.subscription, message.getMessageId()});
                }
                this.listener.received((Consumer)this, message);
            }
            catch (Throwable t) {
                log.error("[{}][{}] Message listener error in processing unqueued message: {}", new Object[]{this.topic, this.subscription, message.getMessageId(), t});
            }
            this.increaseAvailablePermits(this.cnx());
        });
    }

    @Override
    protected void triggerListener(int numMessages) {
    }

    @Override
    void receiveIndividualMessagesFromBatch(PulsarApi.MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload, PulsarApi.MessageIdData messageId, ClientCnx cnx) {
        log.warn("Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", (Object)this.subscription, (Object)this.consumerName);
        this.closeAsync().handle((ok, e) -> {
            this.notifyPendingReceivedCallback(null, (Exception)new PulsarClientException.InvalidMessageException(String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", this.subscription, this.consumerName)));
            return null;
        });
    }
}

