手动创建pyspark数据框

问题描述 投票:0回答:7

我正在尝试根据某些数据手动创建 pyspark 数据框:

row_in = [(1566429545575348), (40.353977), (-111.701859)]
rdd = sc.parallelize(row_in)
schema = StructType(
    [
        StructField("time_epocs", DecimalType(), True),
        StructField("lat", DecimalType(), True),
        StructField("long", DecimalType(), True),
    ]
)
df_in_test = spark.createDataFrame(rdd, schema)

当我尝试显示数据框时,这会出现错误,所以我不确定如何执行此操作。

但是,Spark 文档 对我来说似乎有点复杂,当我尝试遵循这些说明时,我遇到了类似的错误。

有人知道该怎么做吗?

pyspark
7个回答
120
投票

简单的数据框创建:

df = spark.createDataFrame(
    [
        (1, "foo"),  # create your data here, be consistent in the types.
        (2, "bar"),
    ],
    ["id", "label"]  # add your column names here
)

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- label: string (nullable = true)

df.show()
+---+-----+                                                                     
| id|label|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+

根据官方文档

  • 当 schema 是列名列表时,每列的类型将从数据中推断出来。 (上面的例子↑)
  • 当模式为
    pyspark.sql.types.DataType
    或数据类型字符串时,它必须与真实数据匹配。 (示例如下↓)
# Example with a datatype string
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],  
    "id int, label string",  # add column names and types here
)

# Example with pyspark.sql.types
from pyspark.sql import types as T
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],
    T.StructType(  # Define the whole schema within a StructType
        [
            T.StructField("id", T.IntegerType(), True),
            T.StructField("label", T.StringType(), True),
        ]
    ),
)


df.printSchema()
root
 |-- id: integer (nullable = true)  # type is forced to Int
 |-- label: string (nullable = true)

此外,您可以从 Pandas 数据框创建数据框,模式将从 Pandas 数据框的类型推断:

import pandas as pd
import numpy as np


pdf = pd.DataFrame(
    {
        "col1": [np.random.randint(10) for x in range(10)],
        "col2": [np.random.randint(100) for x in range(10)],
    }
)


df = spark.createDataFrame(pdf)

df.show()
+----+----+
|col1|col2|
+----+----+
|   6|   4|
|   1|  39|
|   7|   4|
|   7|  95|
|   6|   3|
|   7|  28|
|   2|  26|
|   0|   4|
|   4|  32|
+----+----+

7
投票

详细阐述/构建@Steven的答案:

field = [
    StructField("MULTIPLIER", FloatType(), True),
    StructField("DESCRIPTION", StringType(), True),
]
schema = StructType(field)
multiplier_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)

将创建一个空白数据框。

我们现在可以简单地向其中添加一行:

l = [(2.3, "this is a sample description")]
rdd = sc.parallelize(l)
multiplier_df_temp = spark.createDataFrame(rdd, schema)
multiplier_df = wtp_multiplier_df.union(wtp_multiplier_df_temp)

7
投票

这个答案演示了如何使用

createDataFrame
create_df
toDF
创建 PySpark DataFrame。

df = spark.createDataFrame([("joe", 34), ("luisa", 22)], ["first_name", "age"])

df.show()
+----------+---+
|first_name|age|
+----------+---+
|       joe| 34|
|     luisa| 22|
+----------+---+

您还可以传递

createDataFrame
RDD 和模式来构造更精确的 DataFrame:

from pyspark.sql import Row
from pyspark.sql.types import *

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])

schema = schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), False)])

df = spark.createDataFrame(rdd, schema)

df.show()
+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+
我的

Quinn

 项目中的 
create_df 可以实现两全其美 - 它简洁且描述性充分:

from pyspark.sql.types import *
from quinn.extensions import *

df = spark.create_df(
    [("jose", "a"), ("li", "b"), ("sam", "c")],
    [("name", StringType(), True), ("blah", StringType(), True)]
)

df.show()
+----+----+
|name|blah|
+----+----+
|jose|   a|
|  li|   b|
| sam|   c|
+----+----+

toDF
与其他方法相比没有任何优势:

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])
df = rdd.toDF()
df.show()
+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+

3
投票

带格式

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [
        (1, "foo"),
        (2, "bar"),
    ],
    StructType(
        [
            StructField("id", IntegerType(), False),
            StructField("txt", StringType(), False),
        ]
    ),
)
print(df.dtypes)
df.show()

1
投票

扩展@Steven的答案:

data = [(i, 'foo') for i in range(1000)] # random data

columns = ['id', 'txt']    # add your columns label here

df = spark.createDataFrame(data, columns)

注意:当

schema
是列名列表时,每列的类型将从数据中推断出来。

如果您想专门定义架构,请执行以下操作:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType(), True), StructField("txt", StringType(), True)])
df1 = spark.createDataFrame(data, schema)

输出:

>>> df1
DataFrame[id: int, txt: string]
>>> df
DataFrame[id: bigint, txt: string]

0
投票

对于初学者,从文件导入数据的完整示例:

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    ShortType,
    StringType,
    StructType,
    StructField,
    TimestampType,
)

import os

here = os.path.abspath(os.path.dirname(__file__))


spark = SparkSession.builder.getOrCreate()
schema = StructType(
    [
        StructField("id", ShortType(), nullable=False),
        StructField("string", StringType(), nullable=False),
        StructField("datetime", TimestampType(), nullable=False),
    ]
)

# read file or construct rows manually
df = spark.read.csv(os.path.join(here, "data.csv"), schema=schema, header=True)

0
投票

与其他答案类似:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

df = spark.createDataFrame(
    data=[
        Row(id=1, label="foo"),
        Row(id=2, label="bar")
    ],
    schema=StructType([
        StructField(name="id", dataType=IntegerType(), nullable=True),
        StructField(name="label", dataType=StringType(), nullable=True)
    ])
)
© www.soinside.com 2019 - 2024. All rights reserved.