/*
 * Decompiled with CFR 0.152.
 */
package com.github.xingshuangs.iot.protocol.rtp.service;

import com.github.xingshuangs.iot.exceptions.SocketRuntimeException;
import com.github.xingshuangs.iot.net.client.UdpClientBasic;
import com.github.xingshuangs.iot.protocol.rtcp.service.RtcpUdpClient;
import com.github.xingshuangs.iot.protocol.rtp.model.RtpPackage;
import com.github.xingshuangs.iot.protocol.rtp.service.IPayloadParser;
import com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RtpUdpClient
extends UdpClientBasic
implements IRtspDataStream {
    private static final Logger log = LoggerFactory.getLogger(RtpUdpClient.class);
    private boolean terminal = false;
    private Consumer<byte[]> commCallback;
    private IPayloadParser iPayloadParser;
    private CompletableFuture<Void> future;
    private RtcpUdpClient rtcpUdpClient;
    private final ExecutorService executorService;

    public void setCommCallback(Consumer<byte[]> commCallback) {
        this.commCallback = commCallback;
    }

    public void setRtcpUdpClient(RtcpUdpClient rtcpUdpClient) {
        this.rtcpUdpClient = rtcpUdpClient;
    }

    public RtpUdpClient(IPayloadParser iPayloadParser) {
        this.iPayloadParser = iPayloadParser;
        this.executorService = Executors.newSingleThreadExecutor();
    }

    public RtpUdpClient(String ip, int port) {
        super(ip, port);
        this.executorService = Executors.newSingleThreadExecutor();
    }

    @Override
    public CompletableFuture<Void> getFuture() {
        return this.future;
    }

    @Override
    public void close() {
        this.executorService.shutdown();
        this.terminal = true;
        super.close();
    }

    @Override
    public void triggerReceive() {
        this.future = CompletableFuture.runAsync(this::waitForReceiveData, this.executorService);
    }

    @Override
    public void sendData(byte[] data) {
        if (this.commCallback != null) {
            this.commCallback.accept(data);
        }
        this.write(data);
    }

    private void waitForReceiveData() {
        log.debug("[RTSP + UDP] RTP enable asynchronous data receiving thread, remote IP[/{}:{}]", (Object)this.serverAddress.getAddress().getHostAddress(), (Object)this.serverAddress.getPort());
        while (!this.terminal) {
            try {
                RtpPackage rtp;
                byte[] data = this.read();
                if (this.commCallback != null) {
                    this.commCallback.accept(data);
                }
                if (data.length > (rtp = RtpPackage.fromBytes(data)).byteArrayLength()) {
                    log.warn("rtp data has unprocessed parts, number of unprocessed bytes [{}]", (Object)(data.length - rtp.byteArrayLength()));
                }
                if (this.rtcpUdpClient != null) {
                    this.rtcpUdpClient.processRtpPackage(rtp);
                }
                this.iPayloadParser.processPackage(rtp);
            }
            catch (SocketRuntimeException e) {
                log.error(e.getMessage());
                this.terminal = true;
                break;
            }
            catch (Exception e) {
                if (this.terminal) continue;
                log.error(e.getMessage(), (Throwable)e);
            }
        }
        log.debug("[RTSP + UDP] RTP disables asynchronous data receiving thread, remote IP[/{}:{}]", (Object)this.serverAddress.getAddress().getHostAddress(), (Object)this.serverAddress.getPort());
    }
}

