自定义线程池和请求队列

问题描述 投票:0回答:1

我目前正在使用 Python 实现一个玩具店服务器,使用套接字连接和自定义线程池来处理并发客户端请求。但是,我在有效管理并发请求方面面临一些挑战。

服务器组件: 服务器通过套接字侦听传入的客户端请求。 它以玩具的名称作为参数,并返回该商品(如果有库存)的美元价格。如果未找到该商品,则返回 -1,如果找到该商品但没有库存,则返回 0。

实现了自定义线程池来同时处理多个客户端连接。

客户端组件:

客户端使用套接字连接到服务器并发出随机玩具请求。

我已经实现了一个自定义线程池来管理客户端请求,但我不确定它是否能以最佳方式处理并发请求。在第一个客户端请求之后,服务器不再接收任何进一步的请求。

服务器代码:

import socket
import json
from threading import Thread, Lock
from collections import deque

class Server():
    def __init__(self, host, port, num_threads):
        self.items = {
        "tux": {
            "qty": 100,
            "cost": 25.99
        },
        "whale": {
            "qty": 100,
            "cost": 19.99
        }
    }
        self.lock = Lock()
        self.request_queue = deque([])
        self.thread_pool = [Thread(target=self.serve_request) for _ in range(num_threads)]
        self.s = socket.socket()
        self.s.bind((host, port))
        print("socket binded to port", port)
        self.s.listen(5)
        print("socket is listening")

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def add_request(self, req):
        data = req.recv(4096)
        data = json.loads(data.decode('utf-8'))
        cost = self.get_item(data['query'])
        self.request_queue.append([req, cost])

    def serve_request(self):
        while True:
            if self.request_queue:
                req, cost = self.request_queue.popleft()
                print("cost: ", cost)
                req.send(str(cost).encode('utf-8'))

    def run(self):
        req, addr = self.s.accept()
        for thread in self.thread_pool:
            thread.start()
        while True:
            self.add_request(req)
            print("request_queue: ", self.request_queue)

host = "127.0.0.1"
port = 12345
server = Server(host, port, 100)
server.run()

客户端代码:

import socket
import json
import random
def main():
    host = "127.0.0.1"
    port = 12345
    s = socket.socket()
    s.connect((host, port))
    while True:
        toys = ["tux", "whale"]
        choice = random.choice(toys)
        message = {"query": str(choice)}
        serialzed_message = json.dumps(message)
        print("requesting: ", choice)
        s.send(serialzed_message.encode('utf-8'))
        data = s.recv(4096)
        print("Server replied: {}".format(str(data.decode('utf-8'))))

if __name__ == "__main__":
    main()
python multithreading sockets client-server python-multithreading
1个回答
0
投票

让我们看看客户端。您需要为每个请求创建一个新的套接字,因为一旦服务器处理请求并返回响应,它就会“忘记”客户端的套接字。在这种情况下,应首先关闭客户端套接字。

客户端代码

import socket
import json
import random

def main():
    host = "127.0.0.1"
    port = 12345
    # Just run 5 requests and then terminate:
    for _ in range(5):
        s = socket.socket()
        s.connect((host, port))
        with s:
            toys = ["tux", "whale"]
            choice = random.choice(toys)
            message = {"query": choice}
            serialzed_message = json.dumps(message)
            print("requesting: ", choice)
            s.send(serialzed_message.encode('utf-8'))
            data = s.recv(4096)
            print("Server replied: {}".format(str(data.decode('utf-8'))))

if __name__ == "__main__":
    main()

就服务器而言,它无法正确处理请求,因为您只发出

req, addr = self.s.accept()
一次。功能
run
应该是:

    def run(self):
        for thread in self.thread_pool:
            thread.start()
        while True:
            req, addr = self.s.accept()
            with req:  # So that the client socket is closed automatically
                self.add_request(req)
                print("request_queue: ", self.request_queue)

您还可以将所有请求处理服务器线程置于 CPU 限制的循环中,检查新请求。您可以/应该使用

threading.Condition
实例来等待非空队列。但更简单的是使用多线程池。请注意,我还允许通过按 Enter 键来终止服务器。

服务器代码

import socket
import json
from threading import Lock
from multiprocessing.pool import ThreadPool

class Server():
    def __init__(self, host, port, num_threads):
        self.items = {
        "tux": {
            "qty": 100,
            "cost": 25.99
        },
        "whale": {
            "qty": 100,
            "cost": 19.99
        }
    }
        self.lock = Lock()
        self.thread_pool = ThreadPool(num_threads)

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def serve_request(self, client_socket):
        data = client_socket.recv(4096)
        data = json.loads(data.decode('utf-8'))
        cost = self.get_item(data['query'])
        print("cost: ", cost)
        with client_socket:
            client_socket.send(str(cost).encode('utf-8'))

    def server(self):
        self.s = socket.socket()
        self.s.bind((host, port))
        print("socket binded to port", port)
        self.s.listen(5)
        print("socket is listening")

        with self.s as server_socket:
            while True:
                client_socket, _ = server_socket.accept()
                self.thread_pool.apply_async(self.serve_request, args=(client_socket,))
                print('request queued')

    def run(self):
        # Run actual server in the pool so that we can wait for Enter key:
        self.thread_pool.apply_async(self.server)
        input('Hit Enter to terminate the server...\n\n')
        # Kill all outstanding requests
        print('Terminating...')
        self.thread_pool.terminate()


host = "127.0.0.1"
port = 12345
server = Server(host, port, 20)
server.run()
© www.soinside.com 2019 - 2024. All rights reserved.