从fifo读取时重叠输出:如何修复/避免这种情况?

问题描述 投票:3回答:2

我正在尝试聚合来自2个文件的数据,因此我决定通过单独的编写器进程将数据发送到命名的fifo,并启动单独的读取器进程来读取和处理聚合数据。所有的读/写都是在ramdisk(/ dev / shm)上进行的,它大约100 GB左右。

这个工作文件和我确保写入fifo的每条数据行都小于512字节,因此管道可以保留其原子行为。

但是在尝试多次运行之后,我开始观察到读取器进程正在接收重叠输出,当我尝试从每个进程管道输出超过1000万行时,就会发生这种情况。我的每条数据线都以新行终止。

我在“+ <fifo”模式下打开fifo读取和“>> fifo”写入。这里不使用系统调用,只需使用常规打开来获取文件句柄并尝试逐行处理数据。

我该如何开始研究这个问题。有任何想法吗?

非常感谢。

更新于2019年/ APR / 29:

请注意,我的循环现在使用系统调用。以前我没有使用它们但最终决定使用它们。

同样的事情也可以通过让2个进程写入单个文件来实现,但是需要注意,因为这只适用于POSIX兼容的文件系统或者如果没有那个 - 可以保留日志文件(多个进程将在RAMDISK中执行写入,因为它也可以工作。 NFS驱动器超出了范围,因为它不符合POSIX,并且此技术不适用于它。

因此,如果我们谈论FIFO与文本文件 - 读取/写入文件的多个进程比读取/写入FIFO的多个进程更快。

仅为即将到来的读者,这是我的作家和读者流程代码。如何设计代码以合并这些子例程取决于您。有很多方法可以做到这一点。

希望它有用。

作家过程

  write_log => sub {
    my ($filehandle, $log_message) = @_;
    select $filehandle ; $|++;
    syswrite ($filehandle, $log_message, length($log_message))
      or die "write_log: syswrite fail!\n";
  },

读者过程:

  read_log => sub
  {
    # In my endless reading loop,
    # if I detect keyword END 2 times (as 
    # i have 2 processes), I exit the reading loop
    # and do further operations.
    #
    my ($end_check_value) = @_;

    sysopen (FH,$logfile, O_CREAT|O_RDONLY)
      or die "($$) read_log: Failed to sysopen\n";

    my ($h, $end) = (undef,0);

    select FH ; $|++ ;

    print STDOUT get_ts().'|'."($$) read_log: now tailing logfile with check count $end_check_value\n";

    for (;;)
    {
      while (my $line = <FH>)
      {
        chomp $line;
        $end++ if $line =~ m/END/g;
        last if $end == $end_check_value;
        my $key = (split(/\s/,$line))[0];
        $h->{$key}++;
      }

      sleep(1) ; seek (FH,0,1);

      # break out of for loop if we
      # have collected the 'END' tags
      # from all worker processes
      if ($end == $end_check_value)
      {
        print STDOUT get_ts().'|'."($$) read_log: breaking for loop ",
                     "with end_check: $end_check_value\n";
        last;
      }
    } close (FH);
  },

绩效统计:

以下是写入RAMDISK上单个文件的多个进程的性能统计信息。平均而言,写入150,000,000行(150 mn)需要大约10分钟加上减去20秒,然后读入哈希值。

test string is 238 bytes long
20190429-12:34:50.637|(11139) PARENT: each child will write (75000000) to (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11139) trunc_log_file: truncated (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11149) process no. (2) launched!
20190429-12:34:54.399|(11150) process no. (1) launched!
20190429-12:34:55.400|(11139) read_log: now tailing logfile with check count 2
20190429-12:44:21.565|(11150) process exiting with status code 0
20190429-12:44:34.164|(11149) process exiting with status code 0
20190429-12:45:03.956|(11139) read_log: breaking for loop with end_check: 2
20190429-12:45:03.957|(11139) read_log: Collected counts:
(11139) (11149):75000000
(11139) (11150):75000000
---------------
(11139) Finished!

real    **10m13.466s**
user    9m31.627s
sys     0m39.650s

这是FIFO的性能统计数据,其中多个进程分别向FIFO写入25,000,000行,读取器进程将它们读回哈希。平均花了大约25-30分钟。它比写入文件的进程慢。

test string is 141 bytes long
20190426-10:25:13.455|28342|2-test-fifo.pl: Starting..
20190426-10:25:13.456|28345|CHILD starting (read_and_hash)
20190426-10:25:13.456|28345|READ_AND_HASH now hashing files
20190426-10:25:14.458|28346|CHILD starting (s1_data_gather)
20190426-10:25:14.458|28346|Working on sit1 data..
20190426-10:25:14.458|28347|CHILD starting (s2_data_gather)
20190426-10:25:14.458|28347|Working on sit2 data..
20190426-10:48:48.454|28346|Finished working on S1 data..
20190426-10:48:48.457|28342|Reaped 28346
20190426-10:48:48.462|28345|read LAST line from S2 data
20190426-10:48:52.657|28347|Finished working on s2 data..
20190426-10:48:52.660|28342|Reaped 28347
20190426-10:48:52.669|28345|read LAST line from S2 data
20190426-10:48:53.130|28345|READ_AND_HASH finished hashing files
(read_n_hash): finished hashing. keys count
        s1 = 25000000
        s2 = 25000000
20190426-10:48:53.130|28345|starting comparison. doing source to target
20190426-10:49:49.566|28345|finished comparing source to target. now comparing target to source
20190426-10:50:45.578|28345|comparing target to source ends. finished
20190426-10:51:57.220|28342|Reaped 28345
20190426-10:51:57.220|28342|2-test-fifo.pl: Ending..
perl pipe fifo rhel5 overlapped-io
2个回答
1
投票

您可能必须为您正在编写的文件打开autoflush。如果您使用open()函数而不是通过OO接口(如IO :: File)打开文件,那么在您成功打开文件后(如$ fifo,比如说),您需要这样的代码。

select $fifo;
$| = 1;

请注意,select()选择输出文件句柄用于打印等未指定特定文件句柄的输出文件句柄。如果你想恢复到目标STDOUT,那么select STDOUT在上面之后,或者,是迂腐:

my $oldfh = select $fifo;
$| = 1;
select $oldfh;

我不认为文件模式('+ <'等)与它有任何关系,因为像“clobbering”和“append”这样的概念不适用于FIFO。你可能也会用简单的“>”和“<”做同样的事情。


0
投票

您在这里看到的可能是并发的简单产品。您假设读者及时将数据从FIFO中拉出。如果两位作家都有机会在读者再次阅读之前写下几条记录怎么办?如果FIFO通过写入部分地达到容量怎么办?编写器将阻止部分写入,然后读者将有机会清空队列,但不能保证编写部分行的编写器将是下一个写入的编写器。这将导致交错的行。

如果我关于autoflush的答案无法解决您的问题,您可能必须考虑以这种方式交错写入的可能性。

正如上面的评论中所提到的,使用数据报套接字(SOCK_DGRAM)而不是FIFO可能会更好。这样,每条消息都是一个没有交错机会的原子单元。

© www.soinside.com 2019 - 2024. All rights reserved.