/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel.io;

import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.data.ArrowRecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.io.ArrowHttpOutputStream;
import com.aliyun.odps.tunnel.io.CompressOption;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.WritableByteChannel;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.xerial.snappy.SnappyFramedOutputStream;

public class ArrowTunnelRecordWriter
implements ArrowRecordWriter {
    private TableTunnel.UploadSession tableSession;
    private ArrowHttpOutputStream outputStream;
    private Connection connection;
    private boolean isClosed;
    private CompressOption compress;

    public ArrowTunnelRecordWriter(TableTunnel.UploadSession tableSession, Connection connection, CompressOption option) {
        this.tableSession = tableSession;
        this.connection = connection;
        this.isClosed = false;
        this.compress = option;
    }

    @Override
    public void write(VectorSchemaRoot root) throws IOException {
        if (this.isClosed) {
            throw new IOException("Arrow writer is closed");
        }
        if (this.outputStream == null) {
            OutputStream wr = this.connection.getOutputStream();
            if (this.compress != null && !this.compress.algorithm.equals((Object)CompressOption.CompressAlgorithm.ODPS_RAW)) {
                if (this.compress.algorithm.equals((Object)CompressOption.CompressAlgorithm.ODPS_ZLIB)) {
                    Deflater def = new Deflater();
                    def.setLevel(this.compress.level);
                    def.setStrategy(this.compress.strategy);
                    wr = new DeflaterOutputStream(wr, def);
                } else if (this.compress.algorithm.equals((Object)CompressOption.CompressAlgorithm.ODPS_SNAPPY)) {
                    wr = new SnappyFramedOutputStream(wr);
                } else {
                    throw new IOException("invalid compression option.");
                }
            }
            this.outputStream = new ArrowHttpOutputStream(wr);
        }
        if (root.getRowCount() == 0) {
            return;
        }
        WriteChannel writeChannel = new WriteChannel((WritableByteChannel)this.outputStream);
        VectorUnloader loader = new VectorUnloader(root);
        try (ArrowRecordBatch recordBatch = loader.getRecordBatch();){
            MessageSerializer.serialize((WriteChannel)writeChannel, (ArrowRecordBatch)recordBatch);
        }
        catch (IOException e) {
            Response response = this.connection.getResponse();
            if (response != null && !response.isOK()) {
                TunnelException exception = new TunnelException(response.getHeader("x-odps-request-id"), this.connection.getInputStream(), response.getStatus());
                throw new IOException(exception.getMessage(), exception);
            }
            throw new IOException("ArrowHttpOutputStream Serialize Exception", e);
        }
    }

    @Override
    public void close() throws IOException {
        if (!this.isClosed) {
            try {
                Response response;
                if (this.outputStream != null) {
                    this.outputStream.close();
                }
                if (!(response = this.connection.getResponse()).isOK()) {
                    TunnelException exception = new TunnelException(response.getHeader("x-odps-request-id"), this.connection.getInputStream(), response.getStatus());
                    throw new IOException(exception.getMessage(), exception);
                }
            }
            finally {
                this.connection.disconnect();
                this.isClosed = true;
            }
        }
    }
}

