有一个 .NET Core Worker Service,它具有用于执行并行过程的数据流模式。它有 TransformBlock 和 ActionBlock。
在运行 Worker 进程时,TransformBlock 和 ActionBlock 都会执行并返回结果,但 XUnit Test 运行只执行 TransformBlock 而不会等待 ActionBlock。
我尝试为 ActionBlock 添加等待,但在该语句之后并没有继续。
请指教
public class ServiceDataflow(IConfiguration configuration,
ILogger<WorkerServiceDataflow> logger,
IEService eService,
IOptions<ENSettings> enSettings) : IServiceDataflow
{
private enSettings _enSettings { get; } = enSettings.Value;
public async Task<DataTable> StartProcessing(IEnumerable<ENQueue> items)
{
DataTable Log = new DataTable();
Log.Columns.Add("Colum1", typeof(string));
Log.Columns.Add("Colum2", typeof(Decimal));
// Define dataflow blocks
var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
{
// Perform asynchronous processing on the item
try
{
var udTemplate = ReplacePlaceholder(email);
string name = string.Format("{0} {1}", email.Last_Name, email.First_Name);
await emailService.SendEmail(email.Address, email.Subject, udTemplate, name, _enSettings.IsSandBox);
return new Result { ProcessedData = email, Status = "Success", IsProcessed = true };
}
catch (Exception ex)
{
var exceptionMsg = string.Format("ServiceParallel: Error in TransformBlock: {0} - {1} - {2}", DateTime.Now, email.Address, ex.Message);
logger.LogError(ex, exceptionMsg);
return new Result { ProcessedData = email, Status = "Error" , IsProcessed = false};
}
});
var consumerBlock = new ActionBlock<IResult>(result =>
{
// Process the result (e.g., store in database, log)
if (result.IsProcessed)
{
// Access the result if the task returned a value
DataRow newRow = Log.NewRow();
newRow["Column1"] = result.ProcessedData?.Address;
newRow["Column2"] = result.ProcessedData?.TemplateId;
Log.Rows.Add(newRow);
logger.LogInformation("Email Send at: {0} Recipient: {1} status{2} ", DateTimeOffset.Now, result.ProcessedData?.Address, result.Status);
}
else
{
logger.LogInformation("Error processing email at: {0} Recipient: {1}", DateTimeOffset.Now, result.ProcessedData?.Address);
}
});
// Link the blocks
processingBlock.LinkTo(consumerBlock);
// Post items for processing
foreach (var item in items)
{
await processingBlock.SendAsync(item);
}
// Signal completion of adding items
processingBlock.Complete();
// Wait for processing to finish using Task.WaitAll
var completionTask = Task.WhenAll(processingBlock.Completion);
await completionTask;
return Log;
}
}
单元测试
public class ServiceDataflowTest : ServiceDataflowFixture
{
[Fact]
public async Task StartProcessing_ProcessesItemsSuccessfully_ReturnsDataTable()
{
try
{
// Mock dependencies
_mockOptions.Setup(m => m.Value).Returns(new EmailNotificationSettings { IsSandBox = true,
UnSubscribeURL= "https://serverurl/Unsubscribe" ,
ProcessUser = "Test1",
url = "https://serverurl"
});
// Setup for LogInformation with any message and arguments
_mockLogger.Setup(m => m.Log(
It.IsAny<LogLevel>(),
It.IsAny<EventId>(),
It.IsAny<object>(), // Capture any object passed as state
It.IsAny<Exception>(),
It.IsAny<Func<object, Exception, string>>() // Don't care about formatter
)).Verifiable();
_mockEService.Setup(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()))
.Returns(Task.FromResult(new SendGrid.Response(HttpStatusCode.OK, null, null))); // Assuming this constructor exists
// Sample data
var sampleItem = new ENQueue
{
Email_Address = "[email protected]",
Subject = "qwqwqw",
};
var items = new List<ENQueue>() { sampleItem };
// Create the Dataflow instance
var dataflow = new ServiceDataflow(_mockConfiguration.Object, _mockLogger.Object,
_mockEService.Object, _mockOptions.Object);
// Call the method under test
var dataTable = await dataflow.StartProcessing(items);
// Assertions
Assert.NotNull(dataTable);
Assert.Single(dataTable.Rows); // Expect one row for the processed item
/* Assert.Equal("[email protected]", dataTable.Rows[0]["Column1"]);
Assert.Equal(123, dataTable.Rows[0]["Column2"]);
Assert.Equal(123, dataTable.Rows[0]["Column3"]);*/
// Verify eService.Send was called with expected arguments
_mockEService.Verify(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()), Times.Once);
//_mockLogger.Verify(m => m.LogInformation(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()), Times.AtLeastOnce);
_mockLogger.Verify();
}
catch (Exception ex) { }
}
}
上面的单元测试在下面一行
var dataTable = await dataflow.StartProcessing(items);
它返回到
ActionBlock
执行之前,但是如果我运行辅助服务,它就会返回。
请指教。
将完成从
processingBlock
传播到 consumerBlock
。然后就只等consumerBlock
完成即可。
processingBlock.LinkTo(
consumerBlock,
new DataflowLinkOptions() { PropagateCompletion = true }
);
foreach (var item in items)
{
await processingBlock.SendAsync(item);
}
processingBlock.Complete();
await consumerBlock.Completion;
您还提到“并行过程”,但电子邮件发送不是并行执行的。为此,您需要在
MaxDegreeOfParallelism
上设置 ExecutionDataflowBlockOptions
。您不妨也将 EnsureOrdered
设置为 false
。
var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
{
// Perform asynchronous processing on the item
}, new ExecutionDataflowBlockOptions
{
EnsureOrdered = false,
MaxDegreeOfParallelism = 10,
});