/*
 * Decompiled with CFR 0.152.
 */
package com.databricks.jdbc.api.impl.arrow.incubator;

import com.databricks.internal.apache.hc.core5.concurrent.FutureCallback;
import com.databricks.internal.apache.hc.core5.http.nio.AsyncRequestProducer;
import com.databricks.internal.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import com.databricks.internal.sdk.service.sql.BaseChunkInfo;
import com.databricks.jdbc.api.impl.arrow.AbstractArrowResultChunk;
import com.databricks.jdbc.api.impl.arrow.ChunkStatus;
import com.databricks.jdbc.api.impl.arrow.incubator.DownloadPhase;
import com.databricks.jdbc.api.impl.arrow.incubator.RetryConfig;
import com.databricks.jdbc.api.impl.arrow.incubator.StreamingResponseConsumer;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.common.DatabricksJdbcUrlParams;
import com.databricks.jdbc.common.util.DatabricksThriftUtil;
import com.databricks.jdbc.common.util.DecompressionUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import com.databricks.jdbc.exception.DatabricksParsingException;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
import com.databricks.jdbc.model.core.ExternalLink;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;

public class ArrowResultChunkV2
extends AbstractArrowResultChunk {
    private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ArrowResultChunkV2.class);
    private static final int N_THREADS_PROCESSING = 150;
    private static final ScheduledExecutorService retryScheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory(){
        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            Thread thread = new Thread(r, "Arrow-Retry-Scheduler-" + this.threadNumber.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    });
    private static final ExecutorService arrowDataProcessingExecutor = Executors.newFixedThreadPool(150, new ThreadFactory(){
        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            Thread thread = new Thread(r, "Arrow-Processing-Thread-" + this.threadNumber.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    });
    protected volatile long downloadStartTime;
    protected volatile long downloadEndTime;
    protected volatile long bytesDownloaded;
    protected byte[] downloadedBytes;

    private ArrowResultChunkV2(Builder builder) {
        super(builder.numRows, builder.rowOffset, builder.chunkIndex, builder.statementId, builder.status, builder.chunkLink, builder.expiryTime, builder.chunkReadyTimeoutSeconds);
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    protected void downloadData(IDatabricksHttpClient httpClient, CompressionCodec compressionCodec, double speedThreshold) {
        RetryConfig retryConfig = new RetryConfig.Builder().maxAttempts(3).baseDelayMs(1000L).maxDelayMs(5000L).build();
        this.retryDownload(httpClient, compressionCodec, retryConfig, 1);
    }

    @Override
    protected void handleFailure(Exception exception, ChunkStatus failedStatus) {
        this.errorMessage = String.format("Data parsing failed for chunk index [%d] and statement [%s]. Exception [%s]", this.chunkIndex, this.statementId, exception);
        LOGGER.error(this.errorMessage);
        this.setStatus(failedStatus);
        this.chunkReadyFuture.completeExceptionally(new DatabricksParsingException(this.errorMessage, (Throwable)exception, DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR));
    }

    private void retryDownload(IDatabricksHttpClient httpClient, CompressionCodec compressionCodec, RetryConfig retryConfig, int currentAttempt) {
        try {
            StreamingResponseConsumer consumer = new StreamingResponseConsumer(this);
            AsyncRequestBuilder requestBuilder = AsyncRequestBuilder.get(this.chunkLink.getExternalLink());
            if (this.chunkLink.getHttpHeaders() != null) {
                this.chunkLink.getHttpHeaders().forEach(requestBuilder::addHeader);
            }
            AsyncRequestProducer requestProducer = requestBuilder.build();
            httpClient.executeAsync(requestProducer, consumer, new ChunkDownloadCallback(httpClient, compressionCodec, retryConfig, currentAttempt));
        }
        catch (Exception e) {
            this.handleRetryableError(httpClient, compressionCodec, retryConfig, currentAttempt, e, DownloadPhase.DOWNLOAD_SETUP);
        }
    }

    private void processArrowData(CompressionCodec compressionCodec, String context) {
        try (ByteArrayInputStream compressedStream = new ByteArrayInputStream(this.downloadedBytes);
             InputStream uncompressedStream = DecompressionUtil.decompress(compressedStream, compressionCodec, context);){
            this.initializeData(uncompressedStream);
            this.downloadedBytes = null;
            this.chunkReadyFuture.complete(null);
        }
        catch (DatabricksSQLException | IOException e) {
            this.handleFailure(e, ChunkStatus.PROCESSING_FAILED);
        }
    }

    private void handleRetryableError(IDatabricksHttpClient httpClient, CompressionCodec compressionCodec, RetryConfig retryConfig, int currentAttempt, Exception e, DownloadPhase phase) {
        this.setStatus(ChunkStatus.DOWNLOAD_FAILED);
        LOGGER.info("Retrying, current attempt: " + currentAttempt + " for chunk " + this.chunkIndex + " for download phase " + phase.getDescription() + " with error: " + String.valueOf(e));
        if (currentAttempt < retryConfig.maxAttempts) {
            long delayMs = this.calculateBackoffDelay(currentAttempt, retryConfig);
            LOGGER.warn("Retryable error during %s for chunk %s (attempt %s/%s), retrying in %s ms. Error: %s", phase.getDescription(), this.chunkIndex, currentAttempt, retryConfig.maxAttempts, delayMs, e.getMessage());
            this.setStatus(ChunkStatus.DOWNLOAD_RETRY);
            retryScheduler.schedule(() -> this.retryDownload(httpClient, compressionCodec, retryConfig, currentAttempt + 1), delayMs, TimeUnit.MILLISECONDS);
        } else {
            this.handleFailure(e, ChunkStatus.DOWNLOAD_FAILED);
        }
    }

    private long calculateBackoffDelay(int attempt, RetryConfig retryConfig) {
        long delay = Math.min(retryConfig.maxDelayMs, retryConfig.baseDelayMs * (long)Math.pow(2.0, attempt - 1));
        return delay + ThreadLocalRandom.current().nextLong(100L);
    }

    private class ChunkDownloadCallback
    implements FutureCallback<byte[]> {
        private final IDatabricksHttpClient httpClient;
        private final CompressionCodec compressionCodec;
        private final RetryConfig retryConfig;
        private final int currentAttempt;

        public ChunkDownloadCallback(IDatabricksHttpClient httpClient, CompressionCodec compressionCodec, RetryConfig retryConfig, int currentAttempt) {
            this.httpClient = httpClient;
            this.compressionCodec = compressionCodec;
            this.retryConfig = retryConfig;
            this.currentAttempt = currentAttempt;
        }

        @Override
        public void completed(byte[] result) {
            ArrowResultChunkV2.this.downloadedBytes = result;
            ArrowResultChunkV2.this.setStatus(ChunkStatus.DOWNLOAD_SUCCEEDED);
            String context = String.format("Data decompression for chunk index [%d] and statement [%s]", ArrowResultChunkV2.this.getChunkIndex(), ArrowResultChunkV2.this.statementId);
            arrowDataProcessingExecutor.submit(() -> ArrowResultChunkV2.this.processArrowData(this.compressionCodec, context));
        }

        @Override
        public void failed(Exception e) {
            ArrowResultChunkV2.this.handleRetryableError(this.httpClient, this.compressionCodec, this.retryConfig, this.currentAttempt, e, DownloadPhase.DATA_DOWNLOAD);
        }

        @Override
        public void cancelled() {
            ArrowResultChunkV2.this.setStatus(ChunkStatus.CANCELLED);
            ArrowResultChunkV2.this.chunkReadyFuture.cancel(true);
        }
    }

    public static class Builder {
        private long chunkIndex;
        private long numRows;
        private long rowOffset;
        private ExternalLink chunkLink;
        private StatementId statementId;
        private Instant expiryTime;
        private ChunkStatus status;
        private int chunkReadyTimeoutSeconds = Integer.parseInt(DatabricksJdbcUrlParams.CHUNK_READY_TIMEOUT_SECONDS.getDefaultValue());

        public Builder withStatementId(StatementId statementId) {
            this.statementId = statementId;
            return this;
        }

        public Builder withChunkInfo(BaseChunkInfo baseChunkInfo) {
            this.chunkIndex = baseChunkInfo.getChunkIndex();
            this.numRows = baseChunkInfo.getRowCount();
            this.rowOffset = baseChunkInfo.getRowOffset();
            this.status = ChunkStatus.PENDING;
            return this;
        }

        public Builder withThriftChunkInfo(long chunkIndex, TSparkArrowResultLink chunkInfo) {
            this.chunkIndex = chunkIndex;
            this.numRows = chunkInfo.getRowCount();
            this.rowOffset = chunkInfo.getStartRowOffset();
            this.expiryTime = Instant.ofEpochMilli(chunkInfo.getExpiryTime());
            this.status = ChunkStatus.URL_FETCHED;
            this.chunkLink = DatabricksThriftUtil.createExternalLink(chunkInfo, chunkIndex);
            return this;
        }

        public Builder withChunkReadyTimeoutSeconds(int chunkReadyTimeoutSeconds) {
            this.chunkReadyTimeoutSeconds = chunkReadyTimeoutSeconds;
            return this;
        }

        public ArrowResultChunkV2 build() {
            return new ArrowResultChunkV2(this);
        }
    }
}

