我正在学习go并发性,我想实现一个简单的示例,该示例从矩阵中获取行并将值的数组(切片)添加到每一行。
由于我使用的是通道,因此我尝试等待每一行从goroutine中获取相应的结果。但是,这并不比仅同步执行更好。如何使每一行等待各自的结果,并允许其他行同时计算其结果?
https://play.golang.org/p/uCOGwOBeIQL
package main
import "fmt"
/*
Array:
0 1 2 3 4 5 6 7 8 9
+
Matrix:
1 0 0 0 0 0 0 0 0 0
0 1 0 0 0 0 0 0 0 0
0 0 1 0 0 0 0 0 0 0
0 0 0 1 0 0 0 0 0 0
0 0 0 0 1 0 0 0 0 0
0 0 0 0 0 1 0 0 0 0
0 0 0 0 0 0 1 0 0 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 0 0 0 1 0
0 0 0 0 0 0 0 0 0 1
->
Expected result:
1 1 2 3 4 5 6 7 8 9
0 2 2 3 4 5 6 7 8 9
0 1 3 3 4 5 6 7 8 9
0 1 2 4 4 5 6 7 8 9
0 1 2 3 5 5 6 7 8 9
0 1 2 3 4 6 6 7 8 9
0 1 2 3 4 5 7 7 8 9
0 1 2 3 4 5 6 8 8 9
0 1 2 3 4 5 6 7 9 9
0 1 2 3 4 5 6 7 8 10
*/
func main() {
numbers := []int {0,1,2,3,4,5,6,7,8,9}
matrix := [][]int{
{1,0,0,0,0,0,0,0,0,0},
{0,1,0,0,0,0,0,0,0,0},
{0,0,1,0,0,0,0,0,0,0},
{0,0,0,1,0,0,0,0,0,0},
{0,0,0,0,1,0,0,0,0,0},
{0,0,0,0,0,1,0,0,0,0},
{0,0,0,0,0,0,1,0,0,0},
{0,0,0,0,0,0,0,1,0,0},
{0,0,0,0,0,0,0,0,1,0},
{0,0,0,0,0,0,0,0,0,1},
}
rmatrix := make([][]int, 10)
for i, row := range matrix {
cResult := make(chan []int)
go func(row []int, numbers []int, c chan <- []int) {
c <- addRow(row,numbers)
}(row,numbers,cResult)
//this read from the channel will block until the goroutine sends its result over the channel
rmatrix[i] = <- cResult
}
fmt.Println(rmatrix)
}
func addRow(row []int, numbers []int) []int{
result := make([]int, len(row))
for i,e := range row {
result[i] = e + numbers[i];
}
return result
}
此示例生成的goroutine数量较少,并且无论哪个goroutine首先完成处理,它都保证正确的顺序。
package main
import (
"fmt"
"sync"
)
type rowRes struct {
index int
result *[]int
}
func addRow(index int, row []int, numbers []int) rowRes {
result := make([]int, len(row))
for i, e := range row {
result[i] = e + numbers[i]
}
return rowRes{
index: index,
result: &result,
}
}
func main() {
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
matrix := [][]int{
{1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
{0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
{0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
}
rmatrix := make([][]int, 10)
// Buffered channel
rowChan := make(chan rowRes, 10)
wg := sync.WaitGroup{}
// Reciever goroutine
go recv(rowChan, rmatrix)
for i := range matrix {
wg.Add(1)
go func(index int, row []int, w *sync.WaitGroup) {
rowChan <- addRow(index, row, numbers)
w.Done()
}(i, matrix[i], &wg)
}
wg.Wait()
close(rowChan)
fmt.Println(rmatrix)
}
func recv(res chan rowRes, rmatrix [][]int) {
for {
select {
case k, ok := <-res:
if !ok {
return
}
rmatrix[k.index] = *k.result
}
}
}
我需要使用sync.WaitGroup
并直接分配调用结果(以确保它们返回其索引行)。谢谢@Peter
package main
import (
"fmt"
"sync"
)
func main() {
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
matrix := [][]int{
{1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
{0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
{0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
}
rmatrix := make([][]int, 10)
var waitGroup sync.WaitGroup
for i, row := range matrix {
waitGroup.Add(1)
go func(i int, row []int) {
rmatrix[i] = addRow(row, numbers)
waitGroup.Done()
}(i, row)
}
waitGroup.Wait()
fmt.Println(rmatrix)
}
func addRow(row []int, numbers []int) []int {
result := make([]int, len(row))
for i, e := range row {
result[i] = e + numbers[i]
}
return result
}
管道方法
taskChannel := make(chan string,1000); // Set up the task queue
wg := sync.WaitGroup
// Task release
wg.add(1)
go func(&wg,taskChannel) {
defer wg.Down()
for i in "task list" {
taskChannel <- "Stuff the characters you want to deal with here"
}
// After the task is sent and closed
close(taskChannel)
}(wg *sync.WaitGroup,taskChannel chan string)
// Task execution
go func(&wg,taskChannel,1000) {
defer wg.Down()
limit := make(chan bool,limitNumber); // Limit the number of concurrent
tg := sync.WaitGroup
loop:
for {
select {
case task,over := <-taskChannel:
if !over { // If there are no more tasks, quit
tg.Wait() // Wait for all tasks to end
break loop
}
tg.Add(1)
limit<-true
go func(&tg,limitm) {
defer func() {
<-limit
tg.Down()
}
// Business processing logic, processing tasks
}(tg *sync.WaitGroup,limit chan bool,task string)
}
}
}(wg *sync.WaitGroup,taskChannel chan string,limitNumber int)
wg.Wait()
希望对您有帮助