/*
 * Decompiled with CFR 0.152.
 */
package org.tikv.raw;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.operation.iterator.RawScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
import shade.com.google.protobuf.ByteString;

public class RawKVClient
implements AutoCloseable {
    private final RegionStoreClient.RegionStoreClientBuilder clientBuilder;
    private final TiConfiguration conf;
    private final ExecutorCompletionService<Object> completionService;
    private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
    private static final int MAX_RETRY_LIMIT = 3;
    private static final int MAX_RAW_SCAN_LIMIT = 10240;
    private static final int RAW_BATCH_PUT_SIZE = 16384;
    private static final int RAW_BATCH_PAIR_COUNT = 512;
    private static final TiKVException ERR_RETRY_LIMIT_EXCEEDED = new GrpcException("retry is exhausted. retry exceeds 3attempts");
    private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED = new TiKVException("limit should be less than MAX_RAW_SCAN_LIMIT");

    public RawKVClient(TiConfiguration conf, RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
        Objects.requireNonNull(conf, "conf is null");
        Objects.requireNonNull(clientBuilder, "clientBuilder is null");
        this.conf = conf;
        this.clientBuilder = clientBuilder;
        ExecutorService executors = Executors.newFixedThreadPool(conf.getRawClientConcurrency());
        this.completionService = new ExecutorCompletionService(executors);
    }

    @Override
    public void close() {
    }

    public void put(ByteString key, ByteString value) {
        BackOffer backOffer = this.defaultBackOff();
        for (int i = 0; i < 3; ++i) {
            RegionStoreClient client = this.clientBuilder.build(key);
            try {
                client.rawPut(backOffer, key, value);
                return;
            }
            catch (TiKVException e) {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
                continue;
            }
        }
        throw ERR_RETRY_LIMIT_EXCEEDED;
    }

    public void batchPut(Map<ByteString, ByteString> kvPairs) {
        this.batchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs);
    }

    private void batchPut(BackOffer backOffer, List<ByteString> keys, List<ByteString> values) {
        Map<ByteString, ByteString> keysToValues = RawKVClient.mapKeysToValues(keys, values);
        this.batchPut(backOffer, keysToValues);
    }

    private void batchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs) {
        Map<TiRegion, List<ByteString>> groupKeys = this.groupKeysByRegion(kvPairs.keySet());
        ArrayList<Batch> batches = new ArrayList<Batch>();
        for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
            this.appendBatches(batches, entry.getKey(), entry.getValue(), entry.getValue().stream().map(kvPairs::get).collect(Collectors.toList()), 16384);
        }
        this.sendBatchPut(backOffer, batches);
    }

    public ByteString get(ByteString key) {
        BackOffer backOffer = this.defaultBackOff();
        for (int i = 0; i < 3; ++i) {
            RegionStoreClient client = this.clientBuilder.build(key);
            try {
                return client.rawGet(this.defaultBackOff(), key);
            }
            catch (TiKVException e) {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
                continue;
            }
        }
        throw ERR_RETRY_LIMIT_EXCEEDED;
    }

    public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
        Iterator<Kvrpcpb.KvPair> iterator = this.rawScanIterator(this.conf, this.clientBuilder, startKey, endKey, limit);
        ArrayList<Kvrpcpb.KvPair> result = new ArrayList<Kvrpcpb.KvPair>();
        iterator.forEachRemaining(result::add);
        return result;
    }

    public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
        Iterator<Kvrpcpb.KvPair> iterator = this.rawScanIterator(this.conf, this.clientBuilder, startKey, limit);
        ArrayList<Kvrpcpb.KvPair> result = new ArrayList<Kvrpcpb.KvPair>();
        iterator.forEachRemaining(result::add);
        return result;
    }

    public void delete(ByteString key) {
        BackOffer backOffer = this.defaultBackOff();
        while (true) {
            RegionStoreClient client = this.clientBuilder.build(key);
            try {
                client.rawDelete(this.defaultBackOff(), key);
                return;
            }
            catch (TiKVException e) {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
                continue;
            }
            break;
        }
    }

    private void appendBatches(List<Batch> batches, TiRegion region, List<ByteString> keys, List<ByteString> values, int limit) {
        ArrayList<ByteString> tmpKeys = new ArrayList<ByteString>();
        ArrayList<ByteString> tmpValues = new ArrayList<ByteString>();
        for (int i = 0; i < keys.size(); ++i) {
            if (i >= limit) {
                batches.add(new Batch(region, tmpKeys, tmpValues));
                tmpKeys.clear();
                tmpValues.clear();
            }
            tmpKeys.add(keys.get(i));
            tmpValues.add(values.get(i));
        }
        if (!tmpKeys.isEmpty()) {
            batches.add(new Batch(region, tmpKeys, tmpValues));
        }
    }

    private Map<TiRegion, List<ByteString>> groupKeysByRegion(Set<ByteString> keys) {
        HashMap<TiRegion, List<ByteString>> groups = new HashMap<TiRegion, List<ByteString>>();
        TiRegion lastRegion = null;
        for (ByteString key : keys) {
            if (lastRegion == null || !lastRegion.contains(key)) {
                lastRegion = this.clientBuilder.getRegionManager().getRegionByKey(key);
            }
            groups.computeIfAbsent(lastRegion, k -> new ArrayList()).add(key);
        }
        return groups;
    }

    private static Map<ByteString, ByteString> mapKeysToValues(List<ByteString> keys, List<ByteString> values) {
        HashMap<ByteString, ByteString> map = new HashMap<ByteString, ByteString>();
        for (int i = 0; i < keys.size(); ++i) {
            map.put(keys.get(i), values.get(i));
        }
        return map;
    }

    private void sendBatchPut(BackOffer backOffer, List<Batch> batches) {
        for (Batch batch : batches) {
            this.completionService.submit(() -> {
                RegionStoreClient client = this.clientBuilder.build(batch.region);
                ConcreteBackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
                ArrayList<Kvrpcpb.KvPair> kvPairs = new ArrayList<Kvrpcpb.KvPair>();
                for (int i = 0; i < batch.keys.size(); ++i) {
                    kvPairs.add(Kvrpcpb.KvPair.newBuilder().setKey((ByteString)batch.keys.get(i)).setValue((ByteString)batch.values.get(i)).build());
                }
                try {
                    client.rawBatchPut(singleBatchBackOffer, kvPairs);
                }
                catch (TiKVException e) {
                    singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
                    logger.warn("ReSplitting ranges for BatchPutRequest");
                    this.batchPut(singleBatchBackOffer, batch.keys, batch.values);
                }
                return null;
            });
        }
        try {
            for (int i = 0; i < batches.size(); ++i) {
                this.completionService.take().get(40000L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TiKVException("Current thread interrupted.", e);
        }
        catch (TimeoutException e) {
            throw new TiKVException("TimeOut Exceeded for current operation. ", e);
        }
        catch (ExecutionException e) {
            throw new TiKVException("Execution exception met.", e);
        }
    }

    private Iterator<Kvrpcpb.KvPair> rawScanIterator(TiConfiguration conf, RegionStoreClient.RegionStoreClientBuilder builder, ByteString startKey, ByteString endKey, int limit) {
        if (limit > 10240) {
            throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
        }
        return new RawScanIterator(conf, builder, startKey, endKey, limit);
    }

    private Iterator<Kvrpcpb.KvPair> rawScanIterator(TiConfiguration conf, RegionStoreClient.RegionStoreClientBuilder builder, ByteString startKey, int limit) {
        return this.rawScanIterator(conf, builder, startKey, ByteString.EMPTY, limit);
    }

    private BackOffer defaultBackOff() {
        return ConcreteBackOffer.newCustomBackOff(1000);
    }

    private static final class Batch {
        private final TiRegion region;
        private final List<ByteString> keys;
        private final List<ByteString> values;

        public Batch(TiRegion region, List<ByteString> keys, List<ByteString> values) {
            this.region = region;
            this.keys = keys;
            this.values = values;
        }
    }
}

