/*
 * Decompiled with CFR 0.152.
 */
package com.azure.storage.queue;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.implementation.SasImplUtils;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.QueueMessageEncoding;
import com.azure.storage.queue.QueueServiceVersion;
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;
import com.azure.storage.queue.implementation.models.MessageIdsUpdateHeaders;
import com.azure.storage.queue.implementation.models.MessageIdsUpdateResponse;
import com.azure.storage.queue.implementation.models.MessagesDequeueHeaders;
import com.azure.storage.queue.implementation.models.MessagesDequeueResponse;
import com.azure.storage.queue.implementation.models.MessagesPeekHeaders;
import com.azure.storage.queue.implementation.models.MessagesPeekResponse;
import com.azure.storage.queue.implementation.models.PeekedMessageItemInternal;
import com.azure.storage.queue.implementation.models.QueueMessage;
import com.azure.storage.queue.implementation.models.QueueMessageItemInternal;
import com.azure.storage.queue.implementation.models.QueuesGetAccessPolicyHeaders;
import com.azure.storage.queue.implementation.models.QueuesGetPropertiesHeaders;
import com.azure.storage.queue.implementation.models.QueuesGetPropertiesResponse;
import com.azure.storage.queue.implementation.util.QueueSasImplUtil;
import com.azure.storage.queue.models.PeekedMessageItem;
import com.azure.storage.queue.models.QueueMessageDecodingError;
import com.azure.storage.queue.models.QueueMessageItem;
import com.azure.storage.queue.models.QueueProperties;
import com.azure.storage.queue.models.QueueSignedIdentifier;
import com.azure.storage.queue.models.SendMessageResult;
import com.azure.storage.queue.models.UpdateMessageResult;
import com.azure.storage.queue.sas.QueueServiceSasSignatureValues;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@ServiceClient(builder=QueueClientBuilder.class, isAsync=true)
public final class QueueAsyncClient {
    private final ClientLogger logger = new ClientLogger(QueueAsyncClient.class);
    private final AzureQueueStorageImpl client;
    private final String queueName;
    private final String accountName;
    private final QueueServiceVersion serviceVersion;
    private final QueueMessageEncoding messageEncoding;
    private final Function<QueueMessageDecodingError, Mono<Void>> processMessageDecodingErrorAsyncHandler;
    private final Consumer<QueueMessageDecodingError> processMessageDecodingErrorHandler;

    QueueAsyncClient(AzureQueueStorageImpl client, String queueName, String accountName, QueueServiceVersion serviceVersion, QueueMessageEncoding messageEncoding, Function<QueueMessageDecodingError, Mono<Void>> processMessageDecodingErrorAsyncHandler, Consumer<QueueMessageDecodingError> processMessageDecodingErrorHandler) {
        Objects.requireNonNull(queueName, "'queueName' cannot be null.");
        this.queueName = queueName;
        this.client = client;
        this.accountName = accountName;
        this.serviceVersion = serviceVersion;
        this.messageEncoding = messageEncoding;
        this.processMessageDecodingErrorAsyncHandler = processMessageDecodingErrorAsyncHandler;
        this.processMessageDecodingErrorHandler = processMessageDecodingErrorHandler;
    }

    public String getQueueUrl() {
        return String.format("%s/%s", this.client.getUrl(), this.queueName);
    }

    public QueueServiceVersion getServiceVersion() {
        return this.serviceVersion;
    }

    public QueueMessageEncoding getMessageEncoding() {
        return this.messageEncoding;
    }

