package net.greghaines.jesque.admin;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.admin.commands.PauseCommand;
import net.greghaines.jesque.admin.commands.ShutdownCommand;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.ConcurrentHashSet;
import net.greghaines.jesque.utils.ConcurrentSet;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.ResqueConstants;
import net.greghaines.jesque.worker.DefaultExceptionHandler;
import net.greghaines.jesque.worker.ExceptionHandler;
import net.greghaines.jesque.worker.JobExecutor;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.MapBasedJobFactory;
import net.greghaines.jesque.worker.RecoveryStrategy;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:net/greghaines/jesque/admin/AdminImpl.class */
public class AdminImpl implements Admin {
    private static final Logger LOG = LoggerFactory.getLogger(AdminImpl.class);
    private static final long RECONNECT_SLEEP_TIME = 5000;
    private static final int RECONNECT_ATTEMPTS = 120;
    protected final Jedis jedis;
    protected final String namespace;
    private final JobFactory jobFactory;
    private final ConcurrentSet<String> channels;
    protected final PubSubListener jedisPubSub;
    protected final AtomicReference<Worker> workerRef;
    protected final AtomicReference<JobExecutor.State> state;
    private final AtomicBoolean processingJob;
    private final AtomicReference<Thread> threadRef;
    private final AtomicReference<ExceptionHandler> exceptionHandlerRef;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:net/greghaines/jesque/admin/AdminImpl$PubSubListener.class */
    public class PubSubListener extends JedisPubSub {
        protected PubSubListener() {
        }

        public void onMessage(String str, String str2) {
            try {
                if (str2 != null) {
                    try {
                        AdminImpl.this.processingJob.set(true);
                        Job job = (Job) ObjectMapperFactory.get().readValue(str2, Job.class);
                        AdminImpl.this.execute(job, str, AdminImpl.this.jobFactory.materializeJob(job));
                        AdminImpl.this.processingJob.set(false);
                    } catch (Exception e) {
                        AdminImpl.this.recoverFromException(str, e);
                        AdminImpl.this.processingJob.set(false);
                    }
                }
            } catch (Throwable th) {
                AdminImpl.this.processingJob.set(false);
                throw th;
            }
        }

        public void onPMessage(String str, String str2, String str3) {
        }

        public void onSubscribe(String str, int i) {
        }

        public void onUnsubscribe(String str, int i) {
        }

        public void onPUnsubscribe(String str, int i) {
        }

        public void onPSubscribe(String str, int i) {
        }
    }

