如何使我的代码异步(Elixir)

问题描述 投票:0回答:1

我有一个“statistic.txt”文件,它可以包含从 0 到 819.2 百万行的数字 [-40 亿; +40 亿]。我需要计算 5 行的组数,其中第 3 个数字比其他数字大。我编写了代码,它可以工作,但需要很长时间才能执行。我怎样才能使我的代码异步以使其运行得更快?

这是我做的代码

defmodule Statistics do
  def count_groups(filename) do
    File.stream!(filename)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.to_integer/1)
    |> Stream.chunk_every(5, 1, :discard)
    |> Stream.filter(fn [a, b, c, d, e] -> c > Enum.max([a, b, d, e]) end)
    |> Enum.count()
  end
end

我尝试使用 ParallelStream、Task 和 Flow,但无法应用任何一个。

更新 这是我使用 Flow 制作的代码

def count_groups_flow(filename) do
    File.stream!(filename)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.to_integer/1)
    |> Stream.chunk_every(5, 1, :discard)
    |> Flow.from_enumerable(stages: 4)
    |> Flow.partition(stages: 4)
    |> Flow.filter(fn [a, b, c, d, e] -> c > Enum.max([a, b, d, e]) end)
    |> Enum.count()
end

更新

输入数据示例(“statistic.txt”文件的前 10 行)

168648312
503340495
-283728446
-762780208
1250431140
-225340028
-72728416
-804793229
-1014960356
-1256160640
-1120932173
asynchronous elixir
1个回答
0
投票

这里的罪魁祸首是

Stream.chunk_every(5, 1, :discard)
,在
之后应用
Flow并没有显着改善,因为它计算的表达式几乎是即时的。

也就是说,目标是首先避免将迭代次数乘以 5。为此,我们需要明智地分块。我们知道,总数很大,所以把它分成更大的块,然后用

Flow
分割每个块的计算是有意义的。我们将使用
Stream.chunk_every(input, n, n-4, :discard)
,其中
n
足够大以确保我们不会丢失五重奏,并且需要
n-4
以便每个五重奏完全进入其中一个块。此外,当数据已经被同时处理时,也应该分阶段进行修剪和转换为整数。

到目前为止,还不错。

filename
|> File.stream!()
# chunk almost without overhead
|> Stream.chunk_every(10_000, 10_000 - 4, :discard)
# now we have big chunks and can use `Flow`
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.map(fn big_chunk ->
  big_chunk
  |> Enum.reduce({0, {nil, nil, nil, nil}}, process_chunk_with_trim)
  |> elem(0)
end)
|> Enum.sum()

唯一剩下的就是实现

process_chunk/2
函数来计算块的中间结果。我们会手动完成,因为与标准核心库实现相比,它会更快。我们需要在累加器中保留之前的四个元素来进行比较。

process_chunk = fn
  e, {0, {nil, nil, nil, nil}} -> {0, {e, nil, nil, nil}}
  e, {0, {e1, nil, nil, nil}} -> {0, {e, e1, nil, nil}}
  e, {0, {e1, e2, nil, nil}} -> {0, {e, e1, e2, nil}}
  e, {0, {e1, e2, e3, nil}} -> {0, {e, e1, e2, e3}}
  e, {acc, {e1, e2, e3, e4}}
       when e2 > e and e2 > e1 and e2 > e3 and e2 > e4 ->
    {acc + 1, {e, e1, e2, e3}}
  e, {acc, {e1, e2, e3, _}} -> {acc, {e, e1, e2, e3}}
end 

process_chunk_with_trim = fn e, acc ->
  process_chunk.(e |> String.trim() |> String.to_integer(), acc)
end

请注意,以上不会计算尾随的四重奏和三重奏,但由于您在分块方法中使用了

:discard
,所以应该没问题。

我没有测试上面的代码,但它应该让您正确地了解如何完成任务(尽管我希望它开箱即用。)

© www.soinside.com 2019 - 2024. All rights reserved.