我正在用 Golang 编写一个聊天应用程序。我有下面的示例代码需要优化。
type Hub struct {
Clients map[int64]*Client
Broadcast chan *model.ChannelMessage
SingleBroadcast chan *model.ChannelMessage //buffered with len 10
}
func (h *Hub) SingleChatRun() {
//handle message transfer between users
for {
m := <-h.SingleBroadcast
saveMessageToDB()
writeMessageToClientWebsocket()
writeMessageToSenderWebsocket()
}
}
func (c *Client) ReadMessage(){
//read message from client websocket
for {
clientMessage := c.Conn.ReadJSON(&clientMessage)
//one to one message
hub.SingleBroadcast <- clientMessage
}
}
问题是,“SingleBroadcast”通道在每个消息传输中都是阻塞且缓慢的,这很糟糕。
删除“SingleBroadcast”的用法并处理“ReadMessage”内的“3个功能” 1.1.此选项会使 websocket 的读取速度变慢,但仅限于非常活跃的用户。也使用更多的资源 1.2.通过为每条消息使用 goroutine 来处理“3 个函数”,使其速度更快。这更快,但是有很多 goroutine
使用 goroutine 处理“SingleBroadcast”,如下所示:
func (h *Hub) SingleChatRun() {
//handle message transfer between users
for {
m := <-h.SingleBroadcast
go func(){
saveMessageToDB()
writeMessageToClientWebsocket()
writeMessageToSenderWebsocket()
}
}
}
2.1。这比方法 1 慢,但创建的 goroutine 更少
使用消息队列/代理
使用 goroutine 池。我不知道
我是 Golang 新手,这些事情很令人困惑。
如果没有 MRE
,很难提出整体解决方案因此,虽然通道在您需要时非常有用,但在学习语言时它们经常被过度使用。这可以在不使用任何内容的情况下完成——尽管我建议使用一个缓冲对数据库的写入。由于数据库通常是瓶颈,因此您不希望写入量线性增长。
const (
avgClients = 1024
dbBufSize = 64
flushInterval = time.Second * 5
)
type Hub struct {
clients map[int64]*Client
rows chan *model.Row // Or whatever your type is
}
func NewHub() *Hub {
rows := make(chan *model.Row, dbBufSize)
go flusher(rows)
return &Hub{
clients: make(map[int64]*Client, avgClients),
rows: make(chan *model.Row, dbBufSize),
}
}
func (h *Hub) Send(src *Client, msg *model.ChannelMessage) {
h.rows <- makeRow(msg)
// Logic
}
type Client {
hub *Hub
conn *websocket.Conn // Or whatever the type is
}
func (c *Client) Loop(){
var msg *model.ChannelMessage
for {
msg = c.conn.ReadJSON(&msg)
// Run this with 'go c.hub.Send(c, msg) if you need
// to process messages before previous ones are finished
c.hub.Send(c, msg)
}
}
func flusher(rows <-chan *model.Row) {
buf := make([]*model.Row, 0, dbBufSize)
ticker := time.NewTicker(flushInterval)
flush := func() {
writeDB(buf)
buf = buf[:0]
}
for {
select {
case row := <-rows:
buf = append(buf, row)
if len(buf) >= dbBufSize {
flush()
ticker.Reset(flushInterval)
}
case <-ticker.C:
if len(buf) > 0 {
flush()
}
}
}
}