想不出怎么用python写两个老师的线程队列

问题描述 投票:0回答:1

我被分配了一个任务:创建一个项目“Laboratory work”,模拟学生进行实验室作业。 在课程开始时,假设学生收到实验室作业。每个 学生做一会儿作业,然后把作业交给老师。工作的交付需要 一些时间(老师不能同时检查 2 篇论文),之后学生要么 在交货时收到标记,或完成实验室。为 20 名学生测试程序 和 2 名教师。 事件:学生要求提交作业、接受作业审核、提交作业、退回 进行修订。 我能够做到,但该计划只适用于一位老师

import threading
import queue
import time
import random
from queue import Queue


class Student:
    def __init__(self, id) -> None:
        self.MIN_TEST_EXECUTION_TIME = 1
        self.MAX_TEST_EXECUTION_TIME = 5
        self._student_id = id
        self.test_execution_time = random.randint(
            self.MIN_TEST_EXECUTION_TIME, self.MAX_TEST_EXECUTION_TIME)
        self._test_result = False

    def get_id(self):
        return self._student_id

    def set_result(self, result: bool):
        if result == False:
            print(f"student {self.get_id()} went to retake")
            self.test_execution_time = random.randint(
                self.MIN_TEST_EXECUTION_TIME, self.MAX_TEST_EXECUTION_TIME)
            return
        self._test_result = result

    def take_test(self):
        print(f"student {self.get_id()} started doing the test")
        time.sleep(self.test_execution_time)

    def get_result(self):
        return self._test_result


class Students:
    def __init__(self):
        self._students = []

    def _append_student(self, student_id):
        self._students.append(Student(student_id))

    def create_group(self, students_amount):
        for _ in range(students_amount):
            self._append_student(f'#{_}')

    def get_group(self):
        return self._students


class Teacher:
    def __init__(self, name) -> None:
        self.name = name
        self.MIN_TEST_CHECKING_TIME = 1
        self.MAX_TEST_CHECKING_TIME = 5
        self.test_check_time = random.randint(
            self.MIN_TEST_CHECKING_TIME, self.MAX_TEST_CHECKING_TIME)

    def check_test(self, student: Student):
        print(
            f"teacher {self.name} started checking the student's work {student.get_id()}")
        time.sleep(self.test_check_time)
        print(f"teacher {self.name} checked the work {student.get_id()}")
        result = random.choice([True, False])
        student.set_result(result)


class Exam:
    def __init__(self, first_teacher: Teacher, second_teacher: Teacher, tested: Students) -> None:
        self.first_teacher = first_teacher
        self.second_teacher = second_teacher
        self.tested = tested.get_group()

    def start_test_process(self, student, teacher, lock):
        student.take_test()
        with lock:
            teacher.check_test(student)
        if student.get_result():
            print(f"student {student.get_id()} passed the test")
            self.tested.remove(student)
            return
        threading.Thread(target=self.start_test_process, args=(
            student, self.first_teacher, lock)).start()

    def run_test(self):
        lock = threading.Lock()
        threads = []
        for student in self.tested:
            t = threading.Thread(target=self.start_test_process, args=(
                student, self.first_teacher, lock))
            threads.append(t)
            t.start()
        for thread in threads:
            thread.join()


studs = Students()
studs.create_group(20)

tich1 = Teacher("ONE")
tich2 = Teacher("TWO")

exam = Exam(tich1, tich2, studs)
exam.run_test()

学生应同时开始考试,教师应并行检查考试

据此,我不太明白如何为两个老师排队......逻辑应该是这样的:学生向第一个免费的老师考试,如果两个都有空,那么给第一个,如果两个很忙,然后他等待其中一个被释放。我尝试使用 queue.Queue() 实现类似的东西,但什么也没有出来。

python queue locking mutex python-multithreading
1个回答
0
投票

原来解决方案很简单,我想了一晚上,但是当我开始冷静地解决问题时,我能够!

def queue(self, student, first_teacher, second_teacher, lock1, lock2):
    student.take_test()
    while True:
        if not lock1.locked():
            with lock1:
                first_teacher.check_test(student)
            if student.get_result():
                print(f"student {student.get_id()} passed the test - work checked by {first_teacher.name}")
                # self.tested.remove(student)
                return
            threading.Thread(target=self.queue, args=(
                student, self.first_teacher, self.second_teacher, lock1, lock2))
        if not lock2.locked():
            with lock2:
                second_teacher.check_test(student)
            if student.get_result():
                print(f"student {student.get_id()} passed the test - work checked by {second_teacher.name}")
                # self.tested.remove(student)
                return
            threading.Thread(target=self.queue, args=(
                student, self.first_teacher, self.second_teacher, lock1, lock2))

def run_test(self):
    lock1 = threading.Lock()
    lock2 = threading.Lock()
    threads = []
    for student in self.tested:
        t = threading.Thread(target=self.queue, args=(
            student, self.first_teacher, self.second_teacher, lock1, lock2))
        threads.append(t)
        t.start()
    for thread in threads:
        thread.join()
© www.soinside.com 2019 - 2024. All rights reserved.