/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.accumulators;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class AccumulatorErrorITCase
extends TestLogger {
    private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
    private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
    private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(3).build());

    public static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"12m"));
        return config;
    }

    @Test
    public void testFaultyAccumulator() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.generateSequence(0L, 10000L).map((MapFunction)new FaultyAccumulatorUsingMapper()).output((OutputFormat)new DiscardingOutputFormat());
        AccumulatorErrorITCase.assertAccumulatorsShouldFail(env.execute());
    }

    @Test
    public void testInvalidTypeAccumulator() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.generateSequence(0L, 10000L).map((MapFunction)new IncompatibleAccumulatorTypesMapper()).map((MapFunction)new IncompatibleAccumulatorTypesMapper2()).output((OutputFormat)new DiscardingOutputFormat());
        try {
            env.execute();
            Assert.fail((String)"Should have failed.");
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, UnsupportedOperationException.class).isPresent());
        }
    }

    @Test
    public void testFaultyMergeAccumulator() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.generateSequence(0L, 10000L).map((MapFunction)new FaultyMergeAccumulatorUsingMapper()).output((OutputFormat)new DiscardingOutputFormat());
        AccumulatorErrorITCase.assertAccumulatorsShouldFail(env.execute());
    }

    private static void assertAccumulatorsShouldFail(JobExecutionResult result) {
        try {
            result.getAllAccumulatorResults();
            Assert.fail((String)"Should have failed");
        }
        catch (Exception ex) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)ex, CustomException.class).isPresent());
        }
    }

    private static class CustomException
    extends RuntimeException {
        private static final long serialVersionUID = 42L;

        private CustomException() {
        }
    }

    private static class FaultyMergeAccumulator
    extends LongCounter {
        private static final long serialVersionUID = 42L;

        private FaultyMergeAccumulator() {
        }

        public void merge(Accumulator<Long, Long> other) {
            throw new CustomException();
        }

        public LongCounter clone() {
            return new FaultyMergeAccumulator();
        }
    }

    private static class FaultyMergeAccumulatorUsingMapper
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private FaultyMergeAccumulatorUsingMapper() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.getRuntimeContext().addAccumulator(AccumulatorErrorITCase.FAULTY_MERGE_ACCUMULATOR, (Accumulator)new FaultyMergeAccumulator());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }

    private static class IncompatibleAccumulatorTypesMapper2
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private IncompatibleAccumulatorTypesMapper2() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.getRuntimeContext().addAccumulator(AccumulatorErrorITCase.INCOMPATIBLE_ACCUMULATORS_NAME, (Accumulator)new DoubleCounter());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }

    private static class IncompatibleAccumulatorTypesMapper
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private IncompatibleAccumulatorTypesMapper() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.getRuntimeContext().addAccumulator(AccumulatorErrorITCase.INCOMPATIBLE_ACCUMULATORS_NAME, (Accumulator)new LongCounter());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }

    private static class FaultyCloneAccumulator
    extends LongCounter {
        private static final long serialVersionUID = 42L;

        private FaultyCloneAccumulator() {
        }

        public LongCounter clone() {
            throw new CustomException();
        }
    }

    private static class FaultyAccumulatorUsingMapper
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private FaultyAccumulatorUsingMapper() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.getRuntimeContext().addAccumulator(AccumulatorErrorITCase.FAULTY_CLONE_ACCUMULATOR, (Accumulator)new FaultyCloneAccumulator());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }
}

