我正在 Go Web 应用程序中实现文件注释过程。该过程包括读取文件、将 ID 存储在数组中、对 ID 进行分块,然后使用 IN 子句从数据库中获取数据。该代码在独立执行时工作得非常好。但是,当我在路由处理程序 (HandleAnnotations) 内的 goroutine 内运行注释过程时,遇到与获取信号量相关的错误: fmt.Println("Error getting semaphore:", err)。
代码上下文: 在我的路由处理程序 (HandleAnnotations) 中,我在 goroutine 中启动文件注释过程以返回 202 Accepted 状态并保持操作在后台运行。这是处理程序的相关部分:
// HandleAnnotations will handle the annotation process
func (a *BatchHandler) HandleAnnotations(c echo.Context) error {
header, err := c.FormFile("file-upload")
go func() {
# I want this long running task to finish in the background
variants, err := a.BatchRepo.AnnotateFile(ctx, batchData)
}()
# I want to return immediately
return c.JSON(http.StatusAccepted, "file uploaded successfully, the annotation process has started in the background")
}
这是我的注释功能
func (p *psqlBatchRepository) AnnotateFile(ctx context.Context, batchData domain.BatchInput) (res []domain.BatchVariant, err error) {
// Open the file and ensure it's closed after the function returns
file, err := batchData.Header.Open()
// Read the file line by line and populate variantVcfs
variantVcfs := make([]string, 0, batchData.LineCount)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
variantVcfs = append(variantVcfs, line)
}
// Set the maximum size for each chunk
maxChunkSize := 100
// Create a 2D slice to store the chunks
var chunks [][]string
// Iterate through the variantVcfs to create chunks
for len(variantVcfs) > 0 {
// Determine the current chunk size (up to maxChunkSize)
currentChunkSize := len(variantVcfs)
if currentChunkSize > maxChunkSize {
currentChunkSize = maxChunkSize
}
// Create the chunk
chunk := variantVcfs[:currentChunkSize]
variantVcfs = variantVcfs[currentChunkSize:]
// Append the chunk to the chunks slice
chunks = append(chunks, chunk)
}
// Set up concurrency control
maxWorkers := 5
sem := semaphore.NewWeighted(int64(maxWorkers))
var wg sync.WaitGroup
// Create an output channel to collect results
outputChan := make(chan []domain.BatchVariant, len(chunks))
// Iterate through the chunks and run the query concurrently
for i, chunk := range chunks {
// Acquire a semaphore slot
if err := sem.Acquire(ctx, 1); err != nil {
// Handle acquisition error
fmt.Println("Error acquiring semaphore:", err)
break
}
// Increment the WaitGroup counter
wg.Add(1)
// Run the query function in a goroutine
go func(chunk []string, workerIndex int) {
defer func() {
// Release the semaphore slot when done
sem.Release(1)
// Decrement the WaitGroup counter
wg.Done()
// Call the query function and handle errors
p.queryFunction(chunk, outputChan)
}(chunk, i)
}
// Close the output channel when all goroutines are done
go func() {
wg.Wait()
close(outputChan)
}()
// Collect the results from the output channel
for result := range outputChan {
res = append(res, result...)
}
return res, nil
}
问题:AnnotateFile 中的代码不在另一个 goroutine 中时可以成功运行。但是,在 HandleAnnotations 处理程序中同时运行它会导致与信号量获取相关的错误。
错误: fmt.Println("获取信号量时出错:", err)
预期行为: 我的目标是在后台同时运行注释过程,同时从路由处理程序返回 202 Accepted 状态。
我希望任何有构建类似系统经验的人能够提供见解
通过删除 AnnotateFile 中的请求上下文来修复它。 (一旦处理程序返回,上下文将被取消)