仅在关闭应用程序后才能在套接字中发送DStream的rdds

问题描述 投票:0回答:1

我正在尝试Spark Streaming设置,其中应用程序接收文本(行),对其进行计数,然后将结果发送回服务器。

这就是我现在正在尝试的:

sc = SparkContext('local[4]', 'StreamTest')
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 2)

lines = ssc.socketTextStream('localhost', 9999)
words = lines.flatMap(lambda line: line.split())

pairs = words.map(lambda word: (word, 1))
word_counts = pairs.reduceByKey(lambda x, y: x + y)

def save_rdd(time, rdd):
    if rdd.isEmpty():
        return

    print('1')
    con = socket.socket()
    con.connect(('localhost', 9999))
    con.send('test')
    con.close()
    print('2')
    # rdd.saveAsTextFile('ws' + time.strftime('%s'))

word_counts.foreachRDD(save_rdd)

ssc.start()
ssc.awaitTermination()

我使用nc -lk 9999启动一个服务器,我发送一个随机的句子,期待一个'test'字符串。

问题是,我认为它确实有效('1''2'被打印)但我无法在服务器上看到'test'。奇怪的是,当我结束应用程序时,我可以在服务器上看到它。

为什么会这样?甚至可以在套接字中发送DStream结果吗?我正在尝试做一些实时的情绪分析,我想向我的服务器报告。

sockets apache-spark pyspark spark-streaming
1个回答
0
投票

我认为你必须在发送数据后刷新数据,下面是一个执行它的java代码

 Socket s=new Socket("localhost",6666);  
 DataOutputStream dout=new DataOutputStream(s.getOutputStream());  
 dout.writeUTF("Hello Server");  
 dout.flush();  
 dout.close();  
 s.close();  
© www.soinside.com 2019 - 2024. All rights reserved.