我正在使用 Dataflow 滑动窗口从 pub/sub 读取内容,在创建实体并写入 Firestore 本机之前应用一些转换。我发现 Beam 不支持本机 Firestore I/O 库,因此我检查了一些自定义 [python 代码][https://www.the-swamp.info/blog/uploading-data-firestore-using-dataflow/]命令写入 Firestore。以下是主要的重要步骤:
1- 创建 pub/sub 滑动窗口:
with beam.Pipeline(options=options) as p:
# Read data from Pub/Sub
input_data = (
p
| "Read from Pub/Sub"
>> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
| "Decode JSON" >> beam.Map(lambda x: json.loads(x))
)
windowed_data = (
input_data
| "Apply Windowing"
>> beam.WindowInto(
beam.window.SlidingWindows(
size=60 * 60 * 5, period=60 * 15
) # sliding window of 5 hours, triggers every 15 min
)
)
2- 创建实体:
def create_content(element, window=DoFn.WindowParam):
logging.info(f" element is : {element}")
product_id = element[0]
window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
# set expiry date of one hour
expiry_timestamp = datetime.utcnow() + timedelta(hours=5)
# create entity
entity = {
"expiry_timestamp": expiry_timestamp,
"product_id": product_id,
"length_list": len(element[1]),
"window_start": window_start,
"window_end": window_end,
}
return entity
3- 写入 Firestore 类:
class FirestoreUpdateDoFn(beam.DoFn):
MAX_DOCUMENTS = 200
def __init__(self, project, collection):
self._project = project
self._collection = collection
def start_bundle(self):
from google.cloud import firestore
self._mutations = []
self.db = firestore.Client(project=self._project)
self.batch = self.db.batch()
def finish_bundle(self):
if self._mutations:
self._flush_batch()
logging.info("committing batch of elements at finish_bundle")
logging.info("closing db")
self.db.close()
def process(self, element, *args, **kwargs):
self._mutations.append(element)
logging.info(f"read element is {element}")
if len(self._mutations) > self.MAX_DOCUMENTS:
self._flush_batch()
def _flush_batch(self):
logging.info(f"length batch is {len(self._mutations)}")
for mutation in self._mutations:
logging.info(f"mutation is {mutation}")
product_id = mutation["product_id"]
key = hashlib.sha1(
json.dumps(product_id, sort_keys=True).encode("utf-8")
).hexdigest()
ref = self.db.collection(self._collection).document(key)
self.batch.set(ref, mutation)
logging.info("about to commit batch")
r = self.batch.commit()
logging.info("committing batch of elements")
logging.debug(r)
self._mutations = []
4-一起运行:
windowed_data | "Create entities" >> beam.Map(
create_content
) | "Write to Firestore" >> beam.ParDo(
FirestoreUpdateDoFn(project_id, collection_id)
)
我使用 FirestoreUpdateDoFn 捆绑包来批量插入 Firestore,我希望通过减少数据流和数据存储之间的来回来提高性能。然而,我注意到写入 Firestore 仍然非常慢。此外,在作业启动大约几分钟后,数据流作业会卡住并且数据停止写入数据库。此外,发布/订阅积压无限期地增加,这意味着消息不会被确认(如果我没有记错的话)。 我还注意到捆绑包非常小,这主要导致小批量(大部分尺寸为 1,请参见屏幕截图)。
目前,我陷入了困境,因为这项工作不可靠(它变得超级慢并且停止写入),并且我不确定是否有方法使用Beam Python将事件流式传输到Firestore中。有什么提示吗?
您的总体想法是正确的,但是,它在
Beam
框架内的实现是错误的。
特别是,您的
_mutations
并未按照您预期的方式工作。它不会附加最多 200 条消息,而是仅将处理方法内的一个 PCollection/element
内的消息分组。我猜测,每个 PCollection 仅包含一项,因此您会看到批量长度为 1。
为了从不同元素收集消息,您需要使用Beam states(即BagStates)。这个文档非常可靠,所以请允许我复制并稍微调整编程指南中的示例伪代码:
class BufferDoFn(DoFn):
BUFFER = BagStateSpec('buffer', EventCoder()) # you need to use a fitting Coder for your case
def process(self,
buffer=DoFn.StateParam(BUFFER)):
key, value = element
buffer.add(value) # equivalent to your _mutations.append(element)
# instead of reading the whole list (as below) you can also introduce a second state (ReadModifyWriteStateSpec) for counting how many messages are in the bagstate -> could be more performant
if list(buffer.read()) > self.MAX_DOCUMENTS:
self._flush_batch() # <- call buffer in there
请记住,有状态的
DoFn
需要键值对才能工作。每个单独的键都会产生不同的状态,该状态独立于其余的状态。在您的情况下,您想要合并所有传入消息,因此您需要为每个传入 PCollection 分配相同的(虚拟)密钥,例如像这样的东西
_ = (
p
| "Add dummy key" >> beam.Map(lambda x: ('dummy_key', x)) # the key does not matter, but there has to be one if you want to use a state
| beam.ParDo(BufferDoFn())
)
顺便说一句,您应该将
Firestore
客户端的初始化移至 setup()
和 teardown()
中,而不是开始/完成包中。所有内容都在 Beam 中捆绑处理,因此它们被频繁调用,可能也会减慢您的速度。