我对Spark完全陌生,我正在编写管道以执行一些转换为审核列表的操作。
我的数据示例:
{
"id": 932522712299,
"ticket_id": 12,
"created_at": "2020-02-14T19:05:16Z",
"author_id": 392401450482,
"events": ["{\"id\": 932522713292, \"type\": \"VoiceComment\", \"public\": false, \"data\": {\"from\": \"11987654321\", \"to\": \"+1987644\"}"],
}
我的模式基本上是:
root
|-- id: long (nullable = true)
|-- ticket_id: long (nullable = true)
|-- created_at: string (nullable = true)
|-- author_id: long (nullable = true)
|-- events: array (nullable = true)
| |-- element: string (containsNull = true)
我的转换有几个步骤:
按类型划分事件:注释,标签,更改或更新;
对于找到的每个事件,我必须从根目录添加ticket_id,author_id和created_at;
每种事件类型必须有一个输出。
[基本上,事件数组中的每个对象都是字符串JSON,因为每种类型都有不同的结构-它们之间唯一的公共属性是type
。
通过使用以下代码将数据框转换为字典,我达到了完成一些可怕工作的目标:
audits = list(map(lambda row: row.asDict(), df.collect()))`
comments = []
for audit in audits:
base_info = {'ticket_id': audit['ticket_id'], 'created_at': audit['created_at'], 'author_id': audit['author_id']}
audit['events'] = [json.loads(x) for x in audit['events']]
audit_comments = [
{**x, **base_info}
for x in audit['events']
if x['type'] == "Comment" or x['type'] == "VoiceComment"
]
comments.extend(audit_comments)
也许这个问题听起来有些la脚或懒惰,但我真的很喜欢简单的事情,例如:
感谢您的任何帮助。
您的代码会将完整的事件数据加载到已提交作业的主节点上。处理数据的一种新方法是希望您创建地图简化作业。为此有多种API-它们为作业创建DAG计划,并且该计划仅在调用诸如head or show
之类的特定功能时才会显示。这样的作业将被分发到群集中的所有计算机。
[使用数据框api时,可以使用pyspark.sql.functions
完成日志
与spark.sql dataframe api具有相同的转换
import pyspark.sql.functions as F
df = df.withColumn('event', F.explode(df.events)).drop(df.events)
df = df.withColumn('event', F.from_json(df.event, 'STRUCT <id: INT, type: STRING, public: Boolean, data: STRUCT<from: STRING, to: STRING'))
events = df.filter(df.event.type == 'Comment' || df.event.type == 'VoiceComment')
events.head(100)
当无法使用sql表达式处理数据时,您可以实现普通的用户定义函数-UDF或Pandas UDF