我正在尝试使用Apache Beam建立事件的管道。我想做的是使用事件中的ID从GCP PubSub读取流数据并从MySQL读取相关元数据,然后将这两个流合并并写入我的Clickhouse数据库。
但是JdbcIO.readall()似乎没有返回其流。正如您在ClickhousePipeline
上看到的那样,在应用CoGroupByKey.create()
之后,我试图合并两个PCollection
,但是userMetaData
变成空的,并且ParDo
之后也没有执行过链接的UserMetadataEnricher()
。
在withRowMapper
的UserMetadataEnricher
中,我添加了println()
以检查它是否正在运行,并且它可以正常工作并打印数据库中的结果,但是,它不会将数据返回到下一个管道。
我想这个问题与Windowing
有关,我在没有窗口测试的情况下检查了它是否正常工作。但是,PubSubIO是Unbounded PCollection
,所以我必须应用窗口才能使用JDBCIO.readall()
对吗?我不知道要解决这个问题。我希望尽快得到答案!
MainPipeline
object MainPipeline {
@JvmStatic
fun run(options: MainPipelineOptions) {
val p = Pipeline.create(options)
val events = p
.apply(
"Read DetailViewEvent PubSub",
PubsubIO.readStrings().fromSubscription(options.inputSubscription)
)
.apply(
"Extract messages",
ParseJsons.of(FoodDetailViewEvent::class.java)
.exceptionsInto(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
)
.exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
)
val validEvents =
events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
val invalidEvents = events.failures()
invalidEvents.apply(FailurePipeline(options))
validEvents.apply(ClickhousePipeline(options))
p.run().waitUntilFinish()
}
@JvmStatic
fun main(args: Array<String>) {
val options = PipelineOptionsFactory
.fromArgs(*args)
.withValidation()
.`as`(MainPipelineOptions::class.java)
run(options)
}
}
ClickhousePipeline
class ClickhousePipeline(private val options: MainPipelineOptions) :
PTransform<PCollection<DetailViewEvent>, PDone>() {
override fun expand(events: PCollection<DetailViewEvent>): PDone {
val windowedEvents = events
.apply(
"Window", Window
.into<DetailViewEvent>(GlobalWindows())
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
val userIdDetailViewEvents = windowedEvents
.apply(
MapElements.via(object :
SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
return KV.of(input.userInfo.userId, input)
}
})
)
val userMetaData = userIdDetailViewEvents
.apply(
MapElements.via(object :
SimpleFunction<KV<String, DetailViewEvent>, String>() {
override fun apply(input: KV<String, DetailViewEvent>): String {
return input.key!!
}
})
)
.apply(
UserMetadataEnricher(options)
)
.apply(
ParDo.of(
object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
@ProcessElement
fun processElement(
@Element data: UserMetadata,
out: OutputReceiver<KV<String, UserMetadata>>
) {
println("User:: ${data}") // Not printed!!
out.output(KV.of(data.userId, data))
}
})
)
val sourceTag = object : TupleTag<DetailViewEvent>() {}
val userMetadataTag = object : TupleTag<UserMetadata>() {}
val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
.and(userMetadataTag, userMetaData)
.apply(CoGroupByKey.create())
val enrichedData = joinedPipeline.apply(
ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
@ProcessElement
fun processElement(
@Element data: KV<String, CoGbkResult>,
out: OutputReceiver<ClickHouseModel>
) {
val name = data.key
val source = data.value.getAll(sourceTag)
val userMetadataSource = data.value.getAll(userMetadataTag)
println("==========================")
for (metadata in userMetadataSource.iterator()) {
println("Metadata:: $metadata") // This is always empty
}
for (event in source.iterator()) {
println("Event:: $event")
}
println("==========================")
val sourceEvent = source.iterator().next()
if (userMetadataSource.iterator().hasNext()) {
val userMetadataEvent = userMetadataSource.iterator().next()
out.output(
ClickHouseModel(
eventType = sourceEvent.eventType,
userMetadata = userMetadataEvent
)
)
}
}
})
)
val clickhouseData = enrichedData.apply(
ParDo.of(object : DoFn<ClickHouseModel, Row>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val data = context.element()
context.output(
data.toSchema()
)
}
})
)
return clickhouseData
.setRowSchema(ClickHouseModel.schemaType())
.apply(
ClickHouseIO.write(
"jdbc:clickhouse://127.0.0.1:8123/test?password=example",
"clickhouse_test"
)
)
}
}
UserMetadataEnricher
class UserMetadataEnricher(private val options: MainPipelineOptions) :
PTransform<PCollection<String>, PCollection<UserMetadata>>() {
@Throws(Exception::class)
override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
return events
.apply(
JdbcIO.readAll<String, UserMetadata>()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
)
.withUsername("root")
.withPassword("example")
)
.withQuery("select id,name,gender from user where id = ?")
.withParameterSetter { id: String, preparedStatement: PreparedStatement ->
preparedStatement.setString(1, id)
}
.withCoder(
SerializableCoder.of(
UserMetadata::class.java
)
)
.withRowMapper
{
println("RowMapper:: ${it.getString(1)}") // printed!!
UserMetadata(
it.getString(1),
it.getString(2),
it.getString(3)
)
}
)
}
}
输出
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
[GlobalWindow
从不关闭,因此对于像pubsub这样的无限制数据源来说,这不是一个合适的选项。
我建议改用FixedWindow(<Some time range>)
。您可以在https://beam.apache.org/documentation/programming-guide/#windowing