我正在做一个 PoC 来验证我是否可以使用 Citrus 框架进行自动化测试,这需要检查 Kafka 主题中是否已发布一些数据。
详情如下:
这是要验证的消息示例:
{
"key": "The_Message_Key",
"value": {
"eventId": "The_Event_Id",
"serviceName": "fileService",
"serviceId": "fileServiceId",
"status": "succeeded",
... other fields
}
}
这是我初始化 KafkaEndpoint 的方式:
KafkaEndpointBuilder()
.server("boostrap.server-1,boostrap.server-2,boostrap.server-3")
.topic("events.topic.with.avro.values")
.consumerProperties(consumerPropertiesMap)
.valueDeserializer(KafkaAvroDeserializer::class.java)
.build()
这就是我执行验证的方式
testCaseRunner.then(receive()
.endpoint(eventsTopicEndpoint)
.message()
.type(MessageType.JSON)
.validate(jsonPath()
.expression("$.eventId", contains(file.name))))
我尝试将反序列化器更改为默认的 StringSerializer 但出现类似的错误。我想使用 Citrus 提供的 json 验证器来对 Message 值的特定字段执行验证。
我实施了 OneCricketeer 建议的解决方案:
以下是代码片段:
Kafka端点设置:
KafkaEndpointBuilder()
.server("boostrap.server-1,boostrap.server-2,boostrap.server-3")
.topic("events.topic.with.avro.values")
.consumerProperties(consumerPropertiesMap)
.valueDeserializer(AvroToStringDeserializer::class.java)
.build()
AvroToString反序列化器:
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.header.Headers
class AvroToStringDeserializer : KafkaAvroDeserializer() {
override fun deserialize(topic: String, data: ByteArray): String {
val record = super.deserialize(topic, data) as GenericRecord
return record.toString()
}
override fun deserialize(topic: String, headers: Headers, data: ByteArray): String {
val record = super.deserialize(topic, headers, data) as GenericRecord
return record.toString()
}
override fun deserialize(topic: String, data: ByteArray, schema: Schema): String {
val record = super.deserialize(topic, data, schema) as GenericRecord
return record.toString()
}
override fun deserialize(topic: String, headers: Headers, data: ByteArray, schema: Schema): String {
val record = super.deserialize(topic, headers, data, schema) as GenericRecord
return record.toString()
}
}