在具有边界框更新的并发处理场景中处理帧顺序

问题描述 投票:0回答:1

简介:

我正在使用 C# 开发一个视频处理应用程序,其中捕获帧、处理对象检测(使用 YOLO),然后显示。由于异步处理的性质和 ConcurrentQueue 的 FIFO 结构,我在维护正确的帧顺序方面遇到了挑战。

场景:

连续捕获帧并将其添加到 ConcurrentQueue 中。 每个 FrameData 对象都包含一个 OpenCV Mat 框架和一个边界框列表(最初为空)。 在处理任务中,我将一个帧出队,运行 YOLO 来获取边界框,然后用这些边界框更新帧。 更新后的帧需要放回到队列中的原始位置。

代码片段:

public class FrameData
        {
            public Mat Frame { get; set; }
            public List<Dictionary<string, object>> BoundingBoxes { get; set; }

            public FrameData(Mat frame, List<Dictionary<string, object>> boundingBoxes = null)
            {
                Frame = frame;
                BoundingBoxes = boundingBoxes;
            }
        }


        private ConcurrentQueue<FrameData> frameBuffer = new ConcurrentQueue<FrameData>();

        public void StartCapture()
        {
            if (videoCaptures != null)
            {
                cancellationTokenSource = new CancellationTokenSource();
                CancellationToken token = cancellationTokenSource.Token;
                Task updateFrame = UpdateFrame(token);
                Task processFrame = ProcessFrame(token);

                if (RunODModel && ObjectDetection)
                {
                    
                }
            }

        }

        public async Task UpdateFrame(CancellationToken token)
        {
            if (isCapturing && !isPaused)
            {
                return; // Video is already playing, so just return
            }
            var capture = videoCaptures;
            const int maxBufferSize = 30;
            isCapturing = true;
            isPaused = false;

            while (isCapturing && capture.IsOpened() && !token.IsCancellationRequested)
            {
                Mat frame = new Mat();
                if (isPaused)
                {
                    await Task.Delay(100);  // Wait a bit before checking again.
                    continue;
                }

                if (capture.Read(frame) && !frame.Empty())
                {
                    frameBuffer.Enqueue(new FrameData(frame)); // Enqueue with no bounding boxes

                    // Display and dispose of the oldest frame if buffer size is equal to or exceeds 20
                    if (frameBuffer.Count >= 20)
                    {
                        if (frameBuffer.TryDequeue(out FrameData oldestFrameData))
                        {
                            var bitmapSource = WriteableBitmapConverter.ToWriteableBitmap(oldestFrameData.Frame);
                            imageControl.Source = bitmapSource; // Assuming imageControl is accessible here

                        }
                    }
                    if (frameBuffer.Count >= maxBufferSize)
                    {
                        frameBuffer.TryDequeue(out _); // Remove the oldest frame if buffer is full
                    }
                }
                else
                {
                    frame.Dispose();
                }

                Console.WriteLine($"Frame buffer count: {frameBuffer.Count}");

                // Add delay if needed to control frame rate
                await Task.Delay(33); // For example, delay for ~30fps
            }

            // Cleanup: Dispose of any frames left in the buffer
            while (!frameBuffer.IsEmpty)
            {
                if (frameBuffer.TryDequeue(out FrameData remainingFrameData))
                {
                    remainingFrameData.Frame.Dispose(); // Dispose the Mat object within the FrameData
                }
            }
        }


        public async Task ProcessFrame(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                if (frameBuffer.TryDequeue(out FrameData frameData))
                {
                    List<Dictionary<string, object>> boundingBoxes = await RunYolo(frameData.Frame);
                    frameData.BoundingBoxes = boundingBoxes; // Add bounding boxes to the frame data

                    frameBuffer.Enqueue(frameData); // Enqueue frame data back to the buffer
                }
                else
                {
                    await Task.Delay(10);
                }
            }
        }


        private async Task<List<Dictionary<string, object>>> RunYolo(Mat frame)
        {
            const string InferenceRequest = "/process";
            const int YoloInputSize = 640;  // replace with actual input size

            // Convert the OpenCV Mat to a byte array in PNG format
            var byteContent = frame.ToBytes(".png");
            using var content = new MultipartFormDataContent
            {
                { new ByteArrayContent(byteContent), "image_file", "frame.png" },
                { new StringContent("iris"), "model_name" },
                { new StringContent(YoloInputSize.ToString()), "img_size" },
                { new StringContent(ObjectTracking ? "true" : "false"), "runFaceRec" },
                { new StringContent(yoloAccuracy.ToString()), "yoloAccuracy" }, // Add frame rate to the content
                { new StringContent(oTFDAccuracy.ToString()), "oTFDAccuracy" }, // Add frame rate to the content
                { new StringContent(oTFRAccuracy.ToString()), "oTFRAccuracy" }, // Add frame rate to the content
                { new StringContent(ObjectTracking ? "true" : "false"), "apply_tracking" },
            };

            var response = await client.PostAsync(ProcUrl + InferenceRequest, content);
            if (!response.IsSuccessStatusCode)
            {
                throw new HttpRequestException($"Response status code does not indicate success: {response.StatusCode} ({response.ReasonPhrase}).");
            }
            var jsonResponse = await response.Content.ReadAsStringAsync();
            var resultsList = JsonConvert.DeserializeObject<List<Dictionary<string, object>>>(jsonResponse);
            return resultsList;
        }

挑战:

由于

ConcurrentQueue
是一个
FIFO
结构,重新排队已处理的帧会将其放置在队列末尾,从而打乱原始捕获顺序。 这会导致帧不按照捕获顺序显示的问题,从而导致视频播放中的视觉不一致。

尝试:

使用

