cowinr
8/10/2019 - 2:45 PM

Service Bus

// create client
IQueueClient queueClient = new QueueClient("Endpoint=sb://sb-doubledecker-ric-eun.servicebus.windows.net/;SharedAcce...", "myQueue");

// send a message to a queue
var message = new Message(Encoding.UTF8.GetBytes("Message content"));
\!h await queueClient.SendAsync(message);

// receive messages from queue
// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
    // Maximum number of concurrent calls
    MaxConcurrentCalls = 1,
    // Automatically complete the messages after returning from user callback.
    // False indicates the complete operation is handled by the user callback.
    AutoComplete = false
};

// Register the function that processes messages.
\!h queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

// message handler
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    // sequence
    message.SystemProperties.SequenceNumber;
    
    // body
    Encoding.UTF8.GetString(message.Body);

    // Complete the message so that it is not received again.
    // This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default).
    \!h await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}

// handle exceptions
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    // encountered exception
    exceptionReceivedEventArgs.Exception;
    
    return Task.CompletedTask;
}

// close client
await queueClient.CloseAsync();
ITopicClient topicClient = new TopicClient(ServiceBusConnectionString, TopicName);

// send
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
await topicClient.SendAsync(message);

// subscribe
ISubscriptionClient subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
    MaxConcurrentCalls = 1,
    AutoComplete = false
};

// Register the function that processes messages.
subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    // Process the message...

    // Complete the message so that it is not received again.
    await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}