Debezium 服务器到 PubSub:删除操作破坏了应用程序

问题描述 投票:0回答:1

我想将有关我的源数据库(postgres 和 sql-server)更改的通知发送到 pubsub,以便稍后将数据存储在 bigquery 中。到目前为止,一切都很好。我决定看一下 debezium 服务器,并出于测试目的将 debezium 服务器部署为 docker 容器,并创建了一个 sql server 数据库和一个 postgres 数据库。

经过一些调试后,我在 pubsub 中收到了第一个创建和更新通知。到目前为止一切顺利。

当我尝试删除数据库中的记录时,服务器崩溃而没有向 pubsub 发送消息。在日志中我可以看到类似的东西:

{"timestamp":"2023-03-30T17:29:13.722Z","sequence":230,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the task and engine","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:13.723Z","sequence":231,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"Stopping down connector","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.178Z","sequence":232,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-15-thread-1","threadId":59,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.181Z","sequence":233,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-16-thread-1","threadId":60,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.182Z","sequence":234,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Finished streaming","threadName":"debezium-postgresconnector-streamio23-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.183Z","sequence":235,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Connected metrics set to 'false'","threadName":"debezium-postgresconnector-streamio23-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.199Z","sequence":236,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.201Z","sequence":237,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.', error = 'io.debezium.DebeziumException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.'","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"io.debezium.server.pubsub.PubSubChangeConsumer","method":"handleBatch","line":257},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":101},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":913},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":229},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":170},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}],"causedBy":{"exception":{"refId":2,"exceptionType":"java.util.concurrent.ExecutionException","message":"com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute."

这是我的 application.properties 文件。我猜唯一非基本的是主题路由,因为我不想为每个表创建一个 pubsub 主题。

debezium.sink.pravega.scope=empty
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=XXXXX
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=XXXXXXXX
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.topic.prefix=streamio23
debezium.source.schema.include.list=inventory
debezium.snapshot.new.tables=parallel
debezium.source.plugin.name=pgoutput
debezium.transforms=Reroute
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.Reroute.topic.regex=(.*)inventory(.*)
debezium.transforms.Reroute.topic.replacement=stream.stream.stream.inventory.orders
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=true
debezium.source.database.history.file.filename=/tmp/FileDatabaseHistory.dat
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
pk.mode=record_key

也许有人知道这个问题并有解决方案? postgres 和 mssql 的错误是一样的,所以我认为它是接收器的问题?

我已经为 application.properties 文件尝试了多种配置并检查了偏移量。但问题总是一样的。

提前谢谢你!

sql-server postgresql google-cloud-platform publish-subscribe debezium
1个回答
0
投票

我有同样的问题,通过设置解决:

debezium.source.tombstones.on.delete=false

here 你可以看到墓碑事件的解释。简而言之,它是启用具有日志压缩功能的 kafka 主题以实际删除事件的功能。这对于 pubsub 来说是不可取的。

© www.soinside.com 2019 - 2024. All rights reserved.