如何从flask socketio中的线程发射到某个套接字?

问题描述 投票:1回答:1

我想隔离每个套接字连接并发出该特定的连接,当我尝试在不使用线程的情况下运行时,它可以正常工作,并且每个套接字都获得了自己的消息,但是当我使用线程时,它就开始广播并且所有客户端都收到所有消息。

这是我的客户端的外观:

var socket = io.connect('https://my_server_link');

	socket.on('connect', function() {
		console.log('Connection Success')
	});

	// Recieves the session id
	socket.on('connection_status',  (data)=>{
		console.log('Connection Status')
		console.log(data);
		session_id = data['session_id']
		socket.emit('verifyClient', {'session_id': session_id, 'api_id': api_id})
	})

	socket.on('verification_status', (data)=>{
		console.log('Verification Status')
		console.log(data)
		if (data['status']){
			console.log('Verified')
			socket.emit('setupUser', {'session_id': session_id, 'user_id': user_id})
		}
	})

	socket.on('user_setup_status', (data)=>{
		console.log('User Setup Status');
		console.log(data);
		if (data['status']){
			console.log('User Setup Successful, Starting monitoring...')
			running_ops = setInterval(()=>{
				op_interval(session_id)
			}, 5000)
		}
	})

	function op_interval(session_id){
		ctx2.drawImage(video, 0, 0, video.width, video.height);
		frame = canva.toDataURL('image/jpeg');
		frame = to_blob(frame);
		socket.emit('monitor', {
			'frame':frame,
			'session_id':session_id
			});
	}

	// when error messages are encountered, generate alert on client side
	socket.on('alert', (alert)=>{
		console.log(alert);
		// TODO : Handle Alerts Here
	})

这是我的服务器端。

app = Flask(__name__)

CORS(app, resources={"*": {"origins": "*"}})

# eventlet.monkey_patch()

socketio = SocketIO(app, async_mode='threading', cors_allowed_origins="*")

# Set up socket connection to the client and register session id
@socketio.on('connect')
def handle_connection():
	print('Connection Successfull')
	# connection_id = request.sid
	session_id = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
	active_socket_sessions[session_id] = {'session_id': session_id}#, 'connection_id': connection_id}
	join_room(session_id)
	emit('connection_status', {'status':'success', 'session_id':session_id}, room=session_id)


@socketio.on('verifyClient')
def verification(data):
	session_id = data['session_id']
	client_id = data['api_id']
	verified = False
	message = "Session not found."

	if session_id in active_socket_sessions.keys():
		client, message = verify_client(database, client_id, active_sessions)

		if len(client.keys()) > 0:
			verified = True
			active_socket_sessions[session_id]['client_id'] = client['client_id']

		emit('verification_status', {'status':verified, 'message': message, 'session_id':session_id}, room=session_id)
	else:
		emit('verification_status', {'status':verified, 'message': message, 'session_id':session_id})


@socketio.on('setupUser')
def user_setup(data):
	session_id = data['session_id']
	user_id = data['user_id']
	found = False
	message = 'Session not found.'

	if session_id in active_socket_sessions.keys():
		client_id = active_socket_sessions[session_id]['client_id']
		user, message = get_user(database, {'client_id': client_id}, user_id)

		if len(user.keys()) > 0:
			found = True
			user['person_encoding'] = str(user['person_encoding'])
			active_socket_sessions[session_id]['user'] = user
			rediss.set_dict(session_id, active_socket_sessions[session_id])
			active_socket_sessions[session_id] = {}

		emit('user_setup_status', {'status': found, 'message': message, 'session_id': session_id}, room=session_id)
	else:
		emit('user_setup_status', {'status': found, 'message': message, 'session_id': session_id})


@socketio.on('disconnect')
def handle_disconnect():
	# Handle Clearing Of Socket From Redis And Also Closing The Socket Connection
	emit('connection_ended', {'data': 'Successfully Disconnected, Socket Is Closed.'})


@socketio.on('monitor')
def monitor(data):
	frame = data['frame']
	session_id = data['session_id']
	socketio.start_background_task(monitoring_process, frame, session_id)


def monitoring_process(frame, session_id):
	if session_id in active_socket_sessions.keys():
		session = rediss.get_dict(session_id)
		client_id = session['client_id']
		user = session['user']
		user_encoding = np.array(eval(user['person_encoding']))
		person_id = user['person_id']
		browser_info = {}

		frame = cv2.imdecode(np.frombuffer(frame, np.uint8), -1)
		preds = model.model.predict(frame, user_encoding)

		if not preds['recognized']:
			alert = vars.BRISO_ALERTS[preds['message']]
			alert['timestamp'] = str(time.time())
			alert['log_id'] = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
			alert['person_id'] = person_id
			headers = browser_info
			message, flag = log_alert(database, client_id, alert, headers)
			preds['session_id'] = session_id
			socketio.emit('alert', preds, room=session_id)


if __name__ == "__main__":
	socketio.run(app, host=vars.HOST_NAME, port=vars.SERVER_PORT, debug=False, use_reloader=False)
python flask socket.io flask-socketio
1个回答
0
投票
[好吧,正如Miguel Grinberg本人所建议的那样,当我在github上问他时,我唯一要做的替换是从将我自己的session_id设置为request.sid而不是调用join_room方法。

更新后的(工作)代码如下:

© www.soinside.com 2019 - 2024. All rights reserved.