我有一个消息消费者(例如:Kafka),它在带有 for-select 的 go 例程上运行,默认情况下它处理收到的消息:
type Consumer struct{}
func (c *Consumer) Start(ctx context.Context) {
fmt.Println("Consumer is starting")
defer func() {
fmt.Println("Consumer is stopping")
}()
i := 0
for {
select {
case <-ctx.Done():
fmt.Println("Context is done")
return
default:
// msg, err := kafka.FetchMessage(ctx)
fmt.Printf("[%d] Received message\n", i)
<-time.After(3 * time.Second)
fmt.Printf("[%d] Stopped processing\n", i)
i++
}
}
}
在我的主要功能中,我有一个监听取消信号的通道。每当收到取消信号时,我想通过调用上下文取消函数来取消 go 例程。
func main() {
fmt.Println("Main Start")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consumer := &Consumer{}
go func() {
consumer.Start(ctx)
}()
<-sigchan
fmt.Println("Received signal to shutdown")
cancel()
}
系统的必备条件是:
但是,当我通过单击 CTRL + C 取消程序时,如果消息正在处理,则处理未完成。如何实现仅在处理完最后一条消息后才会取消消费者的功能。
它立即退出的原因是因为当
main()
返回时,所有的goroutines突然停止了。 consumer.Start
完成之前没有什么可等待的。
signal.NotifyContext
更直接一些,但我们也可以按照您的方式完成。
// Your way, modified:
func main() {
fmt.Println("Main Start")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-sigchan
fmt.Println("Received signal to shutdown")
cancel()
}()
// Block until it exists, cancel in the background
consumer := &Consumer{}
consumer.Start(ctx)
}
// Allowing signal.NotifyContext to deal with it
func main() {
fmt.Println("Main Start")
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
consumer := &Consumer{}
consumer.Start(ctx)
fmt.Println("Shutting down")
}
另请注意,
CTRL+C
并不完全等同于 kill -INT $PID
。前者会将 SIGINT
发送到前台进程的 PGID
中的每个进程。这意味着在主进程可以正常执行之前,分叉可能会被杀死。