嗨,我正在尝试使用 scala 中的库
akka-stream-alpakka-amqp
在兔子消息中设置标头,但我无法使其工作并读取存储库,我看不到任何在 中添加标头的引用或方法消息。
我试图在参数中添加 headers 属性,但我无法使其工作
val exchangeWitHeaders =
RabbitIntegrationConstants.queueDeclaration.withArguments(Map("x-match" -> "all", "h1" -> "header"))
val writeSettings = AmqpWriteSettings(RabbitIntegrationConstants.connectionProvider)
.withRoutingKey(RabbitIntegrationConstants.queueName)
.withDeclaration(exchangeWitHeaders)
val amqpSinkExchange = AmqpSink.simple(writeSettings)
val textToSend = Vector("test")
Source(textToSend).map(s => ByteString(s)).runWith(amqpSinkExchange)
首先,您需要确保您没有使用
SimpleSink
,以便它可以接受 WriteMessage
而不是 ByteString
val amqpSink = AmqpSink.apply(writeSettings)
Source(textToSend)
.map { s =>
val headers = new java.util.HashMap<String, Object>()
headers.put("header1", "value1")
val basicProperties =
new BasicProperties.Builder()
.headers(headers)
.build()
WriteMessage(ByteString(s))
.withProperties(basicProperties)
}
.runWith(amqpSink)