我正在尝试在我的Kafka用户中使用Rx。
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
然后我有以下代码
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
然后在我喜欢的地方使用
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
是否有可能按主题名称(consumeResult.Topic
)运行分离的线程?消费者收到消息时,应按主题将其重定向到相应的线程
尝试一下:
Observable
.Interval(TimeSpan.FromSeconds(0.1))
.Take(20)
.GroupBy(x => x % 3)
.SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
.Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
这将确保在new EventLoopScheduler()
调度程序中为GroupBy
运算符创建的每个内部可观察对象创建一个线程。 SelectMany
使组平坦化,但保持每个组的EventLoopScheduler
关联。
在您的情况下,您是GroupBy
consumeResult.Topic
属性。
请确保您的源可观察项终止,因为线程永远存在直到它们继续存在为止。在订阅上调用Dispose()
足以结束可观察到的状态。