如何在取消Go例程之前等待进程中的操作

问题描述 投票:0回答:1

我有一个消息消费者(例如:Kafka),它在带有 for-select 的 go 例程上运行,默认情况下它处理收到的消息:

type Consumer struct{}

func (c *Consumer) Start(ctx context.Context) {
    fmt.Println("Consumer is starting")
    defer func() {
        fmt.Println("Consumer is stopping")
    }()

    i := 0
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Context is done")
            return
        default:
            // msg, err := kafka.FetchMessage(ctx)
            fmt.Printf("[%d] Received message\n", i)
            <-time.After(3 * time.Second)
            fmt.Printf("[%d] Stopped processing\n", i)
            i++
        }
    }
}

在我的主要功能中,我有一个监听取消信号的通道。每当收到取消信号时,我想通过调用上下文取消函数来取消 go 例程。

func main() {
    fmt.Println("Main Start")
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    consumer := &Consumer{}
    go func() {
        consumer.Start(ctx)
    }()

    <-sigchan
    fmt.Println("Received signal to shutdown")
    cancel()
}

系统的必备条件是:

  • 消息处理必须同步;
  • 不得丢失任何正在进行的操作(消息);

但是,当我通过单击 CTRL + C 取消程序时,如果消息正在处理,则处理未完成。如何实现仅在处理完最后一条消息后才会取消消费者的功能。

去游乐场:https://go.dev/play/p/kRUPPhwxwn7

go goroutine
1个回答
0
投票

它立即退出的原因是因为当

main()
返回时,所有的goroutines突然停止了。
consumer.Start
完成之前没有什么可等待的。

使用

signal.NotifyContext
更直接一些,但我们也可以按照您的方式完成。

// Your way, modified:
func main() {
    fmt.Println("Main Start")
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        <-sigchan
        fmt.Println("Received signal to shutdown")
        cancel()
    }()

    // Block until it exists, cancel in the background
    consumer := &Consumer{}
    consumer.Start(ctx)
}

// Allowing signal.NotifyContext to deal with it
func main() {
    fmt.Println("Main Start")
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    consumer := &Consumer{}
    consumer.Start(ctx)
    fmt.Println("Shutting down")
}

另请注意,

CTRL+C
并不完全等同于
kill -INT $PID
。前者会将
SIGINT
发送到前台进程的
PGID
中的每个进程。这意味着在主进程可以正常执行之前,分叉可能会被杀死。

© www.soinside.com 2019 - 2024. All rights reserved.