在订阅方法中获取日志后,我想调用异步函数,但是Subscribe函数只需要Action
如何在此订阅中使用await关键字?
代码示例:
public static async Task GetLogsTokenTransfer_Observable_Subscription()
{
using(var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws"))
{
var filterTransfers = Event<TransferEventDTO>.GetEventABI().CreateFilterInput();
var subscription = new EthLogsObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(log =>
{
var decoded = Event<TransferEventDTO>.DecodeEvent(log);
if (decoded != null)
{
MyAsyncMethodHere(); // Can not use await keyword !!!!
}
});
await client.StartAsync();
await subscription.SubscribeAsync(filterTransfers);
await Task.Delay(TimeSpan.FromMinutes(1));
await subscription.UnsubscribeAsync();
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
我不确定这是最好的方法,但在我的应用程序中,我正在使用
Observable.FromAsync
。
像这样:
subscription.GetSubscriptionDataResponsesAsObservable()
.Select(log => Observable.FromAsync(async () => {
await ProcessEvent(log);
}))
.Concat()
.Subscribe();
Concat
方法在这里非常重要,因为它确保任务执行中不会重叠
没有什么可以阻止您在观察者中使用async操作。
observable.Subscribe(async data =>
{
Console.WriteLine("Data received");
await Task.Delay(1000);
Console.WriteLine("Data processed");
});