/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream.compact;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.compact.CompactContext;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

public class CompactOperator<T>
extends AbstractStreamOperator<PartitionCommitInfo>
implements OneInputStreamOperator<CompactMessages.CoordinatorOutput, PartitionCommitInfo>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    public static final String UNCOMPACTED_PREFIX = ".uncompacted-";
    public static final String COMPACTED_PREFIX = "compacted-";
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    private final CompactReader.Factory<T> readerFactory;
    private final CompactWriter.Factory<T> writerFactory;
    private transient FileSystem fileSystem;
    private transient ListState<Map<Long, List<Path>>> expiredFilesState;
    private transient TreeMap<Long, List<Path>> expiredFiles;
    private transient List<Path> currentExpiredFiles;
    private transient Set<String> partitions;

    public CompactOperator(SupplierWithException<FileSystem, IOException> fsFactory, CompactReader.Factory<T> readerFactory, CompactWriter.Factory<T> writerFactory) {
        this.fsFactory = fsFactory;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.partitions = new HashSet<String>();
        this.fileSystem = (FileSystem)this.fsFactory.get();
        ListStateDescriptor metaDescriptor = new ListStateDescriptor("expired-files", (TypeSerializer)new MapSerializer((TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)new ListSerializer((TypeSerializer)new KryoSerializer(Path.class, this.getExecutionConfig()))));
        this.expiredFilesState = context.getOperatorStateStore().getListState(metaDescriptor);
        this.expiredFiles = new TreeMap();
        this.currentExpiredFiles = new ArrayList<Path>();
        if (context.isRestored()) {
            this.expiredFiles.putAll((Map)((Iterable)this.expiredFilesState.get()).iterator().next());
        }
    }

    public void processElement(StreamRecord<CompactMessages.CoordinatorOutput> element) throws Exception {
        CompactMessages.CoordinatorOutput value = (CompactMessages.CoordinatorOutput)element.getValue();
        if (value instanceof CompactMessages.CompactionUnit) {
            CompactMessages.CompactionUnit unit = (CompactMessages.CompactionUnit)value;
            if (unit.isTaskMessage(this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask())) {
                String partition = unit.getPartition();
                List<Path> paths = unit.getPaths();
                this.doCompact(partition, paths);
                this.partitions.add(partition);
                this.currentExpiredFiles.addAll(paths);
            }
        } else if (value instanceof CompactMessages.EndCompaction) {
            this.endCompaction(((CompactMessages.EndCompaction)value).getCheckpointId());
        }
    }

    private void endCompaction(long checkpoint) {
        this.output.collect((Object)new StreamRecord((Object)new PartitionCommitInfo(checkpoint, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList<String>(this.partitions))));
        this.partitions.clear();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        this.snapshotState(context.getCheckpointId());
    }

    private void snapshotState(long checkpointId) throws Exception {
        this.expiredFilesState.clear();
        this.expiredFiles.put(checkpointId, new ArrayList<Path>(this.currentExpiredFiles));
        this.expiredFilesState.add(this.expiredFiles);
        this.currentExpiredFiles.clear();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.clearExpiredFiles(checkpointId);
    }

    public void endInput() throws Exception {
        this.endCompaction(Long.MAX_VALUE);
        this.snapshotState(Long.MAX_VALUE);
        this.clearExpiredFiles(Long.MAX_VALUE);
    }

    private void clearExpiredFiles(long checkpointId) throws IOException {
        NavigableMap<Long, List<Path>> outOfDateMetas = this.expiredFiles.headMap(checkpointId, true);
        for (List paths : outOfDateMetas.values()) {
            for (Path meta : paths) {
                this.fileSystem.delete(meta, true);
            }
        }
        outOfDateMetas.clear();
    }

    private void doCompact(String partition, List<Path> paths) throws IOException {
        if (paths.size() == 0) {
            return;
        }
        Path target = CompactOperator.createCompactedFile(paths);
        if (this.fileSystem.exists(target)) {
            return;
        }
        this.checkExist(paths);
        long startMillis = System.currentTimeMillis();
        if (paths.size() == 1) {
            this.doAtomicRename(paths.get(0), target);
        } else {
            this.doMultiFilesCompact(partition, paths, target);
        }
        double costSeconds = (double)(System.currentTimeMillis() - startMillis) / 1000.0;
        LOG.info("Compaction time cost is '{}S', target file is '{}', input files are '{}'", new Object[]{costSeconds, target, paths});
    }

    private void doAtomicRename(Path src, Path dst) throws IOException {
        if (this.fileSystem.getKind() == FileSystemKind.FILE_SYSTEM) {
            this.fileSystem.rename(src, dst);
        } else {
            RecoverableWriter writer = this.fileSystem.createRecoverableWriter();
            RecoverableFsDataOutputStream out = writer.open(dst);
            try (FSDataInputStream in = this.fileSystem.open(src);){
                IOUtils.copyBytes((InputStream)in, (OutputStream)out, (boolean)false);
            }
            catch (Throwable t) {
                out.close();
                throw t;
            }
            out.closeForCommit().commit();
        }
    }

    private void doMultiFilesCompact(String partition, List<Path> files, Path dst) throws IOException {
        Configuration config = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        CompactWriter<T> writer = this.writerFactory.create(CompactContext.create(config, this.fileSystem, partition, dst));
        for (Path path : files) {
            CompactReader<T> reader = this.readerFactory.create(CompactContext.create(config, this.fileSystem, partition, path));
            Throwable throwable = null;
            try {
                T record;
                while ((record = reader.read()) != null) {
                    writer.write(record);
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (reader == null) continue;
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                reader.close();
            }
        }
        writer.commit();
    }

    private void checkExist(List<Path> candidates) throws IOException {
        for (Path path : candidates) {
            if (this.fileSystem.exists(path)) continue;
            throw new IOException("Compaction file not exist: " + path);
        }
    }

    private static Path createCompactedFile(List<Path> uncompactedFiles) {
        Path path = CompactOperator.convertFromUncompacted(uncompactedFiles.get(0));
        return new Path(path.getParent(), COMPACTED_PREFIX + path.getName());
    }

    public static String convertToUncompacted(String path) {
        return UNCOMPACTED_PREFIX + path;
    }

    public static Path convertFromUncompacted(Path path) {
        Preconditions.checkArgument((boolean)path.getName().startsWith(UNCOMPACTED_PREFIX), (Object)("This should be uncompacted file: " + path));
        return new Path(path.getParent(), path.getName().substring(UNCOMPACTED_PREFIX.length()));
    }
}

