package org.apache.druid.server.coordinator.duty;

import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.druid.collections.CircularList;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillUnusedSegments.class */
public class KillUnusedSegments implements CoordinatorDuty {
    public static final String KILL_TASK_TYPE = "kill";
    public static final String TASK_ID_PREFIX = "coordinator-issued";
    private static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK = taskStatusPlus -> {
        return null != taskStatusPlus && "kill".equals(taskStatusPlus.getType()) && taskStatusPlus.getId().startsWith(TASK_ID_PREFIX);
    };
    private static final Logger log = new Logger(KillUnusedSegments.class);
    private final Duration period;
    private final Duration durationToRetain;
    private final boolean ignoreDurationToRetain;
    private final int maxSegmentsToKill;
    private final Duration bufferPeriod;
    private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
    private DateTime lastKillTime;
    private final SegmentsMetadataManager segmentsMetadataManager;
    private final OverlordClient overlordClient;
    private String prevDatasourceKilled;
    private CircularList<String> datasourceCircularKillList;

    public KillUnusedSegments(SegmentsMetadataManager segmentsMetadataManager, OverlordClient overlordClient, KillUnusedSegmentsConfig killUnusedSegmentsConfig) {
        this.period = killUnusedSegmentsConfig.getCleanupPeriod();
        this.maxSegmentsToKill = killUnusedSegmentsConfig.getMaxSegments();
        this.ignoreDurationToRetain = killUnusedSegmentsConfig.isIgnoreDurationToRetain();
        this.durationToRetain = killUnusedSegmentsConfig.getDurationToRetain();
        if (this.ignoreDurationToRetain) {
            log.info("druid.coordinator.kill.durationToRetain[%s] will be ignored when discovering segments to kill because druid.coordinator.kill.ignoreDurationToRetain is set to true.", new Object[]{this.durationToRetain});
        }
        this.bufferPeriod = killUnusedSegmentsConfig.getBufferPeriod();
        Logger logger = log;
        Object[] objArr = new Object[4];
        objArr[0] = this.period;
        objArr[1] = this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain;
        objArr[2] = this.bufferPeriod;
        objArr[3] = Integer.valueOf(this.maxSegmentsToKill);
        logger.info("Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s]", objArr);
        this.segmentsMetadataManager = segmentsMetadataManager;
        this.overlordClient = overlordClient;
        this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap();
    }

