/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;

public class RocksFullSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K> {
    private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";
    @Nonnull
    private final StreamCompressionDecorator keyGroupCompressionDecorator;

    public RocksFullSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull CloseableRegistry cancelStreamRegistry, @Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
        super(DESCRIPTION, db, rocksDBResourceGuard, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, cancelStreamRegistry);
        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory primaryStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = this.createCheckpointStreamSupplier(checkpointId, primaryStreamFactory, checkpointOptions);
        ArrayList<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(this.kvStateInformation.size());
        ArrayList<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> metaDataCopy = new ArrayList<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>>(this.kvStateInformation.size());
        for (Tuple2 tuple2 : this.kvStateInformation.values()) {
            stateMetaInfoSnapshots.add(((RegisteredStateMetaInfoBase)tuple2.f1).snapshot());
            metaDataCopy.add(tuple2);
        }
        ResourceGuard.Lease lease = this.rocksDBResourceGuard.acquireResource();
        Snapshot snapshot = this.db.getSnapshot();
        SnapshotAsynchronousPartCallable asyncSnapshotCallable = new SnapshotAsynchronousPartCallable(checkpointStreamSupplier, lease, snapshot, stateMetaInfoSnapshots, metaDataCopy, primaryStreamFactory.toString());
        return asyncSnapshotCallable.toAsyncSnapshotFutureTask(this.cancelStreamRegistry);
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(long checkpointId, CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) {
        return this.localRecoveryConfig.isLocalRecoveryEnabled() && CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory, (LocalRecoveryDirectoryProvider)this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory);
    }

    private static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo, ReadOptions readOptions) {
        StateSnapshotTransformer stateSnapshotTransformer = null;
        if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
            stateSnapshotTransformer = ((RegisteredKeyValueStateBackendMetaInfo)metaInfo).getSnapshotTransformer();
        }
        RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
        return stateSnapshotTransformer == null ? new RocksIteratorWrapper(rocksIterator) : new RocksTransformingIteratorWrapper(rocksIterator, (StateSnapshotTransformer<byte[]>)stateSnapshotTransformer);
    }

    @VisibleForTesting
    private class SnapshotAsynchronousPartCallable
    extends AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>> {
        @Nonnull
        private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
        @Nonnull
        private final ResourceGuard.Lease dbLease;
        @Nonnull
        private final Snapshot snapshot;
        @Nonnull
        private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        @Nonnull
        private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> metaDataCopy;
        @Nonnull
        private final String logPathString;

        SnapshotAsynchronousPartCallable(@Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, @Nonnull ResourceGuard.Lease dbLease, @Nonnull Snapshot snapshot, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, @Nonnull List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> metaDataCopy, String logPathString) {
            this.checkpointStreamSupplier = checkpointStreamSupplier;
            this.dbLease = dbLease;
            this.snapshot = snapshot;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            this.metaDataCopy = metaDataCopy;
            this.logPathString = logPathString;
        }

        protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(RocksFullSnapshotStrategy.this.keyGroupRange);
            CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = (CheckpointStreamWithResultProvider)this.checkpointStreamSupplier.get();
            this.registerCloseableForCancellation((Closeable)checkpointStreamWithResultProvider);
            this.writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
            if (this.unregisterCloseableFromCancellation((Closeable)checkpointStreamWithResultProvider)) {
                return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult((SnapshotResult)checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), (KeyGroupRangeOffsets)keyGroupRangeOffsets);
            }
            throw new IOException("Stream is already unregistered/closed.");
        }

        protected void cleanupProvidedResources() {
            RocksFullSnapshotStrategy.this.db.releaseSnapshot(this.snapshot);
            IOUtils.closeQuietly((AutoCloseable)this.snapshot);
            IOUtils.closeQuietly((AutoCloseable)this.dbLease);
        }

        protected void logAsyncSnapshotComplete(long startTime) {
            RocksFullSnapshotStrategy.this.logAsyncCompleted(this.logPathString, startTime);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeSnapshotToOutputStream(@Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
            ArrayList<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = new ArrayList<Tuple2<RocksIteratorWrapper, Integer>>(this.metaDataCopy.size());
            DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper((OutputStream)checkpointStreamWithResultProvider.getCheckpointOutputStream());
            ReadOptions readOptions = new ReadOptions();
            try {
                readOptions.setSnapshot(this.snapshot);
                this.writeKVStateMetaData(kvStateIterators, readOptions, (DataOutputView)outputView);
                this.writeKVStateData(kvStateIterators, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
            }
            catch (Throwable throwable) {
                for (Tuple2 tuple2 : kvStateIterators) {
                    IOUtils.closeQuietly((AutoCloseable)((AutoCloseable)tuple2.f0));
                }
                IOUtils.closeQuietly((AutoCloseable)readOptions);
                throw throwable;
            }
            for (Tuple2 tuple2 : kvStateIterators) {
                IOUtils.closeQuietly((AutoCloseable)((AutoCloseable)tuple2.f0));
            }
            IOUtils.closeQuietly((AutoCloseable)readOptions);
        }

        private void writeKVStateMetaData(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, ReadOptions readOptions, DataOutputView outputView) throws IOException {
            int kvStateId = 0;
            for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : this.metaDataCopy) {
                RocksIteratorWrapper rocksIteratorWrapper = RocksFullSnapshotStrategy.getRocksIterator(RocksFullSnapshotStrategy.this.db, (ColumnFamilyHandle)tuple2.f0, (RegisteredStateMetaInfoBase)tuple2.f1, readOptions);
                kvStateIterators.add((Tuple2<RocksIteratorWrapper, Integer>)Tuple2.of((Object)rocksIteratorWrapper, (Object)kvStateId));
                ++kvStateId;
            }
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(RocksFullSnapshotStrategy.this.keySerializer, this.stateMetaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, RocksFullSnapshotStrategy.this.keyGroupCompressionDecorator));
            serializationProxy.write(outputView);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeKVStateData(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
            byte[] previousKey = null;
            byte[] previousValue = null;
            DataOutputViewStreamWrapper kgOutView = null;
            OutputStream kgOutStream = null;
            CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream();
            try {
                try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(kvStateIterators, RocksFullSnapshotStrategy.this.keyGroupPrefixBytes);){
                    if (mergeIterator.isValid()) {
                        keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                        kgOutStream = RocksFullSnapshotStrategy.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream)checkpointOutputStream);
                        kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                        kgOutView.writeShort(mergeIterator.kvStateId());
                        previousKey = mergeIterator.key();
                        previousValue = mergeIterator.value();
                        mergeIterator.next();
                    }
                    while (mergeIterator.isValid()) {
                        assert (!RocksSnapshotUtil.hasMetaDataFollowsFlag(previousKey));
                        if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
                            this.checkInterrupted();
                            RocksSnapshotUtil.setMetaDataFollowsFlagInKey(previousKey);
                        }
                        this.writeKeyValuePair(previousKey, previousValue, (DataOutputView)kgOutView);
                        if (mergeIterator.isNewKeyGroup()) {
                            kgOutView.writeShort(65535);
                            kgOutStream.close();
                            keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                            kgOutStream = RocksFullSnapshotStrategy.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream)checkpointOutputStream);
                            kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                            kgOutView.writeShort(mergeIterator.kvStateId());
                        } else if (mergeIterator.isNewKeyValueState()) {
                            kgOutView.writeShort(mergeIterator.kvStateId());
                        }
                        previousKey = mergeIterator.key();
                        previousValue = mergeIterator.value();
                        mergeIterator.next();
                    }
                }
                if (previousKey != null) {
                    assert (!RocksSnapshotUtil.hasMetaDataFollowsFlag(previousKey));
                    RocksSnapshotUtil.setMetaDataFollowsFlagInKey(previousKey);
                    this.writeKeyValuePair(previousKey, previousValue, (DataOutputView)kgOutView);
                    kgOutView.writeShort(65535);
                    kgOutStream.close();
                    kgOutStream = null;
                }
            }
            finally {
                IOUtils.closeQuietly(kgOutStream);
            }
        }

        private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
            BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
            BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
        }

        private void checkInterrupted() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("RocksDB snapshot interrupted.");
            }
        }
    }
}

