我正在尝试从本地文本文件流。
conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
sc = SparkContext(conf=conf) # .getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
lines = ssc.textFileStream("file:///home/antonis/repos/GLASSEAS/Anomaly_Detector/dataset/")
lines.pprint()
ssc.start()
ssc.awaitTermination()
从之前在stackoverflow上的回复来看,似乎没有任何工作。
我已经尝试了一个空的 dataset
文件夹,然后传递txt文件,我尝试了用 file:/
但什么都没用。
谁能帮我解决这个问题?
你需要用ssc.start和ssc. awaitTermination调用来完成你的代码示例。
...
lines = ssc.textFileStream("/home/antonis/repos/GLASSEAS/Anomaly_Detector/dataset/")
.pprint()
ssc.start()
ssc.awaitTermination()
这个 火花医生 是一个很好的开始(在他们的例子中,他们使用socketTextStream,但其他一切都适用于你的情况)。
你能不能用spark session代替spark context,像这样尝试一下?
sparkSession = SparkSession.builder().config(conf).getOrCreate()
lines = sparkSession.readStream.textFile("file:///home/antonis/repos/GLASSEAS/Anomaly_Detector/dataset/")