我正在尝试使用spark streaming以程序的形式实现下面的kafka-console-consumer命令(运行良好并输出预期的json数据)功能。
kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181 --topic mytopic --formatter CustomAvroMessageFormatter --property "formatter-schema-file= schema.txt" > /var/tmp/myfile.json&
我能够以编程方式使用spark流式传输来自上述主题的消息,因为下面的scala代码效果很好:
object ConsumeTest {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "ConsumeKafkaMsg")
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(1))
//To read from server
val kafkaParams = Map("metadata.broker.list" -> "brokername:9092")
val topics = List("mytopic").toSet
val lines = KafkaUtils.createDirectStream[
String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
但是上面的程序读取二进制格式的消息类似于下面的内容:
��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!�Vcusomtername client
2X3XXXXXX-sasadsad-4673-212c-dsdsadsad
value
,"question"logName
successstԇ���V
对上面的命令使用自定义avro格式化程序使用avro架构将二进制格式转换为json格式。我无法找到如何在我的上述程序中使用命令等效的avro消息格式化程序,这是很重要的。
下面是可能的avro架构(schema.txt)供参考(实际上是v复杂的可用处理):
{
"type" : "record",
"namespace" : "mynamespace",
"name" : "myname",
"fields" : [{
"name":"field1",
"type":{
"type":"record",
"name":"Eventfield1",
"fields":[{.....}]
}]
]
}
请帮助实现相同的。
你有两个选择(两者都需要相当强烈的编码,这是好的,不是吗?))。
StringDecoder
。foreach
运算符对其进行转换,或使用map
转换将转换应用为管道的一部分。您也可以考虑使用spark-avro库。