我目前正在使用 Python 实现一个玩具店服务器,使用套接字连接和自定义线程池来处理并发客户端请求。但是,我在有效管理并发请求方面面临一些挑战。
服务器组件: 服务器通过套接字侦听传入的客户端请求。 它以玩具的名称作为参数,并返回该商品(如果有库存)的美元价格。如果未找到该商品,则返回 -1,如果找到该商品但没有库存,则返回 0。
实现了自定义线程池来同时处理多个客户端连接。
客户端组件:
客户端使用套接字连接到服务器并发出随机玩具请求。
我已经实现了一个自定义线程池来管理客户端请求,但我不确定它是否能以最佳方式处理并发请求。在第一个客户端请求之后,服务器不再接收任何进一步的请求。
服务器代码:
import socket
import json
from threading import Thread, Lock
from collections import deque
class Server():
def __init__(self, host, port, num_threads):
self.items = {
"tux": {
"qty": 100,
"cost": 25.99
},
"whale": {
"qty": 100,
"cost": 19.99
}
}
self.lock = Lock()
self.request_queue = deque([])
self.thread_pool = [Thread(target=self.serve_request) for _ in range(num_threads)]
self.s = socket.socket()
self.s.bind((host, port))
print("socket binded to port", port)
self.s.listen(5)
print("socket is listening")
def get_item(self, item):
with self.lock:
if item not in self.items:
return -1
if self.items[item]['qty'] == 0:
return 0
self.items[item]['qty'] -= 1
return self.items[item]['cost']
def add_request(self, req):
data = req.recv(4096)
data = json.loads(data.decode('utf-8'))
cost = self.get_item(data['query'])
self.request_queue.append([req, cost])
def serve_request(self):
while True:
if self.request_queue:
req, cost = self.request_queue.popleft()
print("cost: ", cost)
req.send(str(cost).encode('utf-8'))
def run(self):
req, addr = self.s.accept()
for thread in self.thread_pool:
thread.start()
while True:
self.add_request(req)
print("request_queue: ", self.request_queue)
host = "127.0.0.1"
port = 12345
server = Server(host, port, 100)
server.run()
客户端代码:
import socket
import json
import random
def main():
host = "127.0.0.1"
port = 12345
s = socket.socket()
s.connect((host, port))
while True:
toys = ["tux", "whale"]
choice = random.choice(toys)
message = {"query": str(choice)}
serialzed_message = json.dumps(message)
print("requesting: ", choice)
s.send(serialzed_message.encode('utf-8'))
data = s.recv(4096)
print("Server replied: {}".format(str(data.decode('utf-8'))))
if __name__ == "__main__":
main()
让我们看看客户端。您需要为每个请求创建一个新的套接字,因为一旦服务器处理请求并返回响应,它就会“忘记”客户端的套接字。在这种情况下,应首先关闭客户端套接字。
客户端代码
import socket
import json
import random
def main():
host = "127.0.0.1"
port = 12345
# Just run 5 requests and then terminate:
for _ in range(5):
s = socket.socket()
s.connect((host, port))
with s:
toys = ["tux", "whale"]
choice = random.choice(toys)
message = {"query": choice}
serialzed_message = json.dumps(message)
print("requesting: ", choice)
s.send(serialzed_message.encode('utf-8'))
data = s.recv(4096)
print("Server replied: {}".format(str(data.decode('utf-8'))))
if __name__ == "__main__":
main()
就服务器而言,它无法正确处理请求,因为您只发出
req, addr = self.s.accept()
一次。功能 run
应该是:
def run(self):
for thread in self.thread_pool:
thread.start()
while True:
req, addr = self.s.accept()
with req: # So that the client socket is closed automatically
self.add_request(req)
print("request_queue: ", self.request_queue)
您还可以将所有请求处理服务器线程置于 CPU 限制的循环中,检查新请求。您可以/应该使用
threading.Condition
实例来等待非空队列。但更简单的是使用多线程池。请注意,我还允许通过按 Enter 键来终止服务器。
服务器代码
import socket
import json
from threading import Lock
from multiprocessing.pool import ThreadPool
class Server():
def __init__(self, host, port, num_threads):
self.items = {
"tux": {
"qty": 100,
"cost": 25.99
},
"whale": {
"qty": 100,
"cost": 19.99
}
}
self.lock = Lock()
self.thread_pool = ThreadPool(num_threads)
def get_item(self, item):
with self.lock:
if item not in self.items:
return -1
if self.items[item]['qty'] == 0:
return 0
self.items[item]['qty'] -= 1
return self.items[item]['cost']
def serve_request(self, client_socket):
data = client_socket.recv(4096)
data = json.loads(data.decode('utf-8'))
cost = self.get_item(data['query'])
print("cost: ", cost)
with client_socket:
client_socket.send(str(cost).encode('utf-8'))
def server(self):
self.s = socket.socket()
self.s.bind((host, port))
print("socket binded to port", port)
self.s.listen(5)
print("socket is listening")
with self.s as server_socket:
while True:
client_socket, _ = server_socket.accept()
self.thread_pool.apply_async(self.serve_request, args=(client_socket,))
print('request queued')
def run(self):
# Run actual server in the pool so that we can wait for Enter key:
self.thread_pool.apply_async(self.server)
input('Hit Enter to terminate the server...\n\n')
# Kill all outstanding requests
print('Terminating...')
self.thread_pool.terminate()
host = "127.0.0.1"
port = 12345
server = Server(host, port, 20)
server.run()