package org.killbill.queue;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.killbill.commons.concurrent.Executors;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.QueueLifecycle;
import org.killbill.queue.dao.EventEntryModelDao;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/killbill/queue/DefaultQueueLifecycle.class */
public abstract class DefaultQueueLifecycle implements QueueLifecycle {
    public static final String QUEUE_NAME = "Queue";
    private static final Logger log = LoggerFactory.getLogger(DefaultQueueLifecycle.class);
    private static final long ONE_MILLION = 1000000;
    private static final long MAX_SLEEP_TIME_MS = 100;
    private static final int MAX_COMPLETED_ENTRIES = 15;
    protected final String svcQName;
    protected final ObjectMapper objectMapper;
    protected final PersistentQueueConfig config;
    private final LinkedBlockingQueue<EventEntryModelDao> completedOrFailedEvents;
    private final LinkedBlockingQueue<EventEntryModelDao> retriedEvents;
    private final Timer dispatchTime;
    private final Timer completeTime;
    private final Histogram dispatchedEntries;
    private final Histogram completeEntries;
    private final boolean isStickyEvent;
    private volatile boolean isDispatchingEvents;
    private volatile boolean isCompletingEvents;
    private ExecutorService lifecycleDispatcherExecutor;
    private ExecutorService lifecycleCompletionExecutor;

