package org.apache.druid.segment.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.timeline.SegmentId;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.class */
public class SegmentSchemaBackFillQueue {
    private static final EmittingLogger log = new EmittingLogger(SegmentSchemaBackFillQueue.class);
    private static final int MAX_BATCH_SIZE = 500;
    private final long executionPeriod;
    private final SegmentSchemaManager segmentSchemaManager;
    private final SegmentSchemaCache segmentSchemaCache;
    private final FingerprintGenerator fingerprintGenerator;
    private final ServiceEmitter emitter;
    private final CentralizedDatasourceSchemaConfig config;
    private final ScheduledExecutorService executor;
    private final BlockingDeque<SegmentSchemaManager.SegmentSchemaMetadataPlus> queue = new LinkedBlockingDeque();

    @Nullable
    private ScheduledFuture<?> scheduledFuture = null;

    @Inject
    public SegmentSchemaBackFillQueue(SegmentSchemaManager segmentSchemaManager, ScheduledExecutorFactory scheduledExecutorFactory, SegmentSchemaCache segmentSchemaCache, FingerprintGenerator fingerprintGenerator, ServiceEmitter serviceEmitter, CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig) {
        this.segmentSchemaManager = segmentSchemaManager;
        this.segmentSchemaCache = segmentSchemaCache;
        this.fingerprintGenerator = fingerprintGenerator;
        this.emitter = serviceEmitter;
        this.config = centralizedDatasourceSchemaConfig;
        this.executionPeriod = centralizedDatasourceSchemaConfig.getBackFillPeriod();
        this.executor = isEnabled() ? scheduledExecutorFactory.create(1, "SegmentSchemaBackFillQueue-%s") : null;
    }

    @LifecycleStop
    public void stop() {
        this.executor.shutdownNow();
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
        }
    }

    public void onLeaderStart() {
        if (isEnabled()) {
            this.scheduledFuture = this.executor.scheduleAtFixedRate(this::processBatchesDueSafely, this.executionPeriod, this.executionPeriod, TimeUnit.MILLISECONDS);
        }
    }

    public void onLeaderStop() {
        if (!isEnabled() || this.scheduledFuture == null) {
            return;
        }
        this.scheduledFuture.cancel(true);
    }

    public void add(SegmentId segmentId, RowSignature rowSignature, Map<String, AggregatorFactory> map, long j) {
        SchemaPayloadPlus schemaPayloadPlus = new SchemaPayloadPlus(new SchemaPayload(rowSignature, map), Long.valueOf(j));
        this.queue.add(new SegmentSchemaManager.SegmentSchemaMetadataPlus(segmentId, this.fingerprintGenerator.generateFingerprint(schemaPayloadPlus.getSchemaPayload(), segmentId.getDataSource(), 1), schemaPayloadPlus));
    }

    public boolean isEnabled() {
        return this.config.isEnabled() && this.config.isBackFillEnabled();
    }

    private void processBatchesDueSafely() {
        try {
            processBatchesDue();
        } catch (Exception e) {
            log.error(e, "Exception backfilling segment schemas.", new Object[0]);
        }
    }

    @VisibleForTesting
    public void processBatchesDue() {
        if (this.queue.isEmpty()) {
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        log.info("Backfilling segment schema. Queue size is [%s].", new Object[]{Integer.valueOf(this.queue.size())});
        int min = Math.min(MAX_BATCH_SIZE, this.queue.size());
        HashMap hashMap = new HashMap();
        for (int i = 0; i < min; i++) {
            SegmentSchemaManager.SegmentSchemaMetadataPlus poll = this.queue.poll();
            if (poll != null) {
                ((List) hashMap.computeIfAbsent(poll.getSegmentId().getDataSource(), str -> {
                    return new ArrayList();
                })).add(poll);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            try {
                this.segmentSchemaManager.persistSchemaAndUpdateSegmentsTable((String) entry.getKey(), (List) entry.getValue(), 1);
                Iterator it = ((List) entry.getValue()).iterator();
                while (it.hasNext()) {
                    this.segmentSchemaCache.markMetadataQueryResultPublished(((SegmentSchemaManager.SegmentSchemaMetadataPlus) it.next()).getSegmentId());
                }
                this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", entry.getKey()).setMetric("metadatacache/backfill/count", Integer.valueOf(((List) entry.getValue()).size())));
            } catch (Exception e) {
                log.error(e, "Exception persisting schema and updating segments table for datasource[%s].", new Object[]{entry.getKey()});
            }
        }
        this.emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time", Long.valueOf(createStarted.millisElapsed())));
    }
}
