package org.apache.druid.discovery;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.client.InputStreamHolder;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.QueryResource;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;

/* loaded from: input_file:org/apache/druid/discovery/DataServerResponseHandler.class */
public class DataServerResponseHandler implements HttpResponseHandler<InputStream, InputStream> {
    private static final Logger log = new Logger(DataServerResponseHandler.class);
    private final Query<?> query;
    private final ResponseContext responseContext;
    private final ObjectMapper objectMapper;
    private final long maxQueuedBytes;
    final boolean usingBackpressure;
    private final long failTime;
    private final AtomicLong totalByteCount = new AtomicLong(0);
    private final AtomicReference<HttpResponseHandler.TrafficCop> trafficCopRef = new AtomicReference<>();
    private final AtomicLong queuedByteCount = new AtomicLong(0);
    private final AtomicBoolean done = new AtomicBoolean(false);
    private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue();
    private final AtomicReference<String> fail = new AtomicReference<>();

    public DataServerResponseHandler(Query<?> query, ResponseContext responseContext, ObjectMapper objectMapper) {
        this.query = query;
        this.responseContext = responseContext;
        this.objectMapper = objectMapper;
        QueryContext context = query.context();
        this.maxQueuedBytes = context.getMaxQueuedBytes(0L);
        this.usingBackpressure = this.maxQueuedBytes > 0;
        long currentTimeMillis = System.currentTimeMillis();
        if (context.hasTimeout()) {
            this.failTime = currentTimeMillis + context.getTimeout();
        } else {
            this.failTime = 0L;
        }
    }

    public ClientResponse<InputStream> handleResponse(HttpResponse httpResponse, HttpResponseHandler.TrafficCop trafficCop) {
        this.trafficCopRef.set(trafficCop);
        checkQueryTimeout();
        log.debug("Received response status[%s] for queryId[%s]", new Object[]{httpResponse.getStatus(), this.query.getId()});
        try {
            String str = httpResponse.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
            if (str != null) {
                this.responseContext.merge(ResponseContext.deserialize(str, this.objectMapper));
            }
            return ClientResponse.finished(new SequenceInputStream(new Enumeration<InputStream>() { // from class: org.apache.druid.discovery.DataServerResponseHandler.2
                @Override // java.util.Enumeration
                public boolean hasMoreElements() {
                    boolean z;
                    if (DataServerResponseHandler.this.fail.get() != null) {
                        throw new RE((String) DataServerResponseHandler.this.fail.get(), new Object[0]);
                    }
                    DataServerResponseHandler.this.checkQueryTimeout();
                    synchronized (DataServerResponseHandler.this.done) {
                        z = (DataServerResponseHandler.this.done.get() && DataServerResponseHandler.this.queue.isEmpty()) ? false : true;
                    }
                    return z;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Enumeration
                public InputStream nextElement() {
                    if (DataServerResponseHandler.this.fail.get() != null) {
                        throw new RE((String) DataServerResponseHandler.this.fail.get(), new Object[0]);
                    }
                    try {
                        return DataServerResponseHandler.this.dequeue();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            }), enqueue(httpResponse.getContent(), 0L));
        } catch (IOException e) {
            return ClientResponse.finished(new InputStream() { // from class: org.apache.druid.discovery.DataServerResponseHandler.1
                @Override // java.io.InputStream
                public int read() throws IOException {
                    throw e;
                }
            });
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e2);
        }
    }

    public ClientResponse<InputStream> handleChunk(ClientResponse<InputStream> clientResponse, HttpChunk httpChunk, long j) {
        checkQueryTimeout();
        ChannelBuffer content = httpChunk.getContent();
        int readableBytes = content.readableBytes();
        boolean z = true;
        if (readableBytes > 0) {
            try {
                z = enqueue(content, j);
                this.totalByteCount.addAndGet(readableBytes);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        return ClientResponse.finished(clientResponse.getObj(), z);
    }

    public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse) {
        log.debug("Finished reading response for queryId[%s]. Read total[%d]", new Object[]{this.query.getId(), Long.valueOf(this.totalByteCount.get())});
        synchronized (this.done) {
            try {
                try {
                    this.queue.put(InputStreamHolder.fromChannelBuffer(ChannelBuffers.EMPTY_BUFFER, Long.MAX_VALUE));
                    this.done.set(true);
                } catch (Throwable th) {
                    this.done.set(true);
                    throw th;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        return ClientResponse.finished(clientResponse.getObj());
    }

    public void exceptionCaught(ClientResponse<InputStream> clientResponse, Throwable th) {
        setupResponseReadFailure(StringUtils.format("Query[%s] failed with exception msg [%s]", new Object[]{this.query.getId(), th.getMessage()}), th);
    }

    private boolean enqueue(ChannelBuffer channelBuffer, long j) throws InterruptedException {
        InputStreamHolder fromChannelBuffer = InputStreamHolder.fromChannelBuffer(channelBuffer, j);
        long addAndGet = this.queuedByteCount.addAndGet(fromChannelBuffer.getLength());
        this.queue.put(fromChannelBuffer);
        return !this.usingBackpressure || addAndGet < this.maxQueuedBytes;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public InputStream dequeue() throws InterruptedException {
        InputStreamHolder poll = this.queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
        if (poll == null) {
            throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] timed out.", new Object[]{this.query.getId()}));
        }
        long addAndGet = this.queuedByteCount.addAndGet(-poll.getLength());
        if (this.usingBackpressure && addAndGet < this.maxQueuedBytes) {
            ((HttpResponseHandler.TrafficCop) Preconditions.checkNotNull(this.trafficCopRef.get(), "No TrafficCop, how can this be?")).resume(poll.getChunkNum());
        }
        return poll.getStream();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long checkQueryTimeout() {
        long currentTimeMillis = this.failTime - System.currentTimeMillis();
        if (currentTimeMillis > 0) {
            return currentTimeMillis;
        }
        String format = StringUtils.format("Query[%s] timed out.", new Object[]{this.query.getId()});
        setupResponseReadFailure(format, null);
        throw new QueryTimeoutException(format);
    }

    private void setupResponseReadFailure(final String str, final Throwable th) {
        this.fail.set(str);
        this.queue.clear();
        this.queue.offer(InputStreamHolder.fromStream(new InputStream() { // from class: org.apache.druid.discovery.DataServerResponseHandler.3
            @Override // java.io.InputStream
            public int read() throws IOException {
                if (th != null) {
                    throw new IOException(str, th);
                }
                throw new IOException(str);
            }
        }, -1L, 0L));
    }
}
