将数据从Spark结构化流保存到Postgresql(Python)

问题描述 投票:0回答:1

我很麻烦执行以下任务:

我通过Spark Structured流从Kafka消耗随机消息(带有温度数字)。以下是spark DataFrame的屏幕截图:

Data consumed from Kafka topic

我的问题是如何遍历数据帧的每一行并将其值传递给Postgresql(使用python代码)。

请找到我到目前为止的代码片段。

import sys
import json
import psycopg2
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder.appName("spark_streaming_sql").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.streams.active

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka_bitnami:39092").option("subscribe", "temperature_telemetries").option("startingOffsets", "latest").load()

data.printSchema()

schema = StructType([
    StructField("id", DoubleType()),
    StructField("timestamp_value", DoubleType()), 
    StructField("device_id", DoubleType()), 
    StructField("temperature_value", DoubleType()),
    StructField("comment", StringType())])

telemetry_dataframe = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

# (1) Approach 1:
telemetry_dataframe.createOrReplaceTempView("updates")

basic_selection = spark.sql("select * from updates where temperature_value>-0.2")

#--------------------------------------------------------------------------------------

# (2) Approach 2:

# basic_selection = telemetry_dataframe.select("*").where("temperature_value>-0.2")

# -------------------------------------------------------------------------------------

def sendPartition(iter):

    conn = psycopg2.connect(database='temp_messages', user='<custom_username>', password='<custom_password>', host='postgresql', port='5432')

    cur = conn.cursor()

    cur.execute("""CREATE TABLE IF NOT EXISTS temperatures_sql (device_id integer, temperature_value numeric, comment varchar(500), timestamp_value numeric);""")

    fields = ['device_id','temperature_value','comment','timestamp_value']

    for item in iter:

       my_data = [item[field] for field in fields]

       cur.execute("INSERT INTO temperatures_sql (device_id, temperature_value, comment, timestamp_value) VALUES (%s, %s, %s, %s)", tuple(my_data)) #tuple(my_data) #temperatures_four

    conn.commit()

    conn.close()

#########################################
#               QUERIES                 #
#########################################

query_1 = telemetry_dataframe \
    .writeStream \
    .foreach(sendPartition) \
    .outputMode("append") \
    .format("console") \
    .start()

query_1.awaitTermination()

注1:一切都在Docker上运行,这就是为什么我使用简单的Python函数而不是jdbc驱动程序连接到Postgresql的原因。

注2

:我知道我必须使用foreach()或foreachBatch()。但是,我不知道如何迭代火花DataFrame的每一行,如上面的屏幕快照所示。

谢谢您的帮助。

我很麻烦执行以下任务:我通过Spark结构化流媒体从Kafka消费了随机消息(带有温度数字)。以下是spark DataFrame的屏幕截图:...

python postgresql apache-spark spark-structured-streaming
1个回答
0
投票

您可以遍历每一行,其余逻辑保持不变

© www.soinside.com 2019 - 2024. All rights reserved.