因此,我正在尝试处理来自kafka流的Spark中的数据,然后将其发送到弹性搜索,以便我可以在Kibana中将其可视化。但是,当我在Kibana中看到数据时,数据显示为损坏的记录而不是它自己的字段。
以下是处理数据并将数据发送到elasticsearch的代码。进入kafka的数据只是来自twitter的文本数据,我正在将几个函数应用到它们上面。
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
df = sqlContext.read.json(rdd)
results = df.toJSON().map(lambda j: json.loads(j)).collect()
send_elastic(results,"index1","document")
except:
pass
def main():
createIndex("index1")
sc = SparkContext(appName="PythonStreaming", master="local[2]")
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'twitter':1})
tweets = kafkaStream.map(lambda x: json.loads(x[1])).map(lambda x: json.loads(x))
sentiments = tweets.map(lambda x: {'tweet': x['text'],'candidate': get_candidate(x['text']),'sentiment':sentiment(x['text'])})
sentiments.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
这就是数据在Kibana中出现的情况。你可以看到它显示为一个腐败的记录,而不是我的意图,它是一个文本,情感和候选领域。我很感激能得到的任何帮助,谢谢。
默认情况下,此行df = sqlContext.read.json(rdd)
期望在单行中具有单个JSON对象。
检查此链接为reference
您的json文件每行必须有一个文档。对于例如如下:
{ "tweet": "RT @humanidee: @john_arcadian @wikileaks @marcorubio Tweet between Bernie and Hilary", "candidate": "Bernie", "sentiment": "negative"}
{ "tweet": "RT @lissbrantley: Outside the Bernie rally in #Concord and @MSNBC is out here asking everyoneif they believe in capitalism and if not @Ber..", "candidate": "bernie", "sentiment": "neutral" }
现在,如果要处理多行,则需要添加以下代码
df = spark.read.option("multiline", "true").json("multi.json")
mdf.show(false)
作为补充说明,请确保JSON对象的格式正确。希望这可以帮助!