为什么ZeroMQ PUSH / PULL起作用,而PUB / SUB无效?

问题描述 投票:1回答:1

[环境:Jetson开发板上的NVIDIA风格的Ubuntu 18.01和TX2i处理器。 ZMQ 4.3.2,将cppzmq C ++包装器用于ZMQ。

我在Google协议缓冲区和ZeroMQ上运行了很多代码,全部都是PUSH / PULL,它工作正常,除了我遇到的不是点对点的情况,而是1:3 。正确的解决方案是执行PUB / SUB,但我无法将消息传递给订户。

我将代码简化为这个简单的示例。如果取消注释#define语句,订阅者将一无所获。注释(编译为PUSH / PULL而不是PUB / SUB),然后订阅者按预期方式获得消息。在sleep_for()时间过多的情况下,我希望订阅者在发布者执行发送之前有足够的时间进行注册。

编辑:

为什么要在订阅者上尝试/抓住?我很早就遇到了例外,并认为这是因为发布者尚未准备好。似乎不再是这种情况了,所以不是我想的那样。

// Publisher
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>


#define PUB_SUB

int main( void )
{
  zmq::context_t* m_pContext = new zmq::context_t( 1 );

#ifdef PUB_SUB
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_PUB );
#else
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_PUSH );
#endif

  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  //m_pSocket->bind( "tcp://*:53001" );       // using '*' or specific IP doesn't change result
  m_pSocket->bind( "tcp://127.0.0.1:53001" );
  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

  // Send the parameters
  protobuf_namespace::Params params;
  params.set_calibrationdata( protobuf_namespace::CalDataType::CAL_REQUESTED ); // init one value to non-zero
  std::string        params_str = params.SerializeAsString();
  zmq::message_t     zmsg( params_str.size() );

  memcpy( zmsg.data(), params_str.c_str(), params_str.size() );
  m_pSocket->send( zmsg, zmq::send_flags::none );

  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  m_pSocket->close();
  zmq_ctx_destroy( m_pContext );
}
// Subscriber - start me first!
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#include <stdio.h>

#define PUB_SUB


int main( void )
{
  zmq::context_t* m_pContext = new zmq::context_t( 1 );

#ifdef PUB_SUB
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_SUB );
  m_pSocket->connect( "tcp://127.0.0.1:53001" );

  int linger = 0;
  zmq_setsockopt( m_pSocket, ZMQ_LINGER, &linger, sizeof( linger ) );
  zmq_setsockopt( m_pSocket, ZMQ_SUBSCRIBE, "", 0 );
#else
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_PULL );
  m_pSocket->connect( "tcp://127.0.0.1:53001" );
#endif

  protobuf_namespace::Params params;
  zmq::message_t zmsg;
  bool retry = true;

  do {
    try {
      m_pSocket->recv( zmsg, zmq::recv_flags::none );
      retry = false;
      std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
    } catch( ... ) { 
      printf("caught\n");
    }
    std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  } while( retry );

  std::string param_str( static_cast<char*>( zmsg.data() ), zmsg.size() );
  params.ParseFromString( param_str );

  if( params.calibrationdata() == protobuf_namespace::CalDataType::CAL_REQUESTED )
    printf( "CAL_REQUESTED\n" );
  else
    printf( "bad data\n" );


  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  m_pSocket->close();
  zmq_ctx_destroy( m_pContext );
}
c++ protocol-buffers zeromq publish-subscribe
1个回答
0
投票

如果有人从未使用ZeroMQ,在深入探讨更多细节之前,可以先看一下[ZeroMQ Principles in less than Five Seconds]

Q

为什么

在订阅服务器上的尝试/捕获?

因为:a]

// Subscriber - start me first!并且在进行PUB传输类路径设置以接受其中的任何第一个tcp://之前,.connect()侧几乎“永远”休眠.bind(),这里先是小睡... std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

b]根据定义,trym_pSocket->recv( zmsg, zmq::recv_flags::none );必须引发异常,因为到目前为止尚无tcp://传输类路径设置(因为PUB端尚未返回从睡眠中)

Q为什么ZeroMQ PUSH / PULL起作用但PUB / SUB无效?]

嗯,两者都会,如果设计正确,则尊重发布的API。

只需删除任何阻塞的sleep() -s,阻止SUB -s加入,使.connect()-能够尽快成功。另外,也许会进入.recv() -ops的非*阻塞形式(重构try/catch),这在中很常见,以更好地反映基于预防性设计的基于.poll()或基于反应性.recv(..., ZMQ_NOBLOCK )的性质事件处理。


最后,但并非最不重要:

ZeroMQ v4 +(与v2 +和pre-v3。?API相反)切换为使用PUB

端消息过滤,因此应适当考虑订阅管理(定时/错误处理/弹性)必须发生。

如有疑问,可以集成使用ZeroMQ内置的socket_monitor工具,扩展Context()实例,并跟踪/检查Context()内部的每个事件-例如,在已发布的API事件下,一直到最低的详细程度。

不要犹豫read and ask more

© www.soinside.com 2019 - 2024. All rights reserved.