List<FrameData> with Locks
:我尝试用List替换ConcurrentQueue并使用锁来保证线程安全。虽然这允许直接访问最新的帧,但它引入了管理线程安全和 p
otential performance bottlenecks due to locking
的复杂性。

Double-Ended Queue (Deque)
:我考虑过使用双端队列来访问队列的两端。但是,.NET 不提供内置双端队列,使用第三方库或自定义实现可能会使情况变得复杂。

带有字典的原子计数器:另一种方法是使用由每个帧的原子计数器(序列号)键控的 ConcurrentDictionary。虽然这允许无序处理帧并按顺序显示它们,但它显着增加了帧管理和同步的复杂性。

问题:

如何有效地处理最新的帧,使用边界框更新它,然后将其放回队列中的原始位置,同时保持正确的显示顺序? C# 中是否有任何特定的数据结构或设计模式可以简化此场景?

使用原子计数器和字典重新尝试逻辑(根据 Shingo):

private ConcurrentDictionary<int, FrameData> frameBuffer = new ConcurrentDictionary<int, FrameData>();
private int frameCounter = 0;  // Atomic counter for frames
private const int maxBufferSize = 30;  // Define the maximum buffer size
private const int displayThreshold = 20;  // Threshold for displaying the oldest frame


        public class FrameData
        {
            public Mat Frame { get; set; }
            public List<Dictionary<string, object>> BoundingBoxes { get; set; }

            public FrameData(Mat frame, List<Dictionary<string, object>> boundingBoxes = null)
            {
                Frame = frame;
                BoundingBoxes = boundingBoxes;
            }
        }


       
        public void StartCapture()
        {
            if (videoCaptures != null)
            {
                cancellationTokenSource = new CancellationTokenSource();
                CancellationToken token = cancellationTokenSource.Token;
                Task updateFrame = UpdateFrame(token);
                Task processFrame = ProcessFrame(token);

                if (RunODModel && ObjectDetection)
                {
                    
                }
            }

        }

        public async Task UpdateFrame(CancellationToken token)
        {
            if (isCapturing && !isPaused)
            {
                return; // Video is already playing, so just return
            }

            var capture = videoCaptures;
            isCapturing = true;
            isPaused = false;

            while (isCapturing && capture.IsOpened() && !token.IsCancellationRequested)
            {
                Mat frame = new Mat();
                if (isPaused)
                {
                    await Task.Delay(100);  // Wait a bit before checking again.
                    continue;
                }

                if (capture.Read(frame) && !frame.Empty())
                {
                    int currentFrameNumber = Interlocked.Increment(ref frameCounter);
                    frameBuffer[currentFrameNumber] = new FrameData(frame); // Add frame to the buffer

                    // Display and dispose of the oldest frame if buffer size reaches the display threshold
                    if (frameBuffer.Count >= displayThreshold)
                    {
                        int oldestFrameKey = frameBuffer.Keys.Min(); // Get the oldest frame's key
                        if (frameBuffer.TryRemove(oldestFrameKey, out FrameData oldestFrameData))
                        {
                            // Display the frame
                            var bitmapSource = WriteableBitmapConverter.ToWriteableBitmap(oldestFrameData.Frame);
                            imageControl.Source = bitmapSource; // Assuming imageControl is accessible here

                            // Dispose of the frame
                            oldestFrameData.Frame.Dispose();
                        }
                    }

                    // Remove the oldest frame if buffer size exceeds the maximum limit
                    if (frameBuffer.Count > maxBufferSize)
                    {
                        int oldestFrameKey = frameBuffer.Keys.Min(); // Get the oldest frame's key
                        frameBuffer.TryRemove(oldestFrameKey, out _);
                    }
                    Debug.WriteLine(frameCounter);
                }
                else
                {
                    frame.Dispose();
                }

                Console.WriteLine($"Frame buffer count: {frameBuffer.Count}");

                // Add delay if needed to control frame rate
                await Task.Delay(33); // For example, delay for ~30fps
            }

            // Cleanup: Dispose of any frames left in the buffer
            foreach (var frameData in frameBuffer.Values)
            {
                frameData.Frame.Dispose();
            }
            frameBuffer.Clear();
        }


        public async Task ProcessFrame(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                int latestFrameKey = frameBuffer.Keys.Max(); // Get the newest frame's key
                if (frameBuffer.TryRemove(latestFrameKey, out FrameData frameData))
                {
                    List<Dictionary<string, object>> boundingBoxes = await RunYolo(frameData.Frame);
                    frameData.BoundingBoxes = boundingBoxes;

                    frameBuffer.TryAdd(latestFrameKey, frameData); // Update the frame back at the same position
                }
                else
                {
                    await Task.Delay(10); // Delay if no frame is available or processing failed
                }
            }
        }
c# multithreading async-await concurrency queue
1个回答
0
投票

我的建议是使用DataFlow。这允许您设置一个处理管道,您可以在其中配置并行性。

一个非常简单的管道可能看起来像这样:

processBlock = new TransformBlock<FrameData , FrameData >(
    DoProcessing,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxThreads,
        BoundedCapacity = maxThreads*2,
    });
outputBlock = new ActionBlock<FrameData >(
    HandleResult, new ExecutionDataflowBlockOptions()
    {
        // this needs to run on the UI thread to get the correct synchronization context
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
processBlock.LinkTo(outputBlock, linkOptions);

...

public FrameData DoProcessing(FrameData input){
   ...
}
public void HandleResult(FrameData input){
   // On UI thread
   ...
}

这将并行处理frameData,同时按原始顺序序列化结果以在UI线程上更新。只需执行

processBlock.Post(...)
即可添加框架。这是使用“BoundedCapacity”设置的,如果处理无法跟上,则会丢弃帧,这在您的特定场景中可能需要也可能不需要。

© www.soinside.com 2019 - 2024. All rights reserved.