/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
implements InternalListState<K, N, V> {
    private final TypeSerializer<V> elementSerializer;
    private static final byte DELIMITER = 44;

    private RocksDBListState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<List<V>> valueSerializer, List<V> defaultValue, TypeSerializer<V> elementSerializer, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.elementSerializer = elementSerializer;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<List<V>> getValueSerializer() {
        return this.valueSerializer;
    }

    public Iterable<V> get() {
        return this.getInternal();
    }

    public List<V> getInternal() {
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.dataOutputView.getCopyOfBuffer();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            return this.deserializeList(valueBytes);
        }
        catch (IOException | RocksDBException e) {
            throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
        }
    }

    private List<V> deserializeList(byte[] valueBytes) {
        V next;
        if (valueBytes == null) {
            return null;
        }
        this.dataInputView.setBuffer(valueBytes);
        ArrayList<V> result = new ArrayList<V>();
        while ((next = RocksDBListState.deserializeNextElement(this.dataInputView, this.elementSerializer)) != null) {
            result.add(next);
        }
        return result;
    }

    private static <V> V deserializeNextElement(DataInputDeserializer in, TypeSerializer<V> elementSerializer) {
        try {
            if (in.available() > 0) {
                Object element = elementSerializer.deserialize((DataInputView)in);
                if (in.available() > 0) {
                    in.readByte();
                }
                return (V)element;
            }
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Unexpected list element deserialization failure");
        }
        return null;
    }

    public void add(V value) {
        Preconditions.checkNotNull(value, (String)"You cannot add null to a ListState.");
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.dataOutputView.getCopyOfBuffer();
            this.dataOutputView.clear();
            this.elementSerializer.serialize(value, (DataOutputView)this.dataOutputView);
            this.backend.db.merge(this.columnFamily, this.writeOptions, key, this.dataOutputView.getCopyOfBuffer());
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error while adding data to RocksDB", (Throwable)e);
        }
    }

    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        Object key = this.backend.getCurrentKey();
        int keyGroup = this.backend.getCurrentKeyGroupIndex();
        try {
            this.writeKeyWithGroupAndNamespace(keyGroup, key, target, this.dataOutputView);
            byte[] targetKey = this.dataOutputView.getCopyOfBuffer();
            for (N source : sources) {
                if (source == null) continue;
                this.writeKeyWithGroupAndNamespace(keyGroup, key, source, this.dataOutputView);
                byte[] sourceKey = this.dataOutputView.getCopyOfBuffer();
                byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
                this.backend.db.delete(this.columnFamily, this.writeOptions, sourceKey);
                if (valueBytes == null) continue;
                this.backend.db.merge(this.columnFamily, this.writeOptions, targetKey, valueBytes);
            }
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", (Throwable)e);
        }
    }

    public void update(List<V> valueToStore) {
        this.updateInternal(valueToStore);
    }

    public void updateInternal(List<V> values) {
        Preconditions.checkNotNull(values, (String)"List of values to add cannot be null.");
        this.clear();
        if (!values.isEmpty()) {
            try {
                this.writeCurrentKeyWithGroupAndNamespace();
                byte[] key = this.dataOutputView.getCopyOfBuffer();
                byte[] premerge = RocksDBListState.getPreMergedValue(values, this.elementSerializer, this.dataOutputView);
                this.backend.db.put(this.columnFamily, this.writeOptions, key, premerge);
            }
            catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
            }
        }
    }

    public void addAll(List<V> values) {
        Preconditions.checkNotNull(values, (String)"List of values to add cannot be null.");
        if (!values.isEmpty()) {
            try {
                this.writeCurrentKeyWithGroupAndNamespace();
                byte[] key = this.dataOutputView.getCopyOfBuffer();
                byte[] premerge = RocksDBListState.getPreMergedValue(values, this.elementSerializer, this.dataOutputView);
                this.backend.db.merge(this.columnFamily, this.writeOptions, key, premerge);
            }
            catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
            }
        }
    }

    @Override
    public void migrateSerializedValue(DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, TypeSerializer<List<V>> priorSerializer, TypeSerializer<List<V>> newSerializer) throws StateMigrationException {
        Preconditions.checkArgument((boolean)(priorSerializer instanceof ListSerializer));
        Preconditions.checkArgument((boolean)(newSerializer instanceof ListSerializer));
        TypeSerializer priorElementSerializer = ((ListSerializer)priorSerializer).getElementSerializer();
        TypeSerializer newElementSerializer = ((ListSerializer)newSerializer).getElementSerializer();
        try {
            while (serializedOldValueInput.available() > 0) {
                V element = RocksDBListState.deserializeNextElement(serializedOldValueInput, priorElementSerializer);
                newElementSerializer.serialize(element, (DataOutputView)serializedMigratedValueOutput);
                if (serializedOldValueInput.available() <= 0) continue;
                serializedMigratedValueOutput.write(44);
            }
        }
        catch (Exception e) {
            throw new StateMigrationException("Error while trying to migrate RocksDB list state.", (Throwable)e);
        }
    }

    private static <V> byte[] getPreMergedValue(List<V> values, TypeSerializer<V> elementSerializer, DataOutputSerializer keySerializationStream) throws IOException {
        keySerializationStream.clear();
        boolean first = true;
        for (V value : values) {
            Preconditions.checkNotNull(value, (String)"You cannot add null to a ListState.");
            if (first) {
                first = false;
            } else {
                keySerializationStream.write(44);
            }
            elementSerializer.serialize(value, (DataOutputView)keySerializationStream);
        }
        return keySerializationStream.getCopyOfBuffer();
    }

    static <E, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, RocksDBKeyedStateBackend<K> backend) {
        return (IS)new RocksDBListState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), (List)stateDesc.getDefaultValue(), ((ListStateDescriptor)stateDesc).getElementSerializer(), backend);
    }

    static class StateSnapshotTransformerWrapper<T>
    implements StateSnapshotTransformer<byte[]> {
        private final StateSnapshotTransformer<T> elementTransformer;
        private final TypeSerializer<T> elementSerializer;
        private final DataOutputSerializer out = new DataOutputSerializer(128);
        private final StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;

        StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
            this.elementTransformer = elementTransformer;
            this.elementSerializer = elementSerializer;
            this.transformStrategy = elementTransformer instanceof StateSnapshotTransformer.CollectionStateSnapshotTransformer ? ((StateSnapshotTransformer.CollectionStateSnapshotTransformer)elementTransformer).getFilterStrategy() : StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.TRANSFORM_ALL;
        }

        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] value) {
            Object next;
            if (value == null) {
                return null;
            }
            ArrayList<Object> result = new ArrayList<Object>();
            DataInputDeserializer in = new DataInputDeserializer(value);
            int prevPosition = 0;
            while ((next = RocksDBListState.deserializeNextElement(in, this.elementSerializer)) != null) {
                Object transformedElement = this.elementTransformer.filterOrTransform(next);
                if (transformedElement != null) {
                    if (this.transformStrategy == StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED) {
                        return Arrays.copyOfRange(value, prevPosition, value.length);
                    }
                    result.add(transformedElement);
                }
                prevPosition = in.getPosition();
            }
            try {
                return result.isEmpty() ? null : RocksDBListState.getPreMergedValue(result, this.elementSerializer, this.out);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to serialize transformed list", (Throwable)e);
            }
        }
    }
}

