我已经在 Apache Beam 上工作了几天。我想快速迭代我正在工作的应用程序,并确保我正在构建的管道没有错误。在 Spark 中,我们可以使用
sc.parallelise
,当我们应用某些操作时,我们会得到可以检查的值。
类似地,当我阅读有关 Apache Beam 的内容时,我发现我们可以创建一个
PCollection
并使用以下语法来使用它
with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines
| "Word" >> beam.ParDo(lambda line: line.split(" "))
| "Pair of One" >> beam.Map(lambda w: (w, 1))
| "Group" >> beam.GroupByKey()
| "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
result = pipeline.run()
我实际上想将结果打印到控制台。但我找不到任何有关它的文档。
有没有办法将结果打印到控制台而不是每次都保存到文件中?
您不需要临时列表。在 python 2.7 中,以下内容应该足够了:
def print_row(row):
print row
(pipeline
| ...
| "print" >> beam.Map(print_row)
)
result = pipeline.run()
result.wait_until_finish()
在 python 3.x 中,
print
是一个函数,因此以下内容就足够了:
(pipeline
| ...
| "print" >> beam.Map(print)
)
result = pipeline.run()
result.wait_until_finish()
在进一步探索并了解如何为我的应用程序编写测试用例之后,我找到了将结果打印到控制台的方法。请注意,我现在正在单节点机器上运行所有内容,并试图了解 apache beam 提供的功能以及如何在不影响行业最佳实践的情况下采用它。
所以,这是我的解决方案。在管道的最后阶段,我们可以引入一个映射函数,它将结果打印到控制台或将结果累积到变量中,稍后我们可以打印变量以查看值
import apache_beam as beam
# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]
# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
| "split" >> beam.ParDo(lambda row: row.split(" "))
| "pair" >> beam.Map(lambda w: (w, 1))
| "group" >> beam.CombinePerKey(sum))
# lets collect our result with a map transformation into output array
output = []
def collect(row):
output.append(row)
return True
counts | "print" >> beam.Map(collect)
# Run the pipeline
result = pipeline.run()
# lets wait until result a available
result.wait_until_finish()
# print the output
print output
也许记录信息而不是打印?
def _logging(elem):
logging.info(elem)
return elem
P | "logging info" >> beam.Map(_logging)
遵循 pycharm Edu 的示例
import apache_beam as beam
class LogElements(beam.PTransform):
class _LoggingFn(beam.DoFn):
def __init__(self, prefix=''):
super(LogElements._LoggingFn, self).__init__()
self.prefix = prefix
def process(self, element, **kwargs):
print self.prefix + str(element)
yield element
def __init__(self, label=None, prefix=''):
super(LogElements, self).__init__(label)
self.prefix = prefix
def expand(self, input):
input | beam.ParDo(self._LoggingFn(self.prefix))
class MultiplyByTenDoFn(beam.DoFn):
def process(self, element):
yield element * 10
p = beam.Pipeline()
(p | beam.Create([1, 2, 3, 4, 5])
| beam.ParDo(MultiplyByTenDoFn())
| LogElements())
p.run()
输出
10
20
30
40
50
Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>
with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines
| "Word" >> beam.ParDo(lambda line: line.split(" "))
| "Pair of One" >> beam.Map(lambda w: (w, 1))
| "Group" >> beam.GroupByKey()
| "Count" >> beam.Map(lambda o: (o[0],str(sum(o[1])))))
word_count | beam.ParDo(lambda x: print(x))
result = pipeline.run()
这就像一个魅力!
输出: ('这个','2') ('是', '2') ('测试','2') ('另一个', '1') ('这个','2') ('是', '2') ('测试','2') ('另一个', '1')
我知道这不是您所要求的,但为什么不将其存储到文本文件中呢?它总是比通过标准输出打印它更好,而且它不是易失性的