Go: 用zmq4使用gob

问题描述 投票:0回答:1

我想用gob和一个... zmq4 插座 zmq4). A zmq4 socket没有io设备,这使得gob似乎无法直接读写。

我无法使用 &client ( type **zmq4.Socket ) 为型 io.Writer 以此来 gob.NewEncoder: zmq4.Socket 不执行 io.Writer (缺少写法)

zmq4 发送功能,该 SendMessage(),接受一个 interface{}所以我就用它来发送。

在服务器端 zmq4 接收函数返回的是一个 string, []byte, []string[][]byte. 我用的是 RecvMessage() 其中返回一个 []string.

写信给一个人是可以的 bytes.Buffer,发送该缓冲区,将其读取为一个 []string 然后把消息的内容部分用gob处理。虽然目前的问题在于将该 []stringbytes.Buffer 为戈壁能够 io.Read 从它。听起来很基本,但我试过很多方法,至今都没有成功。这里是目前的一个。问题显然是与gob产生的"额外数据"当数据在发送和接收前和接收后似乎是一样的时候,就像 print 报表在显示。

有没有更简单、更走心的方法?如果你有 zmq4,下面的代码是自成一体的,应该会执行。

package main

import (
    "bytes"
    "encoding/gob"
    "fmt"
    "sync"
    "time"

    zmq "github.com/pebbe/zmq4"
)

type LogEntry struct {
    ErrID  int
    Name   string
    Level  string
    LogStr string
}

// The client task
// --------------------------------------------------------------
func client_task(s string) {
    var mu sync.Mutex
    client, _ := zmq.NewSocket(zmq.DEALER)
    defer client.Close()
    client.SetIdentity(s)
    client.Connect("tcp://localhost:5570")

    go func() {

        aLogEntry := &LogEntry{
            ErrID:  1,
            Name:   "Client",
            LogStr: "Log msg sent",
        }

        for request_nbr := 1; true; request_nbr++ {

            var network bytes.Buffer
            enc := gob.NewEncoder(&network)
            err := enc.Encode(aLogEntry)
            if err != nil {
                fmt.Println("encode error:", err)
            }

            // Early decode test - this will influence subsequent gob
            // behaviour so leave commented when caring about the sent
            // data
            // dec := gob.NewDecoder(&network)
            // var aLogEntry2 *LogEntry
            // err = dec.Decode(&aLogEntry2)
            // if err != nil {
            //  fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
            // }
            // fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)

            mu.Lock()
            // Replaced length by bytes Buffer method: 91
            fmt.Printf("client_task(len) : %d\n\n", network.Len())
            fmt.Printf("client_task(network) : %v\n\n", network)

            client.SendMessage(network, 0)
            mu.Unlock()
            time.Sleep(5 * time.Second)
        }
    }()

    // pause to allow server
    for {
        time.Sleep(100 * time.Millisecond)
    }
}

// The server task
// --------------------------------------------------------------
func server_task() {

    frontend, _ := zmq.NewSocket(zmq.ROUTER)
    defer frontend.Close()
    frontend.Bind("tcp://*:5570")

    for {
        msg, _ := frontend.RecvMessage(0)
        // Added error reporting - does not report any error
        // err does never get filled in here, never an error could get reported here
        if err != nil {
            fmt.Printf("RECV ERROR: %s", err)
        }

        // using WriteString to write the content portion of the
        // received message to the bytes.Buffer for gob to process
        var network bytes.Buffer
        dec := gob.NewDecoder(&network)
        network.WriteString(msg[1])

        // Added length of the bytes Buffer: 285
        // Before sending the bytes Buffer is: 91
        // More than just msg[1] is written into the buffer ?
        fmt.Printf("server_task(len): %d\n\n", network.Len())
        fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])

        var aLogEntry *LogEntry
        err := dec.Decode(&aLogEntry)
        if err != nil {
            fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
        }

        fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
    }
}

func main() {
    defer fmt.Println("main() done")

    go client_task("1")
    go server_task()

    //  Run for 5 seconds then quit
    time.Sleep(5 * time.Second)
}

打印语句显示,在客户端。

client_task(network) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

在服务器端

server_task(msg[1]) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

似乎也差不多

结果是

server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>
go zeromq
1个回答
0
投票

欢迎来到 "零禅艺术"。


如果你从来没有使用过 ZeroMQ在这里,我们可以先看看 "ZeroMQ 原则 不到 五秒" 刨根问底


OBSERVATION :

ZeroMQ native API定义了这个属性。

当接收消息时,一个 ZMQ_ROUTER 插座应 预留信息 包含发端对等体到消息的路由ID 然后再传给应用程序.


解决。

要不就可以开始使用正确的-enough PUSH/PULL 可扩展的形式原型,而不是(对你的用例来说是过度主义的)。DEALER/ROUTER或可能依赖于您的假设。ROUTER-节点永远不会 .RecvMessage( 0 ) 与其他内部多部分结构,但只有一个,匹配的模板是[ <routing_id> | <[network]-payload> [ | ... ] ] 这不可能是有力的保证吧?

虽然不确定,但怎么 鹅卵石's zmq4 ZeroMQ的go-wrapper试图实现或不实现所有的native-API功能,或处理潜在的差异。(自动的,没有任何用户级的应用干预?) 读取所有的多部分组件& 处理ot-once和或NULL结尾的字符串的处理,这些处理可能会导致内部冲突。.Decode()-方法。

最后,如果你的代码依赖于 msg[1] 如果我没有忽略一些低级别的黑客,我没有意识到,我没有看到明确的处理案例,当发起方( DEALER )并没有向消费者一方传递任何这样的新信息(the ROUTER ),但 .RecvMessage( 0 )-方法填写 msg 并进行(空 msg )的方向。.Decode()-方法,由于显而易见的原因,它必须在空的或不完整的情况下失败。msg,一定不能吗?

我一定会从 PUSH/PULL 替换,它将不会在交付侧注入预先添加的、现在是多帧的成分,与 routing_id 及相关风险。

非阻塞模式的回报来自 .RecvMessage()-方法在填充时仍会发生碰撞 msg 如果没有待处理的消息在里面等待,则会有空数据。PULL-RxQueue-buffers,这仍应恐慌的。.Decode()-方法。

如果 .RecvMessage( 0 )-方法调用实际上表现出阻塞模式的接收,如果ZeroMQ要完全排除在错误的根源分析之外,就应该在错误状态的检测和处理上更加谨慎。更多的自我防御 .setsockopt()-设置( ZMQ_LINGER 和许多其他)的所有部署的ZeroMQ资源也将提高健壮性&容易出错的水平,即对于崩溃的应用程序可能会造成任何伤害的情况下,在生产。

你可以尝试重现这个错误。这里 不幸的是,IDE 遗漏了 ZeroMQ 的部分。

Golang.org 场地 也没有

go: finding module for package github.com/pebbe/zmq4
go: downloading github.com/pebbe/zmq4 v1.2.0
go: found github.com/pebbe/zmq4 in github.com/pebbe/zmq4 v1.2.0
# pkg-config --cflags  -- libzmq
pkg-config: exec: "pkg-config": executable file not found in $PATH

Go build failed.
© www.soinside.com 2019 - 2024. All rights reserved.