使用 Rabbit MQ 和 MS SQL SERVER 设置 Debezium

问题描述 投票:0回答:1

我想通过 Debezium 和 Rabbit MQ 从 SQL Server 传输更改,但是在设置时出现以下错误。您能帮我解决这个问题吗?

回复代码=404,回复文本=NOT_FOUND - 没有交换“XXX_IMPORT_STREAM” 在虚拟主机'/'中,class-id=60,method-id=40

我正在关注以下文章NYIOR CLement

https://www.cloudamqp.com/blog/change-data-capture-with-rabbitmq-and-debezium-part-2.html

docker-compose

services:
  debezium:
    image: quay.io/debezium/server
    container_name: debezium
    healthcheck:
      test: curl http://debezium:8080/q/health || exit 1
      interval: 5s
      timeout: 5s
      retries: 5
    ports:
      - "8080:8080"
    volumes:
      - ./debezium_conf:/debezium/conf:readonly

conf 文件

# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=HOSTIP
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=DEV
debezium.sink.rabbitmq.connection.password=1234
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.autoCreateRoutingKey=true
debezium.sink.rabbitmq.routingKey=XXX_IMPORT_STREAM

# Source connector config - MSSQL
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.plugin.name=pgoutput
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=Database_HostName
debezium.source.database.port=1433
debezium.source.database.user=dev
debezium.source.database.password=1234
debezium.source.database.names=DbName
debezium.source.table.include.list=dbo.table_name
debezium.source.database.allowPublicKeyRetrieval=true
debezium.source.topic.prefix=XXX_IMPORT_STREAM
debezium.source.database.encrypt=false
debezium.source.schema.whitelist=test
debezium.source.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory

# Format config
debezium.format.key=json
debezium.format.value=json

# Quarkus
quarkus.log.console.json=false

执行上述设置后,就成功创建了定义的数据库表快照。但之后我收到以下错误,提示未找到 XXX_IMPORT_STREAM

debezium  | 2024-04-19 11:06:41,148 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=XXX_IMPORT_STREAM, changeLsn=NULL, commitLsn=00000c77:00007710:0006, eventSerialNo=null, snapshot=FALSE, sourceTime=2024-04-19T11:06:40.276Z], snapshotCompleted=true, eventSerialNo=1]]
debezium  | 2024-04-19 11:06:41,149 WARN  [io.deb.rel.RelationalDatabaseSchema] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) After applying the include/exclude list filters, no changes will be captured. Please check your configuration!
debezium  | 2024-04-19 11:06:41,155 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Connected metrics set to 'true'
debezium  | 2024-04-19 11:06:41,177 INFO  [io.deb.pip.sig.SignalProcessor] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) SignalProcessor started. Scheduling it every 5000ms
debezium  | 2024-04-19 11:06:41,178 INFO  [io.deb.uti.Threads] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Creating thread debezium-sqlserverconnector-XXX_IMPORT_STREAM-SignalProcessor
debezium  | 2024-04-19 11:06:41,180 INFO  [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Starting streaming
debezium  | 2024-04-19 11:06:41,180 INFO  [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Finished streaming
debezium  | 2024-04-19 11:06:41,180 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Connected metrics set to 'false'
debezium  | 2024-04-19 11:06:41,181 INFO  [io.deb.pip.sig.SignalProcessor] (pool-7-thread-1) SignalProcessor stopped
debezium  | 2024-04-19 11:06:41,182 INFO  [io.deb.ser.DefaultServiceRegistry] (pool-7-thread-1) Debezium ServiceRegistry stopped.
debezium  | 2024-04-19 11:06:41,184 INFO  [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
debezium  | 2024-04-19 11:06:41,210 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-7-thread-1) Stopped FileOffsetBackingStore
debezium  | 2024-04-19 11:06:41,211 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)', error = 'com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)': com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)
debezium  |     at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:210)
debezium  |     at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:247)
debezium  |     at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:707)
debezium  |     at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:164)
debezium  |     at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
debezium  |     at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
debezium  |     at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
debezium  |     at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
debezium  |     at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
debezium  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium  |     at java.base/java.lang.Thread.run(Thread.java:829)
debezium  | Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)
debezium  |     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
debezium  |     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
debezium  |     at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
debezium  |     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
debezium  |     at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
debezium  |     at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
debezium  |     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
debezium  |     ... 1 more
debezium  |
debezium  | 2024-04-19 11:06:41,231 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
debezium  | 2024-04-19 11:06:41,232 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
debezium  | 2024-04-19 11:06:41,234 ERROR [io.qua.arc.imp.UncaughtExceptions] (main) Error occurred while destroying instance of CLASS bean [types=[io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer, io.debezium.engine.DebeziumEngine$ChangeConsumer<io.debezium.engine.ChangeEvent<java.lang.Object, java.lang.Object>>, java.lang.Object, io.debezium.server.BaseChangeConsumer], qualifiers=[@Default, @Any, @Named("rabbitmq")], target=io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer]: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40) [Error Occurred After Shutdown]
debezium  | 2024-04-19 11:06:41,254 INFO  [io.quarkus] (main) debezium-server-dist stopped in 0.042s
docker-compose rabbitmq queue debezium
1个回答
0
投票

您的 RabbitMQ 节点中似乎没有名为

XXX_IMPORT_STREAM
的交换。

修复:

  • 创建交易所,
    XXX_IMPORT_STREAM
    (直接交易所) 在您的 RabbitMQ 节点中
  • 使用与交换器匹配的绑定密钥将交换器绑定到队列 Debezium 配置文件中定义的路由密钥。
© www.soinside.com 2019 - 2024. All rights reserved.