golang的通道问题(缓冲与非缓冲)。

问题描述 投票:0回答:2

我正在做一个个人项目,它将在Raspberry Pi上运行,并连接一些传感器。

它由2个程序组成。

  1. 一个服务器,每隔X秒从传感器上读取数据
  2. 一个客户端,可以将数据保存在sqlite数据库中,并可以发送一些命令。

服务器可以。

  • 从传感器中读取数据,并将数据写入套接字,这样客户端就可以将数据保存在数据库中了。
  • 监听套接字,这样当客户端发送一些命令时,它就可以执行并将响应发送回客户端。

从传感器读取数据的函数和处理socket连接的函数是在不同的goroutine中执行的,所以,为了在从传感器读取数据时向socket发送数据,我在主函数中创建了一个[]字节通道,并将其传递给goroutine。

当数据被收集从传感器(如果有一个客户端连接)被写入通道,所以另一个函数写到套接字和客户端接收它。

我的问题就出在这里:如果我连续进行多次写入,只有第一个数据到达客户端,而其他数据没有。但是如果我在通道上写的函数中加了一点time.sleep,所有的数据都会正确地到达客户端。

总之,这就是这个小程序的简化版。

package main

import (
    "net"
    "os"
    "sync"
    "time"
)

const socketName string = "./test_socket"

// create to the socket and launch the accept client routine
func launchServerUDS(ch chan []byte) {
    if err := os.RemoveAll(socketName); err != nil {
        return
    }
    l, err := net.Listen("unix", socketName)
    if err != nil {
        return
    }
    go acceptConnectionRoutine(l, ch)
}

// accept incoming connection on the socket and
// 1) launch the routine to handle commands from the client
// 2) launch the routine to send data when the server reads from the sensors
func acceptConnectionRoutine(l net.Listener, ch chan []byte) {
    defer l.Close()
    for {
        conn, err := l.Accept()
        if err != nil {
            return
        }
        go commandsHandlerRoutine(conn, ch)
        go autoSendRoutine(conn, ch)

    }
}

// routine that sends data to the client
func autoSendRoutine(c net.Conn, ch chan []byte) {
    for {
        data := <-ch
        if string(data) == "exit" {
            return
        }
        c.Write(data)
    }
}

// handle client connection and calls functions to execute commands
func commandsHandlerRoutine(c net.Conn, ch chan []byte) {
    for {
        buf := make([]byte, 1024)
        n, err := c.Read(buf)
        if err != nil {
            ch <- []byte("exit")
            break
        }
        // now, for sake of simplicity , only echo commands back to the client
        _, err = c.Write(buf[:n])
        if err != nil {
            ch <- []byte("exit")
            break
        }
    }
}

// write on the channel to the autosend routine so the data are written on the socket
func sendDataToClient(data []byte, ch chan []byte) {
    select {
    case ch <- data:
        // if i put a little sleep here, no problems
        // i i remove the sleep, only data1 is sent to the client
        // time.Sleep(1 * time.Millisecond)
    default:
    }
}

func dummyReadDataRoutine(ch chan []byte) {
    for {
        // read data from the sensors every 5 seconds
        time.Sleep(5 * time.Second)
        // read first data and send it
        sendDataToClient([]byte("dummy data1\n"), ch)
        // read second data and send it
        sendDataToClient([]byte("dummy data2\n"), ch)
        // read third data and send it
        sendDataToClient([]byte("dummy data3\n"), ch)
    }
}

func main() {
    ch := make(chan []byte)
    wg := sync.WaitGroup{}
    wg.Add(2)
    go dummyReadDataRoutine(ch)
    go launchServerUDS(ch)
    wg.Wait()
}

我想我错过了一些东西,我不想使用睡眠来进行写入,因为我不认为这是正确的方法。是否有一些错误,或者有更好的方法来做到这一点?唯一要像我一样保留的是,socket处理和读取数据函数必须在不同的goroutine中执行。

sockets go concurrency channels writing
2个回答
1
投票

主要的问题是在功能。

