C# .NET Core 数据流模式单元测试不等待操作完成

问题描述 投票:0回答:1

有一个 .NET Core Worker Service,它具有用于执行并行过程的数据流模式。它有 TransformBlock 和 ActionBlock。

在运行 Worker 进程时,TransformBlock 和 ActionBlock 都会执行并返回结果,但 XUnit Test 运行只执行 TransformBlock 而不会等待 ActionBlock。

我尝试为 ActionBlock 添加等待,但在该语句之后并没有继续。

请指教

public class ServiceDataflow(IConfiguration configuration,
          ILogger<WorkerServiceDataflow> logger,
          IEService eService,
          IOptions<ENSettings> enSettings) : IServiceDataflow
{
    private enSettings _enSettings { get; } = enSettings.Value;

    public async Task<DataTable> StartProcessing(IEnumerable<ENQueue> items)
    {
        DataTable Log = new DataTable();
        Log.Columns.Add("Colum1", typeof(string));
        Log.Columns.Add("Colum2", typeof(Decimal));

        // Define dataflow blocks
        var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
        {
            // Perform asynchronous processing on the item
            try
            {
                var udTemplate = ReplacePlaceholder(email);
                string name = string.Format("{0} {1}", email.Last_Name, email.First_Name);
                await emailService.SendEmail(email.Address, email.Subject, udTemplate, name, _enSettings.IsSandBox);
                return new Result { ProcessedData = email, Status = "Success", IsProcessed = true };
            }
            catch (Exception ex)
            {
                var exceptionMsg = string.Format("ServiceParallel: Error in TransformBlock: {0} - {1} - {2}", DateTime.Now, email.Address, ex.Message);
                logger.LogError(ex, exceptionMsg);
                return new Result { ProcessedData = email, Status = "Error" , IsProcessed = false};
            }
        });

        var consumerBlock = new ActionBlock<IResult>(result =>
        {
            // Process the result (e.g., store in database, log)
            if (result.IsProcessed)
            {
                // Access the result if the task returned a value
                DataRow newRow = Log.NewRow();
                newRow["Column1"] = result.ProcessedData?.Address;
                newRow["Column2"] = result.ProcessedData?.TemplateId;
                 
                Log.Rows.Add(newRow);
                logger.LogInformation("Email Send at: {0} Recipient: {1} status{2} ", DateTimeOffset.Now, result.ProcessedData?.Address, result.Status);                   
            }
            else
            {
                logger.LogInformation("Error processing email at: {0} Recipient: {1}", DateTimeOffset.Now, result.ProcessedData?.Address);
            }
        });

        // Link the blocks
        processingBlock.LinkTo(consumerBlock);

        // Post items for processing
        foreach (var item in items)
        {
            await processingBlock.SendAsync(item);
        }

        // Signal completion of adding items
        processingBlock.Complete();

        // Wait for processing to finish using Task.WaitAll
        var completionTask = Task.WhenAll(processingBlock.Completion);

        await completionTask;
        return Log;
    }
}

单元测试

public class ServiceDataflowTest : ServiceDataflowFixture
{
    [Fact]
    public async Task StartProcessing_ProcessesItemsSuccessfully_ReturnsDataTable()
    {
        try
        {
            // Mock dependencies
            _mockOptions.Setup(m => m.Value).Returns(new EmailNotificationSettings { IsSandBox = true, 
                UnSubscribeURL= "https://serverurl/Unsubscribe" ,
                ProcessUser = "Test1",
                url = "https://serverurl"
            });

            // Setup for LogInformation with any message and arguments
            _mockLogger.Setup(m => m.Log(
              It.IsAny<LogLevel>(),
              It.IsAny<EventId>(),
              It.IsAny<object>(), // Capture any object passed as state
              It.IsAny<Exception>(),
              It.IsAny<Func<object, Exception, string>>() // Don't care about formatter
            )).Verifiable();

            _mockEService.Setup(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()))
.Returns(Task.FromResult(new SendGrid.Response(HttpStatusCode.OK, null, null))); // Assuming this constructor exists

            // Sample data
            var sampleItem = new ENQueue
            {
                Email_Address = "[email protected]",
                Subject = "qwqwqw",
            };
            var items = new List<ENQueue>() { sampleItem };

            // Create the Dataflow instance
            var dataflow = new ServiceDataflow(_mockConfiguration.Object, _mockLogger.Object,
                                        _mockEService.Object, _mockOptions.Object);

            // Call the method under test
            var dataTable = await dataflow.StartProcessing(items);

            // Assertions
            Assert.NotNull(dataTable);
            Assert.Single(dataTable.Rows); // Expect one row for the processed item
            /*                Assert.Equal("[email protected]", dataTable.Rows[0]["Column1"]);
                            Assert.Equal(123, dataTable.Rows[0]["Column2"]);
                            Assert.Equal(123, dataTable.Rows[0]["Column3"]);*/

            // Verify eService.Send was called with expected arguments
            _mockEService.Verify(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()), Times.Once);
            //_mockLogger.Verify(m => m.LogInformation(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()), Times.AtLeastOnce);

            _mockLogger.Verify();
        }
        catch (Exception ex) { }
    }
}

上面的单元测试在下面一行

var dataTable = await dataflow.StartProcessing(items);

它返回到

ActionBlock
执行之前,但是如果我运行辅助服务,它就会返回。

请指教。

c# unit-testing .net-core xunit.net tpl-dataflow
1个回答
1
投票

将完成从

processingBlock
传播到
consumerBlock
。然后就只等
consumerBlock
完成即可。

processingBlock.LinkTo(
     consumerBlock, 
     new DataflowLinkOptions() { PropagateCompletion = true }
);
foreach (var item in items)
{
    await processingBlock.SendAsync(item);
}
processingBlock.Complete();
await consumerBlock.Completion;

您还提到“并行过程”,但电子邮件发送不是并行执行的。为此,您需要在

MaxDegreeOfParallelism
上设置
ExecutionDataflowBlockOptions
。您不妨也将
EnsureOrdered
设置为
false

var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
{
    // Perform asynchronous processing on the item
}, new ExecutionDataflowBlockOptions
{
    EnsureOrdered = false,
    MaxDegreeOfParallelism = 10,
});
© www.soinside.com 2019 - 2024. All rights reserved.