我正在尝试使用类似于以下格式的多个JSON读取文本文件-
{"availableDocks": 32, "totalDocks": 39, "city": "", "altitude": "", "stAddress2": "", "longitude": -73.99392888, "lastCommunicationTime": "2017-04-24 03:14:09 PM", "postalCode": "", "statusValue": "In Service", "testStation": false, "stAddress1": "W 52 St & 11 Ave", "stationName": "W 52 St & 11 Ave", "landMark": "", "latitude": 40.76727216, "statusKey": 1, "availableBikes": 2, "id": 72, "location": ""}
{"availableDocks": 3, "totalDocks": 33, "city": "", "altitude": "", "stAddress2": "", "longitude": -74.00666661, "lastCommunicationTime": "2017-04-24 03:12:49 PM", "postalCode": "", "statusValue": "In Service", "testStation": false, "stAddress1": "Franklin St & W Broadway", "stationName": "Franklin St & W Broadway", "landMark": "", "latitude": 40.71911552, "statusKey": 1, "availableBikes": 30, "id": 79, "location": ""}
使用下面代码的代码
from pyspark.sql import SQLContext, Row
from pyspark.streaming import StreamingContext
import json
ssc = StreamingContext(sc, 60)
streams=ssc.textFileStream('hdfs:///test_data')
parsed = streams.map(lambda v: json.loads(v))
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
sqlContext = getSqlContextInstance(rdd.context)
# Convert RDD[String] to RDD[Row] to DataFrame
#using spark 1.3 hence jsonRDD which is believe is similar to read.json?
df = sqlContext.jsonRDD(rdd)
# Creates a temporary view using the DataFrame
print df.show()
except Exception as e:
print "in exception"
print str(e)
pass
parsed.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
但是,运行此代码时,我无法解码JSON对象。我使用了JSON lint,并检查了两个json的格式是否正确
Json数据:
{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"}
{"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"}
{"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"}
{"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"}
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
stream_data = ssc.textFileStream("/filepath/")
def readMyStream(rdd):
if not rdd.isEmpty():
df = spark.read.json(rdd)
print('Started the Process')
print('Selection of Columns')
df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull())
df.show()
stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()