我正在使用 Dataflow 流管道,并且遇到了一个问题,即在全局窗口中使用侧面输入会锁定使用 30 秒固定窗口的主分支。首先提供侧面输入的“区域处理”之前的所有阶段都运行顺利。一旦我们使用侧面输入进入舞台,它就会停止或急剧减慢。
预期行为: 我的区域处理阶段应在前一阶段完成后立即运行。
实际行为: 即使没有进行任何耗时的处理,该阶段也会在输出之前等待一段时间。 查看“区域处理”阶段的数据流输入/输出集合图,我们发现通常在提供输入后近 1 分钟处理输出。我也可以在阶段日志中看到这一点。
侧输入按预期每 30 秒获取一次,并且只有 1-2kb。
关于输入数据,我尝试过发送准时数据和延迟数据,两者都有相同的问题。这仍处于早期测试阶段,因此输入量非常低。我一次只通过 Postman 发送几条发布-订阅消息。
管道配置:
这是我的管道代码的简化版本。
class AddTimestampToMessage(DoFn):
def process(self, element):
timestamp_str = element['timestamp']
dt = datetime.fromisoformat(timestamp_str.rstrip('Z')).replace(tzinfo=pytz.UTC)
unix_timestamp = dt.timestamp()
yield TimestampedValue(element, unix_timestamp)
class FetchSideInputDataFromFirebase(DoFn):
def __init__(self, databaseUrl, eventId):
self.databaseUrl = databaseUrl
self.eventId = eventId
def start_bundle(self):
"""
Initialize Firebase Admin SDK
"""
# Initialization code for Firebase
if not firebase_admin._apps:
firebase_admin.initialize_app(None, options={
'databaseURL': self.databaseUrl
})
def process(self, element):
if (not self.eventId):
logging.error('No event id provided to fetch side input data. Skipping...')
return
ref = db.reference('/events/' + self.eventId + '/side_input_data')
data = ref.get()
for key, value in data.items():
yield (key, value)
class ProcessMessage(DoFn):
def process(self, messageGroup, side_input=None, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):
# * Takes the message group and outputs 3 records, using side input data
record1 = {}
record2 = {}
record3 = {}
for record in [record1, record2, record3]:
yield record
def run(input_subscription, output_path, window_size=30.0, pipeline_args=None, project=None):
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
sideInputPColl = (
pipeline
| 'Periodic Impulse' >> PeriodicImpulse(fire_interval=window_size, apply_windowing=False)
| 'Fetch Firebase Data' >> ParDo(FetchSideInputDataFromFirebase(...))
| "Global Window" >> WindowInto(
GlobalWindows(),
trigger=Repeatedly(AfterProcessingTime(1)),
accumulation_mode=AccumulationMode.DISCARDING,
)
)
(
pipeline
| "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)
| 'Add timestamp based on message event time' >> ParDo(AddTimestampToMessage())
| "Window into Fixed Windows" >> WindowInto(
FixedWindows(window_size),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=432000 # allow 5 days of late data
)
| "Key by tag_id" >> WithKeys(lambda message: message['tag_id'])
| "Group by tag_id" >> GroupByKey()
# ------ here is where my issue is ------
| "Zone Processing" >> ParDo(ProcessMessage(), pvalue.AsDict(sideInputPColl))
| "Write to Bigquery" >> io.WriteToBigQuery(
'my_table',
create_disposition=io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=io.BigQueryDisposition.WRITE_APPEND
)
)
我尝试过的事情:
AsSingleton
和 AsDict
选项side_input_update = pvalue.AsSingleton(sideInputPColl)
side_input_update = (
pipeline
| 'Periodic Impulse' >> PeriodicImpulse(fire_interval=window_size, apply_windowing=True)
| 'Fetch Firebase Data' >> ParDo(FetchSideInputDataFromFirebase(...))
)
"Global Window" >> WindowInto(
GlobalWindows(),
trigger=Repeatedly(AfterProcessingTime(5)), # 1, 5, 30
accumulation_mode=AccumulationMode.DISCARDING,
)
侧面输入全局窗口存在一个错误https://github.com/apache/beam/issues/28776,这可能与您的问题相关,也可能无关。但仔细一看,和上面提到的问题很相似。
我会尝试提供的解决方法是将
num_workers
管道选项设置为大于 1。
您的数据流管道可扩展到多少工作人员?