/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapSnapshotStrategy;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapRestoreOperation<K>
implements RestoreOperation<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapRestoreOperation.class);
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
    private final CloseableRegistry cancelStreamRegistry;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
    @Nonnull
    private final KeyGroupRange keyGroupRange;
    @Nonnegative
    private final int numberOfKeyGroups;
    private final HeapSnapshotStrategy<K> snapshotStrategy;
    private final InternalKeyContext<K> keyContext;

    HeapRestoreOperation(@Nonnull Collection<KeyedStateHandle> restoreStateHandles, StateSerializerProvider<K> keySerializerProvider, ClassLoader userCodeClassLoader, Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates, CloseableRegistry cancelStreamRegistry, HeapPriorityQueueSetFactory priorityQueueSetFactory, @Nonnull KeyGroupRange keyGroupRange, int numberOfKeyGroups, HeapSnapshotStrategy<K> snapshotStrategy, InternalKeyContext<K> keyContext) {
        this.restoreStateHandles = restoreStateHandles;
        this.keySerializerProvider = keySerializerProvider;
        this.userCodeClassLoader = userCodeClassLoader;
        this.registeredKVStates = registeredKVStates;
        this.registeredPQStates = registeredPQStates;
        this.cancelStreamRegistry = cancelStreamRegistry;
        this.priorityQueueSetFactory = priorityQueueSetFactory;
        this.keyGroupRange = keyGroupRange;
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.snapshotStrategy = snapshotStrategy;
        this.keyContext = keyContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Void restore() throws Exception {
        this.registeredKVStates.clear();
        this.registeredPQStates.clear();
        boolean keySerializerRestored = false;
        for (KeyedStateHandle keyedStateHandle : this.restoreStateHandles) {
            if (keyedStateHandle == null) continue;
            if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                throw StateUtil.unexpectedStateHandleException(KeyGroupsStateHandle.class, keyedStateHandle.getClass());
            }
            LOG.info("Starting to restore from state handle: {}.", (Object)keyedStateHandle);
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
            FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                serializationProxy.read((DataInputView)inView);
                if (!keySerializerRestored) {
                    TypeSerializer<K> currentSerializer = this.keySerializerProvider.currentSchemaSerializer();
                    TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
                    if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
                        throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
                    }
                    keySerializerRestored = true;
                }
                List<StateMetaInfoSnapshot> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
                HashMap<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<Integer, StateMetaInfoSnapshot>();
                this.createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById);
                this.readStateHandleStateData(fsDataInputStream, inView, keyGroupsStateHandle.getGroupRangeOffsets(), kvStatesById, restoredMetaInfos.size(), serializationProxy.getReadVersion(), serializationProxy.isUsingKeyGroupCompression());
                LOG.info("Finished restoring from state handle: {}.", (Object)keyedStateHandle);
            }
            finally {
                if (!this.cancelStreamRegistry.unregisterCloseable((Closeable)fsDataInputStream)) continue;
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
        return null;
    }

    private void createOrCheckStateForMetaInfo(List<StateMetaInfoSnapshot> restoredMetaInfo, Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
        for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
            switch (metaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE: {
                    StateSnapshotRestore registeredState = this.registeredKVStates.get(metaInfoSnapshot.getName());
                    if (registeredState != null) break;
                    RegisteredKeyValueStateBackendMetaInfo registeredKeyedBackendStateMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(metaInfoSnapshot);
                    this.registeredKVStates.put(metaInfoSnapshot.getName(), this.snapshotStrategy.newStateTable(this.keyContext, registeredKeyedBackendStateMetaInfo, this.keySerializerProvider.currentSchemaSerializer()));
                    break;
                }
                case PRIORITY_QUEUE: {
                    StateSnapshotRestore registeredState = this.registeredPQStates.get(metaInfoSnapshot.getName());
                    if (registeredState != null) break;
                    this.createInternal(new RegisteredPriorityQueueStateBackendMetaInfo(metaInfoSnapshot));
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected state type: " + (Object)((Object)metaInfoSnapshot.getBackendStateType()) + ".");
                }
            }
            kvStatesById.put(kvStatesById.size(), metaInfoSnapshot);
        }
    }

    private <T extends HeapPriorityQueueElement & PriorityComparable> void createInternal(RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
        String stateName = metaInfo.getName();
        KeyGroupedInternalPriorityQueue priorityQueue = this.priorityQueueSetFactory.create(stateName, (TypeSerializer)metaInfo.getElementSerializer());
        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper = new HeapPriorityQueueSnapshotRestoreWrapper<T>(priorityQueue, metaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups);
        this.registeredPQStates.put(stateName, wrapper);
    }

    private void readStateHandleStateData(FSDataInputStream fsDataInputStream, DataInputViewStreamWrapper inView, KeyGroupRangeOffsets keyGroupOffsets, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int numStates, int readVersion, boolean isCompressed) throws IOException {
        StreamCompressionDecorator streamCompressionDecorator = isCompressed ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
            int keyGroupIndex = (Integer)groupOffset.f0;
            long offset = (Long)groupOffset.f1;
            Preconditions.checkState((boolean)this.keyGroupRange.contains(keyGroupIndex), (Object)"The key group must belong to the backend.");
            fsDataInputStream.seek(offset);
            int writtenKeyGroupIndex = inView.readInt();
            Preconditions.checkState((writtenKeyGroupIndex == keyGroupIndex ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
            InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression((InputStream)fsDataInputStream);
            Throwable throwable = null;
            try {
                this.readKeyGroupStateData(kgCompressionInStream, kvStatesById, keyGroupIndex, numStates, readVersion);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (kgCompressionInStream == null) continue;
                if (throwable != null) {
                    try {
                        kgCompressionInStream.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                kgCompressionInStream.close();
            }
        }
    }

    private void readKeyGroupStateData(InputStream inputStream, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int keyGroupIndex, int numStates, int readVersion) throws IOException {
        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inputStream);
        for (int i = 0; i < numStates; ++i) {
            StateSnapshotRestore registeredState;
            short kvStateId = inView.readShort();
            StateMetaInfoSnapshot stateMetaInfoSnapshot = kvStatesById.get(kvStateId);
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE: {
                    registeredState = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    break;
                }
                case PRIORITY_QUEUE: {
                    registeredState = this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected state type: " + (Object)((Object)stateMetaInfoSnapshot.getBackendStateType()) + ".");
                }
            }
            StateSnapshotKeyGroupReader keyGroupReader = registeredState.keyGroupReader(readVersion);
            keyGroupReader.readMappingsInKeyGroup((DataInputView)inView, keyGroupIndex);
        }
    }
}

