使用 Beam Dataflow 流式传输 (python) 从 pub/sub 读取并写入 Firestore(本机)

问题描述 投票:0回答:1

我正在使用 Dataflow 滑动窗口从 pub/sub 读取内容,在创建实体并写入 Firestore 本机之前应用一些转换。我发现 Beam 不支持本机 Firestore I/O 库,因此我检查了一些自定义 [python 代码][https://www.the-swamp.info/blog/uploading-data-firestore-using-dataflow/]命令写入 Firestore。以下是主要的重要步骤:

1- 创建 pub/sub 滑动窗口:

with beam.Pipeline(options=options) as p:
    # Read data from Pub/Sub
    input_data = (
        p
        | "Read from Pub/Sub"
        >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
        | "Decode JSON" >> beam.Map(lambda x: json.loads(x))

    )
    windowed_data = (
        input_data
        | "Apply Windowing"
        >> beam.WindowInto(
            beam.window.SlidingWindows(
                size=60 * 60 * 5, period=60 * 15
            )  # sliding window of 5 hours, triggers every 15 min
        )
    )

2- 创建实体:

def create_content(element, window=DoFn.WindowParam):

logging.info(f" element is : {element}")
product_id = element[0]
window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
# set expiry date of one hour
expiry_timestamp = datetime.utcnow() + timedelta(hours=5)


# create entity
entity = {
    "expiry_timestamp": expiry_timestamp,
    "product_id": product_id,
    "length_list": len(element[1]),
    "window_start": window_start,
    "window_end": window_end,
}
return entity

3- 写入 Firestore 类:

class FirestoreUpdateDoFn(beam.DoFn):
MAX_DOCUMENTS = 200

def __init__(self, project, collection):
    self._project = project
    self._collection = collection
    
def start_bundle(self):
    from google.cloud import firestore

    self._mutations = []
    self.db = firestore.Client(project=self._project)
    self.batch = self.db.batch()

def finish_bundle(self):
    if self._mutations:
        self._flush_batch()
        logging.info("committing batch of elements at finish_bundle")
    logging.info("closing db")
    self.db.close()

def process(self, element, *args, **kwargs):
    self._mutations.append(element)
    logging.info(f"read element is {element}")
    if len(self._mutations) > self.MAX_DOCUMENTS:
        self._flush_batch()

def _flush_batch(self):
    logging.info(f"length batch is {len(self._mutations)}")
    for mutation in self._mutations:
        logging.info(f"mutation is {mutation}")
        product_id = mutation["product_id"]
        key = hashlib.sha1(
            json.dumps(product_id, sort_keys=True).encode("utf-8")
        ).hexdigest()
        ref = self.db.collection(self._collection).document(key)
        self.batch.set(ref, mutation)
    logging.info("about to commit batch")
    r = self.batch.commit()
    logging.info("committing batch of elements")
    logging.debug(r)
    self._mutations = []

4-一起运行:

windowed_data | "Create entities" >> beam.Map(
        create_content
    ) | "Write to Firestore" >> beam.ParDo(
        FirestoreUpdateDoFn(project_id, collection_id)
    )

我使用 FirestoreUpdateDoFn 捆绑包来批量插入 Firestore,我希望通过减少数据流和数据存储之间的来回来提高性能。然而,我注意到写入 Firestore 仍然非常慢。此外,在作业启动大约几分钟后,数据流作业会卡住并且数据停止写入数据库。此外,发布/订阅积压无限期地增加,这意味着消息不会被确认(如果我没有记错的话)。 我还注意到捆绑包非常小,这主要导致小批量(大部分尺寸为 1,请参见屏幕截图)。

目前,我陷入了困境,因为这项工作不可靠(它变得超级慢并且停止写入),并且我不确定是否有方法使用Beam Python将事件流式传输到Firestore中。有什么提示吗?

python google-cloud-firestore streaming google-cloud-dataflow apache-beam
1个回答
0
投票

您的总体想法是正确的,但是,它在

Beam
框架内的实现是错误的。

特别是,您的

_mutations
并未按照您预期的方式工作。它不会附加最多 200 条消息,而是仅将处理方法内的一个 PCollection/
element
内的消息分组。我猜测,每个 PCollection 仅包含一项,因此您会看到批量长度为 1。

为了从不同元素收集消息,您需要使用Beam states(即BagStates)。这个文档非常可靠,所以请允许我复制并稍微调整编程指南中的示例伪代码

class BufferDoFn(DoFn):
  BUFFER = BagStateSpec('buffer', EventCoder())  # you need to use a fitting Coder for your case

  def process(self,
              buffer=DoFn.StateParam(BUFFER)):
    key, value = element
    
    buffer.add(value)  # equivalent to your _mutations.append(element)
    # instead of reading the whole list (as below) you can also introduce a second state (ReadModifyWriteStateSpec) for counting how many messages are in the bagstate -> could be more performant
    if list(buffer.read()) > self.MAX_DOCUMENTS:
       self._flush_batch()  # <- call buffer in there

请记住,有状态的

DoFn
需要键值对才能工作。每个单独的键都会产生不同的状态,该状态独立于其余的状态。在您的情况下,您想要合并所有传入消息,因此您需要为每个传入 PCollection 分配相同的(虚拟)密钥,例如像这样的东西

_ = (
        p
        | "Add dummy key" >> beam.Map(lambda x: ('dummy_key', x)) # the key does not matter, but there has to be one if you want to use a state
        | beam.ParDo(BufferDoFn())
    )

顺便说一句,您应该将

Firestore
客户端的初始化移至
setup()
teardown()
中,而不是开始/完成包中。所有内容都在 Beam 中捆绑处理,因此它们被频繁调用,可能也会减慢您的速度。

© www.soinside.com 2019 - 2024. All rights reserved.