如何使用io_uring同步线程?

问题描述 投票:0回答:1

我是系统编程新手,正在尝试

io_uring
。我开始设计一个网络程序,并确定了一些 CPU 密集型工作,我认为这些工作应该卸载到线程池。不过,我不确定将其与环线程同步的方法以及相关的权衡。

我想到的第一个解决方案是用

pthread_mutex
pthread_cond
锁定的结构(例如队列)。这似乎不合适,因为我怀疑
io_uring_enter
pthread_cond_wait
是否足够频繁地返回,并且在正确的条件下生活在同一个循环中。即使他们这样做了,在这个热循环中引入另一个系统调用似乎也很笨拙。

我当前的设计涉及一对在环和线程池之间共享的文件描述符:每个方向一个。由于只有一个进程,因此指针适合作为描述符上的消息传递。如果读取和写入是原子的,则描述符提供同步:仅例如池端需要

read
/
write
调用,另一端需要常规
io_uring
操作,无需锁定这些操作或指针底层的内存。

我也知道

IORING_OP_MSG_RING
,但由于 CPU 密集型工作可以完全分离,我宁愿在之后立即在(单个)环上安排任何后续 IO。

  • 这种方法有什么明显的问题吗?
  • 我认为匿名管道是最适合在这里使用的描述符类型吗?
  • PIPE_BUF
    限制是原子性的唯一条件吗?
  • 还有其他我没有考虑的方法吗?这是性能最好的吗?
c linux pthreads io-uring
1个回答
0
投票

我从 Jens Axboe 中找到了两段摘录,名为

IORING_OP_MSG_RING
,专门作为这个问题的解决方案。

来自 2022 年内核食谱演讲的幻灯片(幻灯片 31):

[IORING_OP_MSG_RING] 用于在之间传递例如工作项指针 每个线程都有自己的环

来自 liburing GitHub 页面上的(当前,唯一)wiki 文章 io_uring 和 2023 年网络

一个用例可能是处理新连接和单独的后端 处理所述连接的线程,提供了一种传递 从一个环到另一个环的连接。 io_uring_prep_msg_ring() 是一种方法 设立这样的 SQE。或者可以直接从线程中使用 处理给定的连接,将昂贵的工作转移给另一个连接 线程。

根据我的理解,这是这种模式的演示:

#define _GNU_SOURCE // for gettid()
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>

typedef struct io_uring        io_uring;
typedef struct io_uring_params io_uring_params;
typedef struct io_uring_sqe    io_uring_sqe;
typedef struct io_uring_cqe    io_uring_cqe;

#define MQSIZE    64 // size of main-thread (work-sending) queue
#define TQSIZE    32 // size of work-thread (work-receiving) queue
#define NJOBS     32 // number of jobs
#define NTHREADS  4  // number of threads

// represents some unit of work
typedef struct job_unit {
  int input;
  int output;
  int done;
} job_unit;

void * work ( void * arg ) {
  io_uring ring;
  io_uring_sqe * sqe;
  io_uring_cqe * cqe;
  int root_fd = *( ( int * ) arg );
  int tid = gettid();

  io_uring_params params = {};

  // since each thread has a ring, only this thread will touch this ring
  params.flags |= IORING_SETUP_SINGLE_ISSUER;

  // the work-thread should process each incoming job sequentially --
  // there's probably little use in having queued jobs available, e.g.
  // between context switches
  // I'm don't feel confident in my understanding of these switches
  // (it is noted that the computation here is negligible)
  params.flags |= IORING_SETUP_COOP_TASKRUN;
  params.flags |= IORING_SETUP_DEFER_TASKRUN;

  // "share async backend", but not SQ/CQ
  params.flags |= IORING_SETUP_ATTACH_WQ;
  params.wq_fd = root_fd;

  assert( 0 == io_uring_queue_init_params( TQSIZE, &ring, &params ) );
  
  // tell main-thread ring about this one by sending its fd
  // IOSQE_CQE_SKIP_SUCCESS helps reduce bookkeeping by eliding the completion
  assert( ( sqe = io_uring_get_sqe( &ring ) ) );
  io_uring_prep_msg_ring( sqe, root_fd, 0, ring.ring_fd, 0 );
  io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
  io_uring_submit( &ring );

  while ( 1 ) {
    assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );

    // stop signal: terminate thread
    if ( !cqe->user_data ) {
      io_uring_cqe_seen( &ring, cqe );
      break;
    }

    // mutate the struct in some interesting way
    job_unit * job = ( job_unit * ) cqe->user_data;
    printf( "thread #%d assigned job #%d\n", tid, job->input );
    job->output = job->input * 2; // the "work"
    job->done = 1;

    // send the pointer back
    assert( ( sqe = io_uring_get_sqe( &ring ) ) );
    io_uring_prep_msg_ring( sqe, root_fd, 0, ( uint64_t ) job, 0 );
    io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
    io_uring_cqe_seen( &ring, cqe );
    io_uring_submit( &ring );
  }

  printf( "thread #%d exit\n", tid );
  io_uring_queue_exit( &ring );
  return NULL;
}

