avro 相关问题

Apache Avro是一个主要用于Apache Hadoop的数据序列化框架。

我如何使用Python远程读取AVRO服务器

我想用python读取AVRO文件, 当我在本地计算机上执行此操作时,效果很好(FastAvro): 使用 open('/home/user/file.avro', 'rb') 作为 fo: avro_reader = 阅读器(fo) 记录在avro_reader中...

回答 1 投票 0

使用 pyspark 读取 avro 文件时提供架构

我正在尝试使用 pyspark 读取 avro 文件。我想在读取文件时提供我自己的架构。下面是示例代码。 json_schema = """ { "type": "记录...

回答 1 投票 0

如何强制maven在Kafka模式注册表中注册所有需要的avros模式?

我有 **Batch ** 模式,其中包含其他模式列表。这些模式在其他模式文件中声明,因此当我将 Batch 发布到 Kafka Schema 注册表时,仅注册 Batch 模式 inori...

回答 1 投票 0

如何将数据集<Row>转换为列表<GenericRecord>

想知道如何将Dataset转换为List。 我正在谈论: org.apache.avro.generic.GenericRecord org.apache.spark.sql.数据集 org.apache.spark.sql.Row 达...

回答 1 投票 0

在 Avro 架构中定义可空列表时遇到问题 - Pub/Sub Google Cloud

我在使用 Avro 架构验证和消息测试时遇到了一个令人费解的问题,特别是在 Google Cloud Pub/Sub 架构中。在我的架构中,我定义了自定义对象列表

回答 1 投票 0

Avro 解码给出 java.io.EOFException

我使用 Apache avro 架构和 Kafka 0.0.8V。我在生产者/消费者端使用相同的模式。架构中没有任何更改。但当我尝试消费

回答 2 投票 0

使用 avro 作为带有 kafka 模式注册表的关键主题

我和我的团队最近遇到了用于主题键的 Avro 架构问题。我们更改了对键的评论,这完全破坏了我们的 Kafka Streams 连接,也破坏了我们主题的压缩......

回答 1 投票 0

从 Java 类访问 avro 模式中的自定义属性

我有一些数据遵循下面给出的 avro 模式。我通过使用 avro-tools 实用程序编译该模式来生成 Java 类。然而,Java 类仅包含标准 getter ...

回答 1 投票 0

AVRO序列化异常UTF8\String

我在不同的服务上有一个kafka生产者和消费者,消费者代码已推出并且工作正常,然后今天我推出了生产者端的更改并得到了序列化异常他...

回答 1 投票 0

avro 动态嵌套映射 - 类似于 jsonschema 对象

在 jsonschema 中,您可以像这样定义对象的通用属性: { "description": "记录 HTTP 交换的原始请求和响应以用于调试目的。", &q...

回答 1 投票 0

如何使用 GenericRecord 在 Avro 中填充嵌套的嵌套记录

假设我有以下模式: { "姓名" : "个人资料", “类型”:“记录”, “字段”:[ {“名字”:“名字”,“...

回答 1 投票 0

如何在kafka生产者和消费者中重试短暂的Avro序列化/反序列化问题?

由于基础设施问题,我偶尔会看到从 avro 架构注册表获取架构超时。 我没有完整的堆栈跟踪,但典型的错误消息是: org.apache.kafka.common.

回答 2 投票 0

查找 AVRO 文件并将内容传递到 ADF 中的 Web 活动

我正在努力将 .AVRO 文件的特定内容正确传递到 Azure 数据工厂管道中的 Web 活动。 我有一个查找活动,查找 .AVRO 文件,然后存储输出。

回答 1 投票 0

Python 读取嵌入到 PCAP 中的 AVRO

我有一个 PCAP 文件,其中包含 AVRO 编码数据作为 TCP 数据包中的有效负载。出于测试目的,我已使用 xxd -r -p test.hex test.bin 将上述有效负载转换为二进制文件。 (迟到了……

回答 1 投票 0

无法反序列化 Avro 记录:获取 ArrayIndexOutOfBoundsException

我正在尝试使用 Pyflink 从 Kafka 读取 Avro 格式 我的程序是这样的: 从 pyflink.datastream 导入 StreamExecutionEnvironment 从 pyflink.datastream.connectors.kafka 导入

回答 1 投票 0

无法从avro主题生成依赖的java类

在我简单的 Maven 应用程序中,我有 3 个 avro 文件: 报告详细信息.avsc { “类型”:“记录”, "name": "报告详细信息", "命名空间": "com.vl...

回答 1 投票 0

Debezium 连接:java.lang.NoClassDefFoundError:com/google/common/base/Ticker

问题描述 我在 Debezium 和 Confluence Schema Registry 上使用 Avro 序列化时遇到了问题。我遵循 Debezium 的官方文档。正如文档中提到的,来自

回答 1 投票 0

使用 Avro Schema 中的数组生成 Kafka 消息

我使用记录数组创建了 avro 模式 [ { “类型”:“记录”, “名称”:“BbbAvro”, “字段”:[ { ...

回答 1 投票 0

无法使用 Avro(自定义 Serdes)序列化 Spring Cloud Streams

我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。下面是我的配置 @豆 公共双功能 我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。以下是我的配置 @Bean public BiFunction<KStream<String, TestRecord>, KStream<String, TestRecord>, KStream<String, ModifiedTestPayload>> process() { return (walletCreated, placementCreated) -> placementCreated .selectKey((key, val) -> val.getClientId()) .join(walletCreated.selectKey((s, testPayload) -> testPayload.getClientId()), (ts, ts1) -> new ModifiedTestPayload(ts.getClientId()), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.of(2, ChronoUnit.MINUTES)), StreamJoined.with(Serdes.String(), CustomSerdes.TestRecord(), CustomSerdes.TestRecord())); } 两个主题都将输入流以产生相同结构的数据。以下是我定义的自定义 Serdes。 public class CustomSerdes { public static Serde<TestRecord> TestRecord() { return new TestRecordSerde(); } public static class TestRecordSerde extends Serdes.WrapperSerde<TestRecord> { public TestRecordSerde() { super(new TestRecordSerializer(), new TestRecordDeserializer()); } } public static class TestRecordSerializer implements Serializer<TestRecord> { private final KafkaAvroSerializer inner; public TestRecordSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, TestRecord data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class TestRecordDeserializer implements Deserializer<TestRecord> { private final KafkaAvroDeserializer inner; public TestRecordDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public TestRecord deserialize(String topic, byte[] data) { return (TestRecord) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } public static Serde<ModifiedTestPayload> ModifiedTestPayload() { return new ModifiedTestPayloadSerde(); } public static class ModifiedTestPayloadSerde extends Serdes.WrapperSerde<ModifiedTestPayload> { public ModifiedTestPayloadSerde() { super(new ModifiedTestPayloadSerializer(), new ModifiedTestPayloadDeserializer()); } } public static class ModifiedTestPayloadSerializer implements Serializer<ModifiedTestPayload> { private final KafkaAvroSerializer inner; public ModifiedTestPayloadSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, ModifiedTestPayload data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class ModifiedTestPayloadDeserializer implements Deserializer<ModifiedTestPayload> { private final KafkaAvroDeserializer inner; public ModifiedTestPayloadDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public ModifiedTestPayload deserialize(String topic, byte[] data) { return (ModifiedTestPayload) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } } 和我的 application.yml 下面 spring: kafka: streams: application-id: test-app cloud: function: definition: process stream: kafka: streams: bindings: process-in-0: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-in-1: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-out-0: producer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: ai.wownettests.kf.kafkastreamtestone.CustomSerdes$ModifiedTestPayloadSerde configuration: schema.registry.url: http://localhost:8081 binder: brokers: localhost:9092 configuration: schema.registry.url: http://localhost:8081 specific.avro.reader: true default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde bindings: process-in-0: destination: testkf.placements.created process-in-1: destination: testkf.wallet.created process-out-0: destination: testkf.client.profile_completed logging: level: root: info 当我启动我的应用程序时,我收到如下错误(我有一些关于该应用程序尝试处理的 kafka 主题的数据) org.apache.kafka.streams.errors.StreamsException: Unable to serialize record. ProducerRecord(topic=[test-app-KSTREAM-KEY-SELECT-0000000003-repartition], partition=[null], timestamp=[1706526131230] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:231) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na] Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient 通过创建自定义 serde 的 bean 修复了这个问题 @Bean public Serde<TestRecord> testRecordSerde() { Map<String, ?> config = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); var r = CustomSerdes.TestRecord(); r.configure(config, false); return r; }

回答 1 投票 0

无法在我的 Maven 构建中获取 org.apache.flink.formats 包

我一直在尝试通过maven构建我的apache flink项目,但由于某种原因我遇到了编译错误。值得注意的是“org.apache.flink.formats 包不存在”...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.