Apache Beam Min,Max和Average

问题描述 投票:3回答:1

从这个link,Guillem Xercavins为计算最小值和最大值编写了一个自定义类。

class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max

我还需要计算平均值,我发现示例代码如下:

class MeanCombineFn(beam.CombineFn):
  def create_accumulator(self):
    """Create a "local" accumulator to track sum and count."""
    return (0, 0)

  def add_input(self, (sum_, count), input):
    """Process the incoming value."""
    return sum_ + input, count + 1

  def merge_accumulators(self, accumulators):
    """Merge several accumulators into a single one."""
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, (sum_, count)):
    """Compute the mean average."""
    if count == 0:
      return float('NaN')
    return sum_ / float(count)

任何想法如何将平均方法合并到MinMax中所以我只能有一个能够同时计算最小值,最大值和平均值的类,并产生一组键和值 - 3个值的数组?

python apache-beam
1个回答
4
投票

这是组合类解决方案,增加了中位数

import numpy as np

class MinMaxMeanFn(beam.CombineFn):

    def create_accumulator(self):
        # sum, min, max, count, median
        return (0.0, 999999999.0, 0.0, 0, [])

    def add_input(self, cur_data, input):
        (cur_sum, cur_min, cur_max, count, cur_median) = cur_data
        if type(input) == list:
            cur_count = len(input)
            sum_input = sum(input)
            min_input = min(input)
            max_input = max(input)
        else:
            sum_input = input
            cur_count = 1
        return cur_sum + sum_input, min(min_input, cur_min), max(max_input, cur_max), count + cur_count, cur_median + input

    def merge_accumulators(self, accumulators):
        sums, mins, maxs, counts, medians = zip(*accumulators)
        return sum(sums), min(mins), max(maxs), sum(counts), medians

    def extract_output(self, cur_data):
        (sum, min, max, count, medians) = cur_data
        avg = sum / count if count else float('NaN')
        med = np.median(medians)
        return  {
            "max": max,
            "min": min,
            "avg": avg,
            "count": count,
            "median": med
        }

用法示例:

( input |'Format Price' >> beam.ParDo(FormatPriceDoFn())
                        |'Group Price by ID' >> beam.GroupByKey()
                        |'Compute price statistic for each ID' >> beam.CombinePerKey(MinMaxMeanFn()))

*我没有测试CombinePerKey是否在没有GroupByKey的情况下工作,请随意测试它。

© www.soinside.com 2019 - 2024. All rights reserved.