package org.talend.sap.impl.service.client;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sap.IGenericDataAccess;
import org.talend.sap.impl.model.stream.SAPHistoricStream;
import org.talend.sap.impl.model.stream.dto.SAPTableDataRequestDTO;
import org.talend.sap.impl.model.stream.dto.SAPTableJoinDataDTO;
import org.talend.sap.impl.model.stream.dto.SAPTableJoinDataRequestDTO;
import org.talend.sap.model.stream.ISAPHistoricStream;
import org.talend.sap.model.stream.ISAPStreamMetadata;

/* loaded from: input_file:org/talend/sap/impl/service/client/SAPStreamServiceClient.class */
public class SAPStreamServiceClient implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SAPStreamServiceClient.class);
    private static final String URI_METRICS = "/api/stream/historic/{id}";
    private static final String URI_STREAM = "/api/stream/{id}";
    private static final String URI_TABLE_DATA = "/api/stream/{connectionId}/table";
    private static final String URI_TABLE_JOIN_DATA = "/api/stream/{connectionId}/table-join";
    private static final int RETRY_COUNT = 3;
    private static final int RETRY_TIMEOUT = 15;
    private static final long MAX_TIME_TO_WAIT_FOR_MORE_DATA = 60000;
    private final String baseUri;
    private final int retryCount;
    private final int retryTimeout;
    private final long maxTimeToWaitForMoreData;
    private final Gson gson;
    private final CloseableHttpClient httpClient;

    public SAPStreamServiceClient(String str) {
        this(str, RETRY_COUNT, RETRY_TIMEOUT, MAX_TIME_TO_WAIT_FOR_MORE_DATA);
    }

    public SAPStreamServiceClient(String str, int i, int i2, long j) {
        this.baseUri = str;
        this.retryCount = i;
        this.retryTimeout = i2;
        this.maxTimeToWaitForMoreData = j;
        this.gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").create();
        this.httpClient = HttpClients.createDefault();
    }

    public void cancel(String str) {
        LOGGER.info("Canceling data stream with ID '{}'..", str);
        executeDelete(URI_STREAM.replace("{id}", str));
        LOGGER.info("Data stream with ID '{}' canceled", str);
    }

    public ISAPHistoricStream getHistoricById(String str) {
        if (str == null) {
            throw new IllegalArgumentException("Data stream ID must not be null");
        }
        if (str.isEmpty()) {
            throw new IllegalArgumentException("Data stream ID must not be empty");
        }
        return (ISAPHistoricStream) executeGet(URI_METRICS.replace("{id}", str), SAPHistoricStream.class);
    }

    public IGenericDataAccess requestTableData(String str, SAPTableDataRequestDTO sAPTableDataRequestDTO) throws ClientProtocolException, IOException {
        if (sAPTableDataRequestDTO == null) {
            throw new IllegalArgumentException("Table data request must not be null");
        }
        String replace = URI_TABLE_DATA.replace("{connectionId}", str);
        LOGGER.info("Requesting table data..");
        SAPTableJoinDataDTO sAPTableJoinDataDTO = (SAPTableJoinDataDTO) executePost(replace, sAPTableDataRequestDTO, SAPTableJoinDataDTO.class, this.retryCount);
        LOGGER.info("Consuming data stream with ID '{}'", sAPTableJoinDataDTO.getId());
        Properties properties = new Properties();
        for (Map.Entry<String, String> entry : sAPTableJoinDataDTO.getKafkaProperties().entrySet()) {
            properties.setProperty(entry.getKey(), entry.getValue());
        }
        Consumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        assignAndSeek(kafkaConsumer, sAPTableJoinDataDTO);
        return new SAPStreamData(this, sAPTableJoinDataDTO.getFields(), kafkaConsumer, sAPTableJoinDataDTO.getId(), this.maxTimeToWaitForMoreData);
    }

    public IGenericDataAccess requestTableJoinData(String str, SAPTableJoinDataRequestDTO sAPTableJoinDataRequestDTO) throws ClientProtocolException, IOException {
        if (sAPTableJoinDataRequestDTO == null) {
            throw new IllegalArgumentException("Table join data request must not be null");
        }
        String replace = URI_TABLE_JOIN_DATA.replace("{connectionId}", str);
        LOGGER.info("Requesting table join data..");
        SAPTableJoinDataDTO sAPTableJoinDataDTO = (SAPTableJoinDataDTO) executePost(replace, sAPTableJoinDataRequestDTO, SAPTableJoinDataDTO.class, this.retryCount);
        LOGGER.info("Consuming data stream with ID '{}'", sAPTableJoinDataDTO.getId());
        Properties properties = new Properties();
        for (Map.Entry<String, String> entry : sAPTableJoinDataDTO.getKafkaProperties().entrySet()) {
            properties.setProperty(entry.getKey(), entry.getValue());
        }
        Consumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        assignAndSeek(kafkaConsumer, sAPTableJoinDataDTO);
        return new SAPStreamData(this, sAPTableJoinDataDTO.getFields(), kafkaConsumer, sAPTableJoinDataDTO.getId(), this.maxTimeToWaitForMoreData);
    }

    protected void assignAndSeek(Consumer<String, String> consumer, ISAPStreamMetadata iSAPStreamMetadata) {
        int size = iSAPStreamMetadata.getKafkaStartOffsets().size();
        ArrayList arrayList = new ArrayList(size);
        for (int i = 0; i < size; i++) {
            arrayList.add(new TopicPartition(iSAPStreamMetadata.getKafkaTopicName(), i));
        }
        LOGGER.info("Consuming data of topic '{}' from offsets {}", iSAPStreamMetadata.getKafkaTopicName(), iSAPStreamMetadata.getKafkaStartOffsets());
        consumer.assign(arrayList);
        for (int i2 = 0; i2 < size; i2++) {
            consumer.seek((TopicPartition) arrayList.get(i2), ((Long) iSAPStreamMetadata.getKafkaStartOffsets().get(i2)).longValue());
        }
    }

    protected void executeDelete(String str) {
        try {
            CloseableHttpResponse execute = this.httpClient.execute(new HttpDelete(new StringBuilder(this.baseUri.length() + str.length()).append(this.baseUri).append(str).toString()));
            Throwable th = null;
            if (execute != null) {
                if (0 != 0) {
                    try {
                        execute.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    execute.close();
                }
            }
        } catch (IOException e) {
            LOGGER.error("DELETE request on URI '{}{}' failed: {}", new Object[]{this.baseUri, str, e.getMessage()});
            throw new RuntimeException("DELETE request failed", e);
        }
    }

    protected <T> T executeGet(String str, Class<T> cls) {
        HttpGet httpGet = new HttpGet(new StringBuilder(this.baseUri.length() + str.length()).append(this.baseUri).append(str).toString());
        httpGet.setHeader("Content-Type", "application/json");
        try {
            CloseableHttpResponse execute = this.httpClient.execute(httpGet);
            Throwable th = null;
            try {
                try {
                    T t = (T) read(execute, cls);
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return t;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOGGER.error("GET request on URI '{}{}' failed: {}", new Object[]{this.baseUri, str, e.getMessage()});
            throw new RuntimeException("GET request failed", e);
        }
    }

    /* JADX WARN: Finally extract failed */
    protected <T> T executePost(String str, Object obj, Class<T> cls, int i) {
        String json = this.gson.toJson(obj);
        HttpPost httpPost = new HttpPost(new StringBuilder(this.baseUri.length() + str.length()).append(this.baseUri).append(str).toString());
        httpPost.setHeader("Content-Type", "application/json");
        httpPost.setEntity(new StringEntity(json, "UTF-8"));
        try {
            CloseableHttpResponse execute = this.httpClient.execute(httpPost);
            Throwable th = null;
            try {
                if (execute.getStatusLine().getStatusCode() != 503 || i < 1) {
                    T t = (T) read(execute, cls);
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return t;
                }
                if (execute != null) {
                    if (0 != 0) {
                        try {
                            execute.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        execute.close();
                    }
                }
                try {
                    LOGGER.info("RFC server currently busy - next request attempt in {}s", Integer.valueOf(this.retryTimeout));
                    TimeUnit.SECONDS.sleep(this.retryTimeout);
                    return (T) executePost(str, obj, cls, i - 1);
                } catch (InterruptedException e) {
                    LOGGER.error("POST request retry failed: {}", e.getMessage());
                    throw new RuntimeException("POST request retry failed", e);
                }
            } catch (Throwable th4) {
                if (execute != null) {
                    if (0 != 0) {
                        try {
                            execute.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        execute.close();
                    }
                }
                throw th4;
            }
        } catch (IOException e2) {
            LOGGER.error("POST request on URI '{}{}' failed: {}", new Object[]{this.baseUri, str, e2.getMessage()});
            throw new RuntimeException("POST request failed", e2);
        }
        LOGGER.error("POST request on URI '{}{}' failed: {}", new Object[]{this.baseUri, str, e2.getMessage()});
        throw new RuntimeException("POST request failed", e2);
    }

    protected <T> T read(CloseableHttpResponse closeableHttpResponse, Class<T> cls) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        InputStream content = closeableHttpResponse.getEntity().getContent();
        byte[] bArr = new byte[1024];
        while (true) {
            int read = content.read(bArr);
            if (read == -1) {
                break;
            }
            byteArrayOutputStream.write(bArr, 0, read);
        }
        int statusCode = closeableHttpResponse.getStatusLine().getStatusCode();
        if (statusCode != 200) {
            String byteArrayOutputStream2 = byteArrayOutputStream.toString("UTF-8");
            LOGGER.error("Request failed with status code {}: {}", Integer.valueOf(statusCode), byteArrayOutputStream2);
            throw new RuntimeException(String.format("Request failed with status code %d: %s", Integer.valueOf(statusCode), byteArrayOutputStream2));
        }
        try {
            return (T) this.gson.fromJson(byteArrayOutputStream.toString("UTF-8"), cls);
        } catch (IOException e) {
            LOGGER.error("JSON response could not be deserialized: {}", e.getMessage());
            throw e;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            this.httpClient.close();
        } catch (IOException e) {
            LOGGER.error("HTTP client could not be closed correctly", e);
        }
    }
}
