/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;

public class SinkV2MetricsITCase
extends TestLogger {
    private static final String TEST_SINK_NAME = "MetricTestSink";
    private static final String DEFAULT_WRITER_NAME = "Writer";
    private static final String DEFAULT_COMMITTER_NAME = "Committer";
    private static final int DEFAULT_PARALLELISM = 4;
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setConfiguration(this.reporter.addToConfiguration(new Configuration())).build());

    @Test
    public void testMetrics() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int numSplits = Math.max(1, env.getParallelism() - 2);
        int numRecordsPerSplit = 10;
        SharedReference beforeBarrier = this.sharedObjects.add((Object)new CyclicBarrier(numSplits + 1));
        SharedReference afterBarrier = this.sharedObjects.add((Object)new CyclicBarrier(numSplits + 1));
        int stopAtRecord1 = 4;
        int stopAtRecord2 = numRecordsPerSplit - 1;
        env.fromSequence(0L, (long)(numSplits - 1)).flatMap((FlatMapFunction & Serializable)(split, collector) -> LongStream.range(0L, numRecordsPerSplit).forEach(arg_0 -> ((Collector)collector).collect(arg_0))).returns((TypeInformation)BasicTypeInfo.LONG_TYPE_INFO).map((MapFunction & Serializable)i -> {
            if (i % (long)numRecordsPerSplit == (long)stopAtRecord1 || i % (long)numRecordsPerSplit == (long)stopAtRecord2) {
                ((CyclicBarrier)beforeBarrier.get()).await();
                ((CyclicBarrier)afterBarrier.get()).await();
            }
            return i;
        }).sinkTo((Sink)TestSinkV2.newBuilder().setWriter((TestSinkV2.DefaultSinkWriter)new MetricWriter()).build()).name(TEST_SINK_NAME);
        JobClient jobClient = env.executeAsync();
        JobID jobId = jobClient.getJobID();
        ((CyclicBarrier)beforeBarrier.get()).await();
        this.assertSinkMetrics(jobId, stopAtRecord1, numSplits);
        ((CyclicBarrier)afterBarrier.get()).await();
        ((CyclicBarrier)beforeBarrier.get()).await();
        this.assertSinkMetrics(jobId, stopAtRecord2, numSplits);
        ((CyclicBarrier)afterBarrier.get()).await();
        jobClient.getJobExecutionResult().get();
    }

    @Test
    public void testCommitterMetrics() throws Exception {
        int numCommittables = 7;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SharedReference beforeLatch = this.sharedObjects.add((Object)new CountDownLatch(7));
        SharedReference afterLatch = this.sharedObjects.add((Object)new CountDownLatch(1));
        env.fromSequence(0L, 6L).returns((TypeInformation)BasicTypeInfo.LONG_TYPE_INFO).sinkTo((Sink)TestSinkV2.newBuilder().setCommitter((TestSinkV2.DefaultCommitter)new MetricCommitter((SharedReference<CountDownLatch>)beforeLatch, (SharedReference<CountDownLatch>)afterLatch)).setCommittableSerializer((SimpleVersionedSerializer)TestSinkV2.StringSerializer.INSTANCE).build()).name(TEST_SINK_NAME);
        JobClient jobClient = env.executeAsync();
        JobID jobId = jobClient.getJobID();
        ((CountDownLatch)beforeLatch.get()).await();
        this.assertSinkCommitterMetrics(jobId, (Map<String, Long>)ImmutableMap.of((Object)"alreadyCommittedCommittables", (Object)0L, (Object)"failedCommittables", (Object)0L, (Object)"retriedCommittables", (Object)7L, (Object)"successfulCommittables", (Object)0L, (Object)"totalCommittables", (Object)7L, (Object)"pendingCommittables", (Object)7L));
        ((CountDownLatch)afterLatch.get()).countDown();
        jobClient.getJobExecutionResult().get();
        this.assertSinkCommitterMetrics(jobId, (Map<String, Long>)ImmutableMap.of((Object)"alreadyCommittedCommittables", (Object)1L, (Object)"failedCommittables", (Object)2L, (Object)"retriedCommittables", (Object)10L, (Object)"successfulCommittables", (Object)4L, (Object)"totalCommittables", (Object)7L, (Object)"pendingCommittables", (Object)0L));
    }

    private void assertSinkMetrics(JobID jobId, long processedRecordsPerSubtask, int numSplits) {
        List groups = this.reporter.findOperatorMetricGroups(jobId, "MetricTestSink: Writer");
        int subtaskWithMetrics = 0;
        for (OperatorMetricGroup group : groups) {
            Map metrics = this.reporter.getMetricsByGroup((MetricGroup)group);
            if (group.getIOMetricGroup() == null || group.getIOMetricGroup().getNumRecordsOutCounter() == null || group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0L) continue;
            ++subtaskWithMetrics;
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numRecordsOut"))).isEqualTo((Object)processedRecordsPerSubtask);
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numBytesOut"))).isEqualTo((Object)(processedRecordsPerSubtask * 10L));
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numRecordsOutErrors"))).isEqualTo((Object)((processedRecordsPerSubtask + 1L) / 2L));
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numRecordsSend"))).isEqualTo((Object)processedRecordsPerSubtask);
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numBytesSend"))).isEqualTo((Object)(processedRecordsPerSubtask * 10L));
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numRecordsSendErrors"))).isEqualTo((Object)((processedRecordsPerSubtask + 1L) / 2L));
            MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("currentSendTime"))).isEqualTo((Object)((processedRecordsPerSubtask - 1L) * 100L));
        }
        MatcherAssert.assertThat((Object)subtaskWithMetrics, (Matcher)CoreMatchers.equalTo((Object)numSplits));
    }

    private void assertSinkCommitterMetrics(JobID jobId, Map<String, Long> expected) {
        List groups = this.reporter.findOperatorMetricGroups(jobId, "MetricTestSink: Committer");
        HashMap<String, Long> aggregated = new HashMap<String, Long>(6);
        for (OperatorMetricGroup group : groups) {
            Map metrics = this.reporter.getMetricsByGroup((MetricGroup)group);
            for (String metricName : Arrays.asList("successfulCommittables", "alreadyCommittedCommittables", "retriedCommittables", "failedCommittables", "totalCommittables")) {
                Counter counter = (Counter)metrics.get(metricName);
                if (counter == null) continue;
                aggregated.merge(metricName, counter.getCount(), Long::sum);
            }
            Gauge pendingMetrics = (Gauge)metrics.get("pendingCommittables");
            if (pendingMetrics == null || pendingMetrics.getValue() == null) continue;
            aggregated.merge("pendingCommittables", ((Integer)pendingMetrics.getValue()).longValue(), Long::sum);
        }
        expected.entrySet().forEach(e -> MatcherAssert.assertThat((Object)aggregated, (Matcher)Matchers.hasEntry(e.getKey(), e.getValue())));
    }

    private static class MetricCommitter
    extends TestSinkV2.DefaultCommitter {
        private int counter = 0;
        private SharedReference<CountDownLatch> beforeLatch;
        private SharedReference<CountDownLatch> afterLatch;

        MetricCommitter(SharedReference<CountDownLatch> beforeLatch, SharedReference<CountDownLatch> afterLatch) {
            this.beforeLatch = beforeLatch;
            this.afterLatch = afterLatch;
            this.counter = 0;
        }

        public void commit(Collection<Committer.CommitRequest<String>> committables) {
            if (this.counter == 0) {
                System.err.println("Committables arrived " + Thread.currentThread().getName() + " " + committables.stream().map(c -> (String)c.getCommittable()).collect(Collectors.toList()));
                committables.forEach(c -> c.retryLater());
            } else {
                if (this.counter == 1) {
                    try {
                        committables.forEach(any -> ((CountDownLatch)this.beforeLatch.get()).countDown());
                        ((CountDownLatch)this.afterLatch.get()).await();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
                committables.forEach(c -> {
                    switch (((String)c.getCommittable()).charAt(1)) {
                        case '0': {
                            c.signalAlreadyCommitted();
                            break;
                        }
                        case '1': 
                        case '2': {
                            c.signalFailedWithKnownReason((Throwable)new RuntimeException());
                            break;
                        }
                        case '3': {
                            if (this.counter != 1) break;
                            c.retryLater();
                            break;
                        }
                        case '4': 
                        case '5': {
                            c.updateAndRetryLater((Object)("Retry-" + (String)c.getCommittable()));
                        }
                    }
                });
            }
            ++this.counter;
        }
    }

    private static class MetricWriter
    extends TestSinkV2.DefaultSinkWriter<Long> {
        static final long BASE_SEND_TIME = 100L;
        static final long RECORD_SIZE_IN_BYTES = 10L;
        private SinkWriterMetricGroup metricGroup;
        private long sendTime;

        private MetricWriter() {
        }

        public void init(Sink.InitContext context) {
            this.metricGroup = context.metricGroup();
            this.metricGroup.setCurrentSendTimeGauge(() -> this.sendTime);
        }

        public void write(Long element, SinkWriter.Context context) {
            super.write((Object)element, context);
            this.sendTime = element * 100L;
            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
            if (element % 2L == 0L) {
                this.metricGroup.getNumRecordsOutErrorsCounter().inc();
            }
            this.metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(10L);
        }
    }
}

