我目前正在使用 Python 开发 Socket.IO 实现,其中为我的服务器使用了 socketio 和 eventlet,为我的客户端使用了 socketio。尽管客户端和服务器之间的连接成功,但我面临着一个问题,即客户端控制台中未接收/显示从服务器发送的数据。
这是我的服务器(server.py)的简化版本:
import socketio
import eventlet
from threading import Thread
import redis
import time
sio = socketio.Server()
app = socketio.WSGIApp(sio)
redis_conn = None
def initialize_redis_conn():
global redis_conn
if redis_conn is None:
redis_conn = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
print("redis connection init")
def insert_update_sid(sid, unique_id):
"""
Inserts or updates the sid and uniqueId of the client into the Redis cache.
"""
initialize_redis_conn()
key = f"DATA_KEY_{unique_id}"
expiry_in_seconds = 1800
redis_conn.setex(key, expiry_in_seconds, sid)
print("Data inserted into Redis successfully")
def get_sid_by_unique_id(unique_id):
"""
Returns the sessionId (sid) of a client using the uniqueId.
"""
initialize_redis_conn()
print("fetching sid")
key = f"DATA_KEY_{unique_id}"
sid = redis_conn.get(key)
if sid:
print(f"Got sid {sid} using uid {unique_id}")
return sid
else:
return None
def listen_sqs_queue():
"""
this func is just to mimic the sqs, when sqs receives the message it should send that to client
"""
data = {
"communicationStatus": "OKAY",
"status": "Complete"
}
unique_id = "123789"
if unique_id:
print("inside sqs impl is : ", unique_id)
sid = get_sid_by_unique_id(unique_id=unique_id)
if sid:
print("calling send response to client")
send_response_to_client(sid, data)
print(
f"Response has been sent to the client with sessionId: {sid} and uniqueId : {unique_id}")
else:
print(f"Client not found for the given uniqueId: {unique_id}")
def send_response_to_client(sid, resp):
"""
this func sends the response to the client
"""
sio.emit('response_from_server', f'response : {resp}')
print("data sent to client ")
@sio.event
def connect(sid, environ):
print(f"client {sid} connected")
print("sid type is: ", type(sid))
token = environ.get('HTTP_TOKEN')
sio.emit("acknowledge", "ack recevied from server", room=sid)
if token:
auth_status_code = 200
if auth_status_code == 200:
unique_id = "123789"
# insert the sid and uniqueId into redis cache
insert_update_sid(sid, unique_id)
@sio.event
def disconnect(sid):
"""
this is a disconnect event which disconnects the client from server
"""
print(f"client {sid} disconnected")
def start_server():
initialize_redis_conn()
# Start the server in a separate thread
# eventlet.wsgi.server(eventlet.listen(('0.0.0.0', 5000)), app)
sqs_listen_thread = Thread(target=listen_sqs_queue)
sqs_listen_thread.start()
server_thread = Thread(target=eventlet.wsgi.server(eventlet.listen(('localhost', 5000)), app))
server_thread.start()
sqs_listen_thread.join()
server_thread.join()
if __name__ == '__main__':
start_server()
还有我的客户(client.py):
import requests
import socketio
PORT = "5000"
IP = "localhost"
sio = socketio.Client()
@sio.event
def connect():
print("connection established to server connect event")
@sio.event
def disconnect():
print("client disconnected from server")
@sio.event
def server_response(response):
print("response from server recived")
print(f"response received from server: {response}")
@sio.event
def acknowledge(ack):
print(ack)
if __name__ == '__main__':
data = {}
stage = "dev"
token = get_token(stage)
token = "Radom_token"
if token:
print("Token has been generated.")
url = f"http://{IP}:{PORT}"
# sio.connect(url, transports=['websocket', 'polling'], headers={'token': token})
sio.connect(url=url)
sio.wait()
else:
print("Token has not been generated ")
我已经验证服务端和客户端都连接成功,但是客户端没有触发response_from_server事件。我怀疑我发出事件的方式或线程设置可能存在问题。
任何有关如何排查和解决此问题的见解或建议将不胜感激。谢谢!
您正在将线程与 eventlet 混合。 Eventlet 是一个单线程异步平台,您必须留在主线程中才能工作。
一个相对简单的解决方案是对 Python 标准库进行猴子补丁,以便许多线程/网络调用被 greenlet 友好的等效项替换。您不仅需要它用于后台线程,还需要使 Redis 在 eventlet 下工作。