使用 Goroutines 在后台注释和获取数据 – Route Handler 中信号量获取错误

问题描述 投票:0回答:1

我正在 Go Web 应用程序中实现文件注释过程。该过程包括读取文件、将 ID 存储在数组中、对 ID 进行分块,然后使用 IN 子句从数据库中获取数据。该代码在独立执行时工作得非常好。但是,当我在路由处理程序 (HandleAnnotations) 内的 goroutine 内运行注释过程时,遇到与获取信号量相关的错误: fmt.Println("Error getting semaphore:", err)。

代码上下文: 在我的路由处理程序 (HandleAnnotations) 中,我在 goroutine 中启动文件注释过程以返回 202 Accepted 状态并保持操作在后台运行。这是处理程序的相关部分:

// HandleAnnotations will handle the annotation process
func (a *BatchHandler) HandleAnnotations(c echo.Context) error {
    header, err := c.FormFile("file-upload")

    go func() {
        # I want this long running task to finish in the background
        variants, err := a.BatchRepo.AnnotateFile(ctx, batchData)
    }()

    # I want to return immediately
    return c.JSON(http.StatusAccepted, "file uploaded successfully, the annotation process has started in the background")
}

这是我的注释功能

func (p *psqlBatchRepository) AnnotateFile(ctx context.Context, batchData domain.BatchInput) (res []domain.BatchVariant, err error) {
    // Open the file and ensure it's closed after the function returns
    file, err := batchData.Header.Open()

    // Read the file line by line and populate variantVcfs
    variantVcfs := make([]string, 0, batchData.LineCount)
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()

        if len(line) == 0 {
            continue
        }

        variantVcfs = append(variantVcfs, line)
    }

    // Set the maximum size for each chunk
    maxChunkSize := 100

    // Create a 2D slice to store the chunks
    var chunks [][]string

    // Iterate through the variantVcfs to create chunks
    for len(variantVcfs) > 0 {
        // Determine the current chunk size (up to maxChunkSize)
        currentChunkSize := len(variantVcfs)
        if currentChunkSize > maxChunkSize {
            currentChunkSize = maxChunkSize
        }

        // Create the chunk
        chunk := variantVcfs[:currentChunkSize]
        variantVcfs = variantVcfs[currentChunkSize:]

        // Append the chunk to the chunks slice
        chunks = append(chunks, chunk)
    }

    // Set up concurrency control
    maxWorkers := 5
    sem := semaphore.NewWeighted(int64(maxWorkers))
    var wg sync.WaitGroup

    // Create an output channel to collect results
    outputChan := make(chan []domain.BatchVariant, len(chunks))

    // Iterate through the chunks and run the query concurrently
    for i, chunk := range chunks {
        // Acquire a semaphore slot
        if err := sem.Acquire(ctx, 1); err != nil {
            // Handle acquisition error
            fmt.Println("Error acquiring semaphore:", err)
            break
        }

        // Increment the WaitGroup counter
        wg.Add(1)

        // Run the query function in a goroutine
        go func(chunk []string, workerIndex int) {
            defer func() {
                // Release the semaphore slot when done
                sem.Release(1)
                // Decrement the WaitGroup counter
                wg.Done()

            // Call the query function and handle errors
            p.queryFunction(chunk, outputChan)
                
        }(chunk, i)
    }

    // Close the output channel when all goroutines are done
    go func() {
        wg.Wait()
        close(outputChan)
    }()

    // Collect the results from the output channel
    for result := range outputChan {
        res = append(res, result...)
    }

    return res, nil
}

问题:AnnotateFile 中的代码不在另一个 goroutine 中时可以成功运行。但是,在 HandleAnnotations 处理程序中同时运行它会导致与信号量获取相关的错误。

错误: fmt.Println("获取信号量时出错:", err)

预期行为: 我的目标是在后台同时运行注释过程,同时从路由处理程序返回 202 Accepted 状态。

我希望任何有构建类似系统经验的人能够提供见解

rest go concurrency semaphore long-running-processes
1个回答
0
投票

通过删除 AnnotateFile 中的请求上下文来修复它。 (一旦处理程序返回,上下文将被取消)

© www.soinside.com 2019 - 2024. All rights reserved.