我正在开发一台 graphql 服务器,并且有一个订阅 API。这是我在 gqlgen 文档中找到的起始代码:
// CurrentTime is the resolver for the currentTime field.
func (r *subscriptionResolver) CurrentTime(ctx context.Context) (<-chan *model.Time, error) {
// First you'll need to `make()` your channel. Use your type here!
ch := make(chan *model.Time)
// You can (and probably should) handle your channels in a central place outside of `schema.resolvers.go`.
// For this example we'll simply use a Goroutine with a simple loop.
go func() {
// Handle deregistration of the channel here. Note the `defer`
defer close(ch)
for {
// In our example we'll send the current time every second.
time.Sleep(1 * time.Second)
fmt.Println("Tick")
// Prepare your object.
currentTime := time.Now()
t := &model.Time{
UnixTime: int(currentTime.Unix()),
TimeStamp: currentTime.Format(time.RFC3339),
}
// The subscription may have got closed due to the client disconnecting.
// Hence we do send in a select block with a check for context cancellation.
// This avoids goroutine getting blocked forever or panicking,
select {
case <-ctx.Done(): // This runs when context gets cancelled. Subscription closes.
fmt.Println("Subscription Closed")
// Handle deregistration of the channel here. `close(ch)`
return // Remember to return to end the routine.
case ch <- t: // This is the actual send.
// Our message went through, do nothing
}
}
}()
// We return the channel and no error.
return ch, nil
}
我想知道当我收到 ctx.Done() 信号时会发生什么。客户端是否通过取消订阅或关闭订阅来发送此信号?或者它可以在一段时间后自动发生?(我的意思是设置一些空闲超时参数。) 另外,我想知道我这边(服务器端)超时可以触发 Done() 信号吗?
查看gqlgen的weksocket实现(特别是run实现),我们可以看到创建了一个
context.WithCancel
,并且在cancel
函数中调用了defer
函数。
因此,每当我们退出 run
函数时(例如,当客户端关闭连接并发送 connectionCloseMessageType
或默认情况下的意外消息时),上下文就会被取消。
cancel
会触发关闭订阅的 ctx.Done()
,因为上下文是相同的。
如果您想在一定时间后关闭连接,可以使用
context.WithDeadline
或 context.WithTimeout
将自己的超时或截止时间直接添加到解析器中。
InitFunc
为所有订阅解析器提供通用逻辑,就像我们在这个 提案中看到的那样:
server.AddTransport(&transport.Websocket{
Upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
KeepAlivePingInterval: 10 * time.Second,
InitFunc: func(ctx context.Context, initPayload transport.InitPayload) (context.Context, *transport.InitPayload, error) {
payloadAuth := initPayload.Authorization()
if payloadAuth == "" {
return ctx, &initPayload, errors.New("the JWT is missing in the initialization payload")
}
jwt, err := authenticateJWT(payloadAuth)
if err != nil {
return ctx,&initPayload, err
}
// Add the JWT expiration as a deadline, and add the reason
newCtx, _ := context.WithDeadline(transport.AppendCloseReason(ctx, "authentication token has expired"), time.Unix(jwt.ExpiresAt, 0))
return newCtx, &initPayload,nil
},
})