func sendDataToClient(data []byte, ch chan []byte) {
    select {
    case ch <- data:
        // if I put a little sleep here, no problems
        // if I remove the sleep, only data1 is sent to the client
        // time.Sleep(1 * time.Millisecond)
    default:
}

如果通道... ch 在函数被调用的时候还没有准备好,那么 default 将采取的情况和 data 将永远不会被发送。在这种情况下,你应该取消这个函数,直接发送到通道上。

缓冲通道与当前的问题是正交的,应该与缓冲IO的原因类似,即为不能立即进展的写入提供一个 "缓冲区"。如果代码在没有缓冲区的情况下无法进展,增加一个缓冲区只会延迟可能的死锁。

你也不需要 exit 这里的哨兵值,因为你可以在通道上划定范围,并在完成后关闭它。然而这仍然会忽略写入错误,但这又需要重新设计。

for data := range ch {
    c.Write(data)
}

你也应该小心地在通道上传递切片,因为很容易失去跟踪哪个逻辑进程拥有所有权并要修改后备数组。我无法从给出的信息中判断通过通道传递读+写数据是否改善了架构,但这不是你在大多数go网络代码中能找到的模式。


0
投票

JimB给出了很好的解释,所以我认为他的答案是比较好的。

我在这个答案中加入了我的部分解决方案。

我是觉得我的代码很清晰,很简化,但正如Jim所说,我可以做得更简单,更清晰。我把我以前的代码贴出来,是为了让大家更好地理解你如何能贴出更简单的代码,而不是像我这样做得一塌糊涂。

正如chmike所说,我的问题并不像我想的那样与socket有关,而只是与通道有关。在无缓冲通道上写是问题之一。在把无缓冲通道改成有缓冲的通道后,问题就解决了。总之,这段代码不是 "好代码",可以按照JimB在回答中写的原则进行改进。

所以这里是新的代码。

package main

import (
    "net"
    "os"
    "sync"
    "time"
)

const socketName string = "./test_socket"

// create the socket and accept clients connections
func launchServerUDS(ch chan []byte, wg *sync.WaitGroup) {
    defer wg.Done()
    if err := os.RemoveAll(socketName); err != nil {
        return
    }
    l, err := net.Listen("unix", socketName)
    if err != nil {
        return
    }
    defer l.Close()
    for {
        conn, err := l.Accept()
        if err != nil {
            return
        }
        // this goroutine are launched when a client is connected
        // routine that listen and echo commands
        go commandsHandlerRoutine(conn, ch)
        // routine to send data read from the sensors to the client
        go autoSendRoutine(conn, ch)
    }
}

// routine that sends data to the client
func autoSendRoutine(c net.Conn, ch chan []byte) {
    for {
        data := <-ch
        if string(data) == "exit" {
            return
        }
        c.Write(data)
    }
}

// handle commands received from the client
func commandsHandlerRoutine(c net.Conn, ch chan []byte) {
    for {
        buf := make([]byte, 1024)
        n, err := c.Read(buf)
        if err != nil {
            // if i can't read send an exit command to autoSendRoutine and exit
            ch <- []byte("exit")
            break
        }
        // now, for sake of simplicity , only echo commands back to the client
        _, err = c.Write(buf[:n])
        if err != nil {
            // if i can't write back send an exit command to autoSendRoutine and exit
            ch <- []byte("exit")
            break
        }
    }
}

// this goroutine reads from the sensors and write to the channel , so data are sent
// to the client if a client is connected
func dummyReadDataRoutine(ch chan []byte, wg *sync.WaitGroup) {
    x := 0
    for x < 100 {
        // read data from the sensors every 5 seconds
        time.Sleep(1 * time.Second)
        // read first data and send it
        ch <- []byte("data1\n")
        // read second data and send it
        ch <- []byte("data2\n")
        // read third data and send it
        ch <- []byte("data3\n")
        x++
    }
    wg.Done()
}


func main() {
    // create a BUFFERED CHANNEL
    ch := make(chan []byte, 1)
    wg := sync.WaitGroup{}
    wg.Add(2)
    // launch the goruotines that handle the socket connections
    // and read data from the sensors
    go dummyReadDataRoutine(ch, &wg)
    go launchServerUDS(ch, &wg)
    wg.Wait()
}

© www.soinside.com 2019 - 2024. All rights reserved.