int main () {
  io_uring       ring;
  io_uring_sqe * sqe = NULL;
  io_uring_cqe * cqe = NULL;
  io_uring_params params = {};
  pthread_t      threads[ NTHREADS ] = {};
  job_unit       jobs[ NJOBS ] = {};        // would usually be from heap
  int            work_fds[ NTHREADS ] = {}; // work-thread ring FDs
  int            work_fd_num = 0;           // dual iter/counter for work_fds
  int i;                                    // loop var

  params.flags |= IORING_SETUP_SINGLE_ISSUER;

  // see similar lines above in work()
  params.flags |= IORING_SETUP_COOP_TASKRUN;
  params.flags |= IORING_SETUP_DEFER_TASKRUN;

  // we don't test for message failure, but I think the style of this example
  // lends itself to the way libuv handles things, including callbacks that
  // can manage their own means of failure
  params.flags |= IORING_SETUP_SUBMIT_ALL;

  assert( 0 == io_uring_queue_init_params( MQSIZE, &ring, &params ) );

  // start threads, passing in our ring's fd
  for ( i = 0 ; i < NTHREADS ; i++ )
    assert( 0 == pthread_create( &threads[ i ], NULL, work, &ring.ring_fd ) );

  // fill out the table of work-thread ring FDs
  for ( i = 0 ; i < NTHREADS ; i++ ) {
    assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
    work_fds[ work_fd_num++ ] = cqe->user_data;
    io_uring_cqe_seen( &ring, cqe );
  }

  // dispatch jobs
  for ( i = 0 ; i < NJOBS ; i++ ) {
    work_fd_num = ( work_fd_num + 1 ) % NTHREADS;
    jobs[ i ].input = i;
    jobs[ i ].done = 0;
    assert( ( sqe = io_uring_get_sqe( &ring ) ) );
    io_uring_prep_msg_ring( sqe, work_fds[ work_fd_num ], 0,
      ( uint64_t ) &jobs[ i ], 0 );
    io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
  }
  io_uring_submit( &ring );

  // collect results
  for ( i = 0; i < NJOBS ; i++ ) {
    assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
    job_unit * job = ( job_unit * ) cqe->user_data;
    printf( "job %d done\n", job->input );
    io_uring_cqe_seen( &ring, cqe );
  }

  // broadcast shutdown to threads
  for ( i = 0 ; i < NTHREADS ; i++ ) {
    work_fd_num = ( work_fd_num + 1 ) % NTHREADS;
    assert( ( sqe = io_uring_get_sqe( &ring ) ) );
    io_uring_prep_msg_ring( sqe, work_fds[ work_fd_num ], 0, 0, 0 );
    io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
  }
  io_uring_submit( &ring );

  // join and report results
  for ( i = 0 ; i < NTHREADS ; i++ )
    assert( 0 == pthread_join( threads[ i ], NULL ) );
  for ( i = 0 ; i < NJOBS ; i++ )
    printf( "%-2d ", jobs[ i ].output );
  printf( "\n" );

  io_uring_queue_exit( &ring );
  return 0;
}

一些有趣的点:

  • 经过反思,管道指针的想法对于
    io_uring
    来说是一种特别糟糕的想法,因为写入的值需要一直存在到完成为止。使用上面的示例作为模型,为了将指针发送到堆栈上的数组,需要分配一个单独的指针数组。本质上是一个管理队列写入的队列。
  • IORING_OP_MSG_RING 生成两个事件:发送给接收者的预期消息,以及发送给发送者的完成消息。该示例使用 IOSQE_CQE_SKIP_SUCCESS 吞掉它们。
  • 每个线程使用一个环表示
    IORING_SETUP_SINGLE_ISSUER
    IORING_SETUP_ATTACH_WQ
    是明显的优化。
    IORING_SETUP_COOP_TASKRUN
    提出了一个令人信服的案例,但我仍然不确定
    IORING_SETUP_DEFER_TASKRUN
    在这里是否有益。
  • ThreadSanitizer 不喜欢这样。它在结构上的每次访问上引用了数据竞争。也许我做错了什么,但我想知道使用这种模式的程序是否永远无法使用该工具。

正如我上面所指出的,我在这里有点超出了我的深度,所以我只是前瞻性地写这个答案,并提供我迄今为止的进展。如果有人知道更多,我很乐意接受另一个答案或修改这个答案。

© www.soinside.com 2019 - 2024. All rights reserved.