使用 OCaml Async 进行并发写入

问题描述 投票:0回答:1

我正在从网络读取数据,并且我想在收到数据时将其写入文件。写入是并发且非顺序的(想想 P2P 文件共享)。在 C 中,我将获取文件的文件描述符(在程序运行期间),然后使用

lseek
,然后使用
write
,最终关闭
fd
。这些操作可以通过多线程设置中的互斥体进行保护(特别是 lseek 和 write 应该是原子的)。

我真的不明白如何在异步中获得这种行为。我最初的想法是拥有这样的东西。

 let write fd s pos = 
     let posl = Int64.of_int pos in
     Async_unix.Unix_syscalls.lseek fd ~mode:`Set posl
     >>| fun _ -> 
     let wr = Writer.create t.fd in
     let len = String.length s in
     Writer.write wr s ~pos:0 ~len

然后,当接收到数据时,异步调度写入。

我的解决方案不正确。一方面,这个

write
任务需要是原子的,但事实并非如此,因为两个
lseek
可以在第一个
Writer.write
之前执行。即使我可以按顺序安排
write
也无济于事,因为
Writer.write
不会返回
Deferred.t
。有什么想法吗?

顺便说一句,这是之前回答的问题的后续。

asynchronous ocaml
1个回答
2
投票

基本方法是拥有一个工作人员队列,其中每个工作人员执行原子

seek/write
1 操作。不变的是一次只有一个工人在运行。更复杂的策略将采用优先级队列,其中写入按照某些最大化吞吐量的标准进行排序,例如写入后续位置。如果您观察到大量小写入,您还可以实施复杂的缓冲策略,那么一个好主意是将它们合并成更大的块。

但是让我们从一个简单的非优先队列开始,通过

Async.Pipe.t
实现。对于位置写入,我们不能使用 Writer 接口,因为它是为缓冲顺序写入而设计的。因此,我们将使用
Unix.lseek
中的
Async_unix.Std
和 Bigstring.really_write
function. The really_write is a regular non-asynchronous function, so we need to lift it into the Async interface using the
Fd.syscall_in_thread` 函数,例如

let really_pwrite fd offset bytes = 
  Unix.lseek fd offset ~mode:`Set >>= fun (_ : int64) ->
  Fd.syscall_in_thread fd (fun desc -> 
    Bigstring.really_write desc bytes)

注意:此函数将写入系统决定的字节数,但不超过

bytes
的长度。因此,您可能有兴趣实现一个将写入所有字节的
really_pwrite
函数。

总体方案将包括一个主线程,它将拥有一个文件描述符并通过 Async.Pipe 接受来自多个客户端的写入请求。假设每个写请求都是以下类型的消息:

 type chunk = {
    offset : int;
    bytes : Bigstring.t;
 }

然后你的主线程将如下所示:

let process_requests fd = 
  Async.Pipe.iter ~f:(fun {offset; bytes} -> 
    really_pwrite fd offset bytes)

其中

really_pwrite
是一个真正写入所有字节并处理所有错误的函数。您还可以在实际执行
Async.Pipe.iter'
系统调用之前使用
pwrite
函数并预排序和合并写入。

还有一个优化说明。分配一个大字符串是一项相当昂贵的操作,因此您可以考虑预先分配一个大的大字符串并从中提供小块。这将创建有限的资源,因此您的客户端将等待其他客户端完成写入并释放其块。因此,您将拥有一个内存占用有限的受限系统。


1)理想情况下,我们应该使用

pwrite
,尽管 Janestreet 只提供
pwrite_assume_fd_is_nonblocking
函数,当对系统
pwrite
的调用完成时,它不会释放 OCaml 运行时,并且实际上会阻塞整个系统。所以我们需要使用查找和写入的组合。后者将释放 OCaml 运行时,以便程序的其余部分可以继续。 (另外,考虑到他们对非阻塞 fd 的定义,这个函数并没有多大意义,因为只有套接字和 FIFO 被认为是非阻塞的,而且据我所知,它们不支持查找操作。我将提交一个他们的错误跟踪器上的问题。

© www.soinside.com 2019 - 2024. All rights reserved.