我正在运行一个 Golang TCP 服务器,它接收连接,生成一个 goroutine 来处理该连接,然后生成一个新的 goroutine 来从连接中读取数据并将数据写入通道。还有一个简单的函数可以从通道读取数据。
这个想法是,如果客户端关闭连接,ReadFromConn goroutine 将在读取时收到错误,然后返回。延迟代码将写入 done 通道并关闭 done 和 queue 通道。如果 done 通道有数据,消费者代码将停止处理并返回当时的任何结果。
另一种情况是消费者可以判断自己有足够的数据来做出决定并返回。这将导致执行返回到 Handle 函数。当该函数返回时,Handle函数上的延迟将关闭连接,导致ReadFromConn goroutine在从连接读取时收到错误并关闭所有待处理的通道。
这效果很好,但我正在做一些负载测试,并注意到测试完成后 Golang 服务器的内存使用量并没有像负载停止时应用程序的所有其他部分那样减少。我拿了一些 ID 并检查了日志。我看到有时(并非总是)显示 ERROR READING FROM CONN 消息,但没有 READ_FROM_CONN DEFER 日志,所以我认为延迟从未被调用。结果,ProcessQueue 挂起从通道的读取,因为它从未关闭,并且来自 Handle 函数的 CLOSING... 日志也丢失了。
至少,这就是我认为正在发生的事情,并且我相信这就是为什么负载测试结束时内存消耗永远不会下降的原因,因为代码仍在运行一些从通道读取的 goroutine,这些通道应该由 ReadFromConn 上的延迟代码关闭。这种行为是不可预测的;并非所有连接都会发生这种情况,所以我不知道可能出了什么问题。
这是我的 Golang 服务器的简化版本:
package main
import (
"os"
"net"
"fmt"
"io"
)
type CustomStruct struct {
Type string
Stop bool
}
func main() {
// Creates server
server, err := net.Listen("tcp", "0.0.0.0:80")
if err != nil {
fmt.Println("failed to bind listener to socket", err)
}
defer server.Close()
fmt.Println("Listening new connections V2")
// Starts reading from the server
for {
conn, err := server.Accept()
if err != nil {
fmt.Println("failed to accept new connection:", err)
continue
}
go Handle(conn)
}
}
func Handle(conn net.Conn) {
defer conn.Close()
id := "some uuid for each conn"
// Creates channels
queue := make(chan []byte, 512)
done := make(chan bool)
// Starts reading from the server
go ReadFromConn(id, conn, queue, done)
result := ProcessQueue(id, queue, done)
fmt.Println(id, "CLOSING...")
// Do stuffs with result...
fmt.Println(id, result)
}
func ReadFromConn(
id string,
conn io.Reader,
queue chan []byte,
done chan bool,
) {
defer func() {
done <- true
close(queue)
close(done)
fmt.Println(id, "READ_FROM_CONN DEFER")
}()
tmp := make([]byte, 256)
for {
_, err := conn.Read(tmp)
if err != nil {
fmt.Println(id, "ERROR READING FROM CONN " + err.Error())
return
}
if (tmp[0] == 0x00) {
return
}
queue <- tmp
}
}
func ProcessQueue(
id string,
queue chan []byte,
done chan bool,
) CustomStruct {
defer fmt.Println(id, "GET_TRANSCRIPTION_RESULT ENDED")
fmt.Println(id, "GET_TRANSCRIPTION_RESULT STARTED")
result := CustomStruct{
Type: "transcription",
Stop: false,
}
for {
select {
case <-done:
fmt.Println(id, "DONE DETECTED")
return result
default:
fmt.Println(id, "DEFAULT")
payload, open := <-queue
if open == false {
fmt.Println(id, "QUEUE IS CLOSED")
return result
} else {
fmt.Println(id, "QUEUE IS OPEN")
}
// ... Do stuffs with payload, if certain condition is met, of the result of processing payload, return
if (payload[0] == 0x01) {
return result
}
}
}
return result
}
不完全确定问题是什么,但是使用上下文对象可以更干净地实现控制读者的生命周期。上下文避免了必须密切管理通道对象,并提供了一种干净的方法来报告 Goroutine 的错误(如果需要)使用
context.Cause(ctx)
。
示例设置可能如下所示:
ctx, cancel := context.WithCancelCause(context.Background())
queue := make(chan []byte, 512)
go ReadFromConn(id, conn, queue, ctx, cancel)
result := ProcessQueue(id, queue, ctx, cancel)
延迟调用可能会在上下文原因中放置错误,这会导致
ctx.Done
方法现在返回具有存在值的通道。这种方法也是可重复的,这意味着使用简单通道方法时,多个 goroutine 都可以接收一个值,而不是仅接收一个值。
发送到队列时,您也可以选择
ctx.Done
,以防止永远等待不再读取的队列。同样,您也可以这样做从队列中读取。
func ReadFromConn(...) {
defer cancel(fmt.Errorf("reader defer"))
...
for {
...
select {
case queue <- tmp:
case <-ctx.Done():
return
}
}
}
func ProcessQueue(...) {
defer cancel(fmt.Errorf("queue defer"))
...
for {
select {
case <-ctx.Done():
...
case payload := <-queue:
...
}
}