package org.talend.sap.impl.stream;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sap.impl.model.stream.SAPHistoricStream;
import org.talend.sap.impl.model.stream.SAPStream;
import org.talend.sap.model.stream.ISAPHistoricStream;
import org.talend.sap.model.stream.ISAPStream;
import org.talend.sap.model.stream.ISAPStreamMetadata;
import org.talend.sap.stream.ISAPStreamReceiverService;

/* loaded from: input_file:org/talend/sap/impl/stream/SAPStreamReceiverService.class */
public class SAPStreamReceiverService implements ISAPStreamReceiverService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SAPStreamReceiverService.class);
    private final Producer<String, String> producer;
    private final Map<String, SAPStream> streams = Collections.synchronizedMap(new HashMap());
    private final Map<String, SAPHistoricStream> historicStreams = Collections.synchronizedMap(new HashMap());

    public SAPStreamReceiverService(Producer<String, String> producer) {
        this.producer = producer;
    }

    public ISAPHistoricStream cancel(String str, String str2) {
        if (this.streams.remove(str) == null) {
            LOGGER.warn("{}: No active data stream found to cancel", str);
            return null;
        }
        SAPHistoricStream sAPHistoricStream = this.historicStreams.get(str);
        sAPHistoricStream.setEndTime(new Date());
        sAPHistoricStream.setCanceled(true);
        sAPHistoricStream.setCancelReason(str2);
        return sAPHistoricStream;
    }

    public ISAPHistoricStream complete(String str) {
        if (this.streams.remove(str) == null) {
            LOGGER.error("{}: No active data stream found to complete", str);
            throw new RuntimeException("No active data stream found");
        }
        SAPHistoricStream sAPHistoricStream = this.historicStreams.get(str);
        sAPHistoricStream.setEndTime(new Date());
        return sAPHistoricStream;
    }

    public ISAPStream getById(String str) {
        return getById(str, 0L);
    }

    public ISAPStream getById(String str, long j) {
        SAPStream sAPStream = this.streams.get(str);
        if (sAPStream == null && j <= 0) {
            LOGGER.error("{}: No active data stream found", str);
            throw new RuntimeException("No active data stream found");
        }
        if (sAPStream != null) {
            return sAPStream;
        }
        try {
            TimeUnit.MILLISECONDS.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return getById(str, j - 1000);
    }

    public int getCount() {
        return this.streams.size();
    }

    public ISAPHistoricStream getHistoricById(String str) {
        return this.historicStreams.get(str);
    }

    public void increasePackageCount(String str) {
        this.historicStreams.get(str).increasePackageCount();
    }

    public void increaseRowCount(String str, int i, int i2) {
        this.historicStreams.get(str).increaseRowCount(i, i2);
    }

    public void setPackageCount(String str, int i) {
        SAPStream sAPStream = this.streams.get(str);
        if (sAPStream == null) {
            return;
        }
        sAPStream.setPackageCount(i);
    }

    public void start(ISAPStreamMetadata iSAPStreamMetadata) {
        SAPStream sAPStream = new SAPStream(this.producer);
        sAPStream.setId(iSAPStreamMetadata.getId());
        sAPStream.setKafkaStartOffsets(iSAPStreamMetadata.getKafkaStartOffsets());
        sAPStream.setKafkaTopicName(iSAPStreamMetadata.getKafkaTopicName());
        sAPStream.setPartnerHost(iSAPStreamMetadata.getPartnerHost());
        sAPStream.setThreadCount(iSAPStreamMetadata.getThreadCount());
        sAPStream.setTimestamp(iSAPStreamMetadata.getTimestamp());
        sAPStream.setType(iSAPStreamMetadata.getType());
        this.historicStreams.put(sAPStream.getId(), new SAPHistoricStream(iSAPStreamMetadata));
        this.streams.put(sAPStream.getId(), sAPStream);
        LOGGER.info("{}: Started {} data stream with ID '{}' using Kafka topic '{}'", new Object[]{sAPStream.getPartnerHost(), sAPStream.getType(), sAPStream.getId(), sAPStream.getKafkaTopicName()});
    }
}
