package com.couchbase.client.dcp.highlevel.internal;

import com.couchbase.client.core.event.CouchbaseEvent;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.SystemEventHandler;
import com.couchbase.client.dcp.events.DcpFailureEvent;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.FailoverLog;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.Rollback;
import com.couchbase.client.dcp.highlevel.SnapshotDetails;
import com.couchbase.client.dcp.highlevel.SnapshotMarker;
import com.couchbase.client.dcp.highlevel.StreamEnd;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.state.FailoverLogEntry;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/client/dcp/highlevel/internal/EventHandlerAdapter.class */
public class EventHandlerAdapter implements ControlEventHandler, SystemEventHandler {
    private static final Logger log = LoggerFactory.getLogger(EventHandlerAdapter.class);
    private static final int MAX_PARTITIONS = 1024;
    private final Client dcpClient;
    private final EventDispatcher dispatcher;
    private final AtomicLongArray vbucketToUuid = new AtomicLongArray(MAX_PARTITIONS);
    private final AtomicReferenceArray<SnapshotMarker> vbucketToCurrentSnapshot = new AtomicReferenceArray<>(MAX_PARTITIONS);
    private final DataEventHandler dataEventHandler = (channelFlowController, byteBuf) -> {
        try {
            try {
                FlowControlReceipt forMessage = FlowControlReceipt.forMessage(channelFlowController, byteBuf);
                short vbucket = MessageUtil.getVbucket(byteBuf);
                long j = this.vbucketToUuid.get(vbucket);
                SnapshotMarker snapshotMarker = this.vbucketToCurrentSnapshot.get(vbucket);
                switch (byteBuf.getByte(1)) {
                    case DCP_MUTATION_OPCODE:
                        dispatch(new Mutation(byteBuf, forMessage, j, snapshotMarker));
                        byteBuf.release();
                        return;
                    case DCP_DELETION_OPCODE:
                        dispatch(new Deletion(byteBuf, forMessage, j, snapshotMarker, false));
                        byteBuf.release();
                        return;
                    case DCP_EXPIRATION_OPCODE:
                        dispatch(new Deletion(byteBuf, forMessage, j, snapshotMarker, true));
                        byteBuf.release();
                        return;
                    default:
                        forMessage.acknowledge();
                        log.warn("Unexpected data event type: {}", MessageUtil.getShortOpcodeName(byteBuf));
                        byteBuf.release();
                        return;
                }
            } catch (Throwable th) {
                log.error("Failed to dispatch data event", th);
                dispatchOrLogError(new StreamFailure(-1, th));
                byteBuf.release();
            }
        } catch (Throwable th2) {
            byteBuf.release();
            throw th2;
        }
    };

    private EventHandlerAdapter(Client client, EventDispatcher eventDispatcher) {
        this.dcpClient = (Client) Objects.requireNonNull(client);
        this.dispatcher = (EventDispatcher) Objects.requireNonNull(eventDispatcher);
        client.controlEventHandler(this);
        client.systemEventHandler(this);
        client.dataEventHandler(this.dataEventHandler);
    }

    public static EventHandlerAdapter register(Client client, EventDispatcher eventDispatcher) {
        return new EventHandlerAdapter(client, eventDispatcher);
    }

    private void dispatch(DatabaseChangeEvent databaseChangeEvent) {
        this.dispatcher.dispatch(databaseChangeEvent);
    }

    private void dispatchOrLogError(StreamFailure streamFailure) {
        try {
            dispatch(streamFailure);
        } catch (Throwable th) {
            log.error("Error occurred during stream failure event dispatch.", th);
        }
    }

    @Override // com.couchbase.client.dcp.SystemEventHandler
    public void onEvent(CouchbaseEvent couchbaseEvent) {
        try {
            if (couchbaseEvent instanceof StreamEndEvent) {
                StreamEndEvent streamEndEvent = (StreamEndEvent) couchbaseEvent;
                dispatch(new StreamEnd(streamEndEvent.partition(), streamEndEvent.reason()));
            } else if (couchbaseEvent instanceof DcpFailureEvent) {
                DcpFailureEvent dcpFailureEvent = (DcpFailureEvent) couchbaseEvent;
                dispatch(new StreamFailure(dcpFailureEvent.partition().orElse(-1), dcpFailureEvent.error()));
            } else {
                log.debug("Ignoring unrecognized system event: {}", couchbaseEvent.toMap());
            }
        } catch (Throwable th) {
            log.error("Failed to dispatch system event", th);
            dispatchOrLogError(new StreamFailure(-1, th));
        }
    }

    @Override // com.couchbase.client.dcp.ControlEventHandler
    public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        try {
            try {
                channelFlowController.ack(byteBuf);
                switch (byteBuf.getByte(1)) {
                    case 1:
                        short vbucket = RollbackMessage.vbucket(byteBuf);
                        dispatch(new Rollback(this.dcpClient, vbucket, RollbackMessage.seqno(byteBuf), th -> {
                            dispatch(new StreamFailure(vbucket, th));
                        }));
                        byteBuf.release();
                        return;
                    case DCP_FAILOVER_LOG_OPCODE:
                        short vbucket2 = DcpFailoverLogResponse.vbucket(byteBuf);
                        List<FailoverLogEntry> entries = DcpFailoverLogResponse.entries(byteBuf);
                        this.vbucketToUuid.set(vbucket2, entries.get(0).getUuid());
                        dispatch(new FailoverLog(vbucket2, entries));
                        byteBuf.release();
                        return;
                    case DCP_SNAPSHOT_MARKER_OPCODE:
                        SnapshotMarker snapshotMarker = new SnapshotMarker(DcpSnapshotMarkerRequest.startSeqno(byteBuf), DcpSnapshotMarkerRequest.endSeqno(byteBuf));
                        int flags = DcpSnapshotMarkerRequest.flags(byteBuf);
                        short vbucket3 = MessageUtil.getVbucket(byteBuf);
                        this.vbucketToCurrentSnapshot.set(vbucket3, snapshotMarker);
                        dispatch(new SnapshotDetails(vbucket3, flags, snapshotMarker));
                        byteBuf.release();
                        return;
                    default:
                        log.warn("Unexpected control event type: {}", MessageUtil.getShortOpcodeName(MessageUtil.getOpcode(byteBuf)));
                        byteBuf.release();
                        return;
                }
            } catch (Throwable th2) {
                log.error("Failed to dispatch control event", th2);
                dispatchOrLogError(new StreamFailure(-1, th2));
                byteBuf.release();
            }
        } catch (Throwable th3) {
            byteBuf.release();
            throw th3;
        }
    }
}
