Apache Beam Python 在其他集合中查找值

问题描述 投票:0回答:1

我正在尝试在单独的大查询表中查找值,我需要使用表 2 中的 url 来查找表 1 中的 link_url。从表 1 中我只需要 link_id

表1

链接_id 链接网址
id1 https://link1.com
id2 https://link2.com
id3 https://link3.com

表2

网址 应用程序
https://link1.com 应用1
https://link2.com 应用2
https://link3.com 应用3
https://link4.com 应用程序4

所需输出:

链接_id 链接网址 应用程序
id1 https://link1.com 应用1
id2 https://link2.com 应用2
id3 https://link3.com 应用3
https://link4.com 应用程序4

我有什么

import logging
import apache_beam as beam

def use_side_input(main, side_input):

   return side_input[main]

def run():
    with beam.Pipeline() as p:
        links = p | 'ReadLinks' >> beam.io.ReadFromBigQuery(table="table1", gcs_location="gs://bucket/tmp/")

        linkviews = p | 'LinkViews' >> beam.io.ReadFromBigQuery(table="table2", gcs_location="gs://bucket/tmp/")

        out | "use side input" >> beam.Map(use_side_input, side_input=beam.pvalue.AsDict(links))
       

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

我知道我应该使用侧面输入,但我不知道如何继续

python google-bigquery apache-beam
1个回答
0
投票

我会将解决方案留给其他人


def add_link(main, side_input):
    url = main["url"]

    result = side_input.get(url)

    main["link_id"] = None
    if result:
        main["link_id"] = result[0]

    return main


def link_key(event):
    url = event["link_url"]

    return url, [event["link_id"]]

links_keys = links | 'AddLinkKey' >> beam.Map(
            link_key)

#

final = (out | "AddLink" >> beam.Map(add_link, side_input=beam.pvalue.AsDict(episodes_keys)))

© www.soinside.com 2019 - 2024. All rights reserved.