如何在控制台中看到数据框架(相当于结构化流的.show())?

问题描述 投票:0回答:1

我试图看到什么是来作为我的DataFrame... ...

这里是火花代码

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.format("console").start()
time.sleep(10)
query1.awaitTermination()

我得到了进度报告,但很明显,每个触发器的输入行都是0。

2019-08-19 23:45:45 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "a4b26eaf-1032-4083-9e42-a9f2f0426eb7",
  "runId" : "35c2b82a-191d-4998-9c98-17b24f5e3e9d",
  "name" : null,
  "timestamp" : "2019-08-20T06:45:45.458Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketSource[host: 127.0.0.1, port: 9999]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@5f3e6f3"
  }
}

我的TCP服务器正在吐出一些东西,我可以看到它在控制台太 - 但我只是想确定,如果我的火花作业是通过打印出来,但很难做到这一点。

这是我的TCP服务器代码。

import socket
import sys
import csv
import time


port = 9999
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('', port))
server_socket.listen(5)
connection_socket, addr = server_socket.accept()

file_path = "/Users/Downloads/youtube-new/USvideos.csv"
row_count = sum(1 for row in file_path)

with open(file_path, "r") as f:
    reader = csv.reader(f, delimiter="\t")
    while True:
        for i, line in enumerate(reader):
            try:
                print(line)
                data = line[0].encode('utf-8')
                connection_socket.send(data)
                time.sleep(2)
                if (row_count == i-1):
                    break
            except IndexError:
                print("Index error")
                server_socket.close()

server_socket.close()

我可以看到该行被打印出来,所以我至少可以说,这已经接受了连接在localhost:9999这是主机& 端口,我使用的火花工作以及。

这是其中一个 data..

['8mhTWqWlQzU,17.15.11,"Wearing Online Dollar Store Makeup For A Week","Safiya Nygaard",22,2017-11-11T01:19:33.000Z,"wearing online dollar store makeup for a week"|"online dollar store makeup"|"dollar store makeup"|"daiso"|"shopmissa makeup"|"shopmissa haul"|"dollar store makeup haul"|"dollar store"|"shopmissa"|"foundation"|"concealer"|"eye primer"|"eyebrow pencil"|"eyeliner"|"bronzer"|"contour"|"face powder"|"lipstick"|"$1"|"$1 makeup"|"safiya makeup"|"safiya dollar store"|"safiya nygaard"|"safiya"|"safiya and tyler",2922523,119348,1161,6736,https://i.ytimg.com/vi/8mhTWqWlQzU/default.jpg,False,False,False,"I found this online dollar store called ShopMissA that sells all their makeup products for $1 and decided I had to try it out! So I replaced my entire everyday makeup routine with $1 makeup products, including foundation, concealer, eye primer, eyebrow pencil, eyeliner, bronzer, contour, face powder, and lipstick. What do you think? Would you try this?\\n\\nThis video is NOT sponsored!\\n\\nSafiya\'s Nextbeat: https://nextbeat.co/u/safiya\\nIG: https://www.instagram.com/safiyany/\\nTwitter: https://twitter.com/safiyajn\\nFacebook: https://www.facebook.com/safnygaard/\\n\\nAssistant Editor: Claire Wiley\\n\\nMUSIC\\nMind The Gap\\nvia Audio Network\\n\\nSFX\\nvia AudioBlocks"']

括号中的所有内容(注意我其实是在发送 data[0])

apache-spark pyspark spark-streaming pyspark-sql
1个回答
1
投票
from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.queryName("counting").format("memory").outputMode("append").start()
for x in range(5):
  spark.sql("select * from counting").show()
  time.sleep(10)

试试这个,它将向你展示数据,就像方法一样 show() 在spark Sql中做。它将向你展示5组数据,因为我们正在循环5次。

© www.soinside.com 2019 - 2024. All rights reserved.