我正在使用 C# 开发一个视频处理应用程序,其中捕获帧、处理对象检测(使用 YOLO),然后显示。由于异步处理的性质和 ConcurrentQueue 的 FIFO 结构,我在维护正确的帧顺序方面遇到了挑战。
连续捕获帧并将其添加到 ConcurrentQueue 中。 每个 FrameData 对象都包含一个 OpenCV Mat 框架和一个边界框列表(最初为空)。 在处理任务中,我将一个帧出队,运行 YOLO 来获取边界框,然后用这些边界框更新帧。 更新后的帧需要放回到队列中的原始位置。
public class FrameData
{
public Mat Frame { get; set; }
public List<Dictionary<string, object>> BoundingBoxes { get; set; }
public FrameData(Mat frame, List<Dictionary<string, object>> boundingBoxes = null)
{
Frame = frame;
BoundingBoxes = boundingBoxes;
}
}
private ConcurrentQueue<FrameData> frameBuffer = new ConcurrentQueue<FrameData>();
public void StartCapture()
{
if (videoCaptures != null)
{
cancellationTokenSource = new CancellationTokenSource();
CancellationToken token = cancellationTokenSource.Token;
Task updateFrame = UpdateFrame(token);
Task processFrame = ProcessFrame(token);
if (RunODModel && ObjectDetection)
{
}
}
}
public async Task UpdateFrame(CancellationToken token)
{
if (isCapturing && !isPaused)
{
return; // Video is already playing, so just return
}
var capture = videoCaptures;
const int maxBufferSize = 30;
isCapturing = true;
isPaused = false;
while (isCapturing && capture.IsOpened() && !token.IsCancellationRequested)
{
Mat frame = new Mat();
if (isPaused)
{
await Task.Delay(100); // Wait a bit before checking again.
continue;
}
if (capture.Read(frame) && !frame.Empty())
{
frameBuffer.Enqueue(new FrameData(frame)); // Enqueue with no bounding boxes
// Display and dispose of the oldest frame if buffer size is equal to or exceeds 20
if (frameBuffer.Count >= 20)
{
if (frameBuffer.TryDequeue(out FrameData oldestFrameData))
{
var bitmapSource = WriteableBitmapConverter.ToWriteableBitmap(oldestFrameData.Frame);
imageControl.Source = bitmapSource; // Assuming imageControl is accessible here
}
}
if (frameBuffer.Count >= maxBufferSize)
{
frameBuffer.TryDequeue(out _); // Remove the oldest frame if buffer is full
}
}
else
{
frame.Dispose();
}
Console.WriteLine($"Frame buffer count: {frameBuffer.Count}");
// Add delay if needed to control frame rate
await Task.Delay(33); // For example, delay for ~30fps
}
// Cleanup: Dispose of any frames left in the buffer
while (!frameBuffer.IsEmpty)
{
if (frameBuffer.TryDequeue(out FrameData remainingFrameData))
{
remainingFrameData.Frame.Dispose(); // Dispose the Mat object within the FrameData
}
}
}
public async Task ProcessFrame(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (frameBuffer.TryDequeue(out FrameData frameData))
{
List<Dictionary<string, object>> boundingBoxes = await RunYolo(frameData.Frame);
frameData.BoundingBoxes = boundingBoxes; // Add bounding boxes to the frame data
frameBuffer.Enqueue(frameData); // Enqueue frame data back to the buffer
}
else
{
await Task.Delay(10);
}
}
}
private async Task<List<Dictionary<string, object>>> RunYolo(Mat frame)
{
const string InferenceRequest = "/process";
const int YoloInputSize = 640; // replace with actual input size
// Convert the OpenCV Mat to a byte array in PNG format
var byteContent = frame.ToBytes(".png");
using var content = new MultipartFormDataContent
{
{ new ByteArrayContent(byteContent), "image_file", "frame.png" },
{ new StringContent("iris"), "model_name" },
{ new StringContent(YoloInputSize.ToString()), "img_size" },
{ new StringContent(ObjectTracking ? "true" : "false"), "runFaceRec" },
{ new StringContent(yoloAccuracy.ToString()), "yoloAccuracy" }, // Add frame rate to the content
{ new StringContent(oTFDAccuracy.ToString()), "oTFDAccuracy" }, // Add frame rate to the content
{ new StringContent(oTFRAccuracy.ToString()), "oTFRAccuracy" }, // Add frame rate to the content
{ new StringContent(ObjectTracking ? "true" : "false"), "apply_tracking" },
};
var response = await client.PostAsync(ProcUrl + InferenceRequest, content);
if (!response.IsSuccessStatusCode)
{
throw new HttpRequestException($"Response status code does not indicate success: {response.StatusCode} ({response.ReasonPhrase}).");
}
var jsonResponse = await response.Content.ReadAsStringAsync();
var resultsList = JsonConvert.DeserializeObject<List<Dictionary<string, object>>>(jsonResponse);
return resultsList;
}
由于
ConcurrentQueue
是一个 FIFO
结构,重新排队已处理的帧会将其放置在队列末尾,从而打乱原始捕获顺序。
这会导致帧不按照捕获顺序显示的问题,从而导致视频播放中的视觉不一致。
使用
List<FrameData> with Locks
:我尝试用List替换ConcurrentQueue并使用锁来保证线程安全。虽然这允许直接访问最新的帧,但它引入了管理线程安全和 potential performance bottlenecks due to locking
的复杂性。
Double-Ended Queue (Deque)
:我考虑过使用双端队列来访问队列的两端。但是,.NET 不提供内置双端队列,使用第三方库或自定义实现可能会使情况变得复杂。
带有字典的原子计数器:另一种方法是使用由每个帧的原子计数器(序列号)键控的 ConcurrentDictionary。虽然这允许无序处理帧并按顺序显示它们,但它显着增加了帧管理和同步的复杂性。
如何有效地处理最新的帧,使用边界框更新它,然后将其放回队列中的原始位置,同时保持正确的显示顺序? C# 中是否有任何特定的数据结构或设计模式可以简化此场景?
private ConcurrentDictionary<int, FrameData> frameBuffer = new ConcurrentDictionary<int, FrameData>();
private int frameCounter = 0; // Atomic counter for frames
private const int maxBufferSize = 30; // Define the maximum buffer size
private const int displayThreshold = 20; // Threshold for displaying the oldest frame
public class FrameData
{
public Mat Frame { get; set; }
public List<Dictionary<string, object>> BoundingBoxes { get; set; }
public FrameData(Mat frame, List<Dictionary<string, object>> boundingBoxes = null)
{
Frame = frame;
BoundingBoxes = boundingBoxes;
}
}
public void StartCapture()
{
if (videoCaptures != null)
{
cancellationTokenSource = new CancellationTokenSource();
CancellationToken token = cancellationTokenSource.Token;
Task updateFrame = UpdateFrame(token);
Task processFrame = ProcessFrame(token);
if (RunODModel && ObjectDetection)
{
}
}
}
public async Task UpdateFrame(CancellationToken token)
{
if (isCapturing && !isPaused)
{
return; // Video is already playing, so just return
}
var capture = videoCaptures;
isCapturing = true;
isPaused = false;
while (isCapturing && capture.IsOpened() && !token.IsCancellationRequested)
{
Mat frame = new Mat();
if (isPaused)
{
await Task.Delay(100); // Wait a bit before checking again.
continue;
}
if (capture.Read(frame) && !frame.Empty())
{
int currentFrameNumber = Interlocked.Increment(ref frameCounter);
frameBuffer[currentFrameNumber] = new FrameData(frame); // Add frame to the buffer
// Display and dispose of the oldest frame if buffer size reaches the display threshold
if (frameBuffer.Count >= displayThreshold)
{
int oldestFrameKey = frameBuffer.Keys.Min(); // Get the oldest frame's key
if (frameBuffer.TryRemove(oldestFrameKey, out FrameData oldestFrameData))
{
// Display the frame
var bitmapSource = WriteableBitmapConverter.ToWriteableBitmap(oldestFrameData.Frame);
imageControl.Source = bitmapSource; // Assuming imageControl is accessible here
// Dispose of the frame
oldestFrameData.Frame.Dispose();
}
}
// Remove the oldest frame if buffer size exceeds the maximum limit
if (frameBuffer.Count > maxBufferSize)
{
int oldestFrameKey = frameBuffer.Keys.Min(); // Get the oldest frame's key
frameBuffer.TryRemove(oldestFrameKey, out _);
}
Debug.WriteLine(frameCounter);
}
else
{
frame.Dispose();
}
Console.WriteLine($"Frame buffer count: {frameBuffer.Count}");
// Add delay if needed to control frame rate
await Task.Delay(33); // For example, delay for ~30fps
}
// Cleanup: Dispose of any frames left in the buffer
foreach (var frameData in frameBuffer.Values)
{
frameData.Frame.Dispose();
}
frameBuffer.Clear();
}
public async Task ProcessFrame(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
int latestFrameKey = frameBuffer.Keys.Max(); // Get the newest frame's key
if (frameBuffer.TryRemove(latestFrameKey, out FrameData frameData))
{
List<Dictionary<string, object>> boundingBoxes = await RunYolo(frameData.Frame);
frameData.BoundingBoxes = boundingBoxes;
frameBuffer.TryAdd(latestFrameKey, frameData); // Update the frame back at the same position
}
else
{
await Task.Delay(10); // Delay if no frame is available or processing failed
}
}
}
我的建议是使用DataFlow。这允许您设置一个处理管道,您可以在其中配置并行性。
一个非常简单的管道可能看起来像这样:
processBlock = new TransformBlock<FrameData , FrameData >(
DoProcessing,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxThreads,
BoundedCapacity = maxThreads*2,
});
outputBlock = new ActionBlock<FrameData >(
HandleResult, new ExecutionDataflowBlockOptions()
{
// this needs to run on the UI thread to get the correct synchronization context
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
processBlock.LinkTo(outputBlock, linkOptions);
...
public FrameData DoProcessing(FrameData input){
...
}
public void HandleResult(FrameData input){
// On UI thread
...
}
这将并行处理frameData,同时按原始顺序序列化结果以在UI线程上更新。只需执行
processBlock.Post(...)
即可添加框架。这是使用“BoundedCapacity”设置的,如果处理无法跟上,则会丢弃帧,这在您的特定场景中可能需要也可能不需要。