    @Override // org.apache.druid.server.coordinator.duty.CoordinatorDuty
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        if (canDutyRun()) {
            return runInternal(druidCoordinatorRuntimeParams);
        }
        log.debug("Skipping KillUnusedSegments until period[%s] has elapsed after lastKillTime[%s].", new Object[]{this.period, this.lastKillTime});
        return druidCoordinatorRuntimeParams;
    }

    private DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        CoordinatorDynamicConfig coordinatorDynamicConfig = druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig();
        CoordinatorRunStats coordinatorStats = druidCoordinatorRuntimeParams.getCoordinatorStats();
        int availableKillTaskSlots = getAvailableKillTaskSlots(coordinatorDynamicConfig, coordinatorStats);
        if (availableKillTaskSlots <= 0) {
            log.debug("Skipping KillUnusedSegments because there are no available kill task slots.", new Object[0]);
            return druidCoordinatorRuntimeParams;
        }
        Set<String> retrieveAllDataSourceNames = CollectionUtils.isNullOrEmpty(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()) ? this.segmentsMetadataManager.retrieveAllDataSourceNames() : coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
        if (this.datasourceCircularKillList == null || !this.datasourceCircularKillList.equalsSet(retrieveAllDataSourceNames)) {
            this.datasourceCircularKillList = new CircularList<>(retrieveAllDataSourceNames, Comparator.naturalOrder());
        }
        this.lastKillTime = DateTimes.nowUtc();
        killUnusedSegments(retrieveAllDataSourceNames, availableKillTaskSlots, coordinatorStats);
        this.datasourceToLastKillIntervalEnd.keySet().retainAll(retrieveAllDataSourceNames);
        return druidCoordinatorRuntimeParams;
    }

    private void killUnusedSegments(Set<String> set, int i, CoordinatorRunStats coordinatorRunStats) {
        if (CollectionUtils.isNullOrEmpty(set)) {
            log.debug("Skipping KillUnusedSegments because there are no datasources to kill.", new Object[0]);
            coordinatorRunStats.add(Stats.Kill.SUBMITTED_TASKS, 0L);
            return;
        }
        HashSet hashSet = new HashSet(set);
        int i2 = 0;
        Iterator it = this.datasourceCircularKillList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String str = (String) it.next();
            if (!str.equals(this.prevDatasourceKilled) || hashSet.size() <= 1) {
                this.prevDatasourceKilled = str;
                hashSet.remove(str);
                DateTime minus = DateTimes.nowUtc().minus(this.bufferPeriod);
                Interval findIntervalForKill = findIntervalForKill(str, minus, coordinatorRunStats);
                if (findIntervalForKill == null) {
                    this.datasourceToLastKillIntervalEnd.remove(str);
                    if (hashSet.isEmpty()) {
                        break;
                    }
                } else {
                    try {
                        FutureUtils.getUnchecked(this.overlordClient.runKillTask(TASK_ID_PREFIX, str, findIntervalForKill, null, Integer.valueOf(this.maxSegmentsToKill), minus), true);
                        i2++;
                        this.datasourceToLastKillIntervalEnd.put(str, findIntervalForKill.getEnd());
                        if (hashSet.isEmpty() || i2 >= i) {
                            break;
                        }
                    } catch (Exception e) {
                        log.error(e, "Failed to submit kill task for dataSource[%s] in interval[%s]", new Object[]{str, findIntervalForKill});
                        if (Thread.currentThread().isInterrupted()) {
                            log.warn("Skipping kill task scheduling because thread is interrupted.", new Object[0]);
                            break;
                        }
                    }
                }
            }
        }
        log.info("Submitted [%d] kill tasks for [%d] datasources: [%s]. Remaining [%d] datasources to kill: [%s].", new Object[]{Integer.valueOf(i2), Integer.valueOf(set.size() - hashSet.size()), Sets.difference(set, hashSet), Integer.valueOf(hashSet.size()), hashSet});
        coordinatorRunStats.add(Stats.Kill.SUBMITTED_TASKS, i2);
    }

    @Nullable
    private Interval findIntervalForKill(String str, DateTime dateTime, CoordinatorRunStats coordinatorRunStats) {
        List<Interval> unusedSegmentIntervals = this.segmentsMetadataManager.getUnusedSegmentIntervals(str, this.datasourceToLastKillIntervalEnd.get(str), this.ignoreDurationToRetain ? DateTimes.COMPARE_DATE_AS_STRING_MAX : DateTimes.nowUtc().minus(this.durationToRetain), this.maxSegmentsToKill, dateTime);
        coordinatorRunStats.add(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, RowKey.of(Dimension.DATASOURCE, str), unusedSegmentIntervals.size());
        if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
            return null;
        }
        return unusedSegmentIntervals.size() == 1 ? unusedSegmentIntervals.get(0) : JodaUtils.umbrellaInterval(unusedSegmentIntervals);
    }

    private boolean canDutyRun() {
        return this.lastKillTime == null || !DateTimes.nowUtc().isBefore(this.lastKillTime.plus(this.period));
    }

    private int getAvailableKillTaskSlots(CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorRunStats coordinatorRunStats) {
        int min = Math.min((int) (CoordinatorDutyUtils.getTotalWorkerCapacity(this.overlordClient) * Math.min(coordinatorDynamicConfig.getKillTaskSlotRatio(), 1.0d)), coordinatorDynamicConfig.getMaxKillTaskSlots());
        int max = Math.max(0, min - CoordinatorDutyUtils.getStatusOfActiveTasks(this.overlordClient, IS_AUTO_KILL_TASK).size());
        coordinatorRunStats.add(Stats.Kill.AVAILABLE_SLOTS, max);
        coordinatorRunStats.add(Stats.Kill.MAX_SLOTS, min);
        return max;
    }
}
