如何在 Nethereum StreamingWebSocketClient 订阅中使用异步函数

问题描述 投票:0回答:2

在订阅方法中获取日志后,我想调用异步函数,但是Subscribe函数只需要Action,所以使用async-await关键字是不可能的。

如何在此订阅中使用await关键字?

代码示例:

public static async Task GetLogsTokenTransfer_Observable_Subscription()
    {
        using(var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws"))
        { 
            var filterTransfers = Event<TransferEventDTO>.GetEventABI().CreateFilterInput();
            var subscription = new EthLogsObservableSubscription(client);

            subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(log =>
            {

                var decoded = Event<TransferEventDTO>.DecodeEvent(log);
                if (decoded != null)
                {
                    MyAsyncMethodHere(); // Can not use await keyword !!!!
                }
            });

            await client.StartAsync();
            await subscription.SubscribeAsync(filterTransfers);
            await Task.Delay(TimeSpan.FromMinutes(1));
            await subscription.UnsubscribeAsync();
            await Task.Delay(TimeSpan.FromSeconds(5));
        }
    }
c# websocket async-await ethereum nethereum
2个回答
1
投票

我不确定这是最好的方法,但在我的应用程序中,我正在使用

Observable.FromAsync
。 像这样:

subscription.GetSubscriptionDataResponsesAsObservable()
   .Select(log => Observable.FromAsync(async () => {
       await ProcessEvent(log);
   }))
   .Concat()
   .Subscribe();

Concat
方法在这里非常重要,因为它确保任务执行中不会重叠


0
投票

没有什么可以阻止您在观察者中使用async操作。

        observable.Subscribe(async data =>
        {
            Console.WriteLine("Data received");

            await Task.Delay(1000);

            Console.WriteLine("Data processed");
        });
© www.soinside.com 2019 - 2024. All rights reserved.