卡夫卡异常在犯下偏移使用commitAsync

问题描述 投票:0回答:2

我的卡夫卡应用程序读取实时流数据,处理并存储到Hive.I我试图去提交使用commitAsync偏移。我得到的例外如下所示:

java.io.NotSerializableException:通过引起org.apache.spark.streaming.kafka010.DirectKafkaInputDStream的对象被可能序列化为RDD操作的封闭件的一部分。这是因为DSTREAM对象被从封闭内参照。请重写这个DSTREAM内RDD操作,以避免这一点。这已被执行,以避免不必要的对象星火任务腹胀。

下面是我的代码的工作流程:

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
                new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
    messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                        return tuple2.value();
                    }
                });

                records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception {
                        if(!rdd.isEmpty()) {
                            methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                        }
                    }
                 });
                ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
        }
    });
    javaStreamContext.start();
    javaStreamContext.awaitTermination();
}

欣赏任何建议。


下面的代码工作和数据处理后提交抵消。但问题是,它正在处理在下面的情况下重复:让声言如下─消费者作业运行和蜂巢表有0的记录和当前偏移(格式 - fromOffest,untilOffset,差异):512 512 0,则我公司生产1000条记录,并通过一次读取34条记录,但没有承诺,我把它打死了512 546 34

我看到,这个时候,34个区域经济共同体已经装载蜂箱表

接下来,我重新启动应用程序。我看到它再次读取34条记录(而不是阅读1000年至1034年= 76 RECs)的,尽管它已经处理他们,并加载到撵512 1512 1000,然后几秒钟后它就会被更新。 1512 1512 0蜂房现在有(34 + 1000 = 1034)

这导致表重复记录(额外34)。正如在代码中提到的,我犯了后,才处理/负载偏移蜂巢表。

请建议。

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));

            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                    return tuple2.value();
                }
            });

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    if(!rdd.isEmpty()) {
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    }
                }
             });

             messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                    ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);                     
                    for (OffsetRange offset : offsetRanges) {
                        System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ "  "+offset.count());
                    }
                     }
              });             
javaStreamContext.start();
javaStreamContext.awaitTermination();

}

java apache-spark apache-kafka spark-streaming
2个回答
0
投票

尝试移动((CanCommitOffsets)messages.inputDStream())commitAsync(offsetRanges)。出foreachRDD块

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
                new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
    messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                        return tuple2.value();
                    }
                });

                records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception {
                        if(!rdd.isEmpty()) {
                            methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                        }
                    }
                 });
        }
    });
     ((CanCommitOffsets)  messages.inputDStream()).commitAsync(offsetRanges);
    javaStreamContext.start();
    javaStreamContext.awaitTermination();
}

0
投票

下面的代码工作。但我不知道这是否犯加工成蜂巢后的偏移,因为commitAsync块是蜂巢存储方法调用之前。

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
    @Override
    public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
    }
});
            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                    return tuple2.value();
                }
            });

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    if(!rdd.isEmpty()) {
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    }
                }
             });

javaStreamContext.start();
javaStreamContext.awaitTermination();

}

这个代码,如果我添加了下面的框打印偏移信息(初始化offsetRanges后刚),它不重新工作,抛出同样的异常

messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {


                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

               rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String,String>>>() {
                   @Override
                   public void call(Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<String,String>> arg0) throws Exception {

                   OffsetRange o = offsetRanges[TaskContext.get().partitionId()];

                   System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
                   }
            });

                ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);

              }
              });

请提供您的意见

© www.soinside.com 2019 - 2024. All rights reserved.