/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.stream.PartitionCommitTrigger;
import org.apache.flink.util.StringUtils;

public class ProcTimeCommitTigger
implements PartitionCommitTrigger {
    private static final ListStateDescriptor<Map<String, Long>> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor("pending-partitions-with-time", (TypeSerializer)new MapSerializer((TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE));
    private final ListState<Map<String, Long>> pendingPartitionsState;
    private final Map<String, Long> pendingPartitions;
    private final long commitDelay;
    private final ProcessingTimeService procTimeService;

    public ProcTimeCommitTigger(boolean isRestored, OperatorStateStore stateStore, Configuration conf, ProcessingTimeService procTimeService) throws Exception {
        this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        this.pendingPartitions = new HashMap<String, Long>();
        if (isRestored) {
            this.pendingPartitions.putAll((Map)((Iterable)this.pendingPartitionsState.get()).iterator().next());
        }
        this.procTimeService = procTimeService;
        this.commitDelay = ((Duration)conf.get(FileSystemOptions.SINK_PARTITION_COMMIT_DELAY)).toMillis();
    }

    @Override
    public void addPartition(String partition) {
        if (!StringUtils.isNullOrWhitespaceOnly((String)partition)) {
            this.pendingPartitions.putIfAbsent(partition, this.procTimeService.getCurrentProcessingTime());
        }
    }

    @Override
    public List<String> committablePartitions(long checkpointId) {
        ArrayList<String> needCommit = new ArrayList<String>();
        long currentProcTime = this.procTimeService.getCurrentProcessingTime();
        Iterator<Map.Entry<String, Long>> iter = this.pendingPartitions.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, Long> entry = iter.next();
            long creationTime = entry.getValue();
            if (this.commitDelay != 0L && currentProcTime <= creationTime + this.commitDelay) continue;
            needCommit.add(entry.getKey());
            iter.remove();
        }
        return needCommit;
    }

    @Override
    public void snapshotState(long checkpointId, long watermark) throws Exception {
        this.pendingPartitionsState.clear();
        this.pendingPartitionsState.add(new HashMap<String, Long>(this.pendingPartitions));
    }

    @Override
    public List<String> endInput() {
        ArrayList<String> partitions = new ArrayList<String>(this.pendingPartitions.keySet());
        this.pendingPartitions.clear();
        return partitions;
    }
}

