如何在Kafka中制作墓碑Avro记录?

问题描述 投票:1回答:2

我的接收器属性:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

我的connect-avro-distributed.properties

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

我发送这样的数据:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['10.0.0.0:9092'],
)
message=producer.send('orders', key=b'{"id":1}', value=None)

但是它给出了错误。序列化错误。

python apache-kafka kafka-producer-api confluent kafka-python
2个回答
1
投票

我假设您要生成Avro消息,因此您需要正确地序列化消息。我的示例使用了confluent-kafka-python库。如果尚未安装,可以使用

进行安装
confluent-kafka-python

这是一个示例pip install confluent-kafka-python ,该示例发送具有空值的Avro消息:

AvroConsumer

0
投票

您需要在Avro架构中进行设置,才能通过将from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "type":"record", "name":"myrecord", "fields":[ { "name":"id", "type":[ "null", "int" ], "default":null }, { "name":"product", "type":[ "null", "string" ], "default":null }, { "name":"quantity", "type":[ "null", "int" ], "default":null }, { "name":"price", "type":[ "null", "int" ], "default":null } ] } """ key_schema_str = """ { "type":"record", "name":"key_schema", "fields":[ { "name":"id", "type":"int" } ] } """ def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == '__main__': value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) #value = {"name": "Value"} key = {"id": 1} avroProducer = AvroProducer({ 'bootstrap.servers': '10.0.0.0:9092', 'on_delivery': delivery_report, 'schema.registry.url': 'http://10.0.0.0:8081' }, default_key_schema=key_schema, default_value_schema=value_schema) avroProducer.produce(topic='orders', key=key) avroProducer.flush() 添加为字段的可能类型之一来将Avro字段设置为null

请看Avro文档中的示例:

null

此处{ "type": "record", "name": "yourRecord", "fields" : [ {"name": "empId", "type": "long"}, // mandatory field {"name": "empName", "type": ["null", "string"]} // optional field ] } 在类型上声明为null或字符串。允许将empName字段设置为null。

© www.soinside.com 2019 - 2024. All rights reserved.