我正在做一个项目来研究更多经纪人,我正在使用 .NET 6.0 和服务总线来做到这一点。
我的问题是:我需要发布一条消息,并在一段时间后消费同一条消息(我将时间放在环境变量中来控制它),我该怎么做?
我知道,这样做不是一个好的做法,“空队列是一个健康的队列”,但这是一个个人项目。
x时间后消费服务总线消息
我同意评论中的讨论,你也可以使用下面的代码来做同样的事情:
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
const string rithcon = "Endpoint=sb://siln-servicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=WFsJyG1O3qA=";
const string rithqueuename = "rithwik";
static IQueueClient rithqc;
static async Task Main(string[] args)
{
rithqc = new QueueClient(rithcon, rithqueuename);
await RithSendMessageAsync("Hello Rithwik Bojja!");
await Task.Delay(TimeSpan.FromMinutes(1));
await RithReceiveMessageAsync();
}
static async Task RithSendMessageAsync(string rithmessageBody)
{
try
{
var message = new Message(Encoding.UTF8.GetBytes(rithmessageBody))
{
ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(1)
};
await rithqc.SendAsync(message);
Console.WriteLine($"Hey Man the Message sent: {rithmessageBody}");
}
finally
{
}
}
static async Task RithReceiveMessageAsync()
{
try
{
rithqc.RegisterMessageHandler(ProcessMessagesAsync, new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
});
await Task.Delay(TimeSpan.FromMinutes(5));
}
finally
{
await rithqc.CloseAsync();
}
}
static async Task ProcessMessagesAsync(Message rithmessage, CancellationToken token)
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(rithmessage.Body)}");
await rithqc.CompleteAsync(rithmessage.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
return Task.CompletedTask;
}
}
在我的代码中,我保留了 1 分钟的时间来消费消息。
Output:
发送消息后:
接收并消费消息后: