我正在Google Dataflow上使用Apache Beam,并且正在通过lambda函数调用函数情感,但由于未定义函数名称而出现错误。
output_tweets = (lines
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
)
这是我的Apache Beam调用,在最后一行提到了函数问题,这给了我这个问题。
功能代码如下(我认为应该没关系):
def sentiment(messages):
if not isinstance(messages, list):
messages = [messages]
instances = list(map(lambda message: json.loads(message), messages))
lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
for instance in instances['text']:
response = lservice.documents().analyzeSentiment(
body ={
'document': {
'type': 'PLAIN_TEXT',
'content': instance
}
}
).execute()
instance['polarity'] = response['documentSentiment']['polarity']
instance['magnitude'] = response['documentSentiment']['magnitude']
return instances
我得到以下回溯
File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']
任何想法?
此问题可能由于多种原因而发生
sentiment
定义是否与束流管道存在于同一python文件中。sentiment
的定义是否在光束管道中被调用之前?我进行了如下快速测试,如果同时满足以上两个条件,则可以正常运行
def testing(messages):
return messages.lower()
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
0 b'have' 3 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
1 b'ransom' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
2 b'let' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
3 b'me' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
HTH