    /* loaded from: input_file:org/killbill/queue/DefaultQueueLifecycle$CompletionRunnable.class */
    private final class CompletionRunnable implements Runnable {
        private CompletionRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    DefaultQueueLifecycle.log.info("{}: Completion thread {} [{}] starting ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                    while (DefaultQueueLifecycle.this.isCompletingEvents) {
                        DefaultQueueLifecycle.this.withHandlingRuntimeException(new RunnableRawCallback() { // from class: org.killbill.queue.DefaultQueueLifecycle.CompletionRunnable.1
                            @Override // org.killbill.queue.DefaultQueueLifecycle.RunnableRawCallback
                            public void callback() throws InterruptedException {
                                long nanoTime = System.nanoTime();
                                long j = 0;
                                ArrayList arrayList = new ArrayList(DefaultQueueLifecycle.MAX_COMPLETED_ENTRIES);
                                DefaultQueueLifecycle.this.completedOrFailedEvents.drainTo(arrayList, DefaultQueueLifecycle.MAX_COMPLETED_ENTRIES);
                                if (arrayList.isEmpty()) {
                                    long nanoTime2 = System.nanoTime();
                                    EventEntryModelDao eventEntryModelDao = (EventEntryModelDao) DefaultQueueLifecycle.this.completedOrFailedEvents.poll(DefaultQueueLifecycle.MAX_SLEEP_TIME_MS, TimeUnit.MILLISECONDS);
                                    j = System.nanoTime() - nanoTime2;
                                    if (eventEntryModelDao != null) {
                                        arrayList.add(eventEntryModelDao);
                                    }
                                }
                                if (!arrayList.isEmpty()) {
                                    DefaultQueueLifecycle.this.doProcessCompletedEvents(arrayList);
                                }
                                int size = arrayList.size() + CompletionRunnable.this.drainRetriedEvents();
                                if (size > 0) {
                                    DefaultQueueLifecycle.this.completeEntries.update(size);
                                    DefaultQueueLifecycle.this.completeTime.update((System.nanoTime() - nanoTime) - j, TimeUnit.NANOSECONDS);
                                }
                            }
                        });
                    }
                    DefaultQueueLifecycle.log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                } catch (Error e) {
                    DefaultQueueLifecycle.log.error("{}: Completion thread {} [{}] got an exception, exiting...", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId()), e});
                    DefaultQueueLifecycle.log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                } catch (InterruptedException e2) {
                    DefaultQueueLifecycle.log.info("{}: Completion thread {} [{}] got interrupted, exiting... ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                    DefaultQueueLifecycle.log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                }
            } catch (Throwable th) {
                DefaultQueueLifecycle.log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int drainRetriedEvents() {
            int size = DefaultQueueLifecycle.this.retriedEvents.size();
            if (size > 0) {
                ArrayList arrayList = new ArrayList(size);
                DefaultQueueLifecycle.this.retriedEvents.drainTo(arrayList, size);
                DefaultQueueLifecycle.this.doProcessRetriedEvents(arrayList);
            }
            return size;
        }
    }

    /* loaded from: input_file:org/killbill/queue/DefaultQueueLifecycle$DispatchResultMetrics.class */
    public static class DispatchResultMetrics {
        private final int nbEntries;
        private final long timeNanoSec;

        public DispatchResultMetrics(int i, long j) {
            this.nbEntries = i;
            this.timeNanoSec = j;
        }

        public int getNbEntries() {
            return this.nbEntries;
        }

        public long getTimeNanoSec() {
            return this.timeNanoSec;
        }
    }

    /* loaded from: input_file:org/killbill/queue/DefaultQueueLifecycle$DispatcherRunnable.class */
    private final class DispatcherRunnable implements Runnable {
        private DispatcherRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    DefaultQueueLifecycle.log.info("{}: Dispatching thread {} [{}] starting ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                    while (DefaultQueueLifecycle.this.isDispatchingEvents) {
                        DefaultQueueLifecycle.this.withHandlingRuntimeException(new RunnableRawCallback() { // from class: org.killbill.queue.DefaultQueueLifecycle.DispatcherRunnable.1
                            @Override // org.killbill.queue.DefaultQueueLifecycle.RunnableRawCallback
                            public void callback() throws InterruptedException {
                                long nanoTime = System.nanoTime();
                                DispatcherRunnable.this.dispatchEvents();
                                DispatcherRunnable.this.sleepSporadically((System.nanoTime() - nanoTime) / DefaultQueueLifecycle.ONE_MILLION);
                            }
                        });
                    }
                    DefaultQueueLifecycle.log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                } catch (Error e) {
                    DefaultQueueLifecycle.log.error("{}: Dispatching thread {} [{}] got an exception, exiting... ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId()), e});
                    DefaultQueueLifecycle.log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                } catch (InterruptedException e2) {
                    DefaultQueueLifecycle.log.info("{}: Dispatching thread {} [{}] got interrupted, exiting... ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                    DefaultQueueLifecycle.log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                }
            } catch (Throwable th) {
                DefaultQueueLifecycle.log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())});
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void dispatchEvents() {
            long nanoTime = System.nanoTime();
            DispatchResultMetrics doDispatchEvents = DefaultQueueLifecycle.this.doDispatchEvents();
            DefaultQueueLifecycle.this.dispatchedEntries.update(doDispatchEvents.getNbEntries());
            if (DefaultQueueLifecycle.this.isStickyEvent) {
                DefaultQueueLifecycle.this.dispatchTime.update(doDispatchEvents.getTimeNanoSec(), TimeUnit.NANOSECONDS);
            } else {
                DefaultQueueLifecycle.this.dispatchTime.update(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sleepSporadically(long j) throws InterruptedException {
            if (DefaultQueueLifecycle.this.isStickyEvent) {
                return;
            }
            long pollingSleepTimeMs = DefaultQueueLifecycle.this.config.getPollingSleepTimeMs();
            long j2 = j;
            while (true) {
                long j3 = pollingSleepTimeMs - j2;
                if (j3 <= 0) {
                    return;
                }
                long j4 = j3 > DefaultQueueLifecycle.MAX_SLEEP_TIME_MS ? DefaultQueueLifecycle.MAX_SLEEP_TIME_MS : j3;
                Thread.sleep(j4);
                pollingSleepTimeMs = j3;
                j2 = j4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/killbill/queue/DefaultQueueLifecycle$RunnableRawCallback.class */
    public interface RunnableRawCallback {
        void callback() throws InterruptedException;
    }

    public DefaultQueueLifecycle(String str, PersistentQueueConfig persistentQueueConfig, MetricRegistry metricRegistry) {
        this(str, persistentQueueConfig, metricRegistry, QueueObjectMapper.get());
    }

    private DefaultQueueLifecycle(String str, PersistentQueueConfig persistentQueueConfig, MetricRegistry metricRegistry, ObjectMapper objectMapper) {
        this.svcQName = str;
        this.config = persistentQueueConfig;
        this.isDispatchingEvents = false;
        this.isCompletingEvents = false;
        this.objectMapper = objectMapper;
        this.completedOrFailedEvents = new LinkedBlockingQueue<>();
        this.retriedEvents = new LinkedBlockingQueue<>();
        this.isStickyEvent = persistentQueueConfig.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS;
        this.dispatchTime = metricRegistry.timer(MetricRegistry.name(DefaultQueueLifecycle.class, new String[]{str, "dispatchTime"}));
        this.completeTime = metricRegistry.timer(MetricRegistry.name(DefaultQueueLifecycle.class, new String[]{str, "completeTime"}));
        this.dispatchedEntries = metricRegistry.histogram(MetricRegistry.name(DefaultQueueLifecycle.class, new String[]{str, "dispatchedEntries"}));
        this.completeEntries = metricRegistry.histogram(MetricRegistry.name(DefaultQueueLifecycle.class, new String[]{str, "completeEntries"}));
        metricRegistry.register(MetricRegistry.name(DefaultQueueLifecycle.class, new String[]{str, "completedOrFailedEvents", "size"}), new Gauge<Integer>() { // from class: org.killbill.queue.DefaultQueueLifecycle.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m19getValue() {
                return Integer.valueOf(DefaultQueueLifecycle.this.completedOrFailedEvents.size());
            }
        });
    }

    @Override // org.killbill.queue.api.QueueLifecycle
    public boolean startQueue() {
        this.lifecycleDispatcherExecutor = Executors.newFixedThreadPool(this.config.geNbLifecycleDispatchThreads(), this.config.getTableName() + "-lifecycle-dispatcher-th");
        this.lifecycleCompletionExecutor = Executors.newFixedThreadPool(this.config.geNbLifecycleCompleteThreads(), this.config.getTableName() + "-lifecycle-completion-th");
        log.info("{}: Starting...", this.svcQName);
        this.isCompletingEvents = true;
        for (int i = 0; i < this.config.geNbLifecycleCompleteThreads(); i++) {
            this.lifecycleCompletionExecutor.execute(new CompletionRunnable());
        }
        this.isDispatchingEvents = true;
        for (int i2 = 0; i2 < this.config.geNbLifecycleDispatchThreads(); i2++) {
            this.lifecycleDispatcherExecutor.execute(new DispatcherRunnable());
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean stopLifecycleDispatcher() {
        this.isDispatchingEvents = false;
        this.lifecycleDispatcherExecutor.shutdown();
        try {
            return this.lifecycleDispatcherExecutor.awaitTermination(this.config.getShutdownTimeout().getPeriod(), this.config.getShutdownTimeout().getUnit());
        } catch (InterruptedException e) {
            log.info("{}: Lifecycle dispatcher stop sequence has been interrupted", this.svcQName);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean stopLifecycleCompletion() {
        this.isCompletingEvents = false;
        this.lifecycleCompletionExecutor.shutdown();
        try {
            try {
                boolean awaitTermination = this.lifecycleCompletionExecutor.awaitTermination(this.config.getShutdownTimeout().getPeriod(), this.config.getShutdownTimeout().getUnit());
                int size = this.completedOrFailedEvents.size();
                int size2 = this.retriedEvents.size();
                if (size > 0 || size2 > 0) {
                    log.warn("{}: Stopped queue with {} event/notifications non completed", this.svcQName, Integer.valueOf(size + size2));
                }
                return awaitTermination;
            } catch (InterruptedException e) {
                log.info("{}: Lifecycle completion stop sequence has been interrupted", this.svcQName);
                int size3 = this.completedOrFailedEvents.size();
                int size4 = this.retriedEvents.size();
                if (size3 > 0 || size4 > 0) {
                    log.warn("{}: Stopped queue with {} event/notifications non completed", this.svcQName, Integer.valueOf(size3 + size4));
                }
                return false;
            }
        } catch (Throwable th) {
            int size5 = this.completedOrFailedEvents.size();
            int size6 = this.retriedEvents.size();
            if (size5 > 0 || size6 > 0) {
                log.warn("{}: Stopped queue with {} event/notifications non completed", this.svcQName, Integer.valueOf(size5 + size6));
            }
            throw th;
        }
    }

    public <M extends EventEntryModelDao> void dispatchCompletedOrFailedEvents(M m) {
        this.completedOrFailedEvents.add(m);
    }

    public <M extends EventEntryModelDao> void dispatchRetriedEvents(M m) {
        this.retriedEvents.add(m);
    }

    public abstract DispatchResultMetrics doDispatchEvents();

    public abstract void doProcessCompletedEvents(Iterable<? extends EventEntryModelDao> iterable);

    public abstract void doProcessRetriedEvents(Iterable<? extends EventEntryModelDao> iterable);

    public ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void withHandlingRuntimeException(RunnableRawCallback runnableRawCallback) throws InterruptedException {
        try {
            runnableRawCallback.callback();
        } catch (DBIException e) {
            log.warn("{}: Thread {} got DBIException exception: ", new Object[]{this.svcQName, Thread.currentThread().getName(), e});
        } catch (RuntimeException e2) {
            log.warn("{}: Thread {} got Runtime exception: ", new Object[]{this.svcQName, Thread.currentThread().getName(), e2});
        }
    }
}
