/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.MetastoreCommitPolicy;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.PartitionCommitTrigger;
import org.apache.flink.table.utils.PartitionPathUtils;

public class StreamingFileCommitter
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<CommitMessage, Void> {
    private static final long serialVersionUID = 1L;
    private final Configuration conf;
    private final Path locationPath;
    private final ObjectIdentifier tableIdentifier;
    private final List<String> partitionKeys;
    private final TableMetaStoreFactory metaStoreFactory;
    private final FileSystemFactory fsFactory;
    private transient PartitionCommitTrigger trigger;
    private transient TaskTracker taskTracker;
    private transient long currentWatermark;
    private transient List<PartitionCommitPolicy> policies;

    public StreamingFileCommitter(Path locationPath, ObjectIdentifier tableIdentifier, List<String> partitionKeys, TableMetaStoreFactory metaStoreFactory, FileSystemFactory fsFactory, Configuration conf) {
        this.locationPath = locationPath;
        this.tableIdentifier = tableIdentifier;
        this.partitionKeys = partitionKeys;
        this.metaStoreFactory = metaStoreFactory;
        this.fsFactory = fsFactory;
        this.conf = conf;
        PartitionCommitPolicy.validatePolicyChain(metaStoreFactory instanceof EmptyMetaStoreFactory, (String)conf.get(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND));
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.currentWatermark = Long.MIN_VALUE;
        this.trigger = PartitionCommitTrigger.create(context.isRestored(), context.getOperatorStateStore(), this.conf, this.getUserCodeClassloader(), this.partitionKeys, this.getProcessingTimeService());
        this.policies = PartitionCommitPolicy.createPolicyChain(this.getUserCodeClassloader(), (String)this.conf.get(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND), (String)this.conf.get(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS), (String)this.conf.get(FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME), () -> {
            try {
                return this.fsFactory.create(this.locationPath.toUri());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void processElement(StreamRecord<CommitMessage> element) throws Exception {
        boolean needCommit;
        CommitMessage message = (CommitMessage)element.getValue();
        for (String partition : message.partitions) {
            this.trigger.addPartition(partition);
        }
        if (this.taskTracker == null) {
            this.taskTracker = new TaskTracker(message.numberOfTasks);
        }
        if (needCommit = this.taskTracker.add(message.checkpointId, message.taskId)) {
            this.commitPartitions(message.checkpointId);
        }
    }

    private void commitPartitions(long checkpointId) throws Exception {
        List<String> partitions;
        List<String> list = partitions = checkpointId == Long.MAX_VALUE ? this.trigger.endInput() : this.trigger.committablePartitions(checkpointId);
        if (partitions.isEmpty()) {
            return;
        }
        try (TableMetaStoreFactory.TableMetaStore metaStore = this.metaStoreFactory.createTableMetaStore();){
            for (String partition : partitions) {
                LinkedHashMap partSpec = PartitionPathUtils.extractPartitionSpecFromPath((Path)new Path(partition));
                LOG.info("Partition {} of table {} is ready to be committed", (Object)partSpec, (Object)this.tableIdentifier);
                Path path = new Path(this.locationPath, PartitionPathUtils.generatePartitionPath((LinkedHashMap)partSpec));
                PolicyContext context = new PolicyContext(new ArrayList(partSpec.values()), path);
                for (PartitionCommitPolicy policy : this.policies) {
                    if (policy instanceof MetastoreCommitPolicy) {
                        ((MetastoreCommitPolicy)policy).setMetastore(metaStore);
                    }
                    policy.commit(context);
                }
            }
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.trigger.snapshotState(context.getCheckpointId(), this.currentWatermark);
    }

    private class PolicyContext
    implements PartitionCommitPolicy.Context {
        private final List<String> partitionValues;
        private final Path partitionPath;

        private PolicyContext(List<String> partitionValues, Path partitionPath) {
            this.partitionValues = partitionValues;
            this.partitionPath = partitionPath;
        }

        @Override
        public String catalogName() {
            return StreamingFileCommitter.this.tableIdentifier.getCatalogName();
        }

        @Override
        public String databaseName() {
            return StreamingFileCommitter.this.tableIdentifier.getDatabaseName();
        }

        @Override
        public String tableName() {
            return StreamingFileCommitter.this.tableIdentifier.getObjectName();
        }

        @Override
        public List<String> partitionKeys() {
            return StreamingFileCommitter.this.partitionKeys;
        }

        @Override
        public List<String> partitionValues() {
            return this.partitionValues;
        }

        @Override
        public Path partitionPath() {
            return this.partitionPath;
        }
    }

    private static class TaskTracker {
        private final int numberOfTasks;
        private TreeMap<Long, Set<Integer>> notifiedTasks = new TreeMap();

        private TaskTracker(int numberOfTasks) {
            this.numberOfTasks = numberOfTasks;
        }

        private boolean add(long checkpointId, int task) {
            Set tasks = this.notifiedTasks.computeIfAbsent(checkpointId, k -> new HashSet());
            tasks.add(task);
            if (tasks.size() == this.numberOfTasks) {
                this.notifiedTasks.headMap(checkpointId, true).clear();
                return true;
            }
            return false;
        }
    }

    public static class CommitMessage
    implements Serializable {
        public long checkpointId;
        public int taskId;
        public int numberOfTasks;
        public List<String> partitions;

        public CommitMessage() {
        }

        public CommitMessage(long checkpointId, int taskId, int numberOfTasks, List<String> partitions) {
            this.checkpointId = checkpointId;
            this.taskId = taskId;
            this.numberOfTasks = numberOfTasks;
            this.partitions = partitions;
        }
    }
}

