ZeroMQ:使用ZMQ_STREAM获取客户端消息

问题描述 投票:0回答:2

我想使用ZeroMQ的ZMQ_STREAM套接字访问从TCP对等方收到的消息。在下面的C语言示例中,msg字符串似乎为空:

/* http.c */
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main(int argc, char* argv[])
{
    void *ctx    = zmq_ctx_new();
    void *socket = zmq_socket(ctx, ZMQ_STREAM);
    int rc       = zmq_bind(socket, "tcp://127.0.0.1:8080");

    uint8_t id[256];
    size_t  id_size = 256;

    char    msg[256];
    size_t  msg_size = 256;

    char http_response[] =
        "HTTP/1.0 200 OK\r\n"
        "Content-Type: text/html\r\n"
        "\r\n"
        "<h1>Hello, World!</h1>";

    while (1)
    {
        id_size = zmq_recv(socket, id, 256, 0);

        msg_size = zmq_recv(socket, msg, sizeof(msg), 0);
        msg[msg_size] = '\0';
        printf("REQUEST: %s\n", msg);

        zmq_send(socket, id, id_size, ZMQ_SNDMORE);
        zmq_send(socket, http_response, strlen(http_response), ZMQ_SNDMORE);

        zmq_send(socket, id, id_size, ZMQ_SNDMORE);
        zmq_send(socket, 0, 0, ZMQ_SNDMORE);
    }

    zmq_close(socket);
    zmq_ctx_destroy(ctx);

    return 0;
}

编译和执行:

$ cc -Wall -I/usr/local/include/ -L/usr/local/lib/ -o http http.c -lzmq
$ ./http
REQUEST:
REQUEST:

我无法使用CZMQ API,因为我必须依靠FFI才能进入libzmq

c zeromq
2个回答
0
投票

模拟代码有一个警告-它不遵守已发布的API,因为它从不向任何同位体发送单个消息(因为曾经丢失了未标记SNDMORE的帧)+更好的.bind()到了现有的,可访问的TCP地址(不是127.0.0.1上的模拟抽象回送接口),否则将没有外部对等方能够初始化通信并将任何请求仅传递给下一个进程raw[]或无限[ C0]-循环:


0
投票

问题仅仅是对ZeroMQ消息格式的错误处理。每次客户端连接时,都会首先发送一个空消息... #define FOREVER 1 int main( int argc, char* argv[] ) { void *ctx = zmq_ctx_new (); assert ( ctx && "FAILED to instantiate a ZeroMQ Context" ); void *socket = zmq_socket ( ctx, ZMQ_STREAM ); assert ( socket && "FAILED to instantiate a STREAM Socket" ) ; int rc = zmq_bind (socket, "tcp://*:8080"); assert ( rc == 0 && "FAILED to .bind on <tcp>://address:port>" ); size_t id_size = 256; uint8_t id [id_size]; size_t raw_size = 256; uint8_t raw [raw_size]; while ( FOREVER ) { /* Get HTTP request; ID frame and then PAYLOAD frame ........................*/ id_size = zmq_recv ( socket, id, 256, 0 ); assert ( id_size > 0 && "FAILED to .recv() an ID frame" ); do { raw_size = zmq_recv (socket, raw, 256, 0 ); assert ( raw_size >= 0 && "FAILED to .recv() a PAYLOAD frame" ); } while ( raw_size == 256 ); /* consumes the whole PAYLOAD frame till the last Byte ......................*/ char http_response [] = "HTTP/1.0 200 OK\r\n" "Content-Type: text/plain\r\n" "\r\n" "Hello, World!"; //---------------------------------------------------------------------------------------------------------------------- zmq_send ( socket, id, //......................... ID to .send() a PAYLOAD towards id_size, ZMQ_SNDMORE ); //.............. flag == _SNDMORE zmq_send ( socket, http_response, //.............. PAYLOAD content loaded strlen ( http_response), 0 ); //........................ flag == 0 i.e. LAST FRAME ... == CAN .send() THE MESSAGE AS A WHOLE //---------------------------------------------------------------------------------------------------------------------- zmq_send ( socket, id, //......................... ID to .send() a PAYLOAD towards id_size, ZMQ_SNDMORE ); //.............. flag == _SNDMORE zmq_send ( socket, 0, //.......................... 0 0, //.......................... 0 == a ZeroSized _STREAM PAYLOAD 0 ); //........................ flag == 0 i.e. LAST FRAME ... ZeroSized _STREAM PAYLOAD == .close() //---------------------------------------------------------------------------------------------------------------------- } zmq_close ( socket ); zmq_ctx_destroy ( ctx ); return( 0 ); } (没有有效负载)。以下消息[id, ]的有效负载中包含HTTP GET请求字符串。

© www.soinside.com 2019 - 2024. All rights reserved.