package org.talend.sap.impl.stream;

import com.sap.conn.jco.AbapClassException;
import com.sap.conn.jco.AbapException;
import com.sap.conn.jco.JCoFunction;
import com.sap.conn.jco.JCoTable;
import com.sap.conn.jco.server.JCoServerContext;
import com.sap.conn.jco.server.JCoServerFunctionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sap.contract.stream.CMT_TLND_STREAM;
import org.talend.sap.impl.SAPUtil;
import org.talend.sap.model.stream.ISAPHistoricStream;
import org.talend.sap.model.stream.ISAPStream;
import org.talend.sap.stream.ISAPStreamReceiver;
import org.talend.sap.stream.ISAPStreamReceiverService;

/* loaded from: input_file:org/talend/sap/impl/stream/SAPStreamReceiver.class */
public class SAPStreamReceiver implements ISAPStreamReceiver, JCoServerFunctionHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(SAPStreamReceiver.class);
    protected final ISAPStreamReceiverService streamReceiverService;
    protected final long timeout;

    public SAPStreamReceiver(ISAPStreamReceiverService iSAPStreamReceiverService, long j) {
        this.streamReceiverService = iSAPStreamReceiverService;
        this.timeout = j;
    }

    protected JCoTable getData(JCoFunction jCoFunction) {
        return jCoFunction.getImportParameterList().getTable("IT_DATA");
    }

    protected String getId(JCoFunction jCoFunction) {
        return jCoFunction.getImportParameterList().getString(CMT_TLND_STREAM.PARAM_ID);
    }

    public String getName() {
        return CMT_TLND_STREAM.NAME;
    }

    protected int getPackageNumber(JCoFunction jCoFunction) {
        return jCoFunction.getImportParameterList().getInt(CMT_TLND_STREAM.PARAM_PACKAGE_NUMBER);
    }

    protected JCoTable getReturn(JCoFunction jCoFunction) {
        return jCoFunction.getExportParameterList().getTable(CMT_TLND_STREAM.PARAM_RETURN);
    }

    public void handleRequest(JCoServerContext jCoServerContext, JCoFunction jCoFunction) throws AbapException, AbapClassException {
        String partnerHost = jCoServerContext.getConnectionAttributes().getPartnerHost();
        try {
            handleRequest(partnerHost, jCoFunction);
        } catch (Exception e) {
            String format = String.format("Error occurred while handling request: %s", e.getMessage());
            String[] strArr = new String[1];
            strArr[0] = e.getMessage() != null ? e.getMessage() : e.getClass().getName();
            reportError(partnerHost, jCoFunction, 103, strArr);
            this.streamReceiverService.cancel(getId(jCoFunction), format);
        }
    }

    protected void handleRequest(String str, JCoFunction jCoFunction) {
        String id = getId(jCoFunction);
        ISAPHistoricStream historicById = this.streamReceiverService.getHistoricById(id);
        if (historicById != null && historicById.isCanceled()) {
            LOGGER.info("{}: {}: Stream canceled, aborting SAP background job", str, id);
            reportError(str, jCoFunction, 101, "user");
            return;
        }
        ISAPStream byId = this.streamReceiverService.getById(id, this.timeout);
        int packageNumber = getPackageNumber(jCoFunction);
        LOGGER.info("{}: {}: Receiving package {}", new Object[]{str, id, Integer.valueOf(packageNumber)});
        int size = (packageNumber - 1) % byId.getKafkaStartOffsets().size();
        int i = 0;
        JCoTable data = getData(jCoFunction);
        for (int i2 = 0; i2 < data.getNumRows(); i2++) {
            try {
                data.setRow(i2);
                byId.send(size, data.getString(0));
                i++;
            } finally {
                data.clear();
                byId.flush();
            }
        }
        this.streamReceiverService.increasePackageCount(id);
        this.streamReceiverService.increaseRowCount(id, size, i);
        if (hasNoMoreData(jCoFunction)) {
            this.streamReceiverService.setPackageCount(id, packageNumber);
        }
        if (byId.getPackageCount() == historicById.getPackageCount()) {
            ISAPHistoricStream complete = this.streamReceiverService.complete(id);
            LOGGER.info("{}: {}: Stream completed in {}ms - package count: {}, row count: {}", new Object[]{str, id, Long.valueOf(complete.getDuration()), Integer.valueOf(complete.getPackageCount()), Long.valueOf(complete.getRowCount())});
            LOGGER.info("{}: {}: Stream completed in {}ms - package count: {}", new Object[]{str, id, Long.valueOf(complete.getDuration()), Integer.valueOf(complete.getPackageCount())});
            for (int i3 = 0; i3 < byId.getKafkaStartOffsets().size(); i3++) {
                LOGGER.info("{}: {}: Partition {} - row count: {}", new Object[]{str, id, Integer.valueOf(i3), Long.valueOf(complete.getRowCount(i3))});
            }
        }
    }

    protected boolean hasNoMoreData(JCoFunction jCoFunction) {
        return SAPUtil.isTrue(jCoFunction.getImportParameterList().getString(CMT_TLND_STREAM.PARAM_NO_MORE_DATA));
    }

    private void reportError(String str, JCoFunction jCoFunction, int i, String... strArr) {
        JCoTable jCoTable = getReturn(jCoFunction);
        jCoTable.appendRow();
        jCoTable.setValue("TYPE", "E");
        jCoTable.setValue("ID", "ZTALEND_STREAM");
        jCoTable.setValue("NUMBER", i);
        String id = getId(jCoFunction);
        for (int i2 = 0; i2 < strArr.length; i2++) {
            String str2 = strArr[i2];
            if (str2.length() > 50) {
                str2 = str2.substring(0, 50);
            }
            LOGGER.error("{}, {}: Reporting error to SAP: {}", new Object[]{str, id, strArr[i2]});
            jCoTable.setValue(String.format("MESSAGE_V%d", Integer.valueOf(i2 + 1)), str2);
        }
    }
}
