Nginx 无法与 Socket.io 作为 LoadBalancer 正常工作

问题描述 投票:0回答:1

我有一个使用 Apache-Kafka 的 Nodejs 工作 socket.io 应用程序。现在我想扩展我的 socket.io,为此我创建了我的 socketservice.js 的多个实例。我创建了三个文件 server1.js server2.js server3.js 它们都有相同的代码和不同的端口。现在我想使用 Nginx 作为 LoadBalancer,这样当用户连接时,它首先连接到 server1,然后如果其他用户到来,它会连接到 server2,对于 server3 也是如此。

这是我的 nginx.conf

events {}
http {
    upstream socket_services {
        server localhost:4001;
        server localhost:4002;
        server localhost:4003;
    }

    upstream fetch_service {
        server localhost:5000;
    }

    server {
        listen 8080;
        server_name localhost;

        location /api/chat/ {
            proxy_pass http://fetch_service;
        }

        location / {
            proxy_pass http://socket_services;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
    }
}


和我的socketservice.js

const express = require('express');
const cors = require('cors');
const { kafka, producer, getConsumer } = require('./controllers/healsynckafka');
const createSocketServer = require('./controllers/socketserver');
const GroupChatMessage = require("./dbSchema/GroupChatMessage");
const OneOnOneChatMessage = require("./dbSchema/OneOnOneChatMessage");
const RoomChatMessage = require("./dbSchema/RoomMessages");
const { Kafka } = require('kafkajs');
const fs = require('fs');

const app = express();

app.use(cors({
    origin: '*'
}));

const { server, io } = createSocketServer(app);

const runConsumer = async () => {
const consumer = await getConsumer('healsync');
await consumer.connect();
await consumer.subscribe({ topic: 'Messages' });

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        const { key, value } = message;
        const parsedMessage = JSON.parse(value.toString());
        if (key) {
            const timestamp = parseInt(key.toString());
            if(parsedMessage.room){
                const roomMessage = new RoomChatMessage({
                    room: parsedMessage.room,
                    sender: parsedMessage.sender,
                    content: parsedMessage.content,
                    createdAt: new Date(timestamp),
                });
                try {
                    io.to(room).emit("roommessage", roomMessage);
                } catch (error) {
                    console.error('Error saving room message:', error.message);
                }
            }
            else if (parsedMessage.receiverId) {
                const oneOnOneMessage = new OneOnOneChatMessage({
                    senderId: parsedMessage.senderId,
                    receiverId: parsedMessage.receiverId,
                    content: parsedMessage.content,
                    createdAt: new Date(timestamp),
                });
                try {
                    const receiverSocket = users[parsedMessage.receiverId];
                    if (receiverSocket) {
                        receiverSocket.emit('privateMessage', oneOnOneMessage);
                    } else {
                        console.log('Receiver is not online or invalid.');
                    }
                } catch (error) {
                    console.error('Error saving one-on-one message:', error.message);
                }
            } else {
                const groupChatMessage = new GroupChatMessage({
                    senderId: parsedMessage.senderId,
                    content: parsedMessage.content,
                    createdAt: new Date(timestamp),
                });
                try {
                   
                    io.emit('message', groupChatMessage);
                } catch (error) {
                    console.error('Error saving group chat message:', error.message);
                }
            }
        } else {
            console.log('Message key not found.');
        }
    },
});
};

const runProducer = async () => {
  await producer.connect();
};

const sendMessage = async (key, message) => {
try {
    await producer.send({
        topic: 'Messages',
        messages: [{ key: key.toString(), value: JSON.stringify(message) }],
    });
} catch (error) {
    console.error('Error sending message:', error.message);
}
};

runConsumer().catch(console.error);
runProducer().catch(console.error);



const users = {};
io.on('connection', (socket) => {
    console.log('New user connected ->', socket.id, ' on PORT --> username', PORT);
    socket.on('setUsername', (username) => {
        users[username] = socket;
        console.log(`Username set for ${socket.id}: ${username}`);
    });

    socket.on("fetchMessages", async (room) => {
        try {
          const messages = await RoomChatMessage.find({ room }).sort({ createdAt: 1 }).exec();
          socket.emit("fetchedMessages", messages);
        } catch (err) {
          console.error("Error fetching messages:", err);
        }
    });

    socket.on('message', async (data) => {
      const timestamp = new Date().getTime();
        const message = new GroupChatMessage({
            senderId: data.senderId,
            content: data.content,
        });
        await sendMessage(timestamp, message);
    });

    socket.on("joinRoom", async ({ room, sender }) => {
        socket.join(room);
    });

    socket.on("roommessage", async (data) => {
        const { room, sender, content } = data;
        console.log("server roommessage event on - ",room,sender,content)
        const timestamp = new Date().getTime();


        const newMessage = new RoomChatMessage({
            room: room,
            sender: sender,
            content: content
        })

        await sendMessage(timestamp , newMessage)

        
        console.log("sended to roomsg event");
    });
    socket.on("leaveRoom", (roomName) => {
    socket.leave(roomName);
    });
    socket.on('privateMessage', async (data) => {
      const timestamp = new Date().getTime();
        const { username, content, senderId } = data;
        const message = new OneOnOneChatMessage({
            senderId: senderId,
            receiverId: username,
            content: content
        });
        await sendMessage(timestamp,message);
        
    });

    socket.on('disconnect', () => {
        console.log('User disconnected');
        delete users[socket.id];
    });
});

const PORT = process.env.PORT || 4001;
server.listen(PORT, () => {
    console.log(`Server running on port ${PORT}`);
});

我的 client.jsx 是

useEffect(() => {

    if(localStorage.getItem('username') === null){

      window.location.href = "/";
    } 
    const newSocket = io(api);
    setSocket(newSocket);
    const username = localStorage.getItem('username');
    setMyusername(username);

    newSocket.on('message', handleReceivedMessage);
    newSocket.on('privateMessage', handleReceivedMessagePrivate);
    newSocket.on('connect', () => {
      setUserId(newSocket.id);
      newSocket.emit('setUsername', username);
    });
    if(!initialDataFetched) {
      fetchChatData(username);
      setinitialDataFetched(true);
    }
    
  }, [api]);

app.js

function App() {
  const api = "http://localhost:8080";
  const [store, setStore] = useState();
  const [showSearchResult, setShowSearchResult] = useState(false);

  const updateStore = (data) => {
    setStore(data);
  };

  return (
    <div className='mainApp'>
      <ChatContext.Provider value={{ api, store, updateStore, showSearchResult, setShowSearchResult }}>
        <Router>
          <Routes>
            <Route path="/" element={<Login />} /> 
            <Route path="/mainpanel" element={<MainPanel />} />
            <Route path="/rooms/" element={<Room />} />

          </Routes>
        </Router>
      </ChatContext.Provider>
    </div>
  );
}

现在的问题是当我运行所有套接字服务器时。 它们都与未知的连接并连续随机地打印socket.id(这不应该发生)并且不会发生通信。

打印了这么多,但实际上只有两个用户 user1 和 user2。

server1
New user connected -> TgTAgdIkxgvbQS15AAAE  on PORT --> username 4001
New user connected -> Kt3Z_EXytt6sUDH5AAAK  on PORT --> username 4001
New user connected -> oK2zync3xT1b4FBUAAAP  on PORT --> username 4001
New user connected -> 07gSFZLnADv__ZisAAAS  on PORT --> username 4001
New user connected -> tbvoim0zUfHXnEVKAAAT  on PORT --> username 4001

server2
New user connected -> RawBlM_7Y1JMhp_bAADn  on PORT --> 4002
New user connected -> ZRF6Nr-qClCD3K9dAADq  on PORT --> 4002
User disconnected
User disconnected
User disconnected
User disconnected
New user connected -> A_tb4NRJGXzvoXwfAADv  on PORT --> 4002
User disconnected
New user connected -> Ycrh4CVvLYfgt96vAADx  on PORT --> 4002
User disconnected
User disconnected
New user connected -> NOJoJDFoF8Ucz5OIAAD2  on PORT --> 4002
User disconnected
New user connected -> 3S55_55A_gWWP9NOAAD5  on PORT --> 4002
User disconnected
New user connected -> J6efH-fepC_4aPomAAD9  on PORT --> 4002

