如何修复pyspark到elasticsearch腐败记录问题

问题描述 投票:0回答:1

因此,我正在尝试处理来自kafka流的Spark中的数据,然后将其发送到弹性搜索,以便我可以在Kibana中将其可视化。但是,当我在Kibana中看到数据时,数据显示为损坏的记录而不是它自己的字段。

以下是处理数据并将数据发送到elasticsearch的代码。进入kafka的数据只是来自twitter的文本数据,我正在将几个函数应用到它们上面。

def process(time, rdd):
  print("========= %s =========" % str(time))
  try:
    sqlContext = getSqlContextInstance(rdd.context)
    df = sqlContext.read.json(rdd)
    results = df.toJSON().map(lambda j: json.loads(j)).collect()
    send_elastic(results,"index1","document")
  except:
    pass

def main():
  createIndex("index1")
  sc = SparkContext(appName="PythonStreaming", master="local[2]")
  sqlContext = SQLContext(sc)
  ssc = StreamingContext(sc, 10)
  kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'twitter':1})
  tweets = kafkaStream.map(lambda x: json.loads(x[1])).map(lambda x: json.loads(x))
  sentiments = tweets.map(lambda x: {'tweet': x['text'],'candidate': get_candidate(x['text']),'sentiment':sentiment(x['text'])})
  sentiments.foreachRDD(process)
  ssc.start()
  ssc.awaitTermination()

这就是数据在Kibana中出现的情况。你可以看到它显示为一个腐败的记录,而不是我的意图,它是一个文本,情感和候选领域。我很感激能得到的任何帮助,谢谢。

output in kibana

python apache-spark elasticsearch kibana
1个回答
0
投票

默认情况下,此行df = sqlContext.read.json(rdd)期望在单行中具有单个JSON对象。

检查此链接为reference

您的json文件每行必须有一个文档。对于例如如下:

{ "tweet": "RT @humanidee: @john_arcadian @wikileaks @marcorubio Tweet between Bernie and Hilary", "candidate": "Bernie", "sentiment": "negative"}
{ "tweet": "RT @lissbrantley: Outside the Bernie rally in #Concord and @MSNBC is out here asking everyoneif they believe in capitalism and if not @Ber..", "candidate": "bernie", "sentiment": "neutral" }

现在,如果要处理多行,则需要添加以下代码

df = spark.read.option("multiline", "true").json("multi.json")
mdf.show(false)

作为补充说明,请确保JSON对象的格式正确。希望这可以帮助!

© www.soinside.com 2019 - 2024. All rights reserved.