elixir中频率计算的快速并发算法

问题描述 投票:0回答:1

我有两个大的列表,它们的项目长度不是固定的。每个列表中都有几百万个项目,我想统计这两个列表中的项目的频率。first listsecond list!

例如,由于列表的尺寸很大,我希望这个过程是并发完成的。

a = [[c, d], [a, b, e]]
b = [[a, d, c], [e, a, b], [a, d], [c, d, a]]

# expected result of calculate_frequency(a, b) is %{[c, d] => 2, [a, b, e] => 1} Or [{[c, d], 2}, {[a, b, e], 1}]

由于列表的大小,我希望这个过程是并发的,所以我写了这个函数。

  def calculate_frequency(items, data_list) do
    items
    |> Task.async_stream(
      fn item ->
        frequency =
          data_list
          |> Enum.reduce(0, fn data_row, acc ->
            if item -- data_row == [] do
              acc + 1
            else
              acc
            end
          end)

        {item, frequency}
      end,
      ordered: false
    )
    |> Enum.reduce([], fn {:ok, merged}, merged_list -> [merged | merged_list] end)
  end

但是这个算法很慢 我应该怎么做才能让它变快?

PS: 请不要考虑输入和输出的类型,执行速度很重要。

algorithm erlang elixir word-count
1个回答
0
投票

不知道这是否足够快,当然也不是并发的。它是 O(m + n) 哪儿 m 的大小。itemsn 的大小。data_list. 我找不到更快的并发方式,因为合并所有子进程的结果也需要时间。

data_list
|> Enum.reduce(%{}, fn(item, counts)-> 
  Map.update(counts, item, 1, &(&1 + 1)) 
end)
|> Map.take(items)

顺便说一下,并发做事情不一定就是并行做事情。如果你只有一个CPU核,并发其实会让事情变慢,因为一个CPU核一次只能做一件事。


0
投票

把一个列表放到一个 MapSet.

查看第二个列表,看看每个元素是否在 MapSet.

这在列表的长度上是线性的,这两个操作应该可以并行化。


0
投票

我会先对你要比较的数据进行标准化,这样一个简单的平等检查就可以判断两个项目是否如你所定义的那样 "相等"。根据你的代码,我猜测 Enum.sort/1 凑合 MapSet.new/1 或一个返回地图的函数,如果它符合你的用例,可能会比较快。

defp normalize(item) do
  Enum.sort(item)
end

def calculate_frequency(items, data_list) do
  data_list = Enum.map(data_list, &normalize/1)
  items = Enum.map(items, &normalize/1)
end

如果你要从数据列表中获取大部分频率,我会计算数据列表的所有频率。Elixir 1.10引入了 Enum.frequencies/1Enum.frequencies_by/2但如果需要的话,你可以用reduce来实现。

def calculate_frequency(items, data_list) do
  data_frequencies = Enum.frequencies_by(data_list, &normalize/1) # does map for you

  Map.new(items, &Map.get(data_frequencies, normalize(&1), 0)) # if you want result as map
end

我还没有对我的代码或你的代码做任何基准测试。如果你想做更多的异步事情,你可以把你的映射替换为 Task.async_stream/3,你可以用以下组合来代替你的频率呼叫。Stream.chunk_every/2, Task.async_stream/3 (有 Enum.frequencies/1 是函数),而 Map.merge/3.

© www.soinside.com 2019 - 2024. All rights reserved.