例程等待通道的响应并继续

问题描述 投票:0回答:3

我正在学习go并发性,我想实现一个简单的示例,该示例从矩阵中获取行并将值的数组(切片)添加到每一行。

由于我使用的是通道,因此我尝试等待每一行从goroutine中获取相应的结果。但是,这并不比仅同步执行更好。如何使每一行等待各自的结果,并允许其他行同时计算其结果?

https://play.golang.org/p/uCOGwOBeIQL


package main

import "fmt"


/*
Array:
0 1 2 3 4 5 6 7 8 9

+

Matrix:
1 0 0 0 0 0 0 0 0 0
0 1 0 0 0 0 0 0 0 0
0 0 1 0 0 0 0 0 0 0
0 0 0 1 0 0 0 0 0 0
0 0 0 0 1 0 0 0 0 0
0 0 0 0 0 1 0 0 0 0
0 0 0 0 0 0 1 0 0 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 0 0 0 1 0
0 0 0 0 0 0 0 0 0 1

-> 
Expected result:
1 1 2 3 4 5 6 7 8 9
0 2 2 3 4 5 6 7 8 9
0 1 3 3 4 5 6 7 8 9
0 1 2 4 4 5 6 7 8 9
0 1 2 3 5 5 6 7 8 9
0 1 2 3 4 6 6 7 8 9
0 1 2 3 4 5 7 7 8 9
0 1 2 3 4 5 6 8 8 9
0 1 2 3 4 5 6 7 9 9
0 1 2 3 4 5 6 7 8 10
*/
func main() {
    numbers := []int {0,1,2,3,4,5,6,7,8,9}

    matrix := [][]int{
        {1,0,0,0,0,0,0,0,0,0},
        {0,1,0,0,0,0,0,0,0,0},
        {0,0,1,0,0,0,0,0,0,0},
        {0,0,0,1,0,0,0,0,0,0},
        {0,0,0,0,1,0,0,0,0,0},
        {0,0,0,0,0,1,0,0,0,0},
        {0,0,0,0,0,0,1,0,0,0},
        {0,0,0,0,0,0,0,1,0,0},
        {0,0,0,0,0,0,0,0,1,0},
        {0,0,0,0,0,0,0,0,0,1},
    }

    rmatrix := make([][]int, 10)

    for i, row := range matrix {
        cResult := make(chan []int)
        go func(row []int, numbers []int, c chan <- []int) {
            c <- addRow(row,numbers)
        }(row,numbers,cResult)

        //this read from the channel will block until the goroutine sends its result over the channel
        rmatrix[i] = <- cResult
    }
    fmt.Println(rmatrix)
}

func addRow(row []int, numbers []int) []int{
    result := make([]int, len(row))
    for i,e := range row {
        result[i] = e + numbers[i];
    }
    return result
}
go concurrency goroutine
3个回答
0
投票

此示例生成的goroutine数量较少,并且无论哪个goroutine首先完成处理,它都保证正确的顺序。

package main

import (
    "fmt"
    "sync"
)

type rowRes struct {
    index  int
    result *[]int
}

func addRow(index int, row []int, numbers []int) rowRes {
    result := make([]int, len(row))
    for i, e := range row {
        result[i] = e + numbers[i]
    }
    return rowRes{
        index:  index,
        result: &result,
    }
}

func main() {
    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

    matrix := [][]int{
        {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
    }
    rmatrix := make([][]int, 10)

    // Buffered channel
    rowChan := make(chan rowRes, 10)

    wg := sync.WaitGroup{}

    // Reciever goroutine
    go recv(rowChan, rmatrix)

    for i := range matrix {
        wg.Add(1)
        go func(index int, row []int, w *sync.WaitGroup) {
            rowChan <- addRow(index, row, numbers)
            w.Done()
        }(i, matrix[i], &wg)
    }
    wg.Wait()
    close(rowChan)
    fmt.Println(rmatrix)
}

func recv(res chan rowRes, rmatrix [][]int) {
    for {
        select {
        case k, ok := <-res:
            if !ok {
                return
            }
            rmatrix[k.index] = *k.result
        }
    }
}

0
投票

我需要使用sync.WaitGroup并直接分配调用结果(以确保它们返回其索引行)。谢谢@Peter

package main

import (
    "fmt"
    "sync"
)

func main() {
    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

    matrix := [][]int{
        {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
    }

    rmatrix := make([][]int, 10)
    var waitGroup sync.WaitGroup

    for i, row := range matrix {
        waitGroup.Add(1)
        go func(i int, row []int) {
            rmatrix[i] = addRow(row, numbers)
            waitGroup.Done()
        }(i, row)
    }
    waitGroup.Wait()
    fmt.Println(rmatrix)
}

func addRow(row []int, numbers []int) []int {
    result := make([]int, len(row))
    for i, e := range row {
        result[i] = e + numbers[i]
    }
    return result
}


-1
投票

管道方法


taskChannel := make(chan string,1000); // Set up the task queue
wg := sync.WaitGroup

// Task release
wg.add(1)
go func(&wg,taskChannel) {
      defer wg.Down()
      for i in "task list" {
        taskChannel <- "Stuff the characters you want to deal with here"
      }

    // After the task is sent and closed
    close(taskChannel)
}(wg *sync.WaitGroup,taskChannel chan string)

// Task execution
go func(&wg,taskChannel,1000) {
    defer wg.Down()
    limit := make(chan bool,limitNumber); // Limit the number of concurrent
    tg := sync.WaitGroup
    loop:
    for {
      select {
      case task,over := <-taskChannel:
            if !over {  // If there are no more tasks, quit
                tg.Wait()  // Wait for all tasks to end
                break loop
            }

            tg.Add(1)
            limit<-true
            go func(&tg,limitm) {
                defer func() {
                    <-limit
                    tg.Down()
                }
                // Business processing logic, processing tasks
            }(tg *sync.WaitGroup,limit chan bool,task string)
      }
    }
}(wg *sync.WaitGroup,taskChannel chan string,limitNumber int)

wg.Wait()

希望对您有帮助

© www.soinside.com 2019 - 2024. All rights reserved.