/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBAggregatingState;
import org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet;
import org.apache.flink.contrib.streaming.state.RocksDBFoldingState;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBListState;
import org.apache.flink.contrib.streaming.state.RocksDBMapState;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBReducingState;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBValueState;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.TreeOrderedSetCache;
import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
    public static final String MERGE_OPERATOR_NAME = "stringappendtest";
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(ValueStateDescriptor.class, RocksDBValueState::create), Tuple2.of(ListStateDescriptor.class, RocksDBListState::create), Tuple2.of(MapStateDescriptor.class, RocksDBMapState::create), Tuple2.of(AggregatingStateDescriptor.class, RocksDBAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, RocksDBReducingState::create), Tuple2.of(FoldingStateDescriptor.class, RocksDBFoldingState::create)).collect(Collectors.toMap(t -> (Class)t.f0, t -> (StateFactory)t.f1));
    private final String operatorIdentifier;
    private final ColumnFamilyOptions columnOptions;
    private final DBOptions dbOptions;
    private final File instanceBasePath;
    private final File instanceRocksDBPath;
    private final ResourceGuard rocksDBResourceGuard;
    protected RocksDB db;
    private ColumnFamilyHandle defaultColumnFamily;
    private final WriteOptions writeOptions;
    private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
    private final int keyGroupPrefixBytes;
    private final boolean enableIncrementalCheckpointing;
    private final LocalRecoveryConfig localRecoveryConfig;
    private RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
    private RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;
    private final PriorityQueueSetFactory priorityQueueFactory;
    private RocksDBWriteBatchWrapper writeBatchWrapper;
    private final RocksDBNativeMetricOptions metricOptions;
    private final MetricGroup metricGroup;
    private RocksDBNativeMetricMonitor nativeMetricMonitor;

    public RocksDBKeyedStateBackend(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider, RocksDBNativeMetricOptions metricOptions, MetricGroup metricGroup) throws IOException {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
        this.operatorIdentifier = (String)Preconditions.checkNotNull((Object)operatorIdentifier);
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        this.rocksDBResourceGuard = new ResourceGuard();
        this.columnOptions = ((ColumnFamilyOptions)Preconditions.checkNotNull((Object)columnFamilyOptions)).setMergeOperatorName(MERGE_OPERATOR_NAME);
        this.dbOptions = (DBOptions)Preconditions.checkNotNull((Object)dbOptions);
        this.instanceBasePath = (File)Preconditions.checkNotNull((Object)instanceBasePath);
        this.instanceRocksDBPath = new File(instanceBasePath, "db");
        RocksDBKeyedStateBackend.checkAndCreateDirectory(instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            this.cleanInstanceBasePath();
        }
        this.localRecoveryConfig = (LocalRecoveryConfig)Preconditions.checkNotNull((Object)localRecoveryConfig);
        this.keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(this.getNumberOfKeyGroups());
        this.kvStateInformation = new LinkedHashMap();
        this.writeOptions = new WriteOptions().setDisableWAL(true);
        this.metricOptions = metricOptions;
        this.metricGroup = metricGroup;
        switch (priorityQueueStateType) {
            case HEAP: {
                this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
                break;
            }
            case ROCKSDB: {
                this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown priority queue state type: " + (Object)((Object)priorityQueueStateType));
            }
        }
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", directory));
        }
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        byte[] nameSpaceBytes;
        Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = this.kvStateInformation.get(state);
        if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
            return Stream.empty();
        }
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)columnInfo.f1;
        TypeSerializer namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
        DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
        boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(this.keySerializer, namespaceSerializer);
        try {
            RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible);
            nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
        }
        catch (IOException ex) {
            throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", (Throwable)ex);
        }
        RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)columnInfo.f0);
        iterator.seekToFirst();
        RocksStateKeysIterator iteratorWrapper = new RocksStateKeysIterator(iterator, state, this.keySerializer, this.keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes);
        Stream targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 16), false);
        return (Stream)targetStream.onClose(iteratorWrapper::close);
    }

    @VisibleForTesting
    public ColumnFamilyHandle getColumnFamilyHandle(String state) {
        Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = this.kvStateInformation.get(state);
        return columnInfo != null ? (ColumnFamilyHandle)columnInfo.f0 : null;
    }

    private void registerKvStateInformation(String columnFamilyName, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn) {
        this.kvStateInformation.put(columnFamilyName, registeredColumn);
        if (this.nativeMetricMonitor != null) {
            this.nativeMetricMonitor.registerColumnFamily(columnFamilyName, (ColumnFamilyHandle)registeredColumn.f0);
        }
    }

    public void dispose() {
        super.dispose();
        this.rocksDBResourceGuard.close();
        if (this.db != null) {
            IOUtils.closeQuietly((AutoCloseable)this.writeBatchWrapper);
            if (this.nativeMetricMonitor != null) {
                this.nativeMetricMonitor.close();
            }
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamily);
            for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData : this.kvStateInformation.values()) {
                IOUtils.closeQuietly((AutoCloseable)((AutoCloseable)columnMetaData.f0));
            }
            IOUtils.closeQuietly((AutoCloseable)this.db);
            this.db = null;
            IOUtils.closeQuietly((AutoCloseable)this.columnOptions);
            IOUtils.closeQuietly((AutoCloseable)this.dbOptions);
            IOUtils.closeQuietly((AutoCloseable)this.writeOptions);
            this.kvStateInformation.clear();
            this.cleanInstanceBasePath();
        }
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
    }

    private void cleanInstanceBasePath() {
        LOG.info("Deleting existing instance base directory {}.", (Object)this.instanceBasePath);
        try {
            FileUtils.deleteDirectory((File)this.instanceBasePath);
        }
        catch (IOException ex) {
            LOG.warn("Could not delete instance base path for RocksDB: " + this.instanceBasePath, (Throwable)ex);
        }
    }

    public int getKeyGroupPrefixBytes() {
        return this.keyGroupPrefixBytes;
    }

    @VisibleForTesting
    PriorityQueueSetFactory getPriorityQueueFactory() {
        return this.priorityQueueFactory;
    }

    public WriteOptions getWriteOptions() {
        return this.writeOptions;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        long startTime = System.currentTimeMillis();
        this.writeBatchWrapper.flush();
        RocksDBSnapshotStrategyBase<K> chosenSnapshotStrategy = CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType() ? this.savepointSnapshotStrategy : this.checkpointSnapshotStrategy;
        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner = chosenSnapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
        chosenSnapshotStrategy.logSyncCompleted(streamFactory, startTime);
        return snapshotRunner;
    }

    public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
        LOG.info("Initializing RocksDB keyed state backend.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
        }
        this.kvStateInformation.clear();
        try {
            RocksDBIncrementalRestoreOperation incrementalRestoreOperation = null;
            if (restoreState == null || restoreState.isEmpty()) {
                this.createDB();
            } else {
                KeyedStateHandle firstStateHandle = restoreState.iterator().next();
                if (firstStateHandle instanceof IncrementalKeyedStateHandle || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
                    incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation(this);
                    incrementalRestoreOperation.restore(restoreState);
                } else {
                    RocksDBFullRestoreOperation fullRestoreOperation = new RocksDBFullRestoreOperation(this);
                    fullRestoreOperation.doRestore(restoreState);
                }
            }
            this.initializeSnapshotStrategy(incrementalRestoreOperation);
        }
        catch (Exception ex) {
            this.dispose();
            throw ex;
        }
    }

    @VisibleForTesting
    void initializeSnapshotStrategy(@Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
        this.savepointSnapshotStrategy = new RocksFullSnapshotStrategy(this.db, this.rocksDBResourceGuard, this.keySerializer, this.kvStateInformation, this.keyGroupRange, this.keyGroupPrefixBytes, this.localRecoveryConfig, this.cancelStreamRegistry, this.keyGroupCompressionDecorator);
        if (this.enableIncrementalCheckpointing) {
            long lastCompletedCheckpointId;
            TreeMap<Long, Set<StateHandleID>> materializedSstFiles;
            UUID backendUID;
            if (incrementalRestoreOperation == null) {
                backendUID = UUID.randomUUID();
                materializedSstFiles = new TreeMap();
                lastCompletedCheckpointId = -1L;
            } else {
                backendUID = (UUID)Preconditions.checkNotNull((Object)incrementalRestoreOperation.getRestoredBackendUID());
                materializedSstFiles = (SortedMap)Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
                lastCompletedCheckpointId = incrementalRestoreOperation.getLastCompletedCheckpointId();
                Preconditions.checkState((lastCompletedCheckpointId >= 0L ? 1 : 0) != 0);
            }
            this.checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy(this.db, this.rocksDBResourceGuard, this.keySerializer, this.kvStateInformation, this.keyGroupRange, this.keyGroupPrefixBytes, this.localRecoveryConfig, this.cancelStreamRegistry, this.instanceBasePath, backendUID, materializedSstFiles, lastCompletedCheckpointId);
        } else {
            this.checkpointSnapshotStrategy = this.savepointSnapshotStrategy;
        }
    }

    public void notifyCheckpointComplete(long completedCheckpointId) throws Exception {
        if (this.checkpointSnapshotStrategy != null) {
            this.checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
        }
        if (this.savepointSnapshotStrategy != null) {
            this.savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
        }
    }

    private void createDB() throws IOException {
        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(1);
        this.db = this.openDB(this.instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
        this.writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeOptions);
        this.defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.get(0);
    }

    private RocksDB openDB(String path, List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
        RocksDB dbRef;
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(1 + stateColumnFamilyDescriptors.size());
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, this.columnOptions));
        columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
        try {
            dbRef = RocksDB.open((DBOptions)((DBOptions)Preconditions.checkNotNull((Object)this.dbOptions)), (String)((String)Preconditions.checkNotNull((Object)path)), columnFamilyDescriptors, stateColumnFamilyHandles);
        }
        catch (RocksDBException e) {
            throw new IOException("Error while opening RocksDB instance.", e);
        }
        Preconditions.checkState((1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size() ? 1 : 0) != 0, (Object)"Not all requested column family handles have been created");
        if (this.metricOptions.isEnabled()) {
            this.nativeMetricMonitor = new RocksDBNativeMetricMonitor(dbRef, this.metricOptions, this.metricGroup);
        }
        return dbRef;
    }

    private <N, S extends State, SV> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> tryRegisterKvStateInformation(StateDescriptor<S, SV> stateDesc, TypeSerializer<N> namespaceSerializer, @Nullable StateSnapshotTransformer<SV> snapshotTransformer) throws Exception {
        ColumnFamilyHandle newColumnFamily;
        RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo;
        Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> oldStateInfo = this.kvStateInformation.get(stateDesc.getName());
        TypeSerializer stateSerializer = stateDesc.getSerializer();
        if (oldStateInfo != null) {
            RegisteredKeyValueStateBackendMetaInfo castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)oldStateInfo.f1;
            oldStateInfo.f1 = newMetaInfo = this.updateRestoredStateMetaInfo(Tuple2.of((Object)oldStateInfo.f0, (Object)castedMetaInfo), stateDesc, namespaceSerializer, stateSerializer, snapshotTransformer);
            newColumnFamily = (ColumnFamilyHandle)oldStateInfo.f0;
        } else {
            newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<N, SV>(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateSerializer, snapshotTransformer);
            newColumnFamily = this.createColumnFamily(stateDesc.getName());
            this.registerKvStateInformation(stateDesc.getName(), (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)Tuple2.of((Object)newColumnFamily, newMetaInfo));
        }
        return Tuple2.of((Object)newColumnFamily, newMetaInfo);
    }

    private <N, S extends State, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo, StateDescriptor<S, SV> stateDesc, TypeSerializer<N> namespaceSerializer, TypeSerializer<SV> stateSerializer, @Nullable StateSnapshotTransformer<SV> snapshotTransformer) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo restoredKvStateMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)oldStateInfo.f1;
        restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer);
        TypeSerializerSchemaCompatibility s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
        if (!s.isCompatibleAsIs()) {
            throw new StateMigrationException("The new namespace serializer must be compatible.");
        }
        restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);
        TypeSerializerSchemaCompatibility newStateSerializerCompatibility = restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
        if (newStateSerializerCompatibility.isCompatibleAfterMigration()) {
            this.migrateStateValues(stateDesc, oldStateInfo);
        } else if (newStateSerializerCompatibility.isIncompatible()) {
            throw new StateMigrationException("The new state serializer cannot be incompatible.");
        }
        return restoredKvStateMetaInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <N, S extends State, SV> void migrateStateValues(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> stateMetaInfo) throws Exception {
        if (stateDesc.getType() == StateDescriptor.Type.MAP) {
            throw new StateMigrationException("The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported.");
        }
        LOG.info("Performing state migration for state {} because the state serializer's schema, i.e. serialization format, has changed.", stateDesc);
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), ((Object)((Object)this)).getClass());
            throw new FlinkRuntimeException(message);
        }
        Object state = stateFactory.createState(stateDesc, stateMetaInfo, this);
        if (!(state instanceof AbstractRocksDBState)) {
            throw new FlinkRuntimeException("State should be an AbstractRocksDBState but is " + state);
        }
        AbstractRocksDBState rocksDBState = (AbstractRocksDBState)state;
        Snapshot rocksDBSnapshot = this.db.getSnapshot();
        try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)stateMetaInfo.f0);
             RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(this.db, this.getWriteOptions());){
            iterator.seekToFirst();
            DataInputDeserializer serializedValueInput = new DataInputDeserializer();
            DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512);
            while (iterator.isValid()) {
                serializedValueInput.setBuffer(iterator.value());
                rocksDBState.migrateSerializedValue(serializedValueInput, migratedSerializedValueOutput, ((RegisteredKeyValueStateBackendMetaInfo)stateMetaInfo.f1).getPreviousStateSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)stateMetaInfo.f1).getStateSerializer());
                batchWriter.put((ColumnFamilyHandle)stateMetaInfo.f0, iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer());
                migratedSerializedValueOutput.clear();
                iterator.next();
            }
        }
        finally {
            this.db.releaseSnapshot(rocksDBSnapshot);
            rocksDBSnapshot.close();
        }
    }

    private ColumnFamilyHandle createColumnFamily(String stateName) {
        byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
        Preconditions.checkState((!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes) ? 1 : 0) != 0, (Object)"The chosen state name 'default' collides with the name of the default column family!");
        ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, this.columnOptions);
        try {
            return this.db.createColumnFamily(columnDescriptor);
        }
        catch (RocksDBException e) {
            throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", (Throwable)e);
        }
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), ((Object)((Object)this)).getClass());
            throw new FlinkRuntimeException(message);
        }
        Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer, this.getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
        return stateFactory.createState(stateDesc, registerResult, this);
    }

    private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        if (stateDesc instanceof ListStateDescriptor) {
            Optional original = snapshotTransformFactory.createForDeserializedState();
            return original.map(est -> this.createRocksDBListStateTransformer(stateDesc, (StateSnapshotTransformer)est)).orElse(null);
        }
        if (stateDesc instanceof MapStateDescriptor) {
            Optional original = snapshotTransformFactory.createForSerializedState();
            return original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new).orElse(null);
        }
        Optional original = snapshotTransformFactory.createForSerializedState();
        return original.orElse(null);
    }

    private <SV, SEV> StateSnapshotTransformer<SV> createRocksDBListStateTransformer(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer<SEV> elementTransformer) {
        return new RocksDBListState.StateSnapshotTransformerWrapper<SEV>(elementTransformer, ((ListStateDescriptor)stateDesc).getElementSerializer());
    }

    public File getInstanceBasePath() {
        return this.instanceBasePath;
    }

    public boolean supportsAsynchronousSnapshots() {
        return true;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int count = 0;
        for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : this.kvStateInformation.values()) {
            RocksIteratorWrapper rocksIterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)column.f0);
            Throwable throwable = null;
            try {
                rocksIterator.seekToFirst();
                while (rocksIterator.isValid()) {
                    ++count;
                    rocksIterator.next();
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (rocksIterator == null) continue;
                if (throwable != null) {
                    try {
                        rocksIterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                rocksIterator.close();
            }
        }
        return count;
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
        return new RocksIteratorWrapper(db.newIterator());
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
        return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
    }

    @Nonnull
    private <T> Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tryRegisterPriorityQueueMetaInfo(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        Tuple2 metaInfoTuple = this.kvStateInformation.get(stateName);
        if (metaInfoTuple == null) {
            ColumnFamilyHandle columnFamilyHandle = this.createColumnFamily(stateName);
            RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
            metaInfoTuple = new Tuple2((Object)columnFamilyHandle, (Object)metaInfo);
            this.registerKvStateInformation(stateName, (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)metaInfoTuple);
        } else {
            RegisteredPriorityQueueStateBackendMetaInfo castedMetaInfo = (RegisteredPriorityQueueStateBackendMetaInfo)metaInfoTuple.f1;
            TypeSerializer previousElementSerializer = castedMetaInfo.getPreviousElementSerializer();
            if (previousElementSerializer != byteOrderedElementSerializer) {
                TypeSerializerSchemaCompatibility compatibilityResult = castedMetaInfo.updateElementSerializer(byteOrderedElementSerializer);
                if (compatibilityResult.isIncompatible()) {
                    throw new FlinkRuntimeException((Throwable)new StateMigrationException("The new priority queue serializer must not be incompatible."));
                }
                metaInfoTuple.f1 = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
            }
        }
        return metaInfoTuple;
    }

    public boolean requiresLegacySynchronousTimerSnapshots() {
        return this.priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
    }

    class RocksDBPriorityQueueSetFactory
    implements PriorityQueueSetFactory {
        private static final int DEFAULT_CACHES_SIZE = 128;
        @Nonnull
        private final DataOutputSerializer sharedElementOutView = new DataOutputSerializer(128);
        @Nonnull
        private final DataInputDeserializer sharedElementInView = new DataInputDeserializer();

        RocksDBPriorityQueueSetFactory() {
        }

        @Nonnull
        public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, final @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
            Tuple2 metaInfoTuple = RocksDBKeyedStateBackend.this.tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer);
            final ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)metaInfoTuple.f0;
            return new KeyGroupPartitionedPriorityQueue(KeyExtractorFunction.forKeyedObjects(), PriorityComparator.forPriorityComparableObjects(), new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, RocksDBCachingPriorityQueueSet<T>>(){

                @Nonnull
                public RocksDBCachingPriorityQueueSet<T> create(int keyGroupId, int numKeyGroups, @Nonnull KeyExtractorFunction<T> keyExtractor, @Nonnull PriorityComparator<T> elementPriorityComparator) {
                    TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(128);
                    return new RocksDBCachingPriorityQueueSet(keyGroupId, RocksDBKeyedStateBackend.this.keyGroupPrefixBytes, RocksDBKeyedStateBackend.this.db, columnFamilyHandle, byteOrderedElementSerializer, RocksDBPriorityQueueSetFactory.this.sharedElementOutView, RocksDBPriorityQueueSetFactory.this.sharedElementInView, RocksDBKeyedStateBackend.this.writeBatchWrapper, orderedSetCache);
                }
            }, RocksDBKeyedStateBackend.this.keyGroupRange, RocksDBKeyedStateBackend.this.numberOfKeyGroups);
        }
    }

    private static class RocksDBIncrementalRestoreOperation<T> {
        private final RocksDBKeyedStateBackend<T> stateBackend;
        private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
        private UUID restoredBackendUID;
        private long lastCompletedCheckpointId;

        private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
            this.stateBackend = stateBackend;
            this.restoredSstFiles = new TreeMap<Long, Set<StateHandleID>>();
        }

        SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() {
            return this.restoredSstFiles;
        }

        UUID getRestoredBackendUID() {
            return this.restoredBackendUID;
        }

        long getLastCompletedCheckpointId() {
            return this.lastCompletedCheckpointId;
        }

        void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
            boolean isRescaling;
            if (restoreStateHandles.isEmpty()) {
                return;
            }
            KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
            boolean bl = isRescaling = restoreStateHandles.size() > 1 || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange);
            if (!isRescaling) {
                this.restoreWithoutRescaling(theFirstStateHandle);
            } else {
                this.restoreWithRescaling(restoreStateHandles);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception {
            Path temporaryRestoreInstancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceBasePath.getAbsolutePath(), UUID.randomUUID().toString());
            try {
                IncrementalLocalKeyedStateHandle localKeyedStateHandle;
                List<ColumnFamilyDescriptor> columnFamilyDescriptors;
                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
                if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
                    IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle)rawStateHandle;
                    this.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
                    stateMetaInfoSnapshots = this.readMetaData(restoreStateHandle.getMetaStateHandle());
                    columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
                    localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(restoreStateHandle.getBackendIdentifier(), restoreStateHandle.getCheckpointId(), new DirectoryStateHandle(temporaryRestoreInstancePath), restoreStateHandle.getKeyGroupRange(), restoreStateHandle.getMetaStateHandle(), restoreStateHandle.getSharedState().keySet());
                } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) {
                    localKeyedStateHandle = (IncrementalLocalKeyedStateHandle)rawStateHandle;
                    stateMetaInfoSnapshots = this.readMetaData(localKeyedStateHandle.getMetaDataState());
                    columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
                } else {
                    throw new IllegalStateException("Unexpected state handle type, expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass());
                }
                this.restoreLocalStateIntoFullInstance(localKeyedStateHandle, columnFamilyDescriptors, stateMetaInfoSnapshots);
            }
            finally {
                FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
                if (restoreFileSystem.exists(temporaryRestoreInstancePath)) {
                    restoreFileSystem.delete(temporaryRestoreInstancePath, true);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
            this.restoredBackendUID = UUID.randomUUID();
            this.initTargetDB(restoreStateHandles, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange);
            byte[] startKeyGroupPrefixBytes = new byte[((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes];
            RocksDBKeySerializationUtils.serializeKeyGroup(this.stateBackend.getKeyGroupRange().getStartKeyGroup(), startKeyGroupPrefixBytes);
            byte[] stopKeyGroupPrefixBytes = new byte[((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes];
            RocksDBKeySerializationUtils.serializeKeyGroup(this.stateBackend.getKeyGroupRange().getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
            for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
                if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected " + IncrementalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass());
                }
                Path temporaryRestoreInstancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
                try {
                    RestoredDBInstance tmpRestoreDBInfo = this.restoreDBInstanceFromStateHandle((IncrementalKeyedStateHandle)rawStateHandle, temporaryRestoreInstancePath);
                    Throwable throwable = null;
                    try {
                        RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db);
                        Throwable throwable2 = null;
                        try {
                            List tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
                            List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
                            for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
                                ColumnFamilyHandle tmpColumnFamilyHandle = (ColumnFamilyHandle)tmpColumnFamilyHandles.get(i);
                                ColumnFamilyDescriptor tmpColumnFamilyDescriptor = (ColumnFamilyDescriptor)tmpColumnFamilyDescriptors.get(i);
                                ColumnFamilyHandle targetColumnFamilyHandle = this.getOrRegisterColumnFamilyHandle(tmpColumnFamilyDescriptor, null, (StateMetaInfoSnapshot)tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
                                try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle);){
                                    iterator.seek(startKeyGroupPrefixBytes);
                                    while (iterator.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
                                        writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
                                        iterator.next();
                                    }
                                    continue;
                                }
                            }
                        }
                        catch (Throwable throwable3) {
                            throwable2 = throwable3;
                            throw throwable3;
                        }
                        finally {
                            if (writeBatchWrapper == null) continue;
                            if (throwable2 != null) {
                                try {
                                    writeBatchWrapper.close();
                                }
                                catch (Throwable throwable4) {
                                    throwable2.addSuppressed(throwable4);
                                }
                                continue;
                            }
                            writeBatchWrapper.close();
                        }
                    }
                    catch (Throwable throwable5) {
                        throwable = throwable5;
                        throw throwable5;
                    }
                    finally {
                        if (tmpRestoreDBInfo == null) continue;
                        if (throwable != null) {
                            try {
                                tmpRestoreDBInfo.close();
                            }
                            catch (Throwable throwable6) {
                                throwable.addSuppressed(throwable6);
                            }
                            continue;
                        }
                        tmpRestoreDBInfo.close();
                    }
                }
                finally {
                    FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
                    if (!restoreFileSystem.exists(temporaryRestoreInstancePath)) continue;
                    restoreFileSystem.delete(temporaryRestoreInstancePath, true);
                }
            }
        }

        private RestoredDBInstance restoreDBInstanceFromStateHandle(IncrementalKeyedStateHandle restoreStateHandle, Path temporaryRestoreInstancePath) throws Exception {
            this.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = this.readMetaData(restoreStateHandle.getMetaStateHandle());
            List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
            ArrayList columnFamilyHandles = new ArrayList(stateMetaInfoSnapshots.size() + 1);
            RocksDB restoreDb = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(temporaryRestoreInstancePath.getPath(), columnFamilyDescriptors, columnFamilyHandles);
            return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
        }

        private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(ColumnFamilyDescriptor columnFamilyDescriptor, ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
            Tuple2 registeredStateMetaInfoEntry = (Tuple2)((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.get(stateMetaInfoSnapshot.getName());
            if (null == registeredStateMetaInfoEntry) {
                RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
                registeredStateMetaInfoEntry = new Tuple2((Object)(columnFamilyHandle != null ? columnFamilyHandle : this.stateBackend.db.createColumnFamily(columnFamilyDescriptor)), (Object)stateMetaInfo);
                ((RocksDBKeyedStateBackend)this.stateBackend).registerKvStateInformation(stateMetaInfoSnapshot.getName(), (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)registeredStateMetaInfoEntry);
            }
            return (ColumnFamilyHandle)registeredStateMetaInfoEntry.f0;
        }

        private void initTargetDB(Collection<KeyedStateHandle> restoreStateHandles, KeyGroupRange targetKeyGroupRange) throws Exception {
            IncrementalKeyedStateHandle initialHandle = (IncrementalKeyedStateHandle)RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(restoreStateHandles, targetKeyGroupRange);
            if (initialHandle != null) {
                restoreStateHandles.remove(initialHandle);
                RestoredDBInstance restoreDBInfo = null;
                Path instancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath());
                try {
                    restoreDBInfo = this.restoreDBInstanceFromStateHandle(initialHandle, instancePath);
                    RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(restoreDBInfo.db, restoreDBInfo.columnFamilyHandles, targetKeyGroupRange, initialHandle.getKeyGroupRange(), ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes);
                    this.stateBackend.db = restoreDBInfo.db;
                    ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
                    ((RocksDBKeyedStateBackend)this.stateBackend).writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db, ((RocksDBKeyedStateBackend)this.stateBackend).writeOptions);
                    for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
                        this.getOrRegisterColumnFamilyHandle((ColumnFamilyDescriptor)restoreDBInfo.columnFamilyDescriptors.get(i), (ColumnFamilyHandle)restoreDBInfo.columnFamilyHandles.get(i), (StateMetaInfoSnapshot)restoreDBInfo.stateMetaInfoSnapshots.get(i));
                    }
                }
                catch (Exception e) {
                    FileSystem restoreFileSystem;
                    if (restoreDBInfo != null) {
                        restoreDBInfo.close();
                    }
                    if ((restoreFileSystem = instancePath.getFileSystem()).exists(instancePath)) {
                        restoreFileSystem.delete(instancePath, true);
                    }
                    throw e;
                }
            } else {
                ArrayList columnFamilyHandles = new ArrayList(1);
                this.stateBackend.db = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
                ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.get(0);
                ((RocksDBKeyedStateBackend)this.stateBackend).writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db, ((RocksDBKeyedStateBackend)this.stateBackend).writeOptions);
            }
        }

        private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(stateMetaInfoSnapshots.size());
            for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
                ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), ((RocksDBKeyedStateBackend)this.stateBackend).columnOptions);
                columnFamilyDescriptors.add(columnFamilyDescriptor);
            }
            return columnFamilyDescriptors;
        }

        private void restoreLocalStateIntoFullInstance(IncrementalLocalKeyedStateHandle restoreStateHandle, List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception {
            this.restoredBackendUID = restoreStateHandle.getBackendIdentifier();
            LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", (Object)((RocksDBKeyedStateBackend)this.stateBackend).operatorIdentifier, (Object)this.restoredBackendUID);
            if (!((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.mkdirs()) {
                throw new IOException("Could not create RocksDB data directory.");
            }
            Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory();
            this.restoreInstanceDirectoryFromPath(restoreSourcePath);
            ArrayList columnFamilyHandles = new ArrayList(1 + columnFamilyDescriptors.size());
            this.stateBackend.db = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
            ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.remove(0);
            ((RocksDBKeyedStateBackend)this.stateBackend).writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db, ((RocksDBKeyedStateBackend)this.stateBackend).writeOptions);
            for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
                StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
                ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)columnFamilyHandles.get(i);
                RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
                ((RocksDBKeyedStateBackend)this.stateBackend).registerKvStateInformation(stateMetaInfoSnapshot.getName(), (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)new Tuple2((Object)columnFamilyHandle, (Object)stateMetaInfo));
            }
            this.restoredSstFiles.put(restoreStateHandle.getCheckpointId(), restoreStateHandle.getSharedStateHandleIDs());
            this.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
        }

        private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
            FileSystem fileSystem = source.getFileSystem();
            FileStatus[] fileStatuses = fileSystem.listStatus(source);
            if (fileStatuses == null) {
                throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
            }
            for (FileStatus fileStatus : fileStatuses) {
                Path filePath = fileStatus.getPath();
                String fileName = filePath.getName();
                File restoreFile = new File(source.getPath(), fileName);
                File targetFile = new File(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getPath(), fileName);
                if (fileName.endsWith(".sst")) {
                    Files.createLink(targetFile.toPath(), restoreFile.toPath());
                    continue;
                }
                Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<StateMetaInfoSnapshot> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
            FSDataInputStream inputStream = null;
            try {
                inputStream = metaStateHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)inputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.stateBackend).userCodeClassLoader);
                DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)inputStream);
                serializationProxy.read((DataInputView)in);
                if (!serializationProxy.getKeySerializerConfigSnapshot().resolveSchemaCompatibility(((RocksDBKeyedStateBackend)this.stateBackend).keySerializer).isCompatibleAsIs()) {
                    throw new StateMigrationException("The new key serializer must be compatible.");
                }
                List list = serializationProxy.getStateMetaInfoSnapshots();
                return list;
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) {
                    inputStream.close();
                }
            }
        }

        private void transferAllStateDataToDirectory(IncrementalKeyedStateHandle restoreStateHandle, Path dest) throws IOException {
            Map sstFiles = restoreStateHandle.getSharedState();
            Map miscFiles = restoreStateHandle.getPrivateState();
            this.transferAllDataFromStateHandles(sstFiles, dest);
            this.transferAllDataFromStateHandles(miscFiles, dest);
        }

        private void transferAllDataFromStateHandles(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath) throws IOException {
            for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
                StateHandleID stateHandleID = entry.getKey();
                StreamStateHandle remoteFileHandle = entry.getValue();
                this.copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void copyStateDataHandleData(Path restoreFilePath, StreamStateHandle remoteFileHandle) throws IOException {
            FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
            FSDataInputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            try {
                int numBytes;
                inputStream = remoteFileHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)inputStream);
                outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)outputStream);
                byte[] buffer = new byte[8192];
                while ((numBytes = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, numBytes);
                }
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) {
                    inputStream.close();
                }
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)outputStream)) {
                    outputStream.close();
                }
            }
        }

        private class RestoredDBInstance
        implements AutoCloseable {
            @Nonnull
            private final RocksDB db;
            @Nonnull
            private final ColumnFamilyHandle defaultColumnFamilyHandle;
            @Nonnull
            private final List<ColumnFamilyHandle> columnFamilyHandles;
            @Nonnull
            private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
            @Nonnull
            private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

            private RestoredDBInstance(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
                this.db = db;
                this.columnFamilyHandles = columnFamilyHandles;
                this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0);
                this.columnFamilyDescriptors = columnFamilyDescriptors;
                this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            }

            @Override
            public void close() {
                IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamilyHandle);
                for (ColumnFamilyHandle columnFamilyHandle : this.columnFamilyHandles) {
                    IOUtils.closeQuietly((AutoCloseable)columnFamilyHandle);
                }
                IOUtils.closeQuietly((AutoCloseable)this.db);
            }
        }
    }

    private static final class RocksDBFullRestoreOperation<K> {
        private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
        private KeyGroupsStateHandle currentKeyGroupsStateHandle;
        private FSDataInputStream currentStateHandleInStream;
        private DataInputView currentStateHandleInView;
        private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
        private StreamCompressionDecorator keygroupStreamCompressionDecorator;

        public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
            this.rocksDBKeyedStateBackend = (RocksDBKeyedStateBackend)((Object)Preconditions.checkNotNull(rocksDBKeyedStateBackend));
        }

        public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) throws IOException, StateMigrationException, RocksDBException {
            ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).createDB();
            for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                if (keyedStateHandle == null) continue;
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                this.restoreKeyGroupsInStateHandle();
            }
        }

        private void restoreKeyGroupsInStateHandle() throws IOException, StateMigrationException, RocksDBException {
            try {
                this.currentStateHandleInStream = this.currentKeyGroupsStateHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).cancelStreamRegistry.registerCloseable((Closeable)this.currentStateHandleInStream);
                this.currentStateHandleInView = new DataInputViewStreamWrapper((InputStream)this.currentStateHandleInStream);
                this.restoreKVStateMetaData();
                this.restoreKVStateData();
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)this.currentStateHandleInStream)) {
                    IOUtils.closeQuietly((AutoCloseable)this.currentStateHandleInStream);
                }
            }
        }

        private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).userCodeClassLoader);
            serializationProxy.read(this.currentStateHandleInView);
            if (!serializationProxy.getKeySerializerConfigSnapshot().resolveSchemaCompatibility(((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).keySerializer).isCompatibleAsIs()) {
                throw new StateMigrationException("The new key serializer must be compatible.");
            }
            this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
            List restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
            this.currentStateHandleKVStateColumnFamilies = new ArrayList<ColumnFamilyHandle>(restoredMetaInfos.size());
            for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
                Tuple2 registeredColumn = (Tuple2)((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).kvStateInformation.get(restoredMetaInfo.getName());
                if (registeredColumn == null) {
                    byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
                    ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(nameBytes, ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).columnOptions);
                    ColumnFamilyHandle columnFamily = this.rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
                    RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)restoredMetaInfo);
                    registeredColumn = new Tuple2((Object)columnFamily, (Object)stateMetaInfo);
                    ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).kvStateInformation.put(restoredMetaInfo.getName(), registeredColumn);
                }
                this.currentStateHandleKVStateColumnFamilies.add((ColumnFamilyHandle)registeredColumn.f0);
            }
        }

        private void restoreKVStateData() throws IOException, RocksDBException {
            try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksDBKeyedStateBackend.db);){
                for (Tuple2 keyGroupOffset : this.currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
                    int keyGroup = (Integer)keyGroupOffset.f0;
                    Preconditions.checkState((boolean)this.rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), (Object)"The key group must belong to the backend");
                    long offset = (Long)keyGroupOffset.f1;
                    if (0L == offset) continue;
                    this.currentStateHandleInStream.seek(offset);
                    InputStream compressedKgIn = this.keygroupStreamCompressionDecorator.decorateWithCompression((InputStream)this.currentStateHandleInStream);
                    Throwable throwable = null;
                    try {
                        DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                        int kvStateId = compressedKgInputView.readShort();
                        ColumnFamilyHandle handle = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                        boolean keyGroupHasMoreKeys = true;
                        while (keyGroupHasMoreKeys) {
                            byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView)compressedKgInputView);
                            byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView)compressedKgInputView);
                            if (RocksSnapshotUtil.hasMetaDataFollowsFlag(key)) {
                                RocksSnapshotUtil.clearMetaDataFollowsFlag(key);
                                writeBatchWrapper.put(handle, key, value);
                                kvStateId = 0xFFFF & compressedKgInputView.readShort();
                                if (65535 == kvStateId) {
                                    keyGroupHasMoreKeys = false;
                                    continue;
                                }
                                handle = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                                continue;
                            }
                            writeBatchWrapper.put(handle, key, value);
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (compressedKgIn == null) continue;
                        if (throwable != null) {
                            try {
                                compressedKgIn.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        compressedKgIn.close();
                    }
                }
            }
        }
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS createState(StateDescriptor<S, SV> var1, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> var2, RocksDBKeyedStateBackend<K> var3) throws Exception;
    }
}

