我想通过 Debezium 和 Rabbit MQ 从 SQL Server 传输更改,但是在设置时出现以下错误。您能帮我解决这个问题吗?
回复代码=404,回复文本=NOT_FOUND - 没有交换“XXX_IMPORT_STREAM” 在虚拟主机'/'中,class-id=60,method-id=40
我正在关注以下文章NYIOR CLement
https://www.cloudamqp.com/blog/change-data-capture-with-rabbitmq-and-debezium-part-2.html
docker-compose
services:
debezium:
image: quay.io/debezium/server
container_name: debezium
healthcheck:
test: curl http://debezium:8080/q/health || exit 1
interval: 5s
timeout: 5s
retries: 5
ports:
- "8080:8080"
volumes:
- ./debezium_conf:/debezium/conf:readonly
conf 文件
# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=HOSTIP
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=DEV
debezium.sink.rabbitmq.connection.password=1234
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.autoCreateRoutingKey=true
debezium.sink.rabbitmq.routingKey=XXX_IMPORT_STREAM
# Source connector config - MSSQL
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.plugin.name=pgoutput
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=Database_HostName
debezium.source.database.port=1433
debezium.source.database.user=dev
debezium.source.database.password=1234
debezium.source.database.names=DbName
debezium.source.table.include.list=dbo.table_name
debezium.source.database.allowPublicKeyRetrieval=true
debezium.source.topic.prefix=XXX_IMPORT_STREAM
debezium.source.database.encrypt=false
debezium.source.schema.whitelist=test
debezium.source.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory
# Format config
debezium.format.key=json
debezium.format.value=json
# Quarkus
quarkus.log.console.json=false
执行上述设置后,就成功创建了定义的数据库表快照。但之后我收到以下错误,提示未找到 XXX_IMPORT_STREAM
debezium | 2024-04-19 11:06:41,148 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=XXX_IMPORT_STREAM, changeLsn=NULL, commitLsn=00000c77:00007710:0006, eventSerialNo=null, snapshot=FALSE, sourceTime=2024-04-19T11:06:40.276Z], snapshotCompleted=true, eventSerialNo=1]]
debezium | 2024-04-19 11:06:41,149 WARN [io.deb.rel.RelationalDatabaseSchema] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) After applying the include/exclude list filters, no changes will be captured. Please check your configuration!
debezium | 2024-04-19 11:06:41,155 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Connected metrics set to 'true'
debezium | 2024-04-19 11:06:41,177 INFO [io.deb.pip.sig.SignalProcessor] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) SignalProcessor started. Scheduling it every 5000ms
debezium | 2024-04-19 11:06:41,178 INFO [io.deb.uti.Threads] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Creating thread debezium-sqlserverconnector-XXX_IMPORT_STREAM-SignalProcessor
debezium | 2024-04-19 11:06:41,180 INFO [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Starting streaming
debezium | 2024-04-19 11:06:41,180 INFO [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Finished streaming
debezium | 2024-04-19 11:06:41,180 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-XXX_IMPORT_STREAM-change-event-source-coordinator) Connected metrics set to 'false'
debezium | 2024-04-19 11:06:41,181 INFO [io.deb.pip.sig.SignalProcessor] (pool-7-thread-1) SignalProcessor stopped
debezium | 2024-04-19 11:06:41,182 INFO [io.deb.ser.DefaultServiceRegistry] (pool-7-thread-1) Debezium ServiceRegistry stopped.
debezium | 2024-04-19 11:06:41,184 INFO [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
debezium | 2024-04-19 11:06:41,210 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-7-thread-1) Stopped FileOffsetBackingStore
debezium | 2024-04-19 11:06:41,211 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)', error = 'com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)': com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)
debezium | at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:210)
debezium | at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:247)
debezium | at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:707)
debezium | at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:164)
debezium | at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
debezium | at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
debezium | at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
debezium | at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
debezium | at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium | at java.base/java.lang.Thread.run(Thread.java:829)
debezium | Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40)
debezium | at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
debezium | at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
debezium | at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
debezium | at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
debezium | at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
debezium | at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
debezium | at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
debezium | ... 1 more
debezium |
debezium | 2024-04-19 11:06:41,231 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
debezium | 2024-04-19 11:06:41,232 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
debezium | 2024-04-19 11:06:41,234 ERROR [io.qua.arc.imp.UncaughtExceptions] (main) Error occurred while destroying instance of CLASS bean [types=[io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer, io.debezium.engine.DebeziumEngine$ChangeConsumer<io.debezium.engine.ChangeEvent<java.lang.Object, java.lang.Object>>, java.lang.Object, io.debezium.server.BaseChangeConsumer], qualifiers=[@Default, @Any, @Named("rabbitmq")], target=io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer]: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX_IMPORT_STREAM' in vhost '/', class-id=60, method-id=40) [Error Occurred After Shutdown]
debezium | 2024-04-19 11:06:41,254 INFO [io.quarkus] (main) debezium-server-dist stopped in 0.042s
您的 RabbitMQ 节点中似乎没有名为
XXX_IMPORT_STREAM
的交换。
修复:
XXX_IMPORT_STREAM
(直接交易所)
在您的 RabbitMQ 节点中