我将连续数据接收到
NewMessage(Msg)
,我需要将其卸载到线程池/工作线程,并且延迟非常重要。 NewMessage(Msg)
是从第三方库继承的方法,我不能或者不想更改它的参数。
在
Msg
内收到的 NewMessage(Msg)
频率是可变的,可以是每秒 300 Msg
',甚至只是 1。而且,从我启动应用程序到结束应用程序,消息都是连续的。
我已经阅读了 Channel 并可以以基本格式实现它,但我需要一个更复杂的版本,但我找不到示例。
到目前为止的基本实现:
private readonly Channel<Msg> channel;
private readonly ChannelWriter<Msg> writer;
private readonly ChannelReader<Msg> reader;
private CancellationTokenSource ctSource;
private CancellationToken ct;
void CTor()
{
channel = Channel.CreateUnbounded<Msg>();
writer = channel.Writer;
reader = channel.Reader;
ctSource = new CancellationTokenSource();
}
void NewMessage(Msg newMsg)
{
if(!ct.IsCancellationRequested)
{
channel.TryWrite(newMsg);
}
}
async void ConsumeMessages(CancellationToken ct)
{
await foreach(Msg newMsg in channel.Reader.ReadAsync())
{
if(ct.IsCancellationRequested) { return; }
MethodConsume(newMsg);
}
}
void StartConsuming()
{
ct = new ctSource.Token;
ConsumeMessages(ct);
}
void StopConsuming()
{
if(ct.Exists) // looking for correct syntax
{ ctSource.Cancel; }
}
如何实现多个消费者以确保
Msg
消费在延迟方面是最佳的?
可能不是确切的解决方案,但我建议使用 .NET 中的 System.Threading.Channels 命名空间实现一个健壮且高效的解决方案,确保线程安全、FIFO 处理、消费者的动态扩展以及消费者效率的监控。
您可以创建Producter
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Collections.Generic;
using System.Diagnostics;
public class MessageProcessor
{
private readonly Channel<Msg> channel;
private CancellationTokenSource ctSource;
private List<Task> consumers;
private int maxConsumerCount = 10;
private TimeSpan monitorInterval = TimeSpan.FromSeconds(5);
private Stopwatch stopwatch;
public MessageProcessor()
{
channel = Channel.CreateUnbounded<Msg>();
ctSource = new CancellationTokenSource();
consumers = new List<Task>();
stopwatch = new Stopwatch();
}
public void NewMessage(Msg newMsg)
{
if (!ctSource.Token.IsCancellationRequested)
{
channel.Writer.TryWrite(newMsg);
}
}
public void Start()
{
ctSource = new CancellationTokenSource();
StartConsumers(1);
StartMonitor();
}
public void Stop()
{
ctSource.Cancel();
Task.WhenAll(consumers).Wait();
}
private void StartConsumers(int count)
{
for (int i = 0; i < count; i++)
{
var consumer = Task.Run(async () =>
{
await foreach (var msg in channel.Reader.ReadAllAsync(ctSource.Token))
{
ProcessMessage(msg);
}
}, ctSource.Token);
consumers.Add(consumer);
}
}
private void ProcessMessage(Msg msg)
{
Console.Writeln(msg.toString);
}
private void StartMonitor()
{
Task.Run(async () =>
{
while (!ctSource.Token.IsCancellationRequested)
{
await Task.Delay(monitorInterval, ctSource.Token);
AdjustConsumersBasedOnLoad();
}
}, ctSource.Token);
}
private void AdjustConsumersBasedOnLoad()
{
var currentConsumerCount = consumers.Count;
if (currentConsumerCount < maxConsumerCount)
{
StartConsumers(1);
}
// add more logic to reduce consumer if load is low..
if (currentConsumerCount > 1 && ShouldReduceConsumers())
{
ReduceConsumers(1);
}
private bool ShouldReduceConsumers()
{
// Check if a certain percentage of consumers are idle
int idleCount = consumers.Values.Count(idle => idle == false);
return idleCount > currentConsumerCount / 2; // <----- Example condition
}
private void ReduceConsumers(int count)
{
var consumersToStop = consumers.Where(kvp => !kvp.Value)
.Select(kvp => kvp.Key)
.Take(count)
.ToList();
foreach (var consumer in consumersToStop)
{
consumers.Remove(consumer);
// Signal the consumer to stop
((CancellationTokenSource)consumer.AsyncState).Cancel();
}
}
}
}
用途:
public class Msg
{
public string Content { get; set; }
public DateTime Timestamp { get; set; }
public Msg(string content)
{
Content = content;
Timestamp = DateTime.UtcNow;
}
}
class MyClass
{
static async Task Main(string[] args)
{
var messageProcessor = new MessageProcessor();
messageProcessor.Start();
for (int i = 0; i < 100; i++)
{
var messageContent = $"Message {i}";
messageProcessor.NewMessage(new Msg(messageContent));
await Task.Delay(10);
}
messageProcessor.Stop();
}
}
使用渠道比这容易得多。不需要字段,只有当我们想停止生产、消费
和丢弃所有挂起的消息时才需要
CancellationToken
。
制作人所需要做的就是返回一个
ChannelReader
,仅此而已 :
ChannelReader<Msg> Produce(CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Msg>();
var writer=channel.Writer;
//Emulate an actual producer
_ = Task.Run(async ()=>{
while(!token.IsCancellationRequested)
{
//Sending may be blocked if we use a Bounded channel that's full
await writer.SendAsync( new Msg(....),token);
}
},token)
//Ensure we `Complete` even in case of error, propagating the error
.ContinueWith(t=>{
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
将
Channel
保留在 Produce
中意味着该方法可以完全控制其生命周期、完成以及可能的错误处理。这简化了多个步骤的使用或组合,变得更容易*。
消费者不需要访问通道本身,只需要访问者。
async Task Consume(ChannelReader<Msg> reader, CancellationToken token=default)
{
await foreach(var msg in reader.ReadAllAsync(token))
{
//Extra check if we want to exit rearly
if (token.IsCancellationRequested) return;
//Process the message
....
}
}
```
This allows for easy composition:
```
var reader=Produce();
await Consume(reader);
```