从json模式构建spark模式

问题描述 投票:2回答:3

我正在尝试构建一个想要显式提供的spark模式,同时创建数据框我可以使用下面生成json模式

from pyspark.sql.types import StructType
# Save schema from the original DataFrame into json:
schema_json = df.schema.json()

这给了我

{"fields":[{"metadata":{},"name":"cloud_events_version","nullable":true,"type":"string"},{"metadata":{},"name":"data","nullable":true,"type":{"fields":[{"metadata":{},"name":"email","nullable":true,"type":"string"},{"metadata":{},"name":"member_role","nullable":true,"type":"string"},{"metadata":{},"name":"reg_source_product","nullable":true,"type":"string"},{"metadata":{},"name":"school_type","nullable":true,"type":"string"},{"metadata":{},"name":"year_in_college","nullable":true,"type":"long"}],"type":"struct"}},{"metadata":{},"name":"event_time","nullable":true,"type":"string"},{"metadata":{},"name":"event_type","nullable":true,"type":"string"},{"metadata":{},"name":"event_type_version","nullable":true,"type":"string"},{"metadata":{},"name":"event_validated_ts","nullable":true,"type":"string"},{"metadata":{},"name":"event_validation_status","nullable":true,"type":"string"},{"metadata":{},"name":"extensions","nullable":true,"type":{"fields":[{"metadata":{},"name":"client_common","nullable":true,"type":{"fields":[{"metadata":{},"name":"adobe_mcid","nullable":true,"type":"string"},{"metadata":{},"name":"adobe_sdid","nullable":true,"type":"string"},{"metadata":{},"name":"auth_state","nullable":true,"type":"string"},{"metadata":{},"name":"uuid","nullable":true,"type":"string"},{"metadata":{},"name":"client_experiments","nullable":true,"type":"string"},{"metadata":{},"name":"client_ip_address","nullable":true,"type":"string"},{"metadata":{},"name":"device_id","nullable":true,"type":"string"},{"metadata":{},"name":"page_name","nullable":true,"type":"string"},{"metadata":{},"name":"referral_url","nullable":true,"type":"string"},{"metadata":{},"name":"url","nullable":true,"type":"string"},{"metadata":{},"name":"user_agent","nullable":true,"type":"string"},{"metadata":{},"name":"uvn","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"source","nullable":true,"type":"string"},{"metadata":{},"name":"validated_message","nullable":true,"type":"string"},{"metadata":{},"name":"year","nullable":true,"type":"integer"},{"metadata":{},"name":"mon","nullable":true,"type":"integer"},{"metadata":{},"name":"day","nullable":true,"type":"integer"},{"metadata":{},"name":"hour","nullable":true,"type":"integer"}],"type":"struct"}

但为此,我需要解析数据帧,这需要一些时间,我试图避免

我能做的一件事是从我们内部的目录中获取所需的模式。这给了类似的东西

[{u'Name': u'cloud_events_version', u'Type': u'string'},
 {u'Name': u'event_type', u'Type': u'string'},
 {u'Name': u'event_time', u'Type': u'string'},
 {u'Name': u'data', u'Type': u'struct<school_type:string,reg_source_product:string,member_role:string,email:string,year_in_college:int>'},
 {u'Name': u'source', u'Type': u'string'},
 {u'Name': u'extensions', u'Type': u'struct<client_common:struct<auth_state:string,client_ip_address:string,client_experiments:string,uvn:string,device_id:string,adobe_sdid:string,url:string,page_name:string,user_agent:string,uuid:string,adobe_mcid:string,referral_url:string>>'},
 {u'Name': u'event_type_version', u'Type': u'string'},
 {u'Name': u'event_validation_status', u'Type': u'string'},
 {u'Name': u'event_validated_ts', u'Type': u'string'},
 {u'Name': u'validated_message', u'Type': u'string'}]

我正在尝试编写一个递归python查询,生成上面的json。逻辑是循环遍历此dict列表,并在类型为字符串时为此字典指定名称和类型

{"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}

但是当类型是struct时,它会创建struct的所有元素的字典列表,并将其分配给type并以递归方式执行,直到找不到任何struct。

我只能鼓起勇气

def structRecursive(columnName,columnType):
    if "struct" not in columnType:
        ColumnDict = {"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
    else:
        structColumnList = []
        structColumnDict = {
            'metadata': {},
            'name': columnName,
            'nullable': True,
            'type': {'fields': structColumnList, 'type': 'struct'}
        }
        if columnType.count('struct<')==1:
            structCol = columnName
            structColList = columnType.encode('utf-8').replace('struct<',
                    '').replace('>', '').split(',')
            for item in structColList:
                fieldName = item.split(':')[0]
                dataType = item.split(':')[1]
                nodeDict = {}
                nodeDict['metadata'] = {}
                nodeDict['name'] = '{}'.format(fieldName)
                nodeDict['nullable'] = True
                nodeDict['type'] = '{}'.format(dataType)
                structColumnList.append(nodeDict)
        else:
            columnName = columnType.replace('struct<','',1).replace('>','').split(':')[0]
            columnType = columnType.split("{}:".format(columnName),1)[1].replace('>','',1)
        return structColumnDict

MainStructList = []
MainStructDict = {'fields': MainStructList, 'type': 'struct'}
for item in ListOfDict :
    columnName = item['Name'].encode('utf-8')
    columnType = item['Type'].encode('utf-8')
    MainStructList.append(structRecursive(columnName,columnType))

当然,这并没有给出理想的结果。很想在这里得到一些建议。

python apache-spark pyspark
3个回答
1
投票

如果我的问题是正确的,您希望解析列的列表并将其转换为描述具有复杂类型的模式的字典。困难的部分是解析表示复杂类型的字符串。首先,我们需要一个从列定义中提取结构条目的方法:

def extract_struct(text):
    stop = 7
    flag = 1
    for c in text[7:]:
        stop += 1
        if c == "<":
            flag += 1
        if c == ">":
            flag -= 1
        if flag == 0:
            return text[:stop], text[stop:]

这将返回提取的结构和列定义中的剩余文本。例如

extract_struct("struct<a:int,b:double>,c:string")

将返回

("struct<a:int,d:double>", "c:string").

其次,我们需要遍历每个列类型并获取struct条目的定义:

def parse(s, node):
    while s != '':
        # Strip column name
        col_name = s.partition(':')[0]
        s = s.partition(':')[2]

        # If column type is struct, parse it as well
        if s.startswith('struct'):
            col_type, s = extract_struct(s)
            node[col_name] = {}
            parse(col_type[7:-1], node[col_name])
        else:
            # Just add column definition
            col_type = s.partition(',')[0]
            node[col_name] = {
                "metadata": {},
                "name": col_name,
                "nullable": True,
                "type": col_type
            }

        # Go to next entry
        s = s.partition(',')[2]

如果列类型很简单,上面的方法只是向模式树中的当前节点添加一个新列,否则它将提取名称和结构,并递归地遍历struct的子条目。现在我们只需要遍历每一列并解析它们。所以在用一种方法包装上面之后:

def build(columns):
    def extract_struct(text):
        stop = 7
        flag = 1
        for c in text[7:]:
            stop += 1
            if c == '<':
                flag += 1
            if c == '>':
                flag -= 1
            if flag == 0:
                return text[:stop], text[stop:]

    def parse(s, node):
        while s != '':
            # Strip column name
            col_name = s.partition(':')[0]
            s = s.partition(':')[2]

            # If column type is struct, parse it as well
            if s.startswith('struct'):
                col_type, s = extract_struct(s)
                node[col_name] = {}
                parse(col_type[7:-1], node[col_name])
            else:
                # Just add column definition
                col_type = s.partition(',')[0]
                node[col_name] = {
                    "metadata": {},
                    "name": col_name,
                    "nullable": True,
                    "type": col_type
                }

            # Go to next entry
            s = s.partition(',')[2]

    schema = {}
    for column in columns:
        parse("{}:{}".format(column['Name'], column['Type']), schema)
    return schema

现在,如果您在示例列表上运行它,您将获得以下字典(很容易转换为列列表,但顺序并不重要):

{
  "cloud_events_version": {
    "nullable": true, 
    "type": "string", 
    "name": "cloud_events_version", 
    "metadata": {}
  }, 
  "event_type": {
    "nullable": true, 
    "type": "string", 
    "name": "event_type", 
    "metadata": {}
  }, 
  "event_time": {
    "nullable": true, 
    "type": "string", 
    "name": "event_time", 
    "metadata": {}
  }, 
  "event_validated_ts": {
    "nullable": true, 
    "type": "string", 
    "name": "event_validated_ts", 
    "metadata": {}
  }, 
  "event_type_version": {
    "nullable": true, 
    "type": "string", 
    "name": "event_type_version", 
    "metadata": {}
  }, 
  "source": {
    "nullable": true, 
    "type": "string", 
    "name": "source", 
    "metadata": {}
  }, 
  "extensions": {
    "client_common": {
      "adobe_sdid": {
        "nullable": true, 
        "type": "string", 
        "name": "adobe_sdid", 
        "metadata": {}
      }, 
      "auth_state": {
        "nullable": true, 
        "type": "string", 
        "name": "auth_state", 
        "metadata": {}
      }, 
      "client_ip_address": {
        "nullable": true, 
        "type": "string", 
        "name": "client_ip_address", 
        "metadata": {}
      }, 
      "url": {
        "nullable": true, 
        "type": "string", 
        "name": "url", 
        "metadata": {}
      }, 
      "client_experiments": {
        "nullable": true, 
        "type": "string", 
        "name": "client_experiments", 
        "metadata": {}
      }, 
      "referral_url": {
        "nullable": true, 
        "type": "string", 
        "name": "referral_url", 
        "metadata": {}
      }, 
      "page_name": {
        "nullable": true, 
        "type": "string", 
        "name": "page_name", 
        "metadata": {}
      }, 
      "user_agent": {
        "nullable": true, 
        "type": "string", 
        "name": "user_agent", 
        "metadata": {}
      }, 
      "uvn": {
        "nullable": true, 
        "type": "string", 
        "name": "uvn", 
        "metadata": {}
      }, 
      "chegg_uuid": {
        "nullable": true, 
        "type": "string", 
        "name": "chegg_uuid", 
        "metadata": {}
      }, 
      "adobe_mcid": {
        "nullable": true, 
        "type": "string", 
        "name": "adobe_mcid", 
        "metadata": {}
      }, 
      "device_id": {
        "nullable": true, 
        "type": "string", 
        "name": "device_id", 
        "metadata": {}
      }
    }
  }, 
  "validated_message": {
    "nullable": true, 
    "type": "string", 
    "name": "validated_message", 
    "metadata": {}
  }, 
  "event_validation_status": {
    "nullable": true, 
    "type": "string", 
    "name": "event_validation_status", 
    "metadata": {}
  }, 
  "data": {
    "school_type": {
      "nullable": true, 
      "type": "string", 
      "name": "school_type", 
      "metadata": {}
    }, 
    "reg_source_product": {
      "nullable": true, 
      "type": "string", 
      "name": "reg_source_product", 
      "metadata": {}
    }, 
    "member_role": {
      "nullable": true, 
      "type": "string", 
      "name": "member_role", 
      "metadata": {}
    }, 
    "email": {
      "nullable": true, 
      "type": "string", 
      "name": "email", 
      "metadata": {}
    }, 
    "year_in_college": {
      "nullable": true, 
      "type": "int", 
      "name": "year_in_college", 
      "metadata": {}
    }
  }
}

最后,请注意,这仅适用于简单类型和struct(不适用于arraymap类型)。但是,扩展到其他复杂类型也相当容易。


0
投票

为什么不使用标准的json-schema https://json-schema.org/understanding-json-schema/about.html并使用这个json-shema转换为spark schema https://github.com/zalando-incubator/spark-json-schema#quickstart


-1
投票

终于能够解决这个问题

def struct_definition(column_name, column_type):
    column_dict = {"metadata": {}, "name": column_name, "nullable": True, "type": column_type}
    return column_dict


def convert_to_json_array(struct_def):
    striped = struct_def.lstrip('struct')
    striped = striped.lstrip('<')
    striped = striped.rstrip('>')
    main_struct_list = []
    if striped.__contains__('struct'):
        name = striped.split(':')[0]
        json = {'Name': name, 'Type': striped.lstrip(name + ':') + '>'}
        main_struct_list.append(json)
    else:
        for i in striped.split(','):
            key_value = i.split(':')
            normalized_json = {'Name': key_value[0], 'Type': key_value[1]}
            main_struct_list.append(normalized_json)
    return main_struct_list


def to_json(input_list):
    main_struct_list = []
    for x in input_list:
        column_name = x['Name']
        column_type = x['Type']
        if column_type.startswith('struct'):
            main_struct_list.append(
                struct_definition(column_name,
                                  {'fields': to_json(convert_to_json_array(column_type)), 'type': 'struct'}))
        else:
            main_struct_list.append(struct_definition(column_name, column_type))
    return main_struct_list


if __name__ == '__main__':
    sample_list = [{u'Name': u'cloud_events_version', u'Type': u'string'},
                   {u'Name': u'event_type', u'Type': u'string'},
                   {u'Name': u'event_time', u'Type': u'string'},
                   {u'Name': u'data',
                    u'Type': u'struct<school_type:string,reg_source_product:string,member_role:string,email:string,year_in_college:int>'},
                   {u'Name': u'source', u'Type': u'string'},
                   {u'Name': u'extensions',
                    u'Type': u'struct<client_common:struct<auth_state:string,client_ip_address:string,client_experiments:string,uvn:string,device_id:string,adobe_sdid:string,url:string,page_name:string,user_agent:string,uuid:string,adobe_mcid:string,referral_url:string>>'},
                   {u'Name': u'event_type_version', u'Type': u'string'},
                   {u'Name': u'event_validation_status', u'Type': u'string'},
                   {u'Name': u'event_validated_ts', u'Type': u'string'},
                   {u'Name': u'validated_message', u'Type': u'string'}]
    main_struct_dict = {'fields': to_json(sample_list), 'type': 'struct'}
    print(main_struct_dict)
© www.soinside.com 2019 - 2024. All rights reserved.