Pyspark-当读取带有多个json的文本文件时,在火花流传输时无法解码JSON对象

问题描述 投票:0回答:1

我正在尝试使用类似于以下格式的多个JSON读取文本文件-

{"availableDocks": 32, "totalDocks": 39, "city": "", "altitude": "", "stAddress2": "", "longitude": -73.99392888, "lastCommunicationTime": "2017-04-24 03:14:09 PM", "postalCode": "", "statusValue": "In Service", "testStation": false, "stAddress1": "W 52 St & 11 Ave", "stationName": "W 52 St & 11 Ave", "landMark": "", "latitude": 40.76727216, "statusKey": 1, "availableBikes": 2, "id": 72, "location": ""}
{"availableDocks": 3, "totalDocks": 33, "city": "", "altitude": "", "stAddress2": "", "longitude": -74.00666661, "lastCommunicationTime": "2017-04-24 03:12:49 PM", "postalCode": "", "statusValue": "In Service", "testStation": false, "stAddress1": "Franklin St & W Broadway", "stationName": "Franklin St & W Broadway", "landMark": "", "latitude": 40.71911552, "statusKey": 1, "availableBikes": 30, "id": 79, "location": ""}

使用下面代码的代码

from pyspark.sql import SQLContext, Row
from pyspark.streaming import StreamingContext
import json
ssc = StreamingContext(sc, 60)
streams=ssc.textFileStream('hdfs:///test_data')

parsed = streams.map(lambda v: json.loads(v))  
def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']
def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        sqlContext = getSqlContextInstance(rdd.context)

        # Convert RDD[String] to RDD[Row] to DataFrame
        #using spark 1.3 hence jsonRDD which is believe is similar to read.json?
        df = sqlContext.jsonRDD(rdd)

        # Creates a temporary view using the DataFrame

        print df.show()
    except Exception as e:
    print "in exception"
    print str(e)
        pass

parsed.foreachRDD(process)
ssc.start()  
ssc.awaitTermination() 

但是,运行此代码时,我无法解码JSON对象。我使用了JSON lint,并检查了两个json的格式是否正确

python apache-spark pyspark
1个回答
0
投票

Json数据:

{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"}

{"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"}

{"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"}

{"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"}

从上面的json数据读取的Pyspark代码:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)

stream_data = ssc.textFileStream("/filepath/")

def readMyStream(rdd):
  if not rdd.isEmpty():
    df = spark.read.json(rdd)
    print('Started the Process')
    print('Selection of Columns')
    df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull())
    df.show()


stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()
© www.soinside.com 2019 - 2024. All rights reserved.