优化golang聊天应用中的消息广播

问题描述 投票:0回答:1

我正在用 Golang 编写一个聊天应用程序。我有下面的示例代码需要优化。

type Hub struct {
  Clients         map[int64]*Client
  Broadcast       chan *model.ChannelMessage 
  SingleBroadcast chan *model.ChannelMessage //buffered with len 10
}

func (h *Hub) SingleChatRun() {
  //handle message transfer between users
  for {
    m := <-h.SingleBroadcast
    saveMessageToDB()
    writeMessageToClientWebsocket()
    writeMessageToSenderWebsocket()  
  }
}

func (c *Client) ReadMessage(){
  //read message from client websocket
  for {
    clientMessage := c.Conn.ReadJSON(&clientMessage)
    //one to one message 
    hub.SingleBroadcast <- clientMessage
  }
}

问题是,“SingleBroadcast”通道在每个消息传输中都是阻塞且缓慢的,这很糟糕。

  1. 删除“SingleBroadcast”的用法并处理“ReadMessage”内的“3个功能” 1.1.此选项会使 websocket 的读取速度变慢,但仅限于非常活跃的用户。也使用更多的资源 1.2.通过为每条消息使用 goroutine 来处理“3 个函数”,使其速度更快。这更快,但是有很多 goroutine

  2. 使用 goroutine 处理“SingleBroadcast”,如下所示:

func (h *Hub) SingleChatRun() {
  //handle message transfer between users
  for {
    m := <-h.SingleBroadcast
    go func(){
       saveMessageToDB()
       writeMessageToClientWebsocket()
       writeMessageToSenderWebsocket()
    }
  }
}

2.1。这比方法 1 慢,但创建的 goroutine 更少

  1. 使用消息队列/代理

  2. 使用 goroutine 池。我不知道

我是 Golang 新手,这些事情很令人困惑。

go chat channel goroutine
1个回答
0
投票

如果没有 MRE

,很难提出整体解决方案

因此,虽然通道在您需要时非常有用,但在学习语言时它们经常被过度使用。这可以在不使用任何内容的情况下完成——尽管我建议使用一个缓冲对数据库的写入。由于数据库通常是瓶颈,因此您不希望写入量线性增长。

const (
  avgClients    = 1024
  dbBufSize     = 64
  flushInterval = time.Second * 5
)

type Hub struct {
  clients         map[int64]*Client
  rows            chan *model.Row  // Or whatever your type is
}

func NewHub() *Hub {
  rows := make(chan *model.Row, dbBufSize)
  go flusher(rows)

  return &Hub{
    clients: make(map[int64]*Client, avgClients),
    rows: make(chan *model.Row, dbBufSize),
  }
}

func (h *Hub) Send(src *Client, msg *model.ChannelMessage) {
  h.rows <- makeRow(msg)
  // Logic
}


type Client {
  hub  *Hub
  conn *websocket.Conn // Or whatever the type is
}

func (c *Client) Loop(){
  var msg *model.ChannelMessage
  for {
    msg = c.conn.ReadJSON(&msg)

    // Run this with 'go c.hub.Send(c, msg) if you need
    // to process messages before previous ones are finished
    c.hub.Send(c, msg)
  }
}

func flusher(rows <-chan *model.Row) {
  buf := make([]*model.Row, 0, dbBufSize)
  ticker := time.NewTicker(flushInterval)

  flush := func() {
    writeDB(buf)
    buf = buf[:0]
  }

  for {
    select {
    case row := <-rows:
      buf = append(buf, row)
      if len(buf) >= dbBufSize {
        flush()
        ticker.Reset(flushInterval)
      }
    case <-ticker.C:
      if len(buf) > 0 {
        flush()
      }
    }
  }
}

© www.soinside.com 2019 - 2024. All rights reserved.