/*
 * Decompiled with CFR 0.152.
 */
package io.delta.dynamodbcommitcoordinator;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import io.delta.storage.CloseableIterator;
import io.delta.storage.LogStore;
import io.delta.storage.commit.Commit;
import io.delta.storage.commit.CommitCoordinatorClient;
import io.delta.storage.commit.CommitFailedException;
import io.delta.storage.commit.CommitResponse;
import io.delta.storage.commit.CoordinatedCommitsUtils;
import io.delta.storage.commit.GetCommitsResponse;
import io.delta.storage.commit.TableDescriptor;
import io.delta.storage.commit.TableIdentifier;
import io.delta.storage.commit.UpdatedActions;
import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.actions.AbstractProtocol;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamoDBCommitCoordinatorClient
implements CommitCoordinatorClient {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDBCommitCoordinatorClient.class);
    final String coordinatedCommitsTableName;
    final AmazonDynamoDB client;
    final String endpoint;
    final long writeCapacityUnits;
    final long readCapacityUnits;
    public final long backfillBatchSize;
    final boolean skipPathCheck;
    static final String TABLE_CONF_TABLE_ID_KEY = "tableId";
    final int CLIENT_VERSION = 1;

    public DynamoDBCommitCoordinatorClient(String string, String string2, AmazonDynamoDB amazonDynamoDB, long l) throws IOException {
        this(string, string2, amazonDynamoDB, l, 5L, 5L, false);
    }

    public DynamoDBCommitCoordinatorClient(String string, String string2, AmazonDynamoDB amazonDynamoDB, long l, long l2, long l3, boolean bl) throws IOException {
        this.coordinatedCommitsTableName = string;
        this.endpoint = string2;
        this.client = amazonDynamoDB;
        this.backfillBatchSize = l;
        this.readCapacityUnits = l2;
        this.writeCapacityUnits = l3;
        this.skipPathCheck = bl;
        this.tryEnsureTableExists();
    }

    private String getTableId(Map<String, String> map) {
        if (!map.containsKey(TABLE_CONF_TABLE_ID_KEY)) {
            throw new RuntimeException("tableId not found");
        }
        return map.get(TABLE_CONF_TABLE_ID_KEY);
    }

    private GetItemResult getEntryFromCommitCoordinator(Map<String, String> map, String ... stringArray) {
        GetItemRequest getItemRequest = new GetItemRequest().withTableName(this.coordinatedCommitsTableName).addKeyEntry(TABLE_CONF_TABLE_ID_KEY, new AttributeValue().withS(this.getTableId(map))).withAttributesToGet(stringArray);
        return this.client.getItem(getItemRequest);
    }

    protected CommitResponse commitToCoordinator(Path path, Map<String, String> map, long l, FileStatus fileStatus, long l2, boolean bl) throws CommitFailedException {
        block7: {
            HashMap<String, ExpectedAttributeValue> hashMap = new HashMap<String, ExpectedAttributeValue>();
            hashMap.put("tableVersion", new ExpectedAttributeValue().withValue(new AttributeValue().withN(Long.toString(l - 1L))));
            hashMap.put("acceptingCommits", new ExpectedAttributeValue().withValue(new AttributeValue().withBOOL(Boolean.valueOf(true))));
            if (!this.skipPathCheck) {
                hashMap.put("path", new ExpectedAttributeValue().withValue(new AttributeValue().withS(path.getParent().toString())));
            }
            hashMap.put("schemaVersion", new ExpectedAttributeValue().withValue(new AttributeValue().withN(Integer.toString(1))));
            HashMap<String, AttributeValue> hashMap2 = new HashMap<String, AttributeValue>();
            hashMap2.put("version", new AttributeValue().withN(Long.toString(l)));
            hashMap2.put("timestamp", new AttributeValue().withN(Long.toString(l2)));
            hashMap2.put("fsName", new AttributeValue().withS(fileStatus.getPath().getName()));
            hashMap2.put("fsLength", new AttributeValue().withN(Long.toString(fileStatus.getLen())));
            hashMap2.put("fsTimestamp", new AttributeValue().withN(Long.toString(fileStatus.getModificationTime())));
            UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(this.coordinatedCommitsTableName).addKeyEntry(TABLE_CONF_TABLE_ID_KEY, new AttributeValue().withS(this.getTableId(map))).addAttributeUpdatesEntry("tableVersion", new AttributeValueUpdate().withValue(new AttributeValue().withN(Long.toString(l))).withAction(AttributeAction.PUT)).addAttributeUpdatesEntry("hasAcceptedCommits", new AttributeValueUpdate().withValue(new AttributeValue().withBOOL(Boolean.valueOf(true))).withAction(AttributeAction.PUT)).addAttributeUpdatesEntry("tableTimestamp", new AttributeValueUpdate().withValue(new AttributeValue().withN(Long.toString(l2))).withAction(AttributeAction.PUT)).addAttributeUpdatesEntry("commits", new AttributeValueUpdate().withAction(AttributeAction.ADD).withValue(new AttributeValue().withL(new AttributeValue[]{new AttributeValue().withM(hashMap2)}))).withExpected(hashMap);
            if (bl) {
                updateItemRequest = updateItemRequest.addAttributeUpdatesEntry("acceptingCommits", new AttributeValueUpdate().withValue(new AttributeValue().withBOOL(Boolean.valueOf(false))).withAction(AttributeAction.PUT));
            }
            try {
                this.client.updateItem(updateItemRequest);
            }
            catch (ConditionalCheckFailedException conditionalCheckFailedException) {
                GetItemResult getItemResult = this.getEntryFromCommitCoordinator(map, "tableVersion", "acceptingCommits", "path", "schemaVersion");
                int n = Integer.parseInt(((AttributeValue)getItemResult.getItem().get("schemaVersion")).getN());
                if (n != 1) {
                    throw new CommitFailedException(false, false, "The schema version of the commit coordinator does not match the currentDynamoDBCommitCoordinatorClient version. The data schema version is  " + n + " while the client version is 1. Make sure that the correct client is being used to access this table.");
                }
                long l3 = Long.parseLong(((AttributeValue)getItemResult.getItem().get("tableVersion")).getN());
                if (!this.skipPathCheck && !((AttributeValue)getItemResult.getItem().get("path")).getS().equals(path.getParent().toString())) {
                    throw new CommitFailedException(false, false, "This commit was attempted from path " + String.valueOf(path.getParent()) + " while the table is registered at " + ((AttributeValue)getItemResult.getItem().get("path")).getS() + ".");
                }
                if (!((AttributeValue)getItemResult.getItem().get("acceptingCommits")).getBOOL().booleanValue()) {
                    throw new CommitFailedException(false, false, "The commit coordinator is not accepting any new commits for this table.");
                }
                if (l3 == l - 1L) break block7;
                boolean bl2 = l3 > l - 1L;
                throw new CommitFailedException(bl2, bl2, "Commit version " + l + " is not valid. Expected version: " + (l3 + 1L) + ".");
            }
        }
        Commit commit = new Commit(l, fileStatus, l2);
        return new CommitResponse(commit);
    }

    public CommitResponse commit(LogStore logStore, Configuration configuration, TableDescriptor tableDescriptor, long l, Iterator<String> iterator, UpdatedActions updatedActions) throws CommitFailedException {
        Path path = tableDescriptor.getLogPath();
        if (l == 0L) {
            throw new CommitFailedException(false, false, "Commit version 0 must go via filesystem.");
        }
        try {
            boolean bl;
            FileStatus fileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile((LogStore)logStore, (Configuration)configuration, (String)path.toString(), (long)l, iterator, (String)UUID.randomUUID().toString());
            long l2 = updatedActions.getCommitInfo().getCommitTimestamp();
            boolean bl2 = CoordinatedCommitsUtils.isCoordinatedCommitsToFSConversion((Long)l, (UpdatedActions)updatedActions);
            LOG.info("Committing version {} with UUID delta file {} to DynamoDB.", (Object)l, (Object)fileStatus.getPath());
            CommitResponse commitResponse = this.commitToCoordinator(path, tableDescriptor.getTableConf(), l, fileStatus, l2, bl2);
            LOG.info("Commit {} was successful.", (Object)l);
            boolean bl3 = this.backfillBatchSize <= 1L;
            boolean bl4 = l % this.backfillBatchSize == 0L;
            boolean bl5 = bl = bl3 || bl4 || bl2;
            if (bl) {
                this.backfillToVersion(logStore, configuration, tableDescriptor, l, null);
            }
            return commitResponse;
        }
        catch (IOException iOException) {
            throw new CommitFailedException(false, false, iOException.getMessage(), (Throwable)iOException);
        }
    }

    private GetCommitsResultInternal getCommitsImpl(Path path, Map<String, String> map, Long l, Long l2) throws IOException {
        GetItemResult getItemResult = this.getEntryFromCommitCoordinator(map, "commits", "tableVersion", "hasAcceptedCommits");
        Map map2 = getItemResult.getItem();
        long l3 = Long.parseLong(((AttributeValue)map2.get("tableVersion")).getN());
        AttributeValue attributeValue = (AttributeValue)map2.get("commits");
        ArrayList<Commit> arrayList = new ArrayList<Commit>();
        Path path2 = CoordinatedCommitsUtils.commitDirPath((Path)path);
        for (AttributeValue attributeValue2 : attributeValue.getL()) {
            Map map3 = attributeValue2.getM();
            long l4 = Long.parseLong(((AttributeValue)map3.get("version")).getN());
            boolean bl = !(l != null && l4 < l || l2 != null && l2 < l4);
            if (!bl) continue;
            Path path3 = new Path(path2, ((AttributeValue)map3.get("fsName")).getS());
            long l5 = Long.parseLong(((AttributeValue)map3.get("fsLength")).getN());
            long l6 = Long.parseLong(((AttributeValue)map3.get("fsTimestamp")).getN());
            FileStatus fileStatus = new FileStatus(l5, false, 0, 0L, l6, path3);
            long l7 = Long.parseLong(((AttributeValue)map3.get("timestamp")).getN());
            arrayList.add(new Commit(l4, fileStatus, l7));
        }
        GetCommitsResponse getCommitsResponse = new GetCommitsResponse(new ArrayList(arrayList), l3);
        return new GetCommitsResultInternal(getCommitsResponse, ((AttributeValue)map2.get("hasAcceptedCommits")).getBOOL());
    }

    public GetCommitsResponse getCommits(TableDescriptor tableDescriptor, Long l, Long l2) {
        try {
            GetCommitsResultInternal getCommitsResultInternal = this.getCommitsImpl(tableDescriptor.getLogPath(), tableDescriptor.getTableConf(), l, l2);
            long l3 = getCommitsResultInternal.response.getLatestTableVersion();
            if (!getCommitsResultInternal.hasAcceptedCommits) {
                l3 = -1L;
            }
            return new GetCommitsResponse(getCommitsResultInternal.response.getCommits(), l3);
        }
        catch (IOException iOException) {
            throw new UncheckedIOException(iOException);
        }
    }

    private void writeActionsToBackfilledFile(LogStore logStore, Path path, long l, Iterator<String> iterator, Configuration configuration, boolean bl) throws IOException {
        Path path2 = CoordinatedCommitsUtils.getBackfilledDeltaFilePath((Path)path, (Long)l);
        logStore.write(path2, iterator, Boolean.valueOf(bl), configuration);
    }

    private void validateBackfilledFileExists(Path path, Configuration configuration, Long l) {
        try {
            if (l == null) {
                return;
            }
            Path path2 = CoordinatedCommitsUtils.getBackfilledDeltaFilePath((Path)path, (Long)l);
            FileSystem fileSystem = path.getFileSystem(configuration);
            if (!fileSystem.exists(path2)) {
                throw new IllegalArgumentException("Expected backfilled file at " + String.valueOf(path2) + " does not exist.");
            }
        }
        catch (IOException iOException) {
            throw new UncheckedIOException(iOException);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void backfillToVersion(LogStore logStore, Configuration configuration, TableDescriptor tableDescriptor, long l, Long l2) throws IOException {
        GetCommitsResponse getCommitsResponse;
        LOG.info("Backfilling all unbackfilled commits.");
        final Path path = tableDescriptor.getLogPath();
        try {
            getCommitsResponse = this.getCommitsImpl((Path)path, (Map<String, String>)tableDescriptor.getTableConf(), (Long)l2, null).response;
        }
        catch (IOException iOException) {
            throw new UncheckedIOException(iOException);
        }
        this.validateBackfilledFileExists(path, configuration, l2);
        if (l > getCommitsResponse.getLatestTableVersion()) {
            throw new IllegalArgumentException("The requested backfill version " + l + " is greater than the latest version " + getCommitsResponse.getLatestTableVersion() + " for the table.");
        }
        boolean bl = logStore.isPartialWriteVisible(path, configuration) == false;
        for (Commit commit : getCommitsResponse.getCommits()) {
            try (CloseableIterator closeableIterator = logStore.read(commit.getFileStatus().getPath(), configuration);){
                this.writeActionsToBackfilledFile(logStore, path, commit.getVersion(), (Iterator<String>)closeableIterator, configuration, bl);
            }
        }
        UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(this.coordinatedCommitsTableName).addKeyEntry(TABLE_CONF_TABLE_ID_KEY, new AttributeValue().withS(this.getTableId(tableDescriptor.getTableConf()))).addAttributeUpdatesEntry("commits", new AttributeValueUpdate().withAction(AttributeAction.PUT).withValue(new AttributeValue().withL(new AttributeValue[0]))).withExpected((Map)new HashMap<String, ExpectedAttributeValue>(){
            {
                this.put("tableVersion", new ExpectedAttributeValue().withValue(new AttributeValue().withN(Long.toString(getCommitsResponse.getLatestTableVersion()))));
                this.put("path", new ExpectedAttributeValue().withValue(new AttributeValue().withS(path.getParent().toString())));
                this.put("schemaVersion", new ExpectedAttributeValue().withValue(new AttributeValue().withN(Integer.toString(1))));
            }
        });
        try {
            this.client.updateItem(updateItemRequest);
        }
        catch (ConditionalCheckFailedException conditionalCheckFailedException) {
            LOG.warn("Backfill succeeded but the update to the commit coordinator failed. This is probably due to a concurrent update to the commit coordinator. This is not a critical error and  should rectify itself.");
        }
    }

    public Map<String, String> registerTable(Path path, Optional<TableIdentifier> optional, long l, AbstractMetadata abstractMetadata, AbstractProtocol abstractProtocol) {
        HashMap<String, AttributeValue> hashMap = new HashMap<String, AttributeValue>();
        String string = UUID.randomUUID().toString();
        hashMap.put(TABLE_CONF_TABLE_ID_KEY, new AttributeValue().withS(string));
        long l2 = l + 1L;
        hashMap.put("tableVersion", new AttributeValue().withN(Long.toString(l2)));
        hashMap.put("hasAcceptedCommits", new AttributeValue().withBOOL(Boolean.valueOf(false)));
        hashMap.put("path", new AttributeValue().withS(path.getParent().toString()));
        hashMap.put("commits", new AttributeValue().withL(new AttributeValue[0]));
        hashMap.put("acceptingCommits", new AttributeValue().withBOOL(Boolean.valueOf(true)));
        hashMap.put("schemaVersion", new AttributeValue().withN(Integer.toString(1)));
        PutItemRequest putItemRequest = new PutItemRequest().withTableName(this.coordinatedCommitsTableName).withItem(hashMap).withConditionExpression(String.format("attribute_not_exists(%s)", TABLE_CONF_TABLE_ID_KEY));
        this.client.putItem(putItemRequest);
        HashMap<String, String> hashMap2 = new HashMap<String, String>();
        hashMap2.put(TABLE_CONF_TABLE_ID_KEY, string);
        return hashMap2;
    }

    private void tryEnsureTableExists() throws IOException {
        boolean bl = false;
        for (int i = 0; i < 20; ++i) {
            String string = "CREATING";
            try {
                DescribeTableResult describeTableResult = this.client.describeTable(this.coordinatedCommitsTableName);
                TableDescription tableDescription = describeTableResult.getTable();
                string = tableDescription.getTableStatus();
            }
            catch (ResourceNotFoundException resourceNotFoundException) {
                LOG.info("DynamoDB table `{}` for endpoint `{}` does not exist. Creating it now with provisioned throughput of {} RCUs and {} WCUs.", new Object[]{this.coordinatedCommitsTableName, this.endpoint, this.readCapacityUnits, this.writeCapacityUnits});
                try {
                    this.client.createTable(Collections.singletonList(new AttributeDefinition(TABLE_CONF_TABLE_ID_KEY, ScalarAttributeType.S)), this.coordinatedCommitsTableName, Collections.singletonList(new KeySchemaElement(TABLE_CONF_TABLE_ID_KEY, KeyType.HASH)), new ProvisionedThroughput(Long.valueOf(this.readCapacityUnits), Long.valueOf(this.writeCapacityUnits)));
                    bl = true;
                }
                catch (ResourceInUseException resourceInUseException) {
                    // empty catch block
                }
            }
            if (string.equals("ACTIVE")) {
                if (bl) {
                    LOG.info("Successfully created DynamoDB table `{}`", (Object)this.coordinatedCommitsTableName);
                    break;
                }
                LOG.info("Table `{}` already exists", (Object)this.coordinatedCommitsTableName);
                break;
            }
            if (string.equals("CREATING")) {
                LOG.info("Waiting for `{}` table creation", (Object)this.coordinatedCommitsTableName);
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    throw new InterruptedIOException(interruptedException.getMessage());
                }
            }
            LOG.error("table `{}` status: {}", (Object)this.coordinatedCommitsTableName, (Object)string);
            throw new RuntimeException("DynamoDBCommitCoordinatorCliet: Unable to create table with name " + this.coordinatedCommitsTableName + " for endpoint " + this.endpoint + ". Ensure that the credentials provided have the necessary permissions to create tables in DynamoDB. If the table already exists, ensure that the table is in the ACTIVE state.");
        }
    }

    public boolean semanticEquals(CommitCoordinatorClient commitCoordinatorClient) {
        if (!(commitCoordinatorClient instanceof DynamoDBCommitCoordinatorClient)) {
            return false;
        }
        DynamoDBCommitCoordinatorClient dynamoDBCommitCoordinatorClient = (DynamoDBCommitCoordinatorClient)commitCoordinatorClient;
        return this.coordinatedCommitsTableName.equals(dynamoDBCommitCoordinatorClient.coordinatedCommitsTableName) && this.endpoint.equals(dynamoDBCommitCoordinatorClient.endpoint);
    }

    private static class GetCommitsResultInternal {
        final GetCommitsResponse response;
        final boolean hasAcceptedCommits;

        GetCommitsResultInternal(GetCommitsResponse getCommitsResponse, boolean bl) {
            this.response = getCommitsResponse;
            this.hasAcceptedCommits = bl;
        }
    }
}