    public AdminImpl(Config config) {
        this(config, JesqueUtils.set(ResqueConstants.ADMIN_CHANNEL), new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry("PauseCommand", PauseCommand.class), JesqueUtils.entry("ShutdownCommand", ShutdownCommand.class))));
    }

    public AdminImpl(Config config, Set<String> set, JobFactory jobFactory) {
        this(config, set, jobFactory, new Jedis(config.getHost(), config.getPort(), config.getTimeout()));
    }

    public AdminImpl(Config config, Set<String> set, JobFactory jobFactory, Jedis jedis) {
        this.channels = new ConcurrentHashSet();
        this.jedisPubSub = new PubSubListener();
        this.workerRef = new AtomicReference<>(null);
        this.state = new AtomicReference<>(JobExecutor.State.NEW);
        this.processingJob = new AtomicBoolean(false);
        this.threadRef = new AtomicReference<>(null);
        this.exceptionHandlerRef = new AtomicReference<>(new DefaultExceptionHandler());
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        if (jobFactory == null) {
            throw new IllegalArgumentException("jobFactory must not be null");
        }
        if (jedis == null) {
            throw new IllegalArgumentException("jedis must not be null");
        }
        this.namespace = config.getNamespace();
        this.jedis = jedis;
        if (config.getPassword() != null) {
            this.jedis.auth(config.getPassword());
        }
        this.jedis.select(config.getDatabase());
        setChannels(set);
        this.jobFactory = jobFactory;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!this.state.compareAndSet(JobExecutor.State.NEW, JobExecutor.State.RUNNING)) {
            if (!JobExecutor.State.RUNNING.equals(this.state.get())) {
                throw new IllegalStateException("This AdminImpl is shutdown");
            }
            throw new IllegalStateException("This AdminImpl is already running");
        }
        try {
            LOG.debug("AdminImpl starting up");
            this.threadRef.set(Thread.currentThread());
            while (!isShutdown()) {
                this.jedis.subscribe(this.jedisPubSub, createFullChannels());
            }
            LOG.debug("AdminImpl shutting down");
            this.jedis.quit();
            this.threadRef.set(null);
        } catch (Throwable th) {
            LOG.debug("AdminImpl shutting down");
            this.jedis.quit();
            this.threadRef.set(null);
            throw th;
        }
    }

    @Override // net.greghaines.jesque.admin.Admin
    public Set<String> getChannels() {
        return Collections.unmodifiableSet(this.channels);
    }

    @Override // net.greghaines.jesque.admin.Admin
    public void setChannels(Set<String> set) {
        checkChannels(set);
        this.channels.clear();
        this.channels.addAll(set);
        if (this.jedisPubSub.isSubscribed()) {
            this.jedisPubSub.unsubscribe();
        }
    }

    @Override // net.greghaines.jesque.admin.Admin
    public Worker getWorker() {
        return this.workerRef.get();
    }

    @Override // net.greghaines.jesque.admin.Admin
    public void setWorker(Worker worker) {
        this.workerRef.set(worker);
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public void end(boolean z) {
        Thread thread;
        this.state.set(JobExecutor.State.SHUTDOWN);
        this.jedisPubSub.unsubscribe();
        if (!z || (thread = this.threadRef.get()) == null) {
            return;
        }
        thread.interrupt();
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public boolean isShutdown() {
        return JobExecutor.State.SHUTDOWN.equals(this.state.get());
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public boolean isProcessingJob() {
        return this.processingJob.get();
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public void join(long j) throws InterruptedException {
        Thread thread = this.threadRef.get();
        if (thread == null || !thread.isAlive()) {
            return;
        }
        thread.join(j);
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public JobFactory getJobFactory() {
        return this.jobFactory;
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandlerRef.get();
    }

    @Override // net.greghaines.jesque.worker.JobExecutor
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        if (exceptionHandler == null) {
            throw new IllegalArgumentException("exceptionHandler must not be null");
        }
        this.exceptionHandlerRef.set(exceptionHandler);
    }

    protected Object execute(Job job, String str, Object obj) throws Exception {
        Object obj2;
        if (obj instanceof WorkerAware) {
            ((WorkerAware) obj).setWorker(this.workerRef.get());
        }
        if (obj instanceof Callable) {
            obj2 = ((Callable) obj).call();
        } else {
            if (!(obj instanceof Runnable)) {
                throw new ClassCastException("instance must be a Runnable or a Callable: " + obj.getClass().getName() + " - " + obj);
            }
            ((Runnable) obj).run();
            obj2 = null;
        }
        return obj2;
    }

    protected int getReconnectAttempts() {
        return RECONNECT_ATTEMPTS;
    }

    protected void recoverFromException(String str, Exception exc) {
        RecoveryStrategy onException = this.exceptionHandlerRef.get().onException(this, exc, str);
        switch (onException) {
            case RECONNECT:
                LOG.info("Reconnecting to Redis in response to exception", exc);
                int reconnectAttempts = getReconnectAttempts();
                if (JedisUtils.reconnect(this.jedis, reconnectAttempts, RECONNECT_SLEEP_TIME)) {
                    LOG.info("Reconnected to Redis");
                    return;
                } else {
                    LOG.warn("Terminating in response to exception after " + reconnectAttempts + " to reconnect", exc);
                    end(false);
                    return;
                }
            case TERMINATE:
                LOG.warn("Terminating in response to exception", exc);
                end(false);
                return;
            case PROCEED:
                return;
            default:
                LOG.error("Unknown RecoveryStrategy: " + onException + " while attempting to recover from the following exception; Admin proceeding...", exc);
                return;
        }
    }

    protected static void checkChannels(Iterable<String> iterable) {
        if (iterable == null) {
            throw new IllegalArgumentException("channels must not be null");
        }
        for (String str : iterable) {
            if (str == null || "".equals(str)) {
                throw new IllegalArgumentException("channels' members must not be null: " + iterable);
            }
        }
    }

    private String[] createFullChannels() {
        String[] strArr = (String[]) this.channels.toArray(new String[this.channels.size()]);
        int i = 0;
        for (String str : strArr) {
            int i2 = i;
            i++;
            strArr[i2] = JesqueUtils.createKey(this.namespace, ResqueConstants.CHANNEL, str);
        }
        return strArr;
    }
}
