Beam JdbcIO.readAll似乎没有返回结果

问题描述 投票:0回答:1

我正在尝试使用Apache Beam建立事件的管道。我想做的是使用事件中的ID从GCP PubSub读取流数据并从MySQL读取相关元数据,然后将这两个流合并并写入我的Clickhouse数据库。

但是JdbcIO.readall()似乎没有返回其流。正如您在ClickhousePipeline上看到的那样,在应用CoGroupByKey.create()之后,我试图合并两个PCollection,但是userMetaData变成空的,并且ParDo之后也没有执行过链接的UserMetadataEnricher()

withRowMapperUserMetadataEnricher中,我添加了println()以检查它是否正在运行,并且它可以正常工作并打印数据库中的结果,但是,它不会将数据返回到下一个管道。

我想这个问题与Windowing有关,我在没有窗口测试的情况下检查了它是否正常工作。但是,PubSubIO是Unbounded PCollection,所以我必须应用窗口才能使用JDBCIO.readall()对吗?我不知道要解决这个问题。我希望尽快得到答案!

MainPipeline

object MainPipeline {
  @JvmStatic
  fun run(options: MainPipelineOptions) {
    val p = Pipeline.create(options)

    val events = p
      .apply(
        "Read DetailViewEvent PubSub",
        PubsubIO.readStrings().fromSubscription(options.inputSubscription)
      )
      .apply(
        "Extract messages",
        ParseJsons.of(FoodDetailViewEvent::class.java)
          .exceptionsInto(
            TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
          )
          .exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
      )

    val validEvents =
      events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
    val invalidEvents = events.failures()

    invalidEvents.apply(FailurePipeline(options))
    validEvents.apply(ClickhousePipeline(options))

    p.run().waitUntilFinish()
  }

  @JvmStatic
  fun main(args: Array<String>) {
    val options = PipelineOptionsFactory
      .fromArgs(*args)
      .withValidation()
      .`as`(MainPipelineOptions::class.java)

    run(options)
  }
}

ClickhousePipeline

class ClickhousePipeline(private val options: MainPipelineOptions) :
  PTransform<PCollection<DetailViewEvent>, PDone>() {

  override fun expand(events: PCollection<DetailViewEvent>): PDone {
    val windowedEvents = events
      .apply(
        "Window", Window
          .into<DetailViewEvent>(GlobalWindows())
          .triggering(
            Repeatedly
              .forever(
                AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(5))
              )
          )
          .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
      )

    val userIdDetailViewEvents = windowedEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
          override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
            return KV.of(input.userInfo.userId, input)
          }
        })
      )

    val userMetaData = userIdDetailViewEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<KV<String, DetailViewEvent>, String>() {
          override fun apply(input: KV<String, DetailViewEvent>): String {
            return input.key!!
          }
        })
      )
      .apply(
        UserMetadataEnricher(options)
      )
      .apply(
        ParDo.of(
          object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
            @ProcessElement
            fun processElement(
              @Element data: UserMetadata,
              out: OutputReceiver<KV<String, UserMetadata>>
            ) {
              println("User:: ${data}") // Not printed!!
              out.output(KV.of(data.userId, data))
            }
          })
      )

    val sourceTag = object : TupleTag<DetailViewEvent>() {}
    val userMetadataTag = object : TupleTag<UserMetadata>() {}

    val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
      KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
        .and(userMetadataTag, userMetaData)
        .apply(CoGroupByKey.create())

    val enrichedData = joinedPipeline.apply(
      ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
        @ProcessElement
        fun processElement(
          @Element data: KV<String, CoGbkResult>,
          out: OutputReceiver<ClickHouseModel>
        ) {

          val name = data.key
          val source = data.value.getAll(sourceTag)
          val userMetadataSource = data.value.getAll(userMetadataTag)

          println("==========================")
          for (metadata in userMetadataSource.iterator()) {
            println("Metadata:: $metadata") // This is always empty
          }

          for (event in source.iterator()) {
            println("Event:: $event")
          }
          println("==========================")

          val sourceEvent = source.iterator().next()
          if (userMetadataSource.iterator().hasNext()) {
            val userMetadataEvent = userMetadataSource.iterator().next()
            out.output(
              ClickHouseModel(
                eventType = sourceEvent.eventType,
                userMetadata = userMetadataEvent
              )
            )

          }
        }
      })
    )

    val clickhouseData = enrichedData.apply(
      ParDo.of(object : DoFn<ClickHouseModel, Row>() {
        @ProcessElement
        fun processElement(context: ProcessContext) {
          val data = context.element()
          context.output(
            data.toSchema()
          )
        }
      })
    )

    return clickhouseData
      .setRowSchema(ClickHouseModel.schemaType())
      .apply(
        ClickHouseIO.write(
          "jdbc:clickhouse://127.0.0.1:8123/test?password=example",
          "clickhouse_test"
        )
      )
  }
}

UserMetadataEnricher

class UserMetadataEnricher(private val options: MainPipelineOptions) :
  PTransform<PCollection<String>, PCollection<UserMetadata>>() {

  @Throws(Exception::class)
  override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
    return events
      .apply(
        JdbcIO.readAll<String, UserMetadata>()
          .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create(
              "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
            )
              .withUsername("root")
              .withPassword("example")
          )
          .withQuery("select id,name,gender from user where id = ?")
          .withParameterSetter { id: String, preparedStatement: PreparedStatement ->
            preparedStatement.setString(1, id)
          }
          .withCoder(
            SerializableCoder.of(
              UserMetadata::class.java
            )
          )
          .withRowMapper
          {
            println("RowMapper:: ${it.getString(1)}") // printed!!
            UserMetadata(
              it.getString(1),
              it.getString(2),
              it.getString(3)
            )
          }
      )
  }
}


输出

RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

kotlin google-cloud-dataflow apache-beam dataflow
1个回答
0
投票

[GlobalWindow从不关闭,因此对于像pubsub这样的无限制数据源来说,这不是一个合适的选项。

我建议改用FixedWindow(<Some time range>)。您可以在https://beam.apache.org/documentation/programming-guide/#windowing

上了解有关Windows的更多信息
© www.soinside.com 2019 - 2024. All rights reserved.