package org.apache.druid.messages.server;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.messages.MessageBatch;

/* loaded from: input_file:org/apache/druid/messages/server/MessageRelayResource.class */
public class MessageRelayResource<MessageType> {
    private static final Logger log = new Logger(MessageRelayResource.class);
    private static final long GET_MESSAGES_TIMEOUT = 30000;
    private final Outbox<MessageType> outbox;
    private final ObjectMapper smileMapper;
    private final JavaType batchType;

    public MessageRelayResource(Outbox<MessageType> outbox, ObjectMapper objectMapper, Class<MessageType> cls) {
        this.outbox = outbox;
        this.smileMapper = objectMapper;
        this.batchType = objectMapper.getTypeFactory().constructParametricType(MessageBatch.class, new Class[]{cls});
    }

    @GET
    @Path("/outbox/{clientHost}/messages")
    public Void httpGetMessagesFromOutbox(@PathParam("clientHost") String str, @QueryParam("epoch") Long l, @QueryParam("watermark") Long l2, @Context HttpServletRequest httpServletRequest) throws IOException {
        if (l == null || l2 == null || str == null || str.isEmpty()) {
            AsyncContext startAsync = httpServletRequest.startAsync();
            startAsync.getResponse().sendError(400);
            startAsync.complete();
            return null;
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final ListenableFuture<MessageBatch<MessageType>> messages = this.outbox.getMessages(str, l.longValue(), l2.longValue());
        final AsyncContext startAsync2 = httpServletRequest.startAsync();
        startAsync2.setTimeout(GET_MESSAGES_TIMEOUT);
        startAsync2.addListener(new AsyncListener() { // from class: org.apache.druid.messages.server.MessageRelayResource.1
            public void onComplete(AsyncEvent asyncEvent) {
            }

            public void onTimeout(AsyncEvent asyncEvent) {
                if (atomicBoolean.compareAndSet(false, true)) {
                    startAsync2.getResponse().setStatus(204);
                    asyncEvent.getAsyncContext().complete();
                    messages.cancel(true);
                }
            }

            public void onError(AsyncEvent asyncEvent) {
            }

            public void onStartAsync(AsyncEvent asyncEvent) {
            }
        });
        final String remoteAddr = httpServletRequest.getRemoteAddr();
        final String requestURI = httpServletRequest.getRequestURI();
        Futures.addCallback(messages, new FutureCallback<MessageBatch<MessageType>>() { // from class: org.apache.druid.messages.server.MessageRelayResource.2
            public void onSuccess(MessageBatch<MessageType> messageBatch) {
                if (atomicBoolean.compareAndSet(false, true)) {
                    MessageRelayResource.log.debug("Sending message batch: %s", new Object[]{messageBatch});
                    try {
                        HttpServletResponse response = startAsync2.getResponse();
                        response.setStatus(200);
                        response.setContentType("application/x-jackson-smile");
                        MessageRelayResource.this.smileMapper.writerFor(MessageRelayResource.this.batchType).writeValue(startAsync2.getResponse().getOutputStream(), messageBatch);
                        response.getOutputStream().close();
                        startAsync2.complete();
                    } catch (Exception e) {
                        MessageRelayResource.log.noStackTrace().warn(e, "Could not respond to request from[%s] to[%s]", new Object[]{remoteAddr, requestURI});
                    }
                }
            }

            public void onFailure(Throwable th) {
                if (atomicBoolean.compareAndSet(false, true)) {
                    try {
                        startAsync2.getResponse().sendError(500);
                        startAsync2.complete();
                    } catch (Exception e) {
                        th.addSuppressed(e);
                    }
                    MessageRelayResource.log.noStackTrace().warn(th, "Request failed from[%s] to[%s]", new Object[]{remoteAddr, requestURI});
                }
            }
        }, Execs.directExecutor());
        return null;
    }
}
