按组在单独的线程中运行处理

问题描述 投票:0回答:1

我正在尝试在我的Kafka用户中使用Rx。

public static event EventHandler<ConsumeResult<string, string>> GenericEvent;

然后我有以下代码

var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
     x => GenericEvent += x,
     x => GenericEvent -= x)
  .Select(x => x.EventArgs);

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
    GenericEvent(consumeResult.Topic, consumeResult);
}

然后在我喜欢的地方使用

var subscription = observable.Subscribe(message =>
{
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");

    //_kafkaConsumer.Commit(messages);
});

是否有可能按主题名称(consumeResult.Topic)运行分离的线程?消费者收到消息时,应按主题将其重定向到相应的线程

c# asynchronous async-await system.reactive rx.net
1个回答
0
投票

尝试一下:

Observable
    .Interval(TimeSpan.FromSeconds(0.1))
    .Take(20)
    .GroupBy(x => x % 3)
    .SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
    .Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));

这将确保在new EventLoopScheduler()调度程序中为GroupBy运算符创建的每个内部可观察对象创建一个线程。 SelectMany使组平坦化,但保持每个组的EventLoopScheduler关联。

在您的情况下,您是GroupBy consumeResult.Topic属性。

请确保您的源可观察项终止,因为线程永远存在直到它们继续存在为止。在订阅上调用Dispose()足以结束可观察到的状态。

© www.soinside.com 2019 - 2024. All rights reserved.