/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.metrics.lib.impl.hive;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
import org.apache.kylin.metrics.lib.Record;
import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord;
import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveProducer {
    private static final Logger logger = LoggerFactory.getLogger(HiveProducer.class);
    private static final int CACHE_MAX_SIZE = 10;
    private final HiveConf hiveConf;
    private final FileSystem hdfs;
    private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache;
    private final String CONTENT_FILE_NAME;

    public HiveProducer(Properties props) throws Exception {
        this(props, new HiveConf());
    }

    HiveProducer(Properties props, HiveConf hiveConfig) throws Exception {
        String hostName;
        this.hiveConf = hiveConfig;
        for (Map.Entry<Object, Object> e : props.entrySet()) {
            this.hiveConf.set(e.getKey().toString(), e.getValue().toString());
        }
        this.hdfs = FileSystem.get((Configuration)this.hiveConf);
        this.tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener((RemovalListener)new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>(){

            public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String, List<FieldSchema>>> notification) {
                logger.info("Field schema with table " + ActiveReservoirReporter.getTableName((Pair)((Pair)notification.getKey())) + " is removed due to " + notification.getCause());
            }
        }).maximumSize(10L).build((CacheLoader)new CacheLoader<Pair<String, String>, Pair<String, List<FieldSchema>>>(){

            public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName) throws Exception {
                HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(HiveProducer.this.hiveConf);
                String tableLocation = metaStoreClient.getTable((String)tableName.getFirst(), (String)tableName.getSecond()).getSd().getLocation();
                List fields = metaStoreClient.getFields((String)tableName.getFirst(), (String)tableName.getSecond());
                metaStoreClient.close();
                return new Pair((Object)tableLocation, (Object)fields);
            }
        });
        try {
            hostName = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            hostName = "UNKNOWN";
        }
        this.CONTENT_FILE_NAME = hostName + "-part-0000";
    }

    public void close() {
        this.tableFieldSchemaCache.cleanUp();
    }

    public void send(Record record) throws Exception {
        HiveProducerRecord hiveRecord = this.convertTo(record);
        this.write(hiveRecord.key(), Lists.newArrayList((Object[])new HiveProducerRecord[]{hiveRecord}));
    }

    public void send(List<Record> recordList) throws Exception {
        HashMap recordMap = Maps.newHashMap();
        for (Record record : recordList) {
            HiveProducerRecord hiveRecord = this.convertTo(record);
            if (recordMap.get(hiveRecord.key()) == null) {
                recordMap.put(hiveRecord.key(), Lists.newLinkedList());
            }
            ((List)recordMap.get(hiveRecord.key())).add(hiveRecord);
        }
        for (Map.Entry entry : recordMap.entrySet()) {
            this.write((HiveProducerRecord.RecordKey)entry.getKey(), (Iterable)entry.getValue());
        }
    }

    private void write(HiveProducerRecord.RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception {
        Path partitionContentPath;
        String tableLocation = (String)((Pair)this.tableFieldSchemaCache.get((Object)new Pair((Object)recordKey.database(), (Object)recordKey.table()))).getFirst();
        StringBuilder sb = new StringBuilder();
        sb.append(tableLocation);
        for (Map.Entry<String, String> e : recordKey.partition().entrySet()) {
            sb.append("/");
            sb.append(e.getKey().toLowerCase(Locale.ROOT));
            sb.append("=");
            sb.append(e.getValue());
        }
        Path partitionPath = new Path(sb.toString());
        if (!this.hdfs.exists(partitionPath)) {
            StringBuilder hql = new StringBuilder();
            hql.append("ALTER TABLE ");
            hql.append(recordKey.database() + "." + recordKey.table());
            hql.append(" ADD IF NOT EXISTS PARTITION (");
            boolean ifFirst = true;
            for (Map.Entry entry : recordKey.partition().entrySet()) {
                if (ifFirst) {
                    ifFirst = false;
                } else {
                    hql.append(",");
                }
                hql.append(((String)entry.getKey()).toLowerCase(Locale.ROOT));
                hql.append("='" + (String)entry.getValue() + "'");
            }
            hql.append(")");
            Driver driver = new Driver(this.hiveConf);
            SessionState.start((SessionState)new CliSessionState(this.hiveConf));
            driver.run(hql.toString());
            driver.close();
        }
        if (!this.hdfs.exists(partitionContentPath = new Path(partitionPath, this.CONTENT_FILE_NAME))) {
            int nRetry = 0;
            while (!this.hdfs.createNewFile(partitionContentPath) && nRetry++ < 5 && !this.hdfs.exists(partitionContentPath)) {
                Thread.sleep(500L * (long)nRetry);
            }
            if (!this.hdfs.exists(partitionContentPath)) {
                throw new RuntimeException("Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
            }
        }
        try {
            FSDataOutputStream fout = this.hdfs.append(partitionContentPath);
            Object object = null;
            try {
                for (HiveProducerRecord elem : recordItr) {
                    fout.writeBytes(elem.valueToString() + "\n");
                }
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
            finally {
                if (fout != null) {
                    if (object != null) {
                        try {
                            fout.close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)object).addSuppressed(throwable);
                        }
                    } else {
                        fout.close();
                    }
                }
            }
        }
        catch (IOException e) {
            System.out.println("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
            logger.error("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
        }
    }

    private HiveProducerRecord convertTo(Record record) throws Exception {
        Map rawValue = record.getValueRaw();
        HashMap partitionKVs = Maps.newHashMapWithExpectedSize((int)1);
        partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString());
        return this.parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()), partitionKVs, rawValue);
    }

    public HiveProducerRecord parseToHiveProducerRecord(String tableName, Map<String, String> partitionKVs, Map<String, Object> rawValue) throws Exception {
        Pair tableNameSplits = ActiveReservoirReporter.getTableNameSplits((String)tableName);
        List fields = (List)((Pair)this.tableFieldSchemaCache.get((Object)tableNameSplits)).getSecond();
        ArrayList columnValues = Lists.newArrayListWithExpectedSize((int)fields.size());
        for (FieldSchema fieldSchema : fields) {
            columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
        }
        return new HiveProducerRecord((String)tableNameSplits.getFirst(), (String)tableNameSplits.getSecond(), partitionKVs, columnValues);
    }
}

