如何立即停止具有 while 循环的线程(python)?

问题描述 投票:0回答:1

我在一个小项目中使用了一个简单的面部识别程序,我试图在紧急情况下制作一个停止按钮。我已经尝试了从 stop_event 到 Queue 的所有方法,但由于某种原因,task_function 无法停止,除非它可以循环进行下一次迭代。 任何帮助都会很棒,并对看起来很糟糕的代码表示歉意

import cv2
from simple_facerec import SimpleFacerec
import time
import threading
import keyboard
import serial.tools.list_ports
import sys
import openpyxl
import queue
from datetime import datetime
from openpyxl.styles import NamedStyle
import os

sfr = SimpleFacerec()
sfr.load_encoding_images("images/")

cap = cv2.VideoCapture(0)

start_time = None
end_time = None
time_passed = None
check = True
on_check = False
turn_off_no = 0
stop_flag = False

def return_time(start_time):
    current_datetime = datetime.fromtimestamp(start_time)
    formatted_time = current_datetime.strftime("%H:%M:%S")
    date = current_datetime.strftime("%A")
    return formatted_time, date

def camera():
    while True:
        ret, frame = cap.read()
        count = 0
        if ret:
            if count % 5 == 0:
                #print(frame)
                face_locations, face_names = sfr.detect_known_faces(frame)
                for face_loc, name in zip(face_locations, face_names):
                   # y1, x2, y2, x1 = face_loc[0],face_loc[1],face_loc[2],face_loc[3]
                    if name == "Calvin":
                        yield name
                            # Start the stopwatch when "Calvin" is enteredưq
                          # Small delay to avoid immediate stop
                        #cv2.putText(frame, name, (x1, y1-10), cv2.FONT_HERSHEY_COMPLEX, 1, (0,0,200), 2)
                        #cv2.rectangle(frame, (x1, y1), (x2, y2), (0,0,200), 4)
                            #if 'Calvin' in name:
                                #start_event.set()
                    else:
                        yield None       
                        #print(face_loc)
                #cv2.imshow("Frame", frame)
        else:
            print("ERRORRRR")
            cap.release()
            cv2.destroyAllWindows()
        count += 1


def arduino_input(input):
    if input == "OFF":
        #time.sleep(0.05)
        serialInst.write(input.encode('utf-8'))
        print("STOP")
        time.sleep(1)
    elif input == "ON":
        serialInst.write(input.encode('utf-8'))
        print("ON")
        time.sleep(1)
    elif input == "FLICKER":
        serialInst.write(input.encode('utf-8'))
        print("FLICKER")
        time.sleep(1)


def stopwatch(start_event, stop_event):
    global check
    global on_check
    global turn_off_no 
    global stop_flag
    while True:
        if stop_flag:
            print("STOPWATCH STOP")
            return
        else:
            start_event.wait()
            #start_event.clear()
            #print("BRUHH")
            time.sleep(0.1)

            if start_time != None:
                time_has_passed_since_Calvin = time.time() - start_time
                if time_has_passed_since_Calvin > 90:
                    time.sleep(0.05)
                    if check:
                        arduino_input("OFF")
                        time_off,donotneed = return_time(time.time())
                        sheet.cell(row = max_row + turn_off_no + 1, column = 3, value = time_off)
                        #total_time = sheet.cell(row = max_row + turn_off_no + 1, column = 2) - time_off
                        #sheet.cell(row = max_row + turn_off_no + 1, column = 4, value = total_time)
                        workbook.save('light data.xlsx')
                        turn_off_no +=1
                        check = False
                        on_check = False
                elif time_has_passed_since_Calvin <= 90 and time_has_passed_since_Calvin > 75:
                    time.sleep(0.05)
                    if check:
                        arduino_input("FLICKER")
                        #time.sleep(0.05)
                    #print("HELLO????")
            
            time.sleep(0.1)
def writing_data(start):
    final_time, weekday = return_time(start)
    sheet.cell(row = max_row + turn_off_no + 1, column = 1, value = max_row + turn_off_no -1)
    sheet.cell(row = max_row + turn_off_no + 1, column = 2, value = final_time)
    sheet.cell(row = max_row + turn_off_no + 1, column = 4, value = weekday)
    workbook.save('light data.xlsx')

def task(start_event, task_queue):
    calvin_count = 1
    global check
    global stop_flag
    global on_check
    global start_time
    global end_time
    global time_passed
    global turn_off_no 
    decision_generator = camera()
    print("Camera has started")
    while True:

        if stop_flag:
            return
        else:

                decision = next(decision_generator)
                #q
                #print(decision)
                if "Calvin" in str(decision):
                    if calvin_count == 1:
                        start_time = time.time()
                        print("Stopwatch has started")
                        if on_check == False:
                            arduino_input("ON")
                            writing_data(start_time)
                            on_check = True
                        print(f"{on_check} calvin1")
                        check =  True
                        start_event.set()
                    else:
                        #print("IT IS STUPID")
                        #reset_event.set()
                        end_time = time.time()
                        time_passed = round(end_time - start_time,1)
                        start_time = end_time
                        print(f"It has been {time_passed}s since the last Calvin")
                        check =  True
                        if on_check == False:
                            arduino_input("ON")
                            writing_data(start_time)
                            on_check = True
                        print(on_check)
                        start_event.set()
                    #print("Yes")
                    calvin_count += 1 
                else:

                    print("No")
                    #arduino_input("Strange Person")  # Adjust sleep time as needed
                time.sleep(0.1)
        
def close_threads():
    global stop_flag
    stop_flag = True

if __name__ == "__main__":
    workbook = openpyxl.load_workbook('light data.xlsx')
    sheet = workbook['Sheet1']  
    max_row = sheet.max_row
    print(max_row)
    #sheet.cell(row = max_row + 1, column = 1, value = max_row + 1)
    
    ports = serial.tools.list_ports.comports()
    serialInst = serial.Serial()

    portsList = []

    for port in ports:
        portsList.append(str(port))
        print(str(port))

    #val = input("Select port: COM")
    for x in range(0,len(portsList)):
        if portsList[x].startswith("COM3"):
            portvar = "COM3"
            print(f"Arduino was detected. Automatically select {portvar}")

    serialInst.baudrate = 9600
    serialInst.port = portvar
    serialInst.open()
      
    print("It is getting ready")
    time.sleep(5)
    arduino_input("OFF")
    start_event = threading.Event()
    stop_event = threading.Event()
    stop_flag = False
    task_queue = queue.Queue()
    task_queue.put("ON from quee")
    stopwatch_thread = threading.Thread(target=stopwatch, args=(start_event, stop_event))
    camera_thread = threading.Thread(target = task, args=(start_event, task_queue))
    stopwatch_thread.start()
    camera_thread.start()

    while True:
        if keyboard.is_pressed('q'):
            arduino_input("OFF")
            close_threads()
            print("Exiting loop...")
            sys.exit(0)
            break
            


python multithreading python-multithreading
1个回答
0
投票

如果此代码运行在传统类型的 python 之外,您可以通过强制出现这样的错误来简单地终止任何程序。

int(input("Press any key to terminate..."))

如果该程序在终端中运行,它应该可以工作。

© www.soinside.com 2019 - 2024. All rights reserved.