package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.class */
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter implements MqttCallback, ApplicationEventPublisherAware {
    public static final long DEFAULT_COMPLETION_TIMEOUT = 30000;
    public static final long DISCONNECT_COMPLETION_TIMEOUT = 5000;
    private static final int DEFAULT_RECOVERY_INTERVAL = 10000;
    private final MqttPahoClientFactory clientFactory;
    private int recoveryInterval;
    private long completionTimeout;
    private long disconnectCompletionTimeout;
    private volatile IMqttClient client;
    private volatile ScheduledFuture<?> reconnectFuture;
    private volatile boolean connected;
    private volatile boolean cleanSession;
    private volatile ConsumerStopAction consumerStopAction;
    private ApplicationEventPublisher applicationEventPublisher;

    public MqttPahoMessageDrivenChannelAdapter(String str, String str2, MqttPahoClientFactory mqttPahoClientFactory, String... strArr) {
        super(str, str2, strArr);
        this.recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
        this.completionTimeout = 30000L;
        this.disconnectCompletionTimeout = 5000L;
        this.clientFactory = mqttPahoClientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String str, MqttPahoClientFactory mqttPahoClientFactory, String... strArr) {
        super(null, str, strArr);
        this.recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
        this.completionTimeout = 30000L;
        this.disconnectCompletionTimeout = 5000L;
        this.clientFactory = mqttPahoClientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String str, String str2, String... strArr) {
        this(str, str2, new DefaultMqttPahoClientFactory(), strArr);
    }

    public void setCompletionTimeout(long j) {
        this.completionTimeout = j;
    }

    public void setDisconnectCompletionTimeout(long j) {
        this.disconnectCompletionTimeout = j;
    }

    public synchronized void setRecoveryInterval(int i) {
        this.recoveryInterval = i;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    protected void doStart() {
        Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required");
        super.doStart();
        try {
            connectAndSubscribe();
        } catch (Exception e) {
            this.logger.error("Exception while connecting and subscribing, retrying", e);
            scheduleReconnect();
        }
    }

    protected synchronized void doStop() {
        cancelReconnect();
        super.doStop();
        if (this.client != null) {
            try {
                if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS) || (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN) && this.cleanSession)) {
                    this.client.unsubscribe(getTopic());
                }
            } catch (MqttException e) {
                this.logger.error("Exception while unsubscribing", e);
            }
            try {
                this.client.disconnectForcibly(this.disconnectCompletionTimeout);
            } catch (MqttException e2) {
                this.logger.error("Exception while disconnecting", e2);
            }
            this.client.setCallback((MqttCallback) null);
            try {
                this.client.close();
            } catch (MqttException e3) {
                this.logger.error("Exception while closing", e3);
            }
            this.connected = false;
            this.client = null;
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void addTopic(String str, int i) {
        this.topicLock.lock();
        try {
            try {
                super.addTopic(str, i);
                if (this.client != null && this.client.isConnected()) {
                    this.client.subscribe(str, i);
                }
            } catch (MqttException e) {
                super.removeTopic(str);
                throw new MessagingException("Failed to subscribe to topic " + str, e);
            }
        } finally {
            this.topicLock.unlock();
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void removeTopic(String... strArr) {
        this.topicLock.lock();
        try {
            try {
                if (this.client != null && this.client.isConnected()) {
                    this.client.unsubscribe(strArr);
                }
                super.removeTopic(strArr);
                this.topicLock.unlock();
            } catch (MqttException e) {
                throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(strArr), e);
            }
        } catch (Throwable th) {
            this.topicLock.unlock();
            throw th;
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:22:0x00cc, code lost:
    
        if (r6.logger.isWarnEnabled() == false) goto L24;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x00cf, code lost:
    
        r6.logger.warn("Granted QOS different to Requested QOS; topics: " + java.util.Arrays.toString(r0) + " requested: " + java.util.Arrays.toString(r0) + " granted: " + java.util.Arrays.toString(r0));
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private synchronized void connectAndSubscribe() throws org.eclipse.paho.client.mqttv3.MqttException {
        /*
            Method dump skipped, instructions count: 488
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe():void");
    }

    private synchronized void cancelReconnect() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(false);
            this.reconnectFuture = null;
        }
    }

    private synchronized void scheduleReconnect() {
        cancelReconnect();
        try {
            this.reconnectFuture = getTaskScheduler().schedule(() -> {
                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Attempting reconnect");
                    }
                    synchronized (this) {
                        if (!this.connected) {
                            connectAndSubscribe();
                            this.reconnectFuture = null;
                        }
                    }
                } catch (MqttException e) {
                    this.logger.error("Exception while connecting and subscribing", e);
                    scheduleReconnect();
                }
            }, new Date(System.currentTimeMillis() + this.recoveryInterval));
        } catch (Exception e) {
            this.logger.error("Failed to schedule reconnect", e);
        }
    }

    public synchronized void connectionLost(Throwable th) {
        if (isRunning()) {
            this.logger.error("Lost connection: " + th.getMessage() + "; retrying...");
            this.connected = false;
            if (this.client != null) {
                try {
                    this.client.setCallback((MqttCallback) null);
                    this.client.close();
                } catch (MqttException e) {
                }
            }
            this.client = null;
            scheduleReconnect();
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, th));
            }
        }
    }

    public void messageArrived(String str, MqttMessage mqttMessage) {
        Message<?> message = getConverter().toMessage(str, mqttMessage);
        try {
            sendMessage(message);
        } catch (RuntimeException e) {
            this.logger.error("Unhandled exception for " + message.toString(), e);
            throw e;
        }
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
