使用 Python SDK 2.49.0 中的 ReadFromKafka(调用 Javaharness)会在使用
with_metadata=True
时引发编码错误:
java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@6e23419d' with coder 'SchemaCoder<Schema: Fields:
Field{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field{name=key, description=, type=BYTES, options={{}}}
Field{name=value, description=, type=BYTES, options={{}}}
Field{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value BYTES NOT NULL> NOT NULL>, options={{}}}
Field{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{headers=6, timestampTypeName=8, partition=1, offset=2, topic=0, value=5, key=4, timestamp=3, timestampTypeId=7}
对于 Azure 事件中心和 Confluence 集群,管道可以使用
with_metadata=False
正确运行。
我使用默认编码器作为文档说明:https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html#apache_beam.io.kafka.ReadFromKafka
管道片段:
with beam.Pipeline(options=pipeline_options) as p:
reviews = (
p
| ReadFromKafka(
consumer_config={
"bootstrap.servers": stream_bootstrap_servers,
"group.id": stream_group_id,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.jaas.config": stream_sasl_jaas_config,
},
max_num_records=stream_max_num_records,
start_read_time=stream_start_read_time,
topics=[stream_topic],
with_metadata=True,
)
)
reviews | "Logging messages" >> beam.Map(logging.info)