我有一个使用 Apache-Kafka 的 Nodejs 工作 socket.io 应用程序。现在我想扩展我的 socket.io,为此我创建了我的 socketservice.js 的多个实例。我创建了三个文件 server1.js server2.js server3.js 它们都有相同的代码和不同的端口。现在我想使用 Nginx 作为 LoadBalancer,这样当用户连接时,它首先连接到 server1,然后如果其他用户到来,它会连接到 server2,对于 server3 也是如此。
这是我的 nginx.conf
events {}
http {
upstream socket_services {
server localhost:4001;
server localhost:4002;
server localhost:4003;
}
upstream fetch_service {
server localhost:5000;
}
server {
listen 8080;
server_name localhost;
location /api/chat/ {
proxy_pass http://fetch_service;
}
location / {
proxy_pass http://socket_services;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}
和我的socketservice.js
const express = require('express');
const cors = require('cors');
const { kafka, producer, getConsumer } = require('./controllers/healsynckafka');
const createSocketServer = require('./controllers/socketserver');
const GroupChatMessage = require("./dbSchema/GroupChatMessage");
const OneOnOneChatMessage = require("./dbSchema/OneOnOneChatMessage");
const RoomChatMessage = require("./dbSchema/RoomMessages");
const { Kafka } = require('kafkajs');
const fs = require('fs');
const app = express();
app.use(cors({
origin: '*'
}));
const { server, io } = createSocketServer(app);
const runConsumer = async () => {
const consumer = await getConsumer('healsync');
await consumer.connect();
await consumer.subscribe({ topic: 'Messages' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const { key, value } = message;
const parsedMessage = JSON.parse(value.toString());
if (key) {
const timestamp = parseInt(key.toString());
if(parsedMessage.room){
const roomMessage = new RoomChatMessage({
room: parsedMessage.room,
sender: parsedMessage.sender,
content: parsedMessage.content,
createdAt: new Date(timestamp),
});
try {
io.to(room).emit("roommessage", roomMessage);
} catch (error) {
console.error('Error saving room message:', error.message);
}
}
else if (parsedMessage.receiverId) {
const oneOnOneMessage = new OneOnOneChatMessage({
senderId: parsedMessage.senderId,
receiverId: parsedMessage.receiverId,
content: parsedMessage.content,
createdAt: new Date(timestamp),
});
try {
const receiverSocket = users[parsedMessage.receiverId];
if (receiverSocket) {
receiverSocket.emit('privateMessage', oneOnOneMessage);
} else {
console.log('Receiver is not online or invalid.');
}
} catch (error) {
console.error('Error saving one-on-one message:', error.message);
}
} else {
const groupChatMessage = new GroupChatMessage({
senderId: parsedMessage.senderId,
content: parsedMessage.content,
createdAt: new Date(timestamp),
});
try {
io.emit('message', groupChatMessage);
} catch (error) {
console.error('Error saving group chat message:', error.message);
}
}
} else {
console.log('Message key not found.');
}
},
});
};
const runProducer = async () => {
await producer.connect();
};
const sendMessage = async (key, message) => {
try {
await producer.send({
topic: 'Messages',
messages: [{ key: key.toString(), value: JSON.stringify(message) }],
});
} catch (error) {
console.error('Error sending message:', error.message);
}
};
runConsumer().catch(console.error);
runProducer().catch(console.error);
const users = {};
io.on('connection', (socket) => {
console.log('New user connected ->', socket.id, ' on PORT --> username', PORT);
socket.on('setUsername', (username) => {
users[username] = socket;
console.log(`Username set for ${socket.id}: ${username}`);
});
socket.on("fetchMessages", async (room) => {
try {
const messages = await RoomChatMessage.find({ room }).sort({ createdAt: 1 }).exec();
socket.emit("fetchedMessages", messages);
} catch (err) {
console.error("Error fetching messages:", err);
}
});
socket.on('message', async (data) => {
const timestamp = new Date().getTime();
const message = new GroupChatMessage({
senderId: data.senderId,
content: data.content,
});
await sendMessage(timestamp, message);
});
socket.on("joinRoom", async ({ room, sender }) => {
socket.join(room);
});
socket.on("roommessage", async (data) => {
const { room, sender, content } = data;
console.log("server roommessage event on - ",room,sender,content)
const timestamp = new Date().getTime();
const newMessage = new RoomChatMessage({
room: room,
sender: sender,
content: content
})
await sendMessage(timestamp , newMessage)
console.log("sended to roomsg event");
});
socket.on("leaveRoom", (roomName) => {
socket.leave(roomName);
});
socket.on('privateMessage', async (data) => {
const timestamp = new Date().getTime();
const { username, content, senderId } = data;
const message = new OneOnOneChatMessage({
senderId: senderId,
receiverId: username,
content: content
});
await sendMessage(timestamp,message);
});
socket.on('disconnect', () => {
console.log('User disconnected');
delete users[socket.id];
});
});
const PORT = process.env.PORT || 4001;
server.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
我的 client.jsx 是
useEffect(() => {
if(localStorage.getItem('username') === null){
window.location.href = "/";
}
const newSocket = io(api);
setSocket(newSocket);
const username = localStorage.getItem('username');
setMyusername(username);
newSocket.on('message', handleReceivedMessage);
newSocket.on('privateMessage', handleReceivedMessagePrivate);
newSocket.on('connect', () => {
setUserId(newSocket.id);
newSocket.emit('setUsername', username);
});
if(!initialDataFetched) {
fetchChatData(username);
setinitialDataFetched(true);
}
}, [api]);
app.js
function App() {
const api = "http://localhost:8080";
const [store, setStore] = useState();
const [showSearchResult, setShowSearchResult] = useState(false);
const updateStore = (data) => {
setStore(data);
};
return (
<div className='mainApp'>
<ChatContext.Provider value={{ api, store, updateStore, showSearchResult, setShowSearchResult }}>
<Router>
<Routes>
<Route path="/" element={<Login />} />
<Route path="/mainpanel" element={<MainPanel />} />
<Route path="/rooms/" element={<Room />} />
</Routes>
</Router>
</ChatContext.Provider>
</div>
);
}
现在的问题是当我运行所有套接字服务器时。 它们都与未知的连接并连续随机地打印socket.id(这不应该发生)并且不会发生通信。
打印了这么多,但实际上只有两个用户 user1 和 user2。
server1
New user connected -> TgTAgdIkxgvbQS15AAAE on PORT --> username 4001
New user connected -> Kt3Z_EXytt6sUDH5AAAK on PORT --> username 4001
New user connected -> oK2zync3xT1b4FBUAAAP on PORT --> username 4001
New user connected -> 07gSFZLnADv__ZisAAAS on PORT --> username 4001
New user connected -> tbvoim0zUfHXnEVKAAAT on PORT --> username 4001
server2
New user connected -> RawBlM_7Y1JMhp_bAADn on PORT --> 4002
New user connected -> ZRF6Nr-qClCD3K9dAADq on PORT --> 4002
User disconnected
User disconnected
User disconnected
User disconnected
New user connected -> A_tb4NRJGXzvoXwfAADv on PORT --> 4002
User disconnected
New user connected -> Ycrh4CVvLYfgt96vAADx on PORT --> 4002
User disconnected
User disconnected
New user connected -> NOJoJDFoF8Ucz5OIAAD2 on PORT --> 4002
User disconnected
New user connected -> 3S55_55A_gWWP9NOAAD5 on PORT --> 4002
User disconnected
New user connected -> J6efH-fepC_4aPomAAD9 on PORT --> 4002
server3
New user connected -> wImmJwBfE8wo8MhYAAEG on PORT --> 4003
New user connected -> P0vuOGdXyKOtmzqhAAEJ on PORT --> 4003
User disconnected
New user connected -> njDaGQHzJywH1upJAAEL on PORT --> 4003
New user connected -> X1Fgn8AqWmmf_o1uAAEO on PORT --> 4003
New user connected -> XTi4s6MJytczkl1UAAEQ on PORT --> 4003
New user connected -> vgu57U3p41NLQY9_AAES on PORT --> 4003
New user connected -> zJPY00O_apWAyal2AAEU on PORT --> 4003
New user connected -> dUJhUM8kjpzdwJeXAAEX on PORT --> 4003
User disconnected
New user connected -> h9DkdxDig5XvpbSQAAEa on PORT --> 4003
User disconnected
New user connected -> uLeIIK-TlvW3MQGMAAEc on PORT --> 4003
New user connected -> 5S-6dWw39qloC1IDAAEe on PORT --> 4003
User disconnected
User disconnected
New user connected -> kdxEWfpoFSlIfcKZAAEg on PORT --> 4003
User disconnected
New user connected -> CWPD1w0ZseFZT7X3AAEj on PORT --> 4003
User disconnected
User disconnected
New user connected -> 4FoufpwSEXq3eWdAAAEm on PORT --> 4003
User disconnected
New user connected -> r9HGMlQ4yGcRF8zoAAEo on PORT --> 4003
New user connected -> wgHM9oW9izwaeqr4AAEr on PORT --> 4003
User disconnected
User disconnected
New user connected -> AonCTiG1uPu9gEWHAAEw on PORT --> 4003
User disconnected
User disconnected
User disconnected
New user connected -> 3TuAwdtPWmTMx4trAAEy on PORT --> 4003
New user connected -> Z3kcBVKFRF6EmJhUAAE0 on PORT --> 4003
New user connected -> _USV1n1W9aCOoEBQAAE2 on PORT --> 4003
New user connected -> tuxnwDYVKQE5-b2TAAE5 on PORT --> 4003
User disconnected
User disconnected
New user connected -> UELLulq0iWlVHURKAAE9 on PORT --> 4003
User disconnected
User disconnected
New user connected -> tb3FGcqJGj2Eh51gAAFA on PORT --> 4003
User disconnected
User disconnected
User disconnected
New user connected -> xhf5_9zZqpXrOH6MAAFC on PORT --> 4003
User disconnected
User disconnected
User disconnected
New user connected -> n8X4SpZ0Dt7QXM94AAFH on PORT --> 4003
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
New user connected -> FaRf7nRUlyR-dM6kAAFa on PORT --> 4003
如果我们谈论没有 nginx 时实际且正确的控制台是什么样子
New user connected -> 07gSFZLnADv__ZisAAAS on PORT --> user2 4001
New user connected -> tbvoim0zUfHXnEVKAAAT on PORT --> user1 4001
Username set for tbvoim0zUfHXnEVKAAAT: user1
Username set for 07gSFZLnADv__ZisAAAS: user2
当我不使用 Nginx 时,它可以工作。 我希望当用户连接时将用户发送到 server1:4001,当另一个用户连接时将他发送到 server2:4002 等等,以循环方式。 我考虑了 Socket.io not work with nginx 的解决方案,但它对我不起作用。但我认为问题大致相似。
一旦在客户端和特定
socket.io
服务器实例之间建立连接,它需要在整个会话期间保持与同一实例的连接。
因此,您需要在 Nginx 上配置粘性会话才能使其正常工作。
您可以通过使用 IP 哈希来实现这一点:
http {
upstream socket_services {
ip_hash;
server localhost:4001;
server localhost:4002;
server localhost:4003;
}
}