在 Citrus Kafka Endpoint 中接收消息时将 AVRO 负载转换为 JSON

问题描述 投票:0回答:1

我正在做一个 PoC 来验证我是否可以使用 Citrus 框架进行自动化测试,这需要检查 Kafka 主题中是否已发布一些数据。

  • 对于演示,我使用 Kotlin,同时我也使用 Quarkus,因为我在演示的其他测试中需要该框架的一些功能。
  • 测试范围用于验证与外部kafka主题的集成。
  • Kafka 主题以 AVRO 作为值类型,但其内容结构是 JSON。
  • 我已经初始化了 KafkaEndpoint,指定 KafkaAvroDeserializer 作为值反序列化器
  • 我还在消费者属性中提供了架构注册表 URL 和凭据。
  • 我尝试使用 jsonPath 验证来执行基于内容的验证,但出现异常,提示“无法找到正确的消息验证器”。
  • 我想知道是否有办法将消息“转换”或“转换”为 JSON 以便能够执行验证?

详情如下:

这是要验证的消息示例:

{
  "key": "The_Message_Key",
  "value": {
    "eventId": "The_Event_Id",
    "serviceName": "fileService",
    "serviceId": "fileServiceId",
    "status": "succeeded",
    ... other fields
  }
}

这是我初始化 KafkaEndpoint 的方式:

KafkaEndpointBuilder()
    .server("boostrap.server-1,boostrap.server-2,boostrap.server-3")
    .topic("events.topic.with.avro.values")
    .consumerProperties(consumerPropertiesMap)
    .valueDeserializer(KafkaAvroDeserializer::class.java)
    .build()

这就是我执行验证的方式

testCaseRunner.then(receive()
    .endpoint(eventsTopicEndpoint)
    .message()
    .type(MessageType.JSON)
    .validate(jsonPath()
        .expression("$.eventId", contains(file.name))))

这是错误的屏幕截图: TestCaseFailedException: Failed to find proper message validator for message

我尝试将反序列化器更改为默认的 StringSerializer 但出现类似的错误。我想使用 Citrus 提供的 json 验证器来对 Message 值的特定字段执行验证。

kotlin testing apache-kafka quarkus citrus-framework
1个回答
0
投票

我实施了 OneCricketeer 建议的解决方案:

  1. 创建了一个扩展 KafkaAvroDeserializer 的新序列化器。需要 io.confluence 的 citrus-validation-json 依赖项。
  2. 配置 KafkaEndpoint 以使用我的序列化器
  3. 此外,还需要包含 citrus-kafka 和 citrus-validation-json 模块作为依赖项。此外,还需要 citrus-validation-hamcrest 才能使用 hamcrest 作为 json 响应验证的一部分。

以下是代码片段:

Kafka端点设置:

KafkaEndpointBuilder()
    .server("boostrap.server-1,boostrap.server-2,boostrap.server-3")
    .topic("events.topic.with.avro.values")
    .consumerProperties(consumerPropertiesMap)
    .valueDeserializer(AvroToStringDeserializer::class.java)
    .build()

AvroToString反序列化器:

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.header.Headers

class AvroToStringDeserializer : KafkaAvroDeserializer() {

    override fun deserialize(topic: String, data: ByteArray): String {
        val record = super.deserialize(topic, data) as GenericRecord

        return record.toString()
    }

    override fun deserialize(topic: String, headers: Headers, data: ByteArray): String {
        val record = super.deserialize(topic, headers, data) as GenericRecord

        return record.toString()
    }

    override fun deserialize(topic: String, data: ByteArray, schema: Schema): String {
        val record = super.deserialize(topic, data, schema) as GenericRecord

        return record.toString()
    }

    override fun deserialize(topic: String, headers: Headers, data: ByteArray, schema: Schema): String {
        val record = super.deserialize(topic, headers, data, schema) as GenericRecord

        return record.toString()
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.