我最近创建了一个程序,使用 opencv 显示来自 2 个网络摄像机的多视频源。 但我决定为我的应用程序创建 UI,现在,我不太清楚如何使用多线程方法来实现它。

这是我用来在 TKinter GUI 中仅显示一台摄像机的代码:

import tkinter
import cv2
import PIL.Image, PIL.ImageTk
import time

class App:
    def __init__(self, window, window_title, video_source=0):
        self.window = window
        self.video_source = video_source
        # open video source (by default this will try to open the computer webcam)
        self.vid = MyVideoCapture(self.video_source)
        # Create a canvas that can fit the above video source size
        self.canvas = tkinter.Canvas(window, width = self.vid.width, height = self.vid.height)
        # Button that lets the user take a snapshot
        self.btn_snapshot=tkinter.Button(window, text="Snapshot", width=50, command=self.snapshot)
        self.btn_snapshot.pack(anchor=tkinter.CENTER, expand=True)
        # After it is called once, the update method will be automatically called every delay milliseconds
        self.delay = 15
    def snapshot(self):
        # Get a frame from the video source
        ret, frame = self.vid.get_frame()
        if ret:
            cv2.imwrite("frame-" + time.strftime("%d-%m-%Y-%H-%M-%S") + ".jpg", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    def update(self):
        # Get a frame from the video source
        ret, frame = self.vid.get_frame()
        if ret:
            self.photo = PIL.ImageTk.PhotoImage(image = PIL.Image.fromarray(frame))
            self.canvas.create_image(0, 0, image = self.photo, anchor = tkinter.NW)
        self.window.after(self.delay, self.update)
class MyVideoCapture:
    def __init__(self, video_source=0):
        # Open the video source
        self.vid = cv2.VideoCapture(video_source)
        if not self.vid.isOpened():
            raise ValueError("Unable to open video source", video_source)
        # Get video source width and height
        self.width = self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)
        self.height = self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)
    def get_frame(self):
        if self.vid.isOpened():
            ret, frame = self.vid.read()
            if ret:
                # Return a boolean success flag and the current frame converted to BGR
                return (ret, cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                return (ret, None)
            return (ret, None)
    # Release the video source when the object is destroyed
    def __del__(self):
        if self.vid.isOpened():
 # Create a window and pass it to the Application object
App(tkinter.Tk(), "Tkinter and OpenCV")


from threading import Thread
import cv2
import time

class VideoWriterWidget(object):
    def __init__(self, video_file_name, src=0):
        # Create a VideoCapture object
        self.frame_name = str(src)
        self.video_file = video_file_name
        self.video_file_name = video_file_name + '.avi'
        self.capture = cv2.VideoCapture(src)

        # Default resolutions of the frame are obtained (system dependent)
        self.frame_width = int(self.capture.get(3))
        self.frame_height = int(self.capture.get(4))

        # Set up codec and output video settings
        self.codec = cv2.VideoWriter_fourcc('M','J','P','G')
        self.output_video = cv2.VideoWriter(self.video_file_name, self.codec, 30, (self.frame_width, self.frame_height))

        # Start the thread to read frames from the video stream
        self.thread = Thread(target=self.update, args=())
        self.thread.daemon = True

        # Start another thread to show/save frames
        print('initialized {}'.format(self.video_file))

    def update(self):
        # Read the next frame from the stream in a different thread
        while True:
            if self.capture.isOpened():
                (self.status, self.frame) = self.capture.read()

    def show_frame(self):
        # Display frames in main program
        if self.status:
            cv2.imshow(self.frame_name, self.frame)

        # Press Q on keyboard to stop recording
        key = cv2.waitKey(1)
        if key == ord('q'):

    def save_frame(self):
        # Save obtained frame into video output file

    def start_recording(self):
        # Create another thread to show/save frames
        def start_recording_thread():
            while True:
                except AttributeError:
        self.recording_thread = Thread(target=start_recording_thread, args=())
        self.recording_thread.daemon = True

if __name__ == '__main__':
    src1 = 'Your link1'
    video_writer_widget1 = VideoWriterWidget('Camera 1', src1)
    src2 = 'Your link2'
    video_writer_widget2 = VideoWriterWidget('Camera 2', src2)
    src3 = 'Your link3'
    video_writer_widget3 = VideoWriterWidget('Camera 3', src3)

    # Since each video player is in its own thread, we need to keep the main thread alive.
    # Keep spinning using time.sleep() so the background threads keep running
    # Threads are set to daemon=True so they will automatically die 
    # when the main thread dies
    while True:

有人可以帮助我如何使用带有线程的 tkinter 在我的新应用程序中使用我以前的代码(显示多摄像头)吗?

(像许多其他 GUI 一样)不喜欢在线程中使用小部件,所以首先我会尝试在没有线程的情况下在主进程中运行所有组件。


的类来创建小部件 我可以在不同的流中多次使用它。因为我只有一台相机(并且系统不能多次使用同一台相机),所以我找到了一些外部流/文件来测试它。因为流发送非常大的图像,所以我将大小更改为
400, 300


import tkinter
import cv2
import PIL.Image, PIL.ImageTk
import time

# widgets with canvas and camera

class tkCamera(tkinter.Frame):

    def __init__(self, window, video_source=0):
        self.window = window
        self.video_source = video_source
        self.vid = MyVideoCapture(self.video_source)

        self.canvas = tkinter.Canvas(window, width=self.vid.width, height=self.vid.height)
        # Button that lets the user take a snapshot
        self.btn_snapshot = tkinter.Button(window, text="Snapshot", width=50, command=self.snapshot)
        self.btn_snapshot.pack(anchor=tkinter.CENTER, expand=True)
        # After it is called once, the update method will be automatically called every delay milliseconds
        self.delay = 15

    def snapshot(self):
        # Get a frame from the video source
        ret, frame = self.vid.get_frame()
        if ret:
            cv2.imwrite("frame-" + time.strftime("%d-%m-%Y-%H-%M-%S") + ".jpg", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    def update_widget(self):
        # Get a frame from the video source
        ret, frame = self.vid.get_frame()
        if ret:
            self.image = PIL.Image.fromarray(frame)
            self.photo = PIL.ImageTk.PhotoImage(image=self.image)
            self.canvas.create_image(0, 0, image = self.photo, anchor = tkinter.NW)
        self.window.after(self.delay, self.update_widget)

class App:

    def __init__(self, window, window_title, video_source1=0, video_source2=0):
        self.window = window

        # open video source (by default this will try to open the computer webcam)
        self.vid1 = tkCamera(window, video_source1)
        self.vid2 = tkCamera(window, video_source2)
        # Create a canvas that can fit the above video source size
class MyVideoCapture:
    def __init__(self, video_source=0):
        # Open the video source
        self.vid = cv2.VideoCapture(video_source)
        if not self.vid.isOpened():
            raise ValueError("Unable to open video source", video_source)
        # Get video source width and height
        self.width = self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)
        self.height = self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)
        self.width = 400
        self.height = 300
    def get_frame(self):
        if self.vid.isOpened():
            ret, frame = self.vid.read()
            if ret:
                frame = cv2.resize(frame, (400, 300))
                # Return a boolean success flag and the current frame converted to BGR
                return (ret, cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                return (ret, None)
            return (ret, None)
    # Release the video source when the object is destroyed
    def __del__(self):
        if self.vid.isOpened():
# Create a window and pass it to the Application object
App(tkinter.Tk(), "Tkinter and OpenCV", 0, 'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4')

如果您打算处理框架 - 即。检测运动或面部 - 然后来自



中查看类似的想法 使用 Python 和 OpenCV 提高网络摄像头 FPS


 from imutils.video import WebcamVideoStream


版本仍然没有线程,但带有源列表,因此它可以显示许多摄像机。但对于超过 2 个源,显示时会出现问题 - 所以这需要使用





 - 而且我可以在流停止时拍摄快照。 
 仅停止替换画布上的图像,但不停止从线程中的流中读取帧 - 因此其他函数仍然可以处理或记录流。

import tkinter import cv2 import PIL.Image, PIL.ImageTk import time class MyVideoCapture: def __init__(self, video_source=0, width=None, height=None): # Open the video source self.vid = cv2.VideoCapture(video_source) if not self.vid.isOpened(): raise ValueError("Unable to open video source", video_source) self.width = width self.height = height # Get video source width and height if not self.width: self.width = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)) # convert float to int if not self.height: self.height = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert float to int self.ret = False self.frame = None def process(self): ret = False frame = None if self.vid.isOpened(): ret, frame = self.vid.read() if ret: frame = cv2.resize(frame, (self.width, self.height)) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) self.ret = ret self.frame = frame def get_frame(self): self.process() # later run in thread return self.ret, self.frame # Release the video source when the object is destroyed def __del__(self): if self.vid.isOpened(): self.vid.release() class tkCamera(tkinter.Frame): def __init__(self, window, video_source=0, width=None, height=None): super().__init__(window) self.window = window #self.window.title(window_title) self.video_source = video_source self.vid = MyVideoCapture(self.video_source, width, height) self.canvas = tkinter.Canvas(window, width=self.vid.width, height=self.vid.height) self.canvas.pack() # Button that lets the user take a snapshot self.btn_snapshot = tkinter.Button(window, text="Snapshot", width=50, command=self.snapshot) self.btn_snapshot.pack(anchor='center', expand=True) # After it is called once, the update method will be automatically called every delay milliseconds self.delay = 15 self.update_widget() def snapshot(self): # Get a frame from the video source ret, frame = self.vid.get_frame() if ret: cv2.imwrite("frame-" + time.strftime("%d-%m-%Y-%H-%M-%S") + ".jpg", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) def update_widget(self): # Get a frame from the video source ret, frame = self.vid.get_frame() if ret: self.image = PIL.Image.fromarray(frame) self.photo = PIL.ImageTk.PhotoImage(image=self.image) self.canvas.create_image(0, 0, image = self.photo, anchor = tkinter.NW) self.window.after(self.delay, self.update_widget) class App: def __init__(self, window, window_title, video_sources): self.window = window self.window.title(window_title) self.vids = [] for source in video_sources: vid = tkCamera(window, source, 400, 300) vid.pack() self.vids.append(vid) # Create a canvas that can fit the above video source size self.window.mainloop() if __name__ == '__main__': sources = [ 0, #'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4', #'https://imageserver.webcamera.pl/rec/skolnity/latest.mp4', 'https://imageserver.webcamera.pl/rec/krakow4/latest.mp4', ] # Create a window and pass it to the Application object App(tkinter.Tk(), "Tkinter and OpenCV", sources)





我使用的源只有 24 秒,所以几秒钟后它们就会停止。

import tkinter import cv2 import PIL.Image, PIL.ImageTk import time import threading class MyVideoCapture: def __init__(self, video_source=0, width=None, height=None, fps=None): self.video_source = video_source self.width = width self.height = height self.fps = fps # Open the video source self.vid = cv2.VideoCapture(video_source) if not self.vid.isOpened(): raise ValueError("[MyVideoCapture] Unable to open video source", video_source) # Get video source width and height if not self.width: self.width = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)) # convert float to int if not self.height: self.height = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert float to int if not self.fps: self.fps = int(self.vid.get(cv2.CAP_PROP_FPS)) # convert float to int # default value at start self.ret = False self.frame = None # start thread self.running = True self.thread = threading.Thread(target=self.process) self.thread.start() def process(self): while self.running: ret, frame = self.vid.read() if ret: # process image frame = cv2.resize(frame, (self.width, self.height)) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) else: print('[MyVideoCapture] stream end:', self.video_source) # TODO: reopen stream self.running = False break # assign new frame self.ret = ret self.frame = frame # sleep for next frame time.sleep(1/self.fps) def get_frame(self): return self.ret, self.frame # Release the video source when the object is destroyed def __del__(self): # stop thread if self.running: self.running = False self.thread.join() # relase stream if self.vid.isOpened(): self.vid.release() class tkCamera(tkinter.Frame): def __init__(self, window, text="", video_source=0, width=None, height=None): super().__init__(window) self.window = window #self.window.title(window_title) self.video_source = video_source self.vid = MyVideoCapture(self.video_source, width, height) self.label = tkinter.Label(self, text=text) self.label.pack() self.canvas = tkinter.Canvas(self, width=self.vid.width, height=self.vid.height) self.canvas.pack() # Button that lets the user take a snapshot self.btn_snapshot = tkinter.Button(self, text="Start", command=self.start) self.btn_snapshot.pack(anchor='center', side='left') self.btn_snapshot = tkinter.Button(self, text="Stop", command=self.stop) self.btn_snapshot.pack(anchor='center', side='left') # Button that lets the user take a snapshot self.btn_snapshot = tkinter.Button(self, text="Snapshot", command=self.snapshot) self.btn_snapshot.pack(anchor='center', side='left') # After it is called once, the update method will be automatically called every delay milliseconds # calculate delay using `FPS` self.delay = int(1000/self.vid.fps) print('[tkCamera] source:', self.video_source) print('[tkCamera] fps:', self.vid.fps, 'delay:', self.delay) self.image = None self.running = True self.update_frame() def start(self): if not self.running: self.running = True self.update_frame() def stop(self): if self.running: self.running = False def snapshot(self): # Get a frame from the video source #ret, frame = self.vid.get_frame() #if ret: # cv2.imwrite(time.strftime("frame-%d-%m-%Y-%H-%M-%S.jpg"), cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR)) # Save current frame in widget - not get new one from camera - so it can save correct image when it stoped if self.image: self.image.save(time.strftime("frame-%d-%m-%Y-%H-%M-%S.jpg")) def update_frame(self): # widgets in tkinter already have method `update()` so I have to use different name - # Get a frame from the video source ret, frame = self.vid.get_frame() if ret: self.image = PIL.Image.fromarray(frame) self.photo = PIL.ImageTk.PhotoImage(image=self.image) self.canvas.create_image(0, 0, image=self.photo, anchor='nw') if self.running: self.window.after(self.delay, self.update_frame) class App: def __init__(self, window, window_title, video_sources): self.window = window self.window.title(window_title) self.vids = [] columns = 2 for number, source in enumerate(video_sources): text, stream = source vid = tkCamera(self.window, text, stream, 400, 300) x = number % columns y = number // columns vid.grid(row=y, column=x) self.vids.append(vid) self.window.protocol("WM_DELETE_WINDOW", self.on_closing) self.window.mainloop() def on_closing(self, event=None): print('[App] stoping threads') for source in self.vids: source.vid.running = False print('[App] exit') self.window.destroy() if __name__ == '__main__': sources = [ ('me', 0), ('Zakopane, Poland', 'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4'), ('Kraków, Poland', 'https://imageserver.webcamera.pl/rec/krakow4/latest.mp4'), ('Warszawa, Poland', 'https://imageserver.webcamera.pl/rec/warszawa/latest.mp4'), #('Baltic See, Poland', 'https://imageserver.webcamera.pl/rec/chlopy/latest.mp4'), #('Mountains, Poland', 'https://imageserver.webcamera.pl/rec/skolnity/latest.mp4'), ] # Create a window and pass it to the Application object App(tkinter.Tk(), "Tkinter and OpenCV", sources)




 需要具有 BGR 颜色的帧才能正确保存它,因此我必须在帧转换为 RGB 之前保存它。



cv2 array
 - 所以现在它会转换为 

import tkinter import cv2 import PIL.Image, PIL.ImageTk import time import threading class MyVideoCapture: def __init__(self, video_source=0, width=None, height=None, fps=None): self.video_source = video_source self.width = width self.height = height self.fps = fps # Open the video source self.vid = cv2.VideoCapture(video_source) if not self.vid.isOpened(): raise ValueError("[MyVideoCapture] Unable to open video source", video_source) # Get video source width and height if not self.width: self.width = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)) # convert float to int if not self.height: self.height = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert float to int if not self.fps: self.fps = int(self.vid.get(cv2.CAP_PROP_FPS)) # convert float to int # default value at start self.ret = False self.frame = None self.convert_color = cv2.COLOR_BGR2RGB #self.convert_color = cv2.COLOR_BGR2GRAY self.convert_pillow = True # default values for recording self.recording = False self.recording_filename = 'output.mp4' self.recording_writer = None # start thread self.running = True self.thread = threading.Thread(target=self.process) self.thread.start() def start_recording(self, filename=None): if self.recording: print('[MyVideoCapture] already recording:', self.recording_filename) else: # VideoWriter constructors #.mp4 = codec id 2 if filename: self.recording_filename = filename else: self.recording_filename = time.strftime("%Y.%m.%d %H.%M.%S", time.localtime()) + ".avi" #fourcc = cv2.VideoWriter_fourcc(*'I420') # .avi #fourcc = cv2.VideoWriter_fourcc(*'MP4V') # .avi fourcc = cv2.VideoWriter_fourcc(*'MP42') # .avi #fourcc = cv2.VideoWriter_fourcc(*'AVC1') # error libx264 #fourcc = cv2.VideoWriter_fourcc(*'H264') # error libx264 #fourcc = cv2.VideoWriter_fourcc(*'WRAW') # error --- no information --- #fourcc = cv2.VideoWriter_fourcc(*'MPEG') # .avi 30fps #fourcc = cv2.VideoWriter_fourcc(*'MJPG') # .avi #fourcc = cv2.VideoWriter_fourcc(*'XVID') # .avi #fourcc = cv2.VideoWriter_fourcc(*'H265') # error self.recording_writer = cv2.VideoWriter(self.recording_filename, fourcc, self.fps, (self.width, self.height)) self.recording = True print('[MyVideoCapture] started recording:', self.recording_filename) def stop_recording(self): if not self.recording: print('[MyVideoCapture] not recording') else: self.recording = False self.recording_writer.release() print('[MyVideoCapture] stop recording:', self.recording_filename) def record(self, frame): # write frame to file if self.recording_writer and self.recording_writer.isOpened(): self.recording_writer.write(frame) def process(self): while self.running: ret, frame = self.vid.read() if ret: # process image frame = cv2.resize(frame, (self.width, self.height)) # it has to record before converting colors if self.recording: self.record(frame) if self.convert_pillow: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = PIL.Image.fromarray(frame) else: print('[MyVideoCapture] stream end:', self.video_source) # TODO: reopen stream self.running = False if self.recording: self.stop_recording() break # assign new frame self.ret = ret self.frame = frame # sleep for next frame time.sleep(1/self.fps) def get_frame(self): return self.ret, self.frame # Release the video source when the object is destroyed def __del__(self): # stop thread if self.running: self.running = False self.thread.join() # relase stream if self.vid.isOpened(): self.vid.release() class tkCamera(tkinter.Frame): def __init__(self, window, text="", video_source=0, width=None, height=None): super().__init__(window) self.window = window #self.window.title(window_title) self.video_source = video_source self.vid = MyVideoCapture(self.video_source, width, height) self.label = tkinter.Label(self, text=text) self.label.pack() self.canvas = tkinter.Canvas(self, width=self.vid.width, height=self.vid.height) self.canvas.pack() # Button that lets the user take a snapshot self.btn_snapshot = tkinter.Button(self, text="Start", command=self.start) self.btn_snapshot.pack(anchor='center', side='left') self.btn_snapshot = tkinter.Button(self, text="Stop", command=self.stop) self.btn_snapshot.pack(anchor='center', side='left') # Button that lets the user take a snapshot self.btn_snapshot = tkinter.Button(self, text="Snapshot", command=self.snapshot) self.btn_snapshot.pack(anchor='center', side='left') # After it is called once, the update method will be automatically called every delay milliseconds # calculate delay using `FPS` self.delay = int(1000/self.vid.fps) print('[tkCamera] source:', self.video_source) print('[tkCamera] fps:', self.vid.fps, 'delay:', self.delay) self.image = None self.running = True self.update_frame() def start(self): #if not self.running: # self.running = True # self.update_frame() self.vid.start_recording() def stop(self): #if self.running: # self.running = False self.vid.stop_recording() def snapshot(self): # Get a frame from the video source #ret, frame = self.vid.get_frame() #if ret: # cv2.imwrite(time.strftime("frame-%d-%m-%Y-%H-%M-%S.jpg"), cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR)) # Save current frame in widget - not get new one from camera - so it can save correct image when it stoped if self.image: self.image.save(time.strftime("frame-%d-%m-%Y-%H-%M-%S.jpg")) def update_frame(self): # widgets in tkinter already have method `update()` so I have to use different name - # Get a frame from the video source ret, frame = self.vid.get_frame() if ret: #self.image = PIL.Image.fromarray(frame) self.image = frame self.photo = PIL.ImageTk.PhotoImage(image=self.image) self.canvas.create_image(0, 0, image=self.photo, anchor='nw') if self.running: self.window.after(self.delay, self.update_frame) class App: def __init__(self, window, window_title, video_sources): self.window = window self.window.title(window_title) self.vids = [] columns = 2 for number, source in enumerate(video_sources): text, stream = source vid = tkCamera(self.window, text, stream, 400, 300) x = number % columns y = number // columns vid.grid(row=y, column=x) self.vids.append(vid) self.window.protocol("WM_DELETE_WINDOW", self.on_closing) self.window.mainloop() def on_closing(self, event=None): print('[App] stoping threads') for source in self.vids: source.vid.running = False print('[App] exit') self.window.destroy() if __name__ == '__main__': sources = [ ('me', 0), ('Zakopane, Poland', 'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4'), ('Kraków, Poland', 'https://imageserver.webcamera.pl/rec/krakow4/latest.mp4'), ('Warszawa, Poland', 'https://imageserver.webcamera.pl/rec/warszawa/latest.mp4'), #('Baltic See, Poland', 'https://imageserver.webcamera.pl/rec/chlopy/latest.mp4'), #('Mountains, Poland', 'https://imageserver.webcamera.pl/rec/skolnity/latest.mp4'), ] # Create a window and pass it to the Application object App(tkinter.Tk(), "Tkinter and OpenCV", sources)


我创建了可以选择源的版本 - 因此它可以显示录制的视频。


我无法在此处放置代码,因为答案的字符数限制为 30000 个字符。

我把它放在 GitHub 上:


