Socket.io断开并重做

问题描述 投票:3回答:3

我正在尝试通过从容纳我所有数据的常规数据结构切换到将redis与集群一起使用来使我的socket.io应用程序可扩展。但是,我遇到了一些问题,因为在当前实现的某个时刻,我将套接字对象以及其他属性存储在此数据结构data[socket.id].socket = socket中,因为在我的应用程序中有时我需要执行data[someId].socket.disconnect()来手动断开套接字。

我知道我无法将对象直接存储到Redis中,所以我尝试使用JSON.stringify(socket)失败,因为socket是循环的。是否有仅使用id来断开套接字的另一种方法?这样,我可以像这样id来存储data[socket.id].id = socket.id并可以像data[someId].id.disconnect()之类来命名它。因此,基本上,我正在寻找一种无需访问实际套接字对象即可断开套接字的方法(我确实可以访问io对象)。

谢谢大家的帮助。

node.js redis socket.io scalability
3个回答
2
投票

似乎已经完成但没有记录在任何地方... io.sockets.clients(someId)获取套接字对象,而不管它在什么实例上调用,因此唯一需要做的就是使用io.sockets.clients(someId).disconnect()并且实际上将断开客户端的连接,无论它连接到哪个实例。我实际上不需要将它们存储在自己的阵列中。


0
投票

我做类似的事情:

var connTable = {};
function onConnection(socket) {
  connTable[socket.id] = socket;

  socket.on("close", function(data, callback) {
    socket.disconnect();
    onDisconnect(socket.id);
  });


  socket.on('disconnect', function(){
    onDisconnect(socket.id);
  });
}

function manuallyDisconnect(socket_id) {
  connTable[socket_id].disconnect();
}

function onDisconnect() {
  //closing process, or something....
}

0
投票

在nodejs,nginxpm2前面使用socket.io-redis

NGINX.conf

    server {
        server_name                         www.yoursite.io;
        listen                              443 ssl http2;
        listen                              [::]:443 ssl http2;

        location / {
            proxy_set_header                X-Real-IP $remote_addr;
            proxy_set_header                X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header                Host $http_host;
            proxy_set_header                X-NginX-Proxy false;
            proxy_redirect                  off;
            proxy_http_version              1.1;
            proxy_set_header                Upgrade $http_upgrade;
            proxy_set_header                Connection "upgrade";
            proxy_pass                      http://[::1]:4000;
        }
   }

PM2运行集群模式,四个实例...

pm2 start app.js -i 4

app.js

console.clear()
require('dotenv').config()
const express = require('express'),
  app = express(),
  Redis = require('ioredis')
if(process.env.debug === 'true')
  app.use(require('morgan')(':method :url :status :res[content-length] - :response-time ms'))
app.locals = Object.assign(app.locals, {
  sock_transports: process.env.sock_transports.split('|'),
  sock_timeout: process.env.sock_timeout,
  title: process.env.title,
  meta_desc: process.env.meta_desc,
  app_url: 'https://' + process.env.app_subdomain + '.' + process.env.app_domain
})
app.set('functions', require('./lib/functions')(app))
app.set('view engine', 'hbs')
app.set('view cache', false)
app.engine('hbs', require('./lib/hbs')(require('express-handlebars')).engine)
app.use(express.json({
  type: [
    'json'
  ]
}), express.urlencoded({
  extended: true
}))
const redis = new Redis({
  path: process.env.redis_socket,
  db: 1,
  enableReadyCheck: true
})
console.time('Redis')
redis.on('ready', () => {
  console.timeEnd('Redis')
  app.set('redis', redis)
})
redis.on('error', err => {
  console.log('Redis: ' + app.get('colors').redBright(err))
  exitHandler()
})
function loadSessionMiddleware() {
  const session = require('express-session'),
    RedisSession = require('connect-redis')(session),
    client = new Redis({
      path: process.env.redis_socket,
      db: 5
    })
  app.set('session_vars', {
    secret: process.env.session_secret,
    name: process.env.session_name,
    store: new RedisSession({
      client
    }),
    rolling: true,
    saveUninitialized: true,
    unset: 'destroy',
    resave: true,
    proxy: true,
    logErrors: process.env.debug === 'true',
    cookie: {
      path: '/',
      domain: '.' + process.env.app_domain,
      secure: true,
      sameSite: true,
      httpOnly: true,
      expires: false,
      maxAge: 60000 * process.env.session_exp_mins,
    }
  })
  app.set('session', session(app.get('session_vars')))
  const ua = require('useragent')
  ua(true)
  app.set('useragent', ua)
  app.use(
    app.get('session'),
    require('./middleware')(app)
  )
  loadControllers()
}
function loadControllers() {
  require('fs').readdirSync('./controllers').filter(file => {
    return file.slice(-3) === '.js'
  }).forEach(file => {
    require('./controllers/' + file)(app)
  })
  app.get('*', (req, res) => {
    app.get('functions').show404(req, res)
  })
  initServer()
}
function initServer() {
  console.time('Server')
  const server = require('http').createServer(app)
  server.on('error', err => {
    console.err('express err: ' + err)
    app.get('functions').stringify(err)
  })
  server.listen(process.env.app_port)
  app.set('server', server)
  require('./websocket').listen(app, websocket => {
    console.timeEnd('Server')
    app.set('websocket', websocket)
    // www-data
    process.setuid(process.env.app_uid)
  })
}
console.time('Database')
require('./model').load(app, db => {
  console.timeEnd('Database')
  app.set('model', db)
  loadSessionMiddleware()
})
function exitHandler() {
  if(app.get('server'))
    app.get('server').close()
  if(app.get('redis'))
    app.get('redis').quit()
  if(app.get('mail'))
    app.get('mail').close()
  process.exit(0)
}
process.on('SIGINT SIGUSR1 SIGUSR2', () => {
  exitHandler()
})
process.stdin.resume()

websocket.js

var exports = {}
exports.listen = (app, cb) => {
  const websocket = require('socket.io')(app.get('server'), {
    transports: process.env.transports
  }),
  req = {}
  websocket.setMaxListeners(0)
  websocket.adapter(require('socket.io-redis')({
    path: process.env.redis_socket,
    key: 'socket_io',
    db: 2,
    enableOfflineQueue: true
  }))
  websocket.use((socket, next) => {
    app.get('session')(socket.request, socket.request.res || {}, next)
  })
  websocket.isAccountLocked = cb => {
    if(!req.session.user_id) {
      cb(false)
      return
    }
    if(isNaN(req.session.user_id)) {
      cb(false)
      return
    }
    app.get('model').users.get(req.session.user_id, user_rec => {
      if(!user_rec) {
        cb(false)
        return
      }
      if(user_rec.account_locked === 'yes') {
        websocket.showClient(client => {
          app.get('model').users.logout(req.session, () => {
            console.log(client + ' ' + app.get('colors').redBright('Account Locked'))
            cb(true)
          })
        })
        return
      }
      cb(false)
    })
  }
  websocket.showClient = cb => {
    var outp = []
    if(!req.session.user_id && !req.session.full_name)
      outp.push(req.session.ip)
    if(req.session.user_id) {
      outp.push('# ' + req.session.user_id)
      if(req.session.full_name)
        outp.push(' - ' + req.session.full_name)
    }
    cb(app.get('colors').black.bgWhite(outp.join('')))
  }
  websocket.on('connection', socket => {
    if(!socket.request.session)
      return
    req.session = socket.request.session
    socket.use((packet, next) => {
        websocket.isAccountLocked(locked => {
            if(locked)
                return
            var save_sess = false
            if(typeof(socket.handshake.headers['x-real-ip']) !== 'undefined') {
          if(socket.handshake.headers['x-real-ip'] !== req.session.ip) {
            req.session.ip = socket.handshake.headers['x-real-ip']
            save_sess = true
          }
        }
        var ua = app.get('useragent').parse(socket.handshake.headers['user-agent']).toString()
        if(ua !== req.session.useragent) {
          req.session.useragent = ua
          save_sess = true
        }
            websocket.of('/').adapter.remoteJoin(socket.id, req.session.id, err => {
                delete socket.rooms[socket.id]
                if(!save_sess) {
            next()
            return
          }
                req.session.save(() => {
                  next()
                })
            })
      })
    })
    socket.on('disconnecting', () => {
      websocket.of('/').adapter.remoteDisconnect(req.session.id, true, err => {
      })
    })
    socket.on('auth', sess_vars => {
      function setSess() {
        if(sess_vars.path)
          req.session.path = sess_vars.path
        if(sess_vars.search_query)
          req.session.search_query = sess_vars.search_query
        if(sess_vars.search_query_long)
          req.session.search_query_long = sess_vars.search_query_long
        if(sess_vars.dispensary_id)
          req.session.dispensary_id = sess_vars.dispensary_id
        if(sess_vars.city)
          req.session.city = sess_vars.city
        if(sess_vars.state)
          req.session.state = sess_vars.state
        if(sess_vars.zip)
          req.session.zip = sess_vars.zip
        if(sess_vars.country)
          req.session.country = sess_vars.country
        if(sess_vars.hash)
          req.session.hash = sess_vars.hash
        req.session.save(() => {
          websocket.to(req.session.id).emit('auth', sess)
          app.get('functions').showVisitor({
            session: sess
          }, {
            statusCode: 200
          })
        })
      }
      setSess()
    })
    socket.on('logout', () => {
      var sess_ip = req.session.ip,
        sess_id = req.session.id,
        sess_email = req.session.email
      app.get('model').users.logout(req.session, () => {
        websocket.showClient(client => {
          console.log(client + ' - Logged Out')
        })
      })
    })
  })
  cb(websocket)
}
module.exports = exports
© www.soinside.com 2019 - 2024. All rights reserved.