/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem;

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableFactory;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.table.filesystem.PartitionComputer;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;

public class FileSystemTableSink
implements AppendStreamTableSink<RowData>,
PartitionableTableSink,
OverwritableTableSink {
    private final ObjectIdentifier tableIdentifier;
    private final boolean isBounded;
    private final TableSchema schema;
    private final List<String> partitionKeys;
    private final Path path;
    private final String defaultPartName;
    private final Map<String, String> properties;
    private boolean overwrite = false;
    private boolean dynamicGrouping = false;
    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap();

    public FileSystemTableSink(ObjectIdentifier tableIdentifier, boolean isBounded, TableSchema schema, Path path, List<String> partitionKeys, String defaultPartName, Map<String, String> properties) {
        this.tableIdentifier = tableIdentifier;
        this.isBounded = isBounded;
        this.schema = schema;
        this.path = path;
        this.defaultPartName = defaultPartName;
        this.partitionKeys = partitionKeys;
        this.properties = properties;
    }

    public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
        RowDataPartitionComputer computer = new RowDataPartitionComputer(this.defaultPartName, this.schema.getFieldNames(), this.schema.getFieldDataTypes(), this.partitionKeys.toArray(new String[0]));
        EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(this.path);
        OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("part-" + UUID.randomUUID().toString()).build();
        FileSystemFactory fsFactory = FileSystem::get;
        if (this.isBounded) {
            FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<RowData>();
            builder.setPartitionComputer(computer);
            builder.setDynamicGrouped(this.dynamicGrouping);
            builder.setPartitionColumns(this.partitionKeys.toArray(new String[0]));
            builder.setFormatFactory(this.createOutputFormatFactory());
            builder.setMetaStoreFactory(metaStoreFactory);
            builder.setFileSystemFactory(fsFactory);
            builder.setOverwrite(this.overwrite);
            builder.setStaticPartitions(this.staticPartitions);
            builder.setTempPath(this.toStagingPath());
            builder.setOutputFileConfig(outputFileConfig);
            return dataStream.writeUsingOutputFormat(builder.build()).setParallelism(dataStream.getParallelism());
        }
        Configuration conf = new Configuration();
        this.properties.forEach((arg_0, arg_1) -> ((Configuration)conf).setString(arg_0, arg_1));
        Object writer = this.createWriter();
        TableBucketAssigner assigner = new TableBucketAssigner(computer);
        TableRollingPolicy rollingPolicy = new TableRollingPolicy(!(writer instanceof Encoder), ((MemorySize)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE)).getBytes(), ((Duration)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL)).toMillis());
        Object bucketsBuilder = writer instanceof Encoder ? ((StreamingFileSink.DefaultRowFormatBuilder)((StreamingFileSink.DefaultRowFormatBuilder)StreamingFileSink.forRowFormat((Path)this.path, (Encoder)new ProjectionEncoder((Encoder)writer, computer)).withBucketAssigner((BucketAssigner)assigner)).withOutputFileConfig(outputFileConfig)).withRollingPolicy((RollingPolicy)rollingPolicy) : ((StreamingFileSink.DefaultBulkFormatBuilder)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)this.path, (BulkWriter.Factory)new ProjectionBulkFactory((BulkWriter.Factory<RowData>)((BulkWriter.Factory)writer), computer)).withBucketAssigner((BucketAssigner)assigner)).withOutputFileConfig(outputFileConfig)).withRollingPolicy((CheckpointRollingPolicy)rollingPolicy);
        return FileSystemTableSink.createStreamingSink(conf, this.path, this.partitionKeys, this.tableIdentifier, this.overwrite, dataStream, bucketsBuilder, metaStoreFactory, fsFactory, ((Duration)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL)).toMillis());
    }

    public static DataStreamSink<RowData> createStreamingSink(Configuration conf, Path path, List<String> partitionKeys, ObjectIdentifier tableIdentifier, boolean overwrite, DataStream<RowData> inputStream, StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder, TableMetaStoreFactory msFactory, FileSystemFactory fsFactory, long rollingCheckInterval) {
        SingleOutputStreamOperator writerStream;
        if (overwrite) {
            throw new IllegalStateException("Streaming mode not support overwrite.");
        }
        StreamingFileWriter fileWriter = new StreamingFileWriter(rollingCheckInterval, bucketsBuilder);
        SingleOutputStreamOperator returnStream = writerStream = inputStream.transform(StreamingFileWriter.class.getSimpleName(), TypeExtractor.createTypeInfo(StreamingFileCommitter.CommitMessage.class), (OneInputStreamOperator)fileWriter).setParallelism(inputStream.getParallelism());
        if (partitionKeys.size() > 0 && conf.contains(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND)) {
            StreamingFileCommitter committer = new StreamingFileCommitter(path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
            returnStream = writerStream.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, (OneInputStreamOperator)committer).setParallelism(1).setMaxParallelism(1);
        }
        return returnStream.addSink((SinkFunction)new DiscardingSink()).setParallelism(1);
    }

    private Path toStagingPath() {
        Path stagingDir = new Path(this.path, ".staging_" + System.currentTimeMillis());
        try {
            FileSystem fs = stagingDir.getFileSystem();
            Preconditions.checkState((fs.exists(stagingDir) || fs.mkdirs(stagingDir) ? 1 : 0) != 0, (Object)("Failed to create staging dir " + stagingDir));
            return stagingDir;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private OutputFormatFactory<RowData> createOutputFormatFactory() {
        Object writer = this.createWriter();
        return writer instanceof Encoder ? path -> FileSystemTableSink.createEncoderOutputFormat((Encoder<RowData>)((Encoder)writer), path) : path -> FileSystemTableSink.createBulkWriterOutputFormat((BulkWriter.Factory<RowData>)((BulkWriter.Factory)writer), path);
    }

    private Object createWriter() {
        final FileSystemFormatFactory formatFactory = FileSystemTableFactory.createFormatFactory(this.properties);
        final Configuration conf = new Configuration();
        this.properties.forEach((arg_0, arg_1) -> ((Configuration)conf).setString(arg_0, arg_1));
        FileSystemFormatFactory.WriterContext context = new FileSystemFormatFactory.WriterContext(){

            public TableSchema getSchema() {
                return FileSystemTableSink.this.schema;
            }

            public ReadableConfig getFormatOptions() {
                return new DelegatingConfiguration(conf, formatFactory.factoryIdentifier() + ".");
            }

            public List<String> getPartitionKeys() {
                return FileSystemTableSink.this.partitionKeys;
            }
        };
        Optional encoder = formatFactory.createEncoder(context);
        Optional bulk = formatFactory.createBulkWriterFactory(context);
        if (encoder.isPresent()) {
            return encoder.get();
        }
        if (bulk.isPresent()) {
            return bulk.get();
        }
        throw new TableException(formatFactory + " format should implement at least one Encoder or BulkWriter");
    }

    private static OutputFormat<RowData> createBulkWriterOutputFormat(final BulkWriter.Factory<RowData> factory, final Path path) {
        return new OutputFormat<RowData>(){
            private static final long serialVersionUID = 1L;
            private transient BulkWriter<RowData> writer;

            public void configure(Configuration parameters) {
            }

            public void open(int taskNumber, int numTasks) throws IOException {
                this.writer = factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
            }

            public void writeRecord(RowData record) throws IOException {
                this.writer.addElement((Object)record);
            }

            public void close() throws IOException {
                this.writer.flush();
                this.writer.finish();
            }
        };
    }

    private static OutputFormat<RowData> createEncoderOutputFormat(final Encoder<RowData> encoder, final Path path) {
        return new OutputFormat<RowData>(){
            private static final long serialVersionUID = 1L;
            private transient FSDataOutputStream output;

            public void configure(Configuration parameters) {
            }

            public void open(int taskNumber, int numTasks) throws IOException {
                this.output = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
            }

            public void writeRecord(RowData record) throws IOException {
                encoder.encode((Object)record, (OutputStream)this.output);
            }

            public void close() throws IOException {
                this.output.flush();
                this.output.close();
            }
        };
    }

    public FileSystemTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        return this;
    }

    public void setOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    public void setStaticPartition(Map<String, String> partitions) {
        this.staticPartitions = this.toPartialLinkedPartSpec(partitions);
    }

    private LinkedHashMap<String, String> toPartialLinkedPartSpec(Map<String, String> part) {
        LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
        for (String partitionKey : this.partitionKeys) {
            if (!part.containsKey(partitionKey)) continue;
            partSpec.put(partitionKey, part.get(partitionKey));
        }
        return partSpec;
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public DataType getConsumedDataType() {
        return (DataType)this.schema.toRowDataType().bridgedTo(RowData.class);
    }

    public boolean configurePartitionGrouping(boolean supportsGrouping) {
        this.dynamicGrouping = supportsGrouping;
        return this.dynamicGrouping;
    }

    public static class ProjectionBulkFactory
    implements BulkWriter.Factory<RowData> {
        private final BulkWriter.Factory<RowData> factory;
        private final RowDataPartitionComputer computer;

        public ProjectionBulkFactory(BulkWriter.Factory<RowData> factory, RowDataPartitionComputer computer) {
            this.factory = factory;
            this.computer = computer;
        }

        public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
            final BulkWriter writer = this.factory.create(out);
            return new BulkWriter<RowData>(){

                public void addElement(RowData element) throws IOException {
                    writer.addElement((Object)computer.projectColumnsToWrite(element));
                }

                public void flush() throws IOException {
                    writer.flush();
                }

                public void finish() throws IOException {
                    writer.finish();
                }
            };
        }
    }

    private static class ProjectionEncoder
    implements Encoder<RowData> {
        private final Encoder<RowData> encoder;
        private final RowDataPartitionComputer computer;

        private ProjectionEncoder(Encoder<RowData> encoder, RowDataPartitionComputer computer) {
            this.encoder = encoder;
            this.computer = computer;
        }

        public void encode(RowData element, OutputStream stream) throws IOException {
            this.encoder.encode((Object)this.computer.projectColumnsToWrite(element), stream);
        }
    }

    public static class TableRollingPolicy
    extends CheckpointRollingPolicy<RowData, String> {
        private final boolean rollOnCheckpoint;
        private final long rollingFileSize;
        private final long rollingTimeInterval;

        public TableRollingPolicy(boolean rollOnCheckpoint, long rollingFileSize, long rollingTimeInterval) {
            this.rollOnCheckpoint = rollOnCheckpoint;
            Preconditions.checkArgument((rollingFileSize > 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((rollingTimeInterval > 0L ? 1 : 0) != 0);
            this.rollingFileSize = rollingFileSize;
            this.rollingTimeInterval = rollingTimeInterval;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
            try {
                return this.rollOnCheckpoint || partFileState.getSize() > this.rollingFileSize;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) throws IOException {
            return partFileState.getSize() > this.rollingFileSize;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
            return currentTime - partFileState.getCreationTime() >= this.rollingTimeInterval;
        }
    }

    public static class TableBucketAssigner
    implements BucketAssigner<RowData, String> {
        private final PartitionComputer<RowData> computer;

        public TableBucketAssigner(PartitionComputer<RowData> computer) {
            this.computer = computer;
        }

        public String getBucketId(RowData element, BucketAssigner.Context context) {
            try {
                return PartitionPathUtils.generatePartitionPath(this.computer.generatePartValues(element));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }
}

