package org.apache.druid.messages.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.druid.common.guava.FutureBox;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.messages.MessageBatch;

/* loaded from: input_file:org/apache/druid/messages/server/OutboxImpl.class */
public class OutboxImpl<MessageType> implements Outbox<MessageType> {
    private static final int MAX_BATCH_SIZE = 8;
    private final ConcurrentHashMap<String, OutboxQueue<MessageType>> queues = new ConcurrentHashMap<>();
    private volatile boolean stopped;

    /* loaded from: input_file:org/apache/druid/messages/server/OutboxImpl$OutboxQueue.class */
    public static class OutboxQueue<T> {
        private final FutureBox pendingFutures = new FutureBox();

        @GuardedBy("this")
        private long startWatermark = 0;

        @GuardedBy("this")
        private final Deque<Pair<SettableFuture<?>, T>> queue = new ArrayDeque();

        @GuardedBy("this")
        private SettableFuture<?> messageAvailableFuture = SettableFuture.create();
        private final long epoch = ThreadLocalRandom.current().nextLong() & Long.MAX_VALUE;

        ListenableFuture<?> sendMessage(T t) {
            SettableFuture create = SettableFuture.create();
            synchronized (this) {
                this.queue.add(Pair.of(create, t));
                if (!this.messageAvailableFuture.isDone()) {
                    this.messageAvailableFuture.set((Object) null);
                }
            }
            return this.pendingFutures.register(create);
        }

        ListenableFuture<MessageBatch<T>> getMessages(long j) {
            synchronized (this) {
                while (!this.queue.isEmpty() && this.startWatermark < j) {
                    Pair<SettableFuture<?>, T> poll = this.queue.poll();
                    this.startWatermark++;
                    ((SettableFuture) poll.lhs).set((Object) null);
                }
                if (!this.queue.isEmpty()) {
                    return this.pendingFutures.register(Futures.immediateFuture(nextBatch()));
                }
                if (this.messageAvailableFuture.isDone()) {
                    this.messageAvailableFuture = SettableFuture.create();
                }
                return this.pendingFutures.register(FutureUtils.transform(Futures.nonCancellationPropagating(this.messageAvailableFuture), obj -> {
                    MessageBatch<T> nextBatch;
                    synchronized (this) {
                        nextBatch = nextBatch();
                    }
                    return nextBatch;
                }));
            }
        }

        void stop() {
            this.pendingFutures.close();
        }

        @GuardedBy("this")
        private MessageBatch<T> nextBatch() {
            ArrayList arrayList = new ArrayList();
            Iterator<Pair<SettableFuture<?>, T>> it = this.queue.iterator();
            while (it.hasNext() && arrayList.size() < OutboxImpl.MAX_BATCH_SIZE) {
                arrayList.add(it.next().rhs);
            }
            return new MessageBatch<>(arrayList, this.epoch, this.startWatermark);
        }
    }

    @LifecycleStop
    public void stop() {
        this.stopped = true;
        Iterator<OutboxQueue<MessageType>> it = this.queues.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
            it.remove();
        }
    }

    @Override // org.apache.druid.messages.server.Outbox
    public ListenableFuture<?> sendMessage(String str, MessageType messagetype) {
        return this.stopped ? Futures.immediateCancelledFuture() : this.queues.computeIfAbsent(str, str2 -> {
            return new OutboxQueue();
        }).sendMessage(messagetype);
    }

    @Override // org.apache.druid.messages.server.Outbox
    public ListenableFuture<MessageBatch<MessageType>> getMessages(String str, long j, long j2) {
        if (this.stopped) {
            return Futures.immediateCancelledFuture();
        }
        OutboxQueue<MessageType> computeIfAbsent = this.queues.computeIfAbsent(str, str2 -> {
            return new OutboxQueue();
        });
        return (j == ((OutboxQueue) computeIfAbsent).epoch || j == -1) ? computeIfAbsent.getMessages(j2) : Futures.immediateFuture(new MessageBatch(Collections.emptyList(), ((OutboxQueue) computeIfAbsent).epoch, 0L));
    }

    @Override // org.apache.druid.messages.server.Outbox
    public void resetOutbox(String str) {
        OutboxQueue<MessageType> remove = this.queues.remove(str);
        if (remove != null) {
            remove.stop();
        }
    }

    @VisibleForTesting
    long getOutboxEpoch(String str) {
        OutboxQueue<MessageType> outboxQueue = this.queues.get(str);
        if (outboxQueue != null) {
            return ((OutboxQueue) outboxQueue).epoch;
        }
        return -1L;
    }
}
