编辑:几年前我发现了另一个问题(How to populate the cache in CachedSchemaRegistryClient without making a call to register a new schema?)。它提到CachedSchemaRegistryClient需要将架构注册到实际的注册表中才能进行缓存,并且还没有解决方案。因此,将我的问题留在这里,但也希望让我也知道。
我正在研究一个程序,该程序从kafka中提取字节数组,对其进行解密(因此在kafka上是安全的),将字节转换为字符串,将json字符串转换为json对象,然后从模式中查找模式注册表(利用CachedSchemaRegistryClient),使用从注册表元数据中检索到的架构中的架构,将json字节转换为通用记录,然后将该通用记录序列化为avro字节。
运行一些测试后,似乎CachedSchemaRegistyClient是主要的性能消耗。但是据我所知,这是获取架构元数据的最佳方法。我是否实施了一些较差的操作,或者是否可以通过其他方式使其与我的用例一起使用?]
这是解密后处理所有内容的代码:
package org.apache.flink; import avro.fullNested.FinalMessage; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import serializers.AvroFinishedMessageSerializer; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; public class JsonToAvroBytesParser implements FlatMapFunction<String, byte[]> { private transient CachedSchemaRegistryClient schemaRegistryClient; private transient AvroFinishedMessageSerializer avroFinishedMessageSerializer; private String schemaUrl; private Integer identityMaxCount; public JsonToAvroBytesParser(String passedSchemaUrl, int passedImc){ schemaUrl = passedSchemaUrl; identityMaxCount = passedImc; } private void ensureInitialized() { if (schemaUrl.equals("")) { schemaUrl = "https://myschemaurl.com/"; } if(identityMaxCount == null){ identityMaxCount = 5; } if(schemaRegistryClient == null){ schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, identityMaxCount); } if(avroFinalMessageSerializer == null){ avroFinalMessageSerializer = new AvroFinalMessageSerializer(FinalMessage.class); } } @Override public void flatMap(String s, Collector<byte[]> collector) throws Exception { ensureInitialized(); Object obj = new JSONParser().parse(s); JSONObject jsonObject = (JSONObject) obj; try { String headers = jsonObject.get("headers").toString(); JSONObject body = (JSONObject) jsonObject.get("requestBody"); if(headers != null && body != null){ String kafkaTopicFromHeaders = "hard_coded_name-value"; //NOTE: this schema lookup has serious performance issues. SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicFromHeaders); //TODO: need to implement recovery method if schema cannot be reached. JsonAvroConverter converter = new JsonAvroConverter(); GenericRecord specificRecord = converter.convertToGenericDataRecord(body.toJSONString().getBytes(), new Schema.Parser().parse(schemaMetadata.getSchema())); byte[] bytesToReturn = avroFinishedMessageSerializer.serializeWithSchemaId(schemaMetadata, specificRecord); collector.collect(bytesToReturn); } else { System.out.println("json is incorrect."); } } catch (Exception e){ System.out.println("json conversion exception caught"); } } }
感谢您提前提供帮助!
编辑:几年前我发现了另一个问题(如何在不调用注册新模式的情况下在CachedSchemaRegistryClient中填充缓存?)。它提到了...
似乎getLatestSchemaMetadata方法未使用缓存。如果您希望您的调用使用缓存来提高性能,则可以重新组织程序,以使用其他确实使用缓存的方法之一,例如按ID查找架构或按名称使用定义字符串注册架构。