我正在尝试做的事情:
我有一个模式注册表,我希望在保存之前检查我的消息。 它可以与设置为 JSON_SR 的输出记录值格式一起使用
我的问题是从 SQS 拉取时的消息格式。我只能将字符串作为消息正文发送到 SQS,但我需要 JSON。目前,body 字段中的所有消息都只是 JSON 消息的字符串表示形式。
我发送的数据,序列化为JSON字符串,因为SQS无法接受messageBody中的JSON
{
"username": "name",
"property": "test prop",
"created_utc_date": "2024-04-18T08:00:00Z"
}
从 SQS 中提取值时填充了很多字段,但我们可以使用 SMT 来提取我的消息所在的“正文”。提取的消息只是一个 JSON 字符串
"{\"username\":\"name\",\"property\":\"test prop\",\"created_utc_date\":\"2024-04-18T08:00:00Z\"}"
由于出现错误,因此无法保存。
“连接器无法向架构注册表注册新架构,因为它与同一主题的现有架构不兼容。”
我查看了很多可能的解决方案、SMT、转换器,但我没有设法让任何东西发挥作用,有没有一种方法可以在某个时候以某种方式反序列化该数据,以便连接器可以摄取它?
我唯一发现的是这个https://www.confluence.io/hub/redhatinsights/expandjsonsmt/但是该插件无法添加到Confluence Cloud。
所以我没能做到这一点。然而,我也做了类似的事情。 由于我找不到使用连接器以正确的格式将所需的消息发送到 Kafka 中正确主题的方法,因此我放弃了它并将其替换为 AWS Lambda 函数。 AWS Lambda 现在由 SQS 触发。使用 lambda,我可以像将消息发送到 Kafka 之前一样格式化、序列化和更改消息。
如果其他人想尝试使用这种解决方案,这里是我的 Lambda 代码
const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
exports.handler = async function (event, context) {
const eventRecords = event.Records;
await Promise.all(eventRecords.map(async (record) => {
const message = JSON.parse(record.body);
const topic = message.topic;
await publishToKafka(message, topic);
}));
}
publishToKafka = async function (message, kafkaTopic) {
const kafkaBrokers = [process.env.KAFKA_DEV_CLUSTER];
const schemaRegistryUrl = process.env.SCHEMA_REGISTRY_URL;
const registry = new SchemaRegistry({
host: schemaRegistryUrl,
auth: {
username: process.env.SCHEMA_REGISTRY_DEV_API_KEY,
password: process.env.SCHEMA_REGISTRY_DEV_API_SECRET,
},
})
// Just the way we name schemas
const schemaName = kafkaTopic + '-value';
const ssl = true;
const kafka = new Kafka({
brokers: kafkaBrokers,
ssl,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_DEV_API_KEY,
password: process.env.KAFKA_DEV_API_SECRET,
},
securityProtocol: 'SASL_SSL'
});
const producer = kafka.producer();
await producer.connect();
try {
const id = await registry.getLatestSchemaId(schemaName);
const encodedPayload = await registry.encode(id, message)
await producer.send({
topic: kafkaTopic,
messages: [{ value: encodedPayload }]
});
console.log('Message sent to Kafka');
return 'Message sent to Kafka';
} catch (error) {
console.error('Error sending message to Kafka:', error);
return 'Error sending message to Kafka';
} finally {
await producer.disconnect();
}
};
所以最终我的工作流程非常简单,将消息发送到 SQS,然后 lambda 函数处理这些消息并将它们发送到 kafka。