从 Apache Beam 管道收集输出并将其显示到控制台

问题描述 投票:0回答:6

我已经在 Apache Beam 上工作了几天。我想快速迭代我正在工作的应用程序,并确保我正在构建的管道没有错误。在 Spark 中,我们可以使用

sc.parallelise
,当我们应用某些操作时,我们会得到可以检查的值。

类似地,当我阅读有关 Apache Beam 的内容时,我发现我们可以创建一个

PCollection
并使用以下语法来使用它

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

我实际上想将结果打印到控制台。但我找不到任何有关它的文档。

有没有办法将结果打印到控制台而不是每次都保存到文件中?

apache-beam
6个回答
14
投票

您不需要临时列表。在 python 2.7 中,以下内容应该足够了:

def print_row(row):
    print row

(pipeline 
    | ...
    | "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()

在 python 3.x 中,

print
是一个函数,因此以下内容就足够了:

(pipeline 
    | ...
    | "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()

9
投票

在进一步探索并了解如何为我的应用程序编写测试用例之后,我找到了将结果打印到控制台的方法。请注意,我现在正在单节点机器上运行所有内容,并试图了解 apache beam 提供的功能以及如何在不影响行业最佳实践的情况下采用它。

所以,这是我的解决方案。在管道的最后阶段,我们可以引入一个映射函数,它将结果打印到控制台或将结果累积到变量中,稍后我们可以打印变量以查看值

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output

4
投票

也许记录信息而不是打印?

def _logging(elem):
    logging.info(elem)
    return elem

P | "logging info" >> beam.Map(_logging)

0
投票

遵循 pycharm Edu 的示例

import apache_beam as beam

class LogElements(beam.PTransform):
    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            print self.prefix + str(element)
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))

class MultiplyByTenDoFn(beam.DoFn):

    def process(self, element):
        yield element * 10

p = beam.Pipeline()

(p | beam.Create([1, 2, 3, 4, 5])
   | beam.ParDo(MultiplyByTenDoFn())
   | LogElements())

p.run()

输出

10
20
30
40
50
Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>

0
投票
with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines 
              | "Word" >> beam.ParDo(lambda line: line.split(" "))
              | "Pair of One" >> beam.Map(lambda w: (w, 1))
              | "Group" >> beam.GroupByKey()
              | "Count" >> beam.Map(lambda o: (o[0],str(sum(o[1])))))
word_count | beam.ParDo(lambda x: print(x))
result = pipeline.run()

这就像一个魅力!

输出: ('这个','2') ('是', '2') ('测试','2') ('另一个', '1') ('这个','2') ('是', '2') ('测试','2') ('另一个', '1')


-2
投票

我知道这不是您所要求的,但为什么不将其存储到文本文件中呢?它总是比通过标准输出打印它更好,而且它不是易失性的

© www.soinside.com 2019 - 2024. All rights reserved.