Go 函数单独运行良好,但在 goroutines 中调用时卡住

问题描述 投票:0回答:0

这是一个将字节写入 io.Writer 的函数的代码,当它单独运行时,它可以很好地处理输入。

func WriteFrame(w io.Writer, frame Frame) (int, error) {
    log.Printf("WriteFrame() being called, frame: %v\n", frame)
    log.Printf("w io.Writer: %v\n", w)
    if w == nil {
        fmt.Println("w is nil in WriteFrame")
        return 0, nil
    }
    header := append(frame.Destination[:], frame.Source[:]...)

    lenBytes := make([]byte, 2)

    ...

    dataIncrement := header
    dataIncrement = append(dataIncrement, lenBytes...)
    dataIncrement = append(dataIncrement, frame.Data...)
    checksum := crc32.ChecksumIEEE(dataIncrement)

    ...

    dataIncrement = append(dataIncrement, checksumBytes...)
    log.Printf("WriteFrame, abt to write data: %v\n", dataIncrement)

    length, err := w.Write(dataIncrement)
    if err != nil {
        return 0, err
    }
    log.Printf("WriteFrame, finishe writing data: %v\n", dataIncrement)
    // Return the length of the frame
    return length, nil
}

这是一个永远运行的goroutine调用WriteFrame从channel中获取frame并将其写入dest端口的代码,但是在特定的情况下卡住了。发生这种情况时,我只能看到 WriteFrame 被调用并且有有效的字节数据,但是

length, err := w.Write(dataIncrement)
之后的所有内容似乎都没有执行,因为我看不到这一行的任何返回。

// define a goroutine to handle the frame
    go func() {
        for {
            fmt.Println("start loop in line 87")
            fmt.Printf("sw.ForwardingTable: %v\n", sw.ForwardingTable)
            fmt.Printf("sw.SendQueues: %v\n", sw.SendQueues)
            select {
            case frame := <-frameChan:
                fmt.Printf("case frame := <-frameChan, frame: %v\n", frame)
                // If the frame is not nil, handle it
                // Check if the frame source in the forward table
                srcPort := <-sendPortChan
                if _, ok := sw.ForwardingTable[frame.Destination]; !ok {
                    fmt.Printf("case frame := <-frameChan, frame: %v, sw.ForwardingTable[frame.Source]: %v, broadcast\n", frame, sw.ForwardingTable[frame.Source])
                    // If it is broadcast, send it to all the ports
                    for _, port := range sw.Ports {
                        log.Printf("checking port %v for broadcast\n", port)
                        // Check if the port is the same as the port of the frame
                        log.Printf("port != srcPort: %v, len(sw.SendQueues[port]) < sw.SendQueueSize: %v\n", port != srcPort, len(sw.SendQueues[port]) < sw.SendQueueSize)
                        ...
                        }
                    }
                } else {
                    fmt.Printf("case frame := <-frameChan, frame: %v, sw.ForwardingTable[frame.Source]: %v, unicast\n", frame, sw.ForwardingTable[frame.Source])
                    log.Printf(" checking dst port %v for unicast\n", sw.ForwardingTable[frame.Destination])
                    // If it is not broadcast, check if the MAC address is in the forwarding table
                    log.Printf("len(sw.SendQueues[sw.ForwardingTable[frame.Destination]]) < sw.SendQueueSize: %v\n", len(sw.SendQueues[sw.ForwardingTable[frame.Destination]]) < sw.SendQueueSize)
                    ...
                }
                if _, ok := sw.ForwardingTable[frame.Source]; !ok {
                    sw.ForwardingTable[frame.Source] = srcPort
                    log.Printf("adding src port %v to forwarding table\n", srcPort)
                }
            case err := <-errChan:
                fmt.Println("case err := <-errChan")
                // If the error is not nil, return the error
                if err != nil && err != io.EOF {
                    log.Printf("error from errChan, not io.EOF: %v", err)
                    return
                }
            case port := <-portChan:
                fmt.Println("case port := <-portChan")
                fmt.Printf("case port, port: %v\n", port)
                // If the port is not nil, send the frame to the port
                if port != nil {
                    // Check if the queue is not empty
                    log.Printf("port: %v, sw.SendQueues[port]: %v, len(sw.SendQueues[port]): %v\n", port, sw.SendQueues[port], len(sw.SendQueues[port]))
                    if len(sw.SendQueues[port]) != 0 {
                        fmt.Printf("case port, sw.SendQueues[port]: %v, to write frame\n", sw.SendQueues[port])
                        // If it is not empty, send the frame to the port
                        size, err := WriteFrame(port, sw.SendQueues[port][0])
                        fmt.Printf("case port, sw.SendQueues[port]: %v, write frame sized %v finished\n", sw.SendQueues[port], size)
                        if err != nil {
                            fmt.Printf("case port, err: %v, write frame error\n", err)
                            errChan <- err
                            return
                        }
                        // Delete the frame from the queue
                        sw.SendQueues[port] = sw.SendQueues[port][1:]
                        fmt.Printf("case port, sw.SendQueues[port]: %v, write frame finished\n", sw.SendQueues[port])
                    }
                }
            default:
                // If there is no frame in the queue, do nothing
                //fmt.Println("case dft: No frame in the queue")
                time.Sleep(time.Millisecond * 1)
            }
            time.Sleep(time.Millisecond * 1)
        }
    }()
    // define a goroutine to send the frame to the port
    go func() {
        for {
            if len(portChan) < 9997 {
                for _, port := range sw.Ports {
                    // Send the port to the port channel
                    portChan <- port
                }
            }
        }
    }()

我试过给

io.Writer
添加缓冲并用
bufio
转换它,但它没有帮助。

此外,还有一个测试正在运行,将帧馈送到端口以读取数据并将其推送到通道。然后 goroutine 将从 channel 和 broadcase 获取数据或将它们单播到目标端口。最后,测试将使用目标端口提供的数据。

我期待在 goroutine 中调用

WriteFrame
时,它不会卡住。

添加:测试如何启动

io.Pipe
,将
io.Writer
提供给
WriteFrame
,并在转发后使用
io.Reader
从目标端口读取。

type pipedPort struct {
    io.Reader
    io.WriteCloser

    Input  io.WriteCloser
    Output io.ReadCloser
}

func newPipedPort() pipedPort {
    r, in := io.Pipe()
    out, w := io.Pipe()

    return pipedPort{
        Reader:      r,
        WriteCloser: w,

        Input:  in,
        Output: out,
    }
}

// In test function:
// To forward a frame from ports[1] to ports[0], first write data to ports[1], wait for the goroutine to receive data and forward to ports[0], finally read from ports[0]
                log.Printf("testing, writeFrame(t, ports[1].Input: %v, frameBA4: %v)\n", ports[1].Input, frameBA4)
                writeFrame(t, ports[1].Input, frameBA4)
                log.Println("testing, expectSize(t, sw.RunSize(), 2)")
                time.Sleep(5 * time.Second)

                log.Println("testing, expectFrame(t, ports[0].Output, &frameBA4)")
                expectFrame(t, ports[0].Output, &frameBA4)
go channel goroutine ethernet producer-consumer
© www.soinside.com 2019 - 2024. All rights reserved.