我正在尝试在单独的大查询表中查找值,我需要使用表 2 中的 url 来查找表 1 中的 link_url。从表 1 中我只需要 link_id
表1
链接_id | 链接网址 |
---|---|
id1 | https://link1.com |
id2 | https://link2.com |
id3 | https://link3.com |
表2
网址 | 应用程序 |
---|---|
https://link1.com | 应用1 |
https://link2.com | 应用2 |
https://link3.com | 应用3 |
https://link4.com | 应用程序4 |
所需输出:
链接_id | 链接网址 | 应用程序 |
---|---|---|
id1 | https://link1.com | 应用1 |
id2 | https://link2.com | 应用2 |
id3 | https://link3.com | 应用3 |
空 | https://link4.com | 应用程序4 |
我有什么
import logging
import apache_beam as beam
def use_side_input(main, side_input):
return side_input[main]
def run():
with beam.Pipeline() as p:
links = p | 'ReadLinks' >> beam.io.ReadFromBigQuery(table="table1", gcs_location="gs://bucket/tmp/")
linkviews = p | 'LinkViews' >> beam.io.ReadFromBigQuery(table="table2", gcs_location="gs://bucket/tmp/")
out | "use side input" >> beam.Map(use_side_input, side_input=beam.pvalue.AsDict(links))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我知道我应该使用侧面输入,但我不知道如何继续
我会将解决方案留给其他人
def add_link(main, side_input):
url = main["url"]
result = side_input.get(url)
main["link_id"] = None
if result:
main["link_id"] = result[0]
return main
def link_key(event):
url = event["link_url"]
return url, [event["link_id"]]
links_keys = links | 'AddLinkKey' >> beam.Map(
link_key)
#
final = (out | "AddLink" >> beam.Map(add_link, side_input=beam.pvalue.AsDict(episodes_keys)))