使用zmq的定时器

问题描述 投票:0回答:1

我正在做一个项目,我必须使用zmq_poll。但是我并没有完全理解它的作用。

所以我也尝试着去实现它。

zmq_pollitem_t timer_open(void){

  zmq_pollitem_t items[1];


    if( items[0].socket  == nullptr ){
         printf("error socket %s: %s\n", zmq_strerror(zmq_errno()));
         return;        
    }
else{
    items[0].socket = gsock; 
} 


items[0].fd = -1;   
items[0].events = ZMQ_POLLIN;  


 // get a timer
items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
    printf("timerfd_create() failed: errno=%d\n", errno);
            items[0].socket  = nullptr;

            return;
    }

int rc = zmq_poll(items,1,-1);

if(rc == -1){
    printf("error poll %s: %s\n", zmq_strerror(zmq_errno()));
    return;
} 
else
     return items[0];
}

我对这个话题很陌生, 我必须修改一个现有的项目, 用zmq的函数来替换. 在其他网站上我看到了一些例子,他们在一个无限循环中使用两个项目和zmq_poll函数。我看了文档,但还是不能正确理解这是如何工作的。而这是我实现的另外两个函数。我不知道这样的实现方式是否正确。

   void timer_set(zmq_pollitem_t items[] , long msec, ipc_timer_mode_t mode ) {


    struct itimerspec t;

    ...

    timerfd_settime( items[0].fd , 0, &t, NULL );

}


void timer_close(zmq_pollitem_t items[]){

if( items[0].fd != -1 )
       close(items[0].fd);

items[0].socket = nullptr; 

}

我不确定是否需要zmq_poll函数 因为我使用的是定时器

EDIT:

void some_function_timer_example() {
   // We want to wait on two timers
   zmq_pollitem_t items[2] ;

   // Setup first timer
   ipc_timer_open_(&items[0]);
   ipc_timer_set_(&items[0], 1000, IPC_TIMER_ONE_SHOT);
   // Setup second timer
   ipc_timer_open_(&items[1]);
   ipc_timer_set_(&items[1], 1000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
        //ipc_timer_set_(&items[0], 1000, IPC_TIMER_REPEAT);
        //ipc_timer_set_(&items[1], 5000, IPC_TIMER_REPEAT);

      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */

        if (items [0].revents & ZMQ_POLLIN) {
            //  Process task
            std::cout << "revents: 1" << std::endl;
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  Process weather update

            std::cout << "revents: 2" << std::endl;

        }
   }
}

现在它仍然非常快,而且没有等待。只有在开始的时候,它还在等待。而当timer_set在循环里面的时候,它就会正常等待,只有当等待时间相同的时候才会等待。ipc_timer_set(&items[1], 1000,...) and ipctimer_set(&items[0], 1000,...)

那我该如何改变这种情况?或者这就是正确的行为?

c++ c sockets timer zeromq
1个回答
0
投票

zmq_poll 工作原理和select一样,但是它允许一些额外的东西。例如,你可以选择普通的同步文件描述符,也可以选择特殊的异步套接字。

在你的例子中,你可以使用你已经尝试过的定时器fd,但是你需要做一些小的改变。

首先你必须考虑如何调用这些定时器。我想使用的情况是,如果你想创建多个定时器并等待它们。这将是典型的函数在yuor当前的代码,可能是使用循环的定时器(无论是使用select()或任何其他他们可能做).它将是这样的东西。

void some_function() {
   // We want to wait on two timers
   zmq_pollitem items[2];

   // Setup first timer
   ipc_timer_open(&item[0]);
   ipc_timer_set(&item[0], 1000, IPC_TIMER_ONE_REPEAT);
   // Setup second timer
   ipc_timer_open(&item[1]);
   ipc_timer_set(&item[1], 5000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */
   }
}

现在, 你需要修正ipc_timer_open. 这将是非常简单的 - 只需创建定时器fd。

// Takes a pointer to pre-allocated zmq_pollitem_t and returns 0 for success, -1 for error
int ipc_timer_open(zmq_pollitem_t *items){
    items[0].socket = NULL; 
    items[0].events = ZMQ_POLLIN;  
    // get a timer
    items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
        printf("timerfd_create() failed: errno=%d\n", errno);
        return -1; // error
    }
    return 0;
}

编辑: 添加作为评论的回复,因为这很长:来自文档。If both socket and fd are set in a single zmq_pollitem_t, the ØMQ socket referenced by socket shall take precedence and the value of fd shall be ignored.

所以如果你要传递fd, 你必须把socket设置为NULL. 我甚至不清楚在哪里 gsock 是来自。这是在文档中吗?我找不到。

还有什么时候能脱离while(1)循环?

这是应用逻辑,你要按照你的要求来编码。zmq_poll只是每当一个定时器打到的时候,就会一直返回。在这个例子中,每隔一秒,zmq_poll都会返回,因为第一个定时器(这是一个重复)一直在触发。但是在5秒的时候,它也会因为第二个定时器(这是一个单发)而返回。它由你决定何时退出循环。你想让这个循环无限地进行下去吗?你需要检查不同的条件来退出循环吗?你想这样做100次,然后返回吗?你可以在这段代码上面写任何你想要的逻辑。

那么返回的是什么样的事件呢?

ZMQ_POLLIN,因为定时器fs表现得像可读文件描述符。

© www.soinside.com 2019 - 2024. All rights reserved.