    public HttpPipeline getHttpPipeline() {
        return this.client.getHttpPipeline();
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> create() {
        try {
            return this.createWithResponse(null).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> createWithResponse(Map<String, String> metadata) {
        try {
            return FluxUtil.withContext(context -> this.createWithResponse(metadata, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<Void>> createWithResponse(Map<String, String> metadata, Context context) {
        context = context == null ? Context.NONE : context;
        return this.client.getQueues().createWithResponseAsync(this.queueName, null, metadata, null, context.addData("az.namespace", "Microsoft.Storage")).map(response -> new SimpleResponse<Object>((Response<?>)response, null));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> delete() {
        try {
            return this.deleteWithResponse().flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> deleteWithResponse() {
        try {
            return FluxUtil.withContext(this::deleteWithResponse);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<Void>> deleteWithResponse(Context context) {
        context = context == null ? Context.NONE : context;
        return this.client.getQueues().deleteWithResponseAsync(this.queueName, null, null, context.addData("az.namespace", "Microsoft.Storage")).map(response -> new SimpleResponse<Object>((Response<?>)response, null));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<QueueProperties> getProperties() {
        try {
            return this.getPropertiesWithResponse().flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<QueueProperties>> getPropertiesWithResponse() {
        try {
            return FluxUtil.withContext(this::getPropertiesWithResponse);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<QueueProperties>> getPropertiesWithResponse(Context context) {
        context = context == null ? Context.NONE : context;
        return this.client.getQueues().getPropertiesWithResponseAsync(this.queueName, null, null, context.addData("az.namespace", "Microsoft.Storage")).map(this::getQueuePropertiesResponse);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> setMetadata(Map<String, String> metadata) {
        try {
            return this.setMetadataWithResponse(metadata).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> setMetadataWithResponse(Map<String, String> metadata) {
        try {
            return FluxUtil.withContext(context -> this.setMetadataWithResponse(metadata, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<Void>> setMetadataWithResponse(Map<String, String> metadata, Context context) {
        context = context == null ? Context.NONE : context;
        return this.client.getQueues().setMetadataWithResponseAsync(this.queueName, null, metadata, null, context.addData("az.namespace", "Microsoft.Storage")).map(response -> new SimpleResponse<Object>((Response<?>)response, null));
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public PagedFlux<QueueSignedIdentifier> getAccessPolicy() {
        try {
            Function retriever = marker -> this.client.getQueues().getAccessPolicyWithResponseAsync(this.queueName, null, null, Context.NONE).map(response -> new PagedResponseBase(response.getRequest(), response.getStatusCode(), response.getHeaders(), response.getValue(), null, (QueuesGetAccessPolicyHeaders)response.getDeserializedHeaders()));
            return new PagedFlux<QueueSignedIdentifier>(() -> (Mono)retriever.apply(null), retriever);
        }
        catch (RuntimeException ex) {
            return FluxUtil.pagedFluxError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> setAccessPolicy(Iterable<QueueSignedIdentifier> permissions) {
        try {
            return this.setAccessPolicyWithResponse(permissions).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> setAccessPolicyWithResponse(Iterable<QueueSignedIdentifier> permissions) {
        try {
            return FluxUtil.withContext(context -> this.setAccessPolicyWithResponse(permissions, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<Void>> setAccessPolicyWithResponse(Iterable<QueueSignedIdentifier> permissions, Context context) {
        Context context2 = context = context == null ? Context.NONE : context;
        if (permissions != null) {
            for (QueueSignedIdentifier permission : permissions) {
                if (permission.getAccessPolicy() != null && permission.getAccessPolicy().getStartsOn() != null) {
                    permission.getAccessPolicy().setStartsOn(permission.getAccessPolicy().getStartsOn().truncatedTo(ChronoUnit.SECONDS));
                }
                if (permission.getAccessPolicy() == null || permission.getAccessPolicy().getExpiresOn() == null) continue;
                permission.getAccessPolicy().setExpiresOn(permission.getAccessPolicy().getExpiresOn().truncatedTo(ChronoUnit.SECONDS));
            }
        }
        List<QueueSignedIdentifier> permissionsList = StreamSupport.stream(permissions != null ? permissions.spliterator() : Spliterators.emptySpliterator(), false).collect(Collectors.toList());
        return this.client.getQueues().setAccessPolicyWithResponseAsync(this.queueName, null, null, permissionsList, context.addData("az.namespace", "Microsoft.Storage")).map(response -> new SimpleResponse<Object>((Response<?>)response, null));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> clearMessages() {
        try {
            return this.clearMessagesWithResponse().flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> clearMessagesWithResponse() {
        try {
            return FluxUtil.withContext(this::clearMessagesWithResponse);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<Void>> clearMessagesWithResponse(Context context) {
        context = context == null ? Context.NONE : context;
        return this.client.getMessages().clearWithResponseAsync(this.queueName, null, null, context.addData("az.namespace", "Microsoft.Storage")).map(response -> new SimpleResponse<Object>((Response<?>)response, null));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<SendMessageResult> sendMessage(String messageText) {
        try {
            return this.sendMessageWithResponse(messageText, null, null).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<SendMessageResult> sendMessage(BinaryData message) {
        return this.sendMessageWithResponse(message, null, null).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<SendMessageResult>> sendMessageWithResponse(String messageText, Duration visibilityTimeout, Duration timeToLive) {
        try {
            return FluxUtil.withContext(context -> this.sendMessageWithResponse(BinaryData.fromString(messageText), visibilityTimeout, timeToLive, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<SendMessageResult>> sendMessageWithResponse(BinaryData message, Duration visibilityTimeout, Duration timeToLive) {
        try {
            return FluxUtil.withContext(context -> this.sendMessageWithResponse(message, visibilityTimeout, timeToLive, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<SendMessageResult>> sendMessageWithResponse(BinaryData message, Duration visibilityTimeout, Duration timeToLive, Context context) {
        Integer visibilityTimeoutInSeconds = visibilityTimeout == null ? null : Integer.valueOf((int)visibilityTimeout.getSeconds());
        Integer timeToLiveInSeconds = timeToLive == null ? null : Integer.valueOf((int)timeToLive.getSeconds());
        context = context == null ? Context.NONE : context;
        Context finalContext = context.addData("az.namespace", "Microsoft.Storage");
        return this.encodeMessage(message).flatMap(messageText -> {
            QueueMessage queueMessage = new QueueMessage().setMessageText((String)messageText);
            return this.client.getMessages().enqueueWithResponseAsync(this.queueName, queueMessage, visibilityTimeoutInSeconds, timeToLiveInSeconds, null, null, finalContext).map(response -> new SimpleResponse<SendMessageResult>((Response<?>)response, (SendMessageResult)response.getValue().get(0)));
        });
    }

    private Mono<String> encodeMessage(BinaryData message) {
        Objects.requireNonNull(message, "'message' cannot be null.");
        switch (this.messageEncoding) {
            case NONE: {
                return Mono.just(message.toString());
            }
            case BASE64: {
                return Mono.just(Base64.getEncoder().encodeToString(message.toBytes()));
            }
        }
        return FluxUtil.monoError(this.logger, new IllegalArgumentException("Unsupported message encoding=" + (Object)((Object)this.messageEncoding)));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<QueueMessageItem> receiveMessage() {
        try {
            return this.receiveMessagesWithOptionalTimeout(1, null, null, Context.NONE).singleOrEmpty();
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public PagedFlux<QueueMessageItem> receiveMessages(Integer maxMessages) {
        try {
            return this.receiveMessagesWithOptionalTimeout(maxMessages, null, null, Context.NONE);
        }
        catch (RuntimeException ex) {
            return FluxUtil.pagedFluxError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public PagedFlux<QueueMessageItem> receiveMessages(Integer maxMessages, Duration visibilityTimeout) {
        try {
            return this.receiveMessagesWithOptionalTimeout(maxMessages, visibilityTimeout, null, Context.NONE);
        }
        catch (RuntimeException ex) {
            return FluxUtil.pagedFluxError(this.logger, ex);
        }
    }

    PagedFlux<QueueMessageItem> receiveMessagesWithOptionalTimeout(Integer maxMessages, Duration visibilityTimeout, Duration timeout, Context context) {
        Integer visibilityTimeoutInSeconds = visibilityTimeout == null ? null : Integer.valueOf((int)visibilityTimeout.getSeconds());
        Function retriever = marker -> StorageImplUtils.applyOptionalTimeout(this.client.getMessages().dequeueWithResponseAsync(this.queueName, maxMessages, visibilityTimeoutInSeconds, null, null, context), timeout).flatMap(this::transformMessagesDequeueResponse);
        return new PagedFlux<QueueMessageItem>(() -> (Mono)retriever.apply(null), retriever);
    }

    private Mono<PagedResponseBase<MessagesDequeueHeaders, QueueMessageItem>> transformMessagesDequeueResponse(MessagesDequeueResponse response) {
        List queueMessageInternalItems = response.getValue();
        if (queueMessageInternalItems == null) {
            queueMessageInternalItems = Collections.emptyList();
        }
        return Flux.fromIterable(queueMessageInternalItems).flatMapSequential(queueMessageItemInternal -> this.transformQueueMessageItemInternal((QueueMessageItemInternal)queueMessageItemInternal, this.messageEncoding).onErrorResume(IllegalArgumentException.class, e -> {
            if (this.processMessageDecodingErrorAsyncHandler != null) {
                return this.transformQueueMessageItemInternal((QueueMessageItemInternal)queueMessageItemInternal, QueueMessageEncoding.NONE).flatMap(messageItem -> this.processMessageDecodingErrorAsyncHandler.apply(new QueueMessageDecodingError(this, new QueueClient(this), (QueueMessageItem)messageItem, null, (Exception)e))).then(Mono.empty());
            }
            if (this.processMessageDecodingErrorHandler != null) {
                return this.transformQueueMessageItemInternal((QueueMessageItemInternal)queueMessageItemInternal, QueueMessageEncoding.NONE).flatMap(messageItem -> {
                    try {
                        this.processMessageDecodingErrorHandler.accept(new QueueMessageDecodingError(this, new QueueClient(this), (QueueMessageItem)messageItem, null, (Exception)e));
                        return Mono.empty();
                    }
                    catch (RuntimeException re) {
                        return FluxUtil.monoError(this.logger, re);
                    }
                }).subscribeOn(Schedulers.boundedElastic());
            }
            return FluxUtil.monoError(this.logger, e);
        })).collectList().map(queueMessageItems -> new PagedResponseBase(response.getRequest(), response.getStatusCode(), response.getHeaders(), queueMessageItems, null, (MessagesDequeueHeaders)response.getDeserializedHeaders()));
    }

    private Mono<QueueMessageItem> transformQueueMessageItemInternal(QueueMessageItemInternal queueMessageItemInternal, QueueMessageEncoding messageEncoding) {
        QueueMessageItem queueMessageItem = new QueueMessageItem().setMessageId(queueMessageItemInternal.getMessageId()).setDequeueCount(queueMessageItemInternal.getDequeueCount()).setExpirationTime(queueMessageItemInternal.getExpirationTime()).setInsertionTime(queueMessageItemInternal.getInsertionTime()).setPopReceipt(queueMessageItemInternal.getPopReceipt()).setTimeNextVisible(queueMessageItemInternal.getTimeNextVisible());
        return this.decodeMessageBody(queueMessageItemInternal.getMessageText(), messageEncoding).map(queueMessageItem::setBody).switchIfEmpty(Mono.just(queueMessageItem));
    }

    private Mono<BinaryData> decodeMessageBody(String messageText, QueueMessageEncoding messageEncoding) {
        if (messageText == null) {
            return Mono.empty();
        }
        switch (messageEncoding) {
            case NONE: {
                return Mono.just(BinaryData.fromString(messageText));
            }
            case BASE64: {
                try {
                    return Mono.just(BinaryData.fromBytes(Base64.getDecoder().decode(messageText)));
                }
                catch (IllegalArgumentException e) {
                    return FluxUtil.monoError(this.logger, e);
                }
            }
        }
        return FluxUtil.monoError(this.logger, new IllegalArgumentException("Unsupported message encoding=" + (Object)((Object)messageEncoding)));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PeekedMessageItem> peekMessage() {
        try {
            return this.peekMessages(null).singleOrEmpty();
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public PagedFlux<PeekedMessageItem> peekMessages(Integer maxMessages) {
        try {
            return this.peekMessagesWithOptionalTimeout(maxMessages, null, Context.NONE);
        }
        catch (RuntimeException ex) {
            return FluxUtil.pagedFluxError(this.logger, ex);
        }
    }

    PagedFlux<PeekedMessageItem> peekMessagesWithOptionalTimeout(Integer maxMessages, Duration timeout, Context context) {
        Function retriever = marker -> StorageImplUtils.applyOptionalTimeout(this.client.getMessages().peekWithResponseAsync(this.queueName, maxMessages, null, null, context), timeout).flatMap(this::transformMessagesPeekResponse);
        return new PagedFlux<PeekedMessageItem>(() -> (Mono)retriever.apply(null), retriever);
    }

    private Mono<PagedResponseBase<MessagesPeekHeaders, PeekedMessageItem>> transformMessagesPeekResponse(MessagesPeekResponse response) {
        List peekedMessageInternalItems = response.getValue();
        if (peekedMessageInternalItems == null) {
            peekedMessageInternalItems = Collections.emptyList();
        }
        return Flux.fromIterable(peekedMessageInternalItems).flatMapSequential(peekedMessageItemInternal -> this.transformPeekedMessageItemInternal((PeekedMessageItemInternal)peekedMessageItemInternal, this.messageEncoding).onErrorResume(IllegalArgumentException.class, e -> {
            if (this.processMessageDecodingErrorAsyncHandler != null) {
                return this.transformPeekedMessageItemInternal((PeekedMessageItemInternal)peekedMessageItemInternal, QueueMessageEncoding.NONE).flatMap(messageItem -> this.processMessageDecodingErrorAsyncHandler.apply(new QueueMessageDecodingError(this, new QueueClient(this), null, (PeekedMessageItem)messageItem, (Exception)e))).then(Mono.empty());
            }
            if (this.processMessageDecodingErrorHandler != null) {
                return this.transformPeekedMessageItemInternal((PeekedMessageItemInternal)peekedMessageItemInternal, QueueMessageEncoding.NONE).flatMap(messageItem -> {
                    try {
                        this.processMessageDecodingErrorHandler.accept(new QueueMessageDecodingError(this, new QueueClient(this), null, (PeekedMessageItem)messageItem, (Exception)e));
                        return Mono.empty();
                    }
                    catch (RuntimeException re) {
                        return FluxUtil.monoError(this.logger, re);
                    }
                }).subscribeOn(Schedulers.boundedElastic());
            }
            return FluxUtil.monoError(this.logger, e);
        })).collectList().map(peekedMessageItems -> new PagedResponseBase(response.getRequest(), response.getStatusCode(), response.getHeaders(), peekedMessageItems, null, (MessagesPeekHeaders)response.getDeserializedHeaders()));
    }

    private Mono<PeekedMessageItem> transformPeekedMessageItemInternal(PeekedMessageItemInternal peekedMessageItemInternal, QueueMessageEncoding messageEncoding) {
        PeekedMessageItem peekedMessageItem = new PeekedMessageItem().setMessageId(peekedMessageItemInternal.getMessageId()).setDequeueCount(peekedMessageItemInternal.getDequeueCount()).setExpirationTime(peekedMessageItemInternal.getExpirationTime()).setInsertionTime(peekedMessageItemInternal.getInsertionTime());
        return this.decodeMessageBody(peekedMessageItemInternal.getMessageText(), messageEncoding).map(peekedMessageItem::setBody).switchIfEmpty(Mono.just(peekedMessageItem));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<UpdateMessageResult> updateMessage(String messageId, String popReceipt, String messageText, Duration visibilityTimeout) {
        try {
            return this.updateMessageWithResponse(messageId, popReceipt, messageText, visibilityTimeout).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<UpdateMessageResult>> updateMessageWithResponse(String messageId, String popReceipt, String messageText, Duration visibilityTimeout) {
        try {
            return FluxUtil.withContext(context -> this.updateMessageWithResponse(messageId, popReceipt, messageText, visibilityTimeout, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<UpdateMessageResult>> updateMessageWithResponse(String messageId, String popReceipt, String messageText, Duration visibilityTimeout, Context context) {
        QueueMessage message = messageText == null ? null : new QueueMessage().setMessageText(messageText);
        context = context == null ? Context.NONE : context;
        visibilityTimeout = visibilityTimeout == null ? Duration.ZERO : visibilityTimeout;
        return this.client.getMessageIds().updateWithResponseAsync(this.queueName, messageId, popReceipt, (int)visibilityTimeout.getSeconds(), null, null, message, context.addData("az.namespace", "Microsoft.Storage")).map(this::getUpdatedMessageResponse);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> deleteMessage(String messageId, String popReceipt) {
        try {
            return this.deleteMessageWithResponse(messageId, popReceipt).flatMap(FluxUtil::toMono);
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> deleteMessageWithResponse(String messageId, String popReceipt) {
        try {
            return FluxUtil.withContext(context -> this.deleteMessageWithResponse(messageId, popReceipt, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError(this.logger, ex);
        }
    }

    Mono<Response<Void>> deleteMessageWithResponse(String messageId, String popReceipt, Context context) {
        context = context == null ? Context.NONE : context;
        return this.client.getMessageIds().deleteWithResponseAsync(this.queueName, messageId, popReceipt, null, null, context.addData("az.namespace", "Microsoft.Storage")).map(response -> new SimpleResponse<Object>((Response<?>)response, null));
    }

    public String getQueueName() {
        return this.queueName;
    }

    public String getAccountName() {
        return this.accountName;
    }

    public String generateSas(QueueServiceSasSignatureValues queueServiceSasSignatureValues) {
        return this.generateSas(queueServiceSasSignatureValues, Context.NONE);
    }

    public String generateSas(QueueServiceSasSignatureValues queueServiceSasSignatureValues, Context context) {
        return new QueueSasImplUtil(queueServiceSasSignatureValues, this.getQueueName()).generateSas(SasImplUtils.extractSharedKeyCredential(this.getHttpPipeline()), context);
    }

    private Response<QueueProperties> getQueuePropertiesResponse(QueuesGetPropertiesResponse response) {
        QueuesGetPropertiesHeaders propertiesHeaders = (QueuesGetPropertiesHeaders)response.getDeserializedHeaders();
        QueueProperties properties = new QueueProperties(propertiesHeaders.getXMsMeta(), propertiesHeaders.getXMsApproximateMessagesCount());
        return new SimpleResponse<QueueProperties>(response, properties);
    }

    private Response<UpdateMessageResult> getUpdatedMessageResponse(MessageIdsUpdateResponse response) {
        MessageIdsUpdateHeaders headers = (MessageIdsUpdateHeaders)response.getDeserializedHeaders();
        UpdateMessageResult updateMessageResult = new UpdateMessageResult(headers.getXMsPopreceipt(), headers.getXMsTimeNextVisible());
        return new SimpleResponse<UpdateMessageResult>(response, updateMessageResult);
    }
}