server3
New user connected -> wImmJwBfE8wo8MhYAAEG  on PORT --> 4003
New user connected -> P0vuOGdXyKOtmzqhAAEJ  on PORT --> 4003
User disconnected
New user connected -> njDaGQHzJywH1upJAAEL  on PORT --> 4003
New user connected -> X1Fgn8AqWmmf_o1uAAEO  on PORT --> 4003
New user connected -> XTi4s6MJytczkl1UAAEQ  on PORT --> 4003
New user connected -> vgu57U3p41NLQY9_AAES  on PORT --> 4003
New user connected -> zJPY00O_apWAyal2AAEU  on PORT --> 4003
New user connected -> dUJhUM8kjpzdwJeXAAEX  on PORT --> 4003
User disconnected
New user connected -> h9DkdxDig5XvpbSQAAEa  on PORT --> 4003
User disconnected
New user connected -> uLeIIK-TlvW3MQGMAAEc  on PORT --> 4003
New user connected -> 5S-6dWw39qloC1IDAAEe  on PORT --> 4003
User disconnected
User disconnected
New user connected -> kdxEWfpoFSlIfcKZAAEg  on PORT --> 4003
User disconnected
New user connected -> CWPD1w0ZseFZT7X3AAEj  on PORT --> 4003
User disconnected
User disconnected
New user connected -> 4FoufpwSEXq3eWdAAAEm  on PORT --> 4003
User disconnected
New user connected -> r9HGMlQ4yGcRF8zoAAEo  on PORT --> 4003
New user connected -> wgHM9oW9izwaeqr4AAEr  on PORT --> 4003
User disconnected
User disconnected
New user connected -> AonCTiG1uPu9gEWHAAEw  on PORT --> 4003
User disconnected
User disconnected
User disconnected
New user connected -> 3TuAwdtPWmTMx4trAAEy  on PORT --> 4003
New user connected -> Z3kcBVKFRF6EmJhUAAE0  on PORT --> 4003
New user connected -> _USV1n1W9aCOoEBQAAE2  on PORT --> 4003
New user connected -> tuxnwDYVKQE5-b2TAAE5  on PORT --> 4003
User disconnected
User disconnected
New user connected -> UELLulq0iWlVHURKAAE9  on PORT --> 4003
User disconnected
User disconnected
New user connected -> tb3FGcqJGj2Eh51gAAFA  on PORT --> 4003
User disconnected
User disconnected
User disconnected
New user connected -> xhf5_9zZqpXrOH6MAAFC  on PORT --> 4003
User disconnected
User disconnected
User disconnected
New user connected -> n8X4SpZ0Dt7QXM94AAFH  on PORT --> 4003
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
User disconnected
New user connected -> FaRf7nRUlyR-dM6kAAFa  on PORT --> 4003

如果我们谈论没有 nginx 时实际且正确的控制台是什么样子

New user connected -> 07gSFZLnADv__ZisAAAS  on PORT --> user2 4001
New user connected -> tbvoim0zUfHXnEVKAAAT  on PORT --> user1 4001
Username set for tbvoim0zUfHXnEVKAAAT: user1
Username set for 07gSFZLnADv__ZisAAAS: user2

当我不使用 Nginx 时,它可以工作。 我希望当用户连接时将用户发送到 server1:4001,当另一个用户连接时将他发送到 server2:4002 等等,以循环方式。 我考虑了 Socket.io not work with nginx 的解决方案,但它对我不起作用。但我认为问题大致相似。

node.js nginx socket.io load-balancing nginx-reverse-proxy
1个回答
0
投票

一旦在客户端和特定

socket.io
服务器实例之间建立连接,它需要在整个会话期间保持与同一实例的连接。

因此,您需要在 Nginx 上配置粘性会话才能使其正常工作。

您可以通过使用 IP 哈希来实现这一点:

http {
    upstream socket_services {
        ip_hash;
        server localhost:4001;
        server localhost:4002;
        server localhost:4003;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.