如何实现生产者为第三方的动态频道

问题描述 投票:0回答:2

我将连续数据接收到

NewMessage(Msg)
,我需要将其卸载到线程池/工作线程,并且延迟非常重要。
NewMessage(Msg)
是从第三方库继承的方法,我不能或者不想更改它的参数。

Msg
内收到的
NewMessage(Msg)
频率是可变的,可以是每秒 300
Msg
',甚至只是 1。而且,从我启动应用程序到结束应用程序,消息都是连续的。

我已经阅读了 Channel 并可以以基本格式实现它,但我需要一个更复杂的版本,但我找不到示例。

到目前为止的基本实现:

private readonly Channel<Msg> channel;
private readonly ChannelWriter<Msg> writer;
private readonly ChannelReader<Msg> reader;
private CancellationTokenSource ctSource;
private CancellationToken ct;

void CTor()
{
    channel = Channel.CreateUnbounded<Msg>();
    writer = channel.Writer;
    reader = channel.Reader;
    ctSource = new CancellationTokenSource();
}
void NewMessage(Msg newMsg)
{
    if(!ct.IsCancellationRequested)
    {       
        channel.TryWrite(newMsg);
    }
}
async void ConsumeMessages(CancellationToken ct)
{
    await foreach(Msg newMsg in channel.Reader.ReadAsync())
    {
        if(ct.IsCancellationRequested) { return; }
        MethodConsume(newMsg);
    }
}
void StartConsuming()
{
    ct = new ctSource.Token;
    ConsumeMessages(ct);
}
void StopConsuming()
{
    if(ct.Exists)            // looking for correct syntax
    { ctSource.Cancel; }
}

如何实现多个消费者以确保

Msg
消费在延迟方面是最佳的?

c# channel system.threading.channels
2个回答
0
投票

可能不是确切的解决方案,但我建议使用 .NET 中的 System.Threading.Channels 命名空间实现一个健壮且高效的解决方案,确保线程安全、FIFO 处理、消费者的动态扩展以及消费者效率的监控。

您可以创建Producter

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Collections.Generic;
using System.Diagnostics;

public class MessageProcessor
{
    private readonly Channel<Msg> channel;
    private CancellationTokenSource ctSource;
    private List<Task> consumers;
    private int maxConsumerCount = 10;
    private TimeSpan monitorInterval = TimeSpan.FromSeconds(5);
    private Stopwatch stopwatch;

    public MessageProcessor()
    {
        channel = Channel.CreateUnbounded<Msg>();
        ctSource = new CancellationTokenSource();
        consumers = new List<Task>();
        stopwatch = new Stopwatch();
    }

    public void NewMessage(Msg newMsg)
    {
        if (!ctSource.Token.IsCancellationRequested)
        {
            channel.Writer.TryWrite(newMsg);
        }
    }

    public void Start()
    {
        ctSource = new CancellationTokenSource();
        StartConsumers(1);
        StartMonitor();
    }

    public void Stop()
    {
        ctSource.Cancel();
        Task.WhenAll(consumers).Wait();
    }

    private void StartConsumers(int count)
    {
        for (int i = 0; i < count; i++)
        {
            var consumer = Task.Run(async () =>
            {
                await foreach (var msg in channel.Reader.ReadAllAsync(ctSource.Token))
                {
                    ProcessMessage(msg);
                }
            }, ctSource.Token);

            consumers.Add(consumer);
        }
    }

    private void ProcessMessage(Msg msg)
    {
        Console.Writeln(msg.toString);
    }

    private void StartMonitor()
    {
        Task.Run(async () =>
        {
            while (!ctSource.Token.IsCancellationRequested)
            {
                await Task.Delay(monitorInterval, ctSource.Token);
                AdjustConsumersBasedOnLoad();
            }
        }, ctSource.Token);
    }

    private void AdjustConsumersBasedOnLoad()
    {
        var currentConsumerCount = consumers.Count;
        if (currentConsumerCount < maxConsumerCount)
        {
            StartConsumers(1);
        }
        // add more logic to reduce consumer if load is low..
        if (currentConsumerCount > 1 && ShouldReduceConsumers())
        {
            ReduceConsumers(1);
        }

     private bool ShouldReduceConsumers()
    {
        // Check if a certain percentage of consumers are idle
        int idleCount = consumers.Values.Count(idle => idle == false);
        return idleCount > currentConsumerCount / 2; // <----- Example condition
    }

    private void ReduceConsumers(int count)
    {
        var consumersToStop = consumers.Where(kvp => !kvp.Value)
                                       .Select(kvp => kvp.Key)
                                       .Take(count)
                                       .ToList();
        foreach (var consumer in consumersToStop)
        {
            consumers.Remove(consumer);
            // Signal the consumer to stop
            ((CancellationTokenSource)consumer.AsyncState).Cancel();
        }
    }
    }
}

用途:

public class Msg
{
    public string Content { get; set; }
    public DateTime Timestamp { get; set; }

    public Msg(string content)
    {
        Content = content;
        Timestamp = DateTime.UtcNow;
    }
}

class MyClass
{
    static async Task Main(string[] args)
    {
        var messageProcessor = new MessageProcessor();
        messageProcessor.Start();
        for (int i = 0; i < 100; i++)
        {
            var messageContent = $"Message {i}";
            messageProcessor.NewMessage(new Msg(messageContent));
            await Task.Delay(10);
        }
        messageProcessor.Stop();
    }
}


0
投票

使用渠道比这容易得多。不需要字段,只有当我们想停止生产、消费

丢弃所有挂起的消息时才需要 CancellationToken

制作人所需要做的就是返回一个

ChannelReader
,仅此而已 :

ChannelReader<Msg> Produce(CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Msg>();
    var writer=channel.Writer;

    //Emulate an actual producer
    _ = Task.Run(async ()=>{
        while(!token.IsCancellationRequested)
        {
            //Sending may be blocked if we use a Bounded channel that's full
            await writer.SendAsync( new Msg(....),token);
        }
    },token)
    //Ensure we `Complete` even in case of error, propagating the error
    .ContinueWith(t=>{
        writer.TryComplete(t.Exception);
    });
    return channel.Reader;
}

Channel
保留在
Produce
中意味着该方法可以完全控制其生命周期、完成以及可能的错误处理。这简化了多个步骤的使用或组合,变得更容易*。

消费者不需要访问通道本身,只需要访问者。

async Task Consume(ChannelReader<Msg> reader, CancellationToken token=default)
{
    await foreach(var msg in reader.ReadAllAsync(token))
    {
        //Extra check if we want to exit rearly
        if (token.IsCancellationRequested) return;

        //Process the message
        ....
    }
}
```

This allows for easy composition:

```
var reader=Produce();
await Consume(reader);
```
© www.soinside.com 2019 - 2024. All rights reserved.