迭代项目列表时如何使并发调用超时

问题描述 投票:0回答:1

我正在研究一个问题,我需要处理列表中的一堆项目。现在我只需要在配置的时间内可以处理的项目的结果,其他结果需要被丢弃。我编写了这个程序,它的一个非常简单的版本可以工作,但是扩展逻辑,我陷入困境,因为总是执行 switch 情况“全部完成”,或者执行“超时”但仍然给出所有结果。

package main

import (
    "fmt"
    "sync"
    "time"
)

func doSomething(x int) int {
    time.Sleep(time.Second * 2)
    return x
}

func main() {
    timeoutCfg := 5

    done := make(chan struct{}, 1)
    somethings := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    results := make(chan int, 10) // buffered channel to hold partial results
    var wg sync.WaitGroup

    // Launch goroutines to process tasks
    for _, key := range somethings {
        wg.Add(1)
        go func(k int) {
            defer wg.Done()
            results <- doSomething(k)
        }(key)
    }

    // Start the timer
    timer := time.After(time.Second * time.Duration(timeoutCfg))

    // Wait for all goroutines to finish or for timeout
    go func() {
        wg.Wait()
        close(done)
        close(results)
    }()

    select {
    case <-timer:
        fmt.Println("timeout")
        for x := range results {
            fmt.Println(x)
        }
    case <-done:
        fmt.Println("done")
        for x := range results {
            fmt.Println(x)
        }
    }
}
go concurrency
1个回答
0
投票

好吧,先写同步代码

package main

import (
    "context"
    "errors"
    "fmt"
    "time"
)

func doSomething(ctx context.Context, x int) (int, error) {
    timer := time.NewTimer(700 * time.Millisecond)
    defer timer.Stop()

    select {
    case <-ctx.Done(): // canceled
        return 0, ctx.Err()

    case <-timer.C:
    }

    return x, nil
}

func process(ctx context.Context, timeout time.Duration, somethings []int) ([]int, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    var results []int
    var err error
    for _, key := range somethings {
        var result int
        result, err = doSomething(ctx, key)
        if err != nil {
            break
        }
        results = append(results, result)
    }

    if err != nil && !errors.Is(err, context.DeadlineExceeded) {
        return nil, err
    }

    return results, nil
}

func main() {
    ctx := context.Background()
    timeoutCfg := 5 * time.Second
    somethings := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

    results, err := process(ctx, timeoutCfg, somethings)

    if err != nil {
        fmt.Println(err)
    } else {
        for _, x := range results {
            fmt.Println(x)
        }
    }
}

Go Playground 上尝试一下。这里我使用

context.Context
来取消并在第一个错误时中断循环。

当第一个错误不是

context.DeadlineExceeded
时返回错误,否则返回(部分)结果。当您需要更多信息(结果是否完整)时,需要额外的布尔值。

太好了,到目前为止,让我们使

process
并发:

func process(ctx context.Context, timeout time.Duration, somethings []int) ([]int, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    eg, ctx := errgroup.WithContext(ctx)

    var results resultCollector
    for _, key := range somethings {
        eg.Go(func() error {
            result, err := doSomething(ctx, key)
            if err == nil {
                results.append(result)
            }
            return err
        })
    }

    err := eg.Wait()
    if err != nil && !errors.Is(err, context.DeadlineExceeded) {
        return nil, err
    }

    return results.get(), nil
}

我们在这里使用

golang.org/x/sync/errgroup
,我们需要一个线程安全的
append
,所以让我们写
resultCollector

type resultCollector struct {
    results []int
    mu      sync.Mutex
}

func (r *resultCollector) append(x int) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.results = append(r.results, x)
}

func (r *resultCollector) get() []int {
    r.mu.Lock()
    defer r.mu.Unlock()
    results := r.results
    r.results = nil
    return results
}

Go Playground 上尝试一下。诀窍是首选同步函数使用

context.Context
进行取消

© www.soinside.com 2019 - 2024. All rights reserved.