给定messageId的流数据中的缓冲消息

问题描述 投票:0回答:1

使用案例:我有消息有messageId,多个消息可以有相同的消息ID,这些消息存在于由messageId分区的流管道(如kafka)中,所以我确保所有具有相同messageId的消息都将进入同一个分区。

所以我需要编写一个应该缓冲消息一段时间(比如说1分钟)的作业,然后将具有相同messageId的所有消息组合到单个大消息中。

我认为可以使用spark Datasets和spark sql(或其他东西?)来完成。但我找不到任何关于如何为给定的消息ID存储消息一段时间的示例/文档,然后对这些消息进行聚合。

apache-kafka streaming spark-streaming buffering apache-samza
1个回答
0
投票

我想你要找的是Spark Streaming。 Spark有一个Kafka Connector,可以链接到Spark Streaming Context。

这是一个非常基本的示例,它将在1分钟的间隔内为给定主题集中的所有消息创建RDD,然后通过消息ID字段对它们进行分组(当然,您的值序列化器必须公开这样的getMessageId方法)。

SparkConf conf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.minutes(1));

Map<String, Object> params = new HashMap<String, Object>() {{
    put("bootstrap.servers", kafkaServers);
    put("key.deserializer", kafkaKeyDeserializer);
    put("value.deserializer", kafkaValueDeserializer);
}};

List<String> topics = new ArrayList<String>() {{
    // Add Topics
}};

JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, params)
    );

stream.foreachRDD(rdd -> rdd.groupBy(record -> record.value().getMessageId()));

ssc.start();
ssc.awaitTermination(); 

还有其他几种方法可以在流API中对消息进行分组。查看文档以获取更多示例。

© www.soinside.com 2019 - 2024. All rights reserved.