使用光纤和通道在 Go 中实现 SSE

问题描述 投票:0回答:1

我正在尝试在 Go 中实现服务器发送事件。我正在使用 Fiber 框架,并发现一个示例实现工作得很好。

但它不使用通道,所以我无法随时发送事件/消息,它只是按照指定的时间间隔不断发送它们。

然后我发现了另一个使用通道的例子。但它不使用 Fiber,所以我尝试混合这两个示例。现在,我的代码如下所示:

// ... imports

var sseChan chan string
var wg sync.WaitGroup

// main...

func sseHandler(c *fiber.Ctx) error {
    c.Set("Content-Type", "text/event-stream")
    c.Set("Cache-Control", "no-cache")
    c.Set("Connection", "keep-alive")
    c.Set("Transfer-Encoding", "chunked")

    sseChan = make(chan string)

    // wg.Add(1)
    defer func() {
        // wg.Done()
        close(sseChan)
        sseChan = nil
        fmt.Println("defer func called")
    }()

    fmt.Println("client connected")

    c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
        for {
            fmt.Println("inside streamwrite loop")
            select {
            case message := <-sseChan:
                fmt.Println("got a msg on sseChan")
                fmt.Fprintf(w, "data: %s\n\n", message)

                err := w.Flush()
                if err != nil {
                    // Refreshing page in web browser will establish a new
                    // SSE connection, but only (the last) one is alive, so
                    // dead connections must be closed here.
                    fmt.Printf("Error while flushing: %v. Closing http connection.\n", err)

                    break
                }
            case <-c.Context().Done():
                fmt.Printf("Client closed connection")
                return
            }
        }
    }))

    // wg.Wait()

    fmt.Println("returnin")
    return nil
}

func fireEvent(c *fiber.Ctx) error {
    if sseChan != nil {
        fmt.Println("sendin msg")
        msg := time.Now().Format("15:04:05")
        sseChan <- msg
    }

    return c.SendString("event fired")
}

当我尝试从浏览器建立连接(使用

EventSource
API)时,后端会抛出此错误:

client connected
returnin
defer func called
inside streamwrite loop
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x560 pc=0x6f157a]

goroutine 4 [running]:
github.com/valyala/fasthttp.(*RequestCtx).Done(...)
        /home/user/go/pkg/mod/github.com/valyala/[email protected]/server.go:2739
main.sseHandler.func2(0xc00013ebe0?)
        /home/user/Projects/scaling-sse/api-go/main.go:71 +0x9a
github.com/valyala/fasthttp.NewStreamReader.func1()

我可以看到 defer 函数在 StreamWriter 中的循环之前被调用。这导致

sseChan
变为
nil
。所以我添加了
wg
语句(在
sseHandler
中注释),这可以防止错误 但我在浏览器中没有得到任何输出。此外,关闭选项卡时,不会调用 defer 函数。

go server-sent-events channel go-fiber
1个回答
0
投票

恐慌似乎是由这一行产生的:

case <-c.Context().Done():

根据https://github.com/go Fiber/ Fiber/issues/429#issuecomment-1500921469以及我在代码中看到的内容,当客户端关闭连接时,

Done()
通道不会'没有得到任何价值。
在客户端连接关闭时,
SetBodyStreamWriter
将返回,下次
w.Flush()
将生成错误(尝试发送某些内容,但连接已关闭)。

因此,删除该案例将解决恐慌(我不太确定为什么会恐慌,只是将通道

c.Context().Done()
分配给另一个变量似乎不会触发恐慌)。

在代码中,您还希望避免在

sseChan
中实例化
sseHandler
通道。另外,立即关闭它也无法对其进行写入,
sseHandler
会立即返回,正如您从日志中看到的那样。

我假设您想要将事件广播到所有连接的客户端,在这种情况下,您可能想要全局实例化

sseChan = make(chan string)
,或者以可在写入或读取的位置访问的方式实例化。

我正在开发一个存储库,其中包含将 SSE 与 Fiber 结合使用的示例 https://github.com/emanuelef/sse-go- Fiber

© www.soinside.com 2019 - 2024. All rights reserved.