这是一个将字节写入 io.Writer 的函数的代码,当它单独运行时,它可以很好地处理输入。
func WriteFrame(w io.Writer, frame Frame) (int, error) {
log.Printf("WriteFrame() being called, frame: %v\n", frame)
log.Printf("w io.Writer: %v\n", w)
if w == nil {
fmt.Println("w is nil in WriteFrame")
return 0, nil
}
header := append(frame.Destination[:], frame.Source[:]...)
lenBytes := make([]byte, 2)
...
dataIncrement := header
dataIncrement = append(dataIncrement, lenBytes...)
dataIncrement = append(dataIncrement, frame.Data...)
checksum := crc32.ChecksumIEEE(dataIncrement)
...
dataIncrement = append(dataIncrement, checksumBytes...)
log.Printf("WriteFrame, abt to write data: %v\n", dataIncrement)
length, err := w.Write(dataIncrement)
if err != nil {
return 0, err
}
log.Printf("WriteFrame, finishe writing data: %v\n", dataIncrement)
// Return the length of the frame
return length, nil
}
这是一个永远运行的goroutine调用WriteFrame从channel中获取frame并将其写入dest端口的代码,但是在特定的情况下卡住了。发生这种情况时,我只能看到 WriteFrame 被调用并且有有效的字节数据,但是
length, err := w.Write(dataIncrement)
之后的所有内容似乎都没有执行,因为我看不到这一行的任何返回。
// define a goroutine to handle the frame
go func() {
for {
fmt.Println("start loop in line 87")
fmt.Printf("sw.ForwardingTable: %v\n", sw.ForwardingTable)
fmt.Printf("sw.SendQueues: %v\n", sw.SendQueues)
select {
case frame := <-frameChan:
fmt.Printf("case frame := <-frameChan, frame: %v\n", frame)
// If the frame is not nil, handle it
// Check if the frame source in the forward table
srcPort := <-sendPortChan
if _, ok := sw.ForwardingTable[frame.Destination]; !ok {
fmt.Printf("case frame := <-frameChan, frame: %v, sw.ForwardingTable[frame.Source]: %v, broadcast\n", frame, sw.ForwardingTable[frame.Source])
// If it is broadcast, send it to all the ports
for _, port := range sw.Ports {
log.Printf("checking port %v for broadcast\n", port)
// Check if the port is the same as the port of the frame
log.Printf("port != srcPort: %v, len(sw.SendQueues[port]) < sw.SendQueueSize: %v\n", port != srcPort, len(sw.SendQueues[port]) < sw.SendQueueSize)
...
}
}
} else {
fmt.Printf("case frame := <-frameChan, frame: %v, sw.ForwardingTable[frame.Source]: %v, unicast\n", frame, sw.ForwardingTable[frame.Source])
log.Printf(" checking dst port %v for unicast\n", sw.ForwardingTable[frame.Destination])
// If it is not broadcast, check if the MAC address is in the forwarding table
log.Printf("len(sw.SendQueues[sw.ForwardingTable[frame.Destination]]) < sw.SendQueueSize: %v\n", len(sw.SendQueues[sw.ForwardingTable[frame.Destination]]) < sw.SendQueueSize)
...
}
if _, ok := sw.ForwardingTable[frame.Source]; !ok {
sw.ForwardingTable[frame.Source] = srcPort
log.Printf("adding src port %v to forwarding table\n", srcPort)
}
case err := <-errChan:
fmt.Println("case err := <-errChan")
// If the error is not nil, return the error
if err != nil && err != io.EOF {
log.Printf("error from errChan, not io.EOF: %v", err)
return
}
case port := <-portChan:
fmt.Println("case port := <-portChan")
fmt.Printf("case port, port: %v\n", port)
// If the port is not nil, send the frame to the port
if port != nil {
// Check if the queue is not empty
log.Printf("port: %v, sw.SendQueues[port]: %v, len(sw.SendQueues[port]): %v\n", port, sw.SendQueues[port], len(sw.SendQueues[port]))
if len(sw.SendQueues[port]) != 0 {
fmt.Printf("case port, sw.SendQueues[port]: %v, to write frame\n", sw.SendQueues[port])
// If it is not empty, send the frame to the port
size, err := WriteFrame(port, sw.SendQueues[port][0])
fmt.Printf("case port, sw.SendQueues[port]: %v, write frame sized %v finished\n", sw.SendQueues[port], size)
if err != nil {
fmt.Printf("case port, err: %v, write frame error\n", err)
errChan <- err
return
}
// Delete the frame from the queue
sw.SendQueues[port] = sw.SendQueues[port][1:]
fmt.Printf("case port, sw.SendQueues[port]: %v, write frame finished\n", sw.SendQueues[port])
}
}
default:
// If there is no frame in the queue, do nothing
//fmt.Println("case dft: No frame in the queue")
time.Sleep(time.Millisecond * 1)
}
time.Sleep(time.Millisecond * 1)
}
}()
// define a goroutine to send the frame to the port
go func() {
for {
if len(portChan) < 9997 {
for _, port := range sw.Ports {
// Send the port to the port channel
portChan <- port
}
}
}
}()
我试过给
io.Writer
添加缓冲并用bufio
转换它,但它没有帮助。
此外,还有一个测试正在运行,将帧馈送到端口以读取数据并将其推送到通道。然后 goroutine 将从 channel 和 broadcase 获取数据或将它们单播到目标端口。最后,测试将使用目标端口提供的数据。
我期待在 goroutine 中调用
WriteFrame
时,它不会卡住。
添加:测试如何启动
io.Pipe
,将 io.Writer
提供给 WriteFrame
,并在转发后使用 io.Reader
从目标端口读取。
type pipedPort struct {
io.Reader
io.WriteCloser
Input io.WriteCloser
Output io.ReadCloser
}
func newPipedPort() pipedPort {
r, in := io.Pipe()
out, w := io.Pipe()
return pipedPort{
Reader: r,
WriteCloser: w,
Input: in,
Output: out,
}
}
// In test function:
// To forward a frame from ports[1] to ports[0], first write data to ports[1], wait for the goroutine to receive data and forward to ports[0], finally read from ports[0]
log.Printf("testing, writeFrame(t, ports[1].Input: %v, frameBA4: %v)\n", ports[1].Input, frameBA4)
writeFrame(t, ports[1].Input, frameBA4)
log.Println("testing, expectSize(t, sw.RunSize(), 2)")
time.Sleep(5 * time.Second)
log.Println("testing, expectFrame(t, ports[0].Output, &frameBA4)")
expectFrame(t, ports[0].Output, &frameBA4)