Perl Mojolicious:使用 async/await 和 all_settled 限制并发

问题描述 投票:0回答:1

在 Mojolicious 完整应用程序中,我有以下模式。

  1. 同时运行一些遗留请求(例如同时插入
    db1
    db2
    );和
  2. 按顺序运行一些遗留请求(例如,对于每个数据库,首先插入
    table1
    ,然后插入
    table2
    ,等等)。

注意:对于下面的示例,我使用了

httpbin.org
Docker 镜像:

docker run --rm --name httpbin -p 8000:80 kennethreitz/httpbin

如何使用

Mojo::Promise->map
来限制此模式的
concurrency
concurrency
可以应用于我在
db1
db2
中的所有操作吗?在我下面的示例中,是否可以(比如说)在任何给定时刻将
http://localhost:8000
的点击次数限制为 3?

use Mojolicious::Lite -signatures, -async_await;
use Mojo::Util qw(dumper);

helper db1_LocationA_p => async sub ($self, $request)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                my $tx = $self->ua->post('http://localhost:8000/delay/2' => json => $request);
                my $res = $tx->result;
                die $res->message if $res->is_error;
                $res->json;
            },
            sub {
                my ( $subprocess, $err, @res ) = @_;
                $reject->( $err ) if $err;
                $resolve->( @res );
            }
            );
                              });
};

helper db1_LocationB_p => async sub ($self, $request) {
    return $self->db1_LocationA_p("LocationB $request"); # For brevity
};

helper db2_LocationA_p => async sub ($self, $request)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                my $tx = $self->ua->post('http://localhost:8000/delay/5' => json => $request);
                my $res = $tx->result;
                die $res->message if $res->is_error;
                $res->json;
            },
            sub {
                my ( $subprocess, $err, @res ) = @_;
                $reject->( $err ) if $err;
                $resolve->( @res );
            }
            );
                              });
};

helper db2_LocationB_p => async sub ($self, $request) {
    return $self->db2_LocationA_p("LocationB $request"); # For brevity
};

helper add_db1 => async sub($self, $table1, $table2, $table3) {
    # run sequentially. table1 first, then table2, then table3
    my @table1 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table1 );
    my @table2 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table2 );
    my @table3 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table3 );
    return (@table1, @table2, @table3);
};

helper add_db2 => async sub ($self, $table1, $table2) {
    # run sequentially. table1 first, then table2
    my @table1 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_),  $self->db2_LocationB_p($_) } @$table1 );
    my @table2 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_),  $self->db2_LocationB_p($_) } @$table2 );
    return (@table1, @table2);
};

any '/' => async sub ($self) {
    my $param = $self->param('param');

    my ($db1_table1, $db1_table2, $db1_table3, $db2_table1, $db2_table2);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=FOO${param};);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAR${param};);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAZ${param};);
    push @$db1_table2, qq(ADD DB1 TABLL2 : ID=ABC, IDs = FOO${param}, BAR${param}, BAZ${param};);
    push @$db1_table2, qq(ADD DB1 TABLL2 : ID=XYZ, IDs = FOO${param}, BAR${param}, BAZ${param};);
    push @$db1_table3, qq(ADD DB1 TABLE3 : ID=ZZZ ,IDs = ABC, XYZ;);

    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=FOO${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAR${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAZ${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=QUX${param};);
    push @$db2_table2, qq(ADD DB2 TABLE2 : ID=FOO, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
    push @$db2_table2, qq(ADD DB2 TABLE2 : ID=BAR, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
    
    $self->render_later();
    my @results = eval {
        await Mojo::Promise->all(
            # run concurrently. db1 & db2 can run in parallel at the same time.
            $self->add_db1($db1_table1, $db1_table2, $db1_table3),
            $self->add_db2($db2_table1, $db2_table2),
            )};
    
    if (my $err = $@) {
        warn "Something went wrong: " . dumper($err);
        $self->render(json => $err, status=>502 );
    } else {
        say STDERR dumper(@results);
        $self->render(json => {db1=>$results[0], db2=>$results[1]});
    }
};

app->start;

在上面的例子中:

  • add_db1()
    会立即向
    ADD DB1 TABLE1
    提出 3 个关于
    db1_LocationA
    的请求,并向
    db1_LocationB
    提出另外 3 个请求;和
  • add_db2()
    会立即向
    ADD DB2 TABLE1
    提出 3 个关于
    db2_LocationA
    的请求,并向
    db2_LocationB
    提出另外 3 个请求。

因此总共向 add_db1()

add_db2()
发出了
12 个请求
。我的问题是,是否可以将其限制为 3 个总共(作为示例)

perl async-await promise concurrency mojolicious
1个回答
1
投票

我有一些使用 Mojo::Base 发出 http 请求的经验,我不太明白你所做的代码:(,但这就是我发出 http 请求的示例:

#!/usr/bin/perl

use Mojo::Base qw(-strict -signatures -async_await);
use Mojo::Promise;
use Mojo::UserAgent;

my $ua = Mojo::UserAgent->new;
my @urls = map{"http://ffuf.me/cd/pipes/user?id=$_"} 300..1000;

async sub get_pp($url) {
    my $tx = await $ua->get_p($url);

    my $body = $tx->res->body;
    say $tx->req->url;
   
    if ($body!~/Not Found/i) {
       say $tx->req->url . " " . $body;
       exit;
    }
}

async sub main(@urls) {
    await Mojo::Promise->map({concurrency=>20}, sub {
                        get_pp($_) }, @urls);
}

await main(@urls);

-- id 参数从 300 到 1000 进行模糊测试,同时使用 20 个 http 请求

我的 github 存储库中有更多这样的示例: https://github.com/spoNge369/perl_scrap

在修改您提供的任何代码之前,您可以测试这些属性:

  • connect_timeout()
  • inactivity_timeout()
  • 最大连接数()
  • 请求超时()

适用于 $ua(user-agent) 示例: 总时间限制为 5 秒,其中 3 秒可能用于连接 $ua->max_redirects(0)->connect_timeout(3)->request_timeout(5);

https://docs.mojolicious.org/Mojo/UserAgent#request_timeout

© www.soinside.com 2019 - 2024. All rights reserved.