在Parquet文件上创建具有JSON数据的Hive表

问题描述 投票:1回答:1

我正在努力实现的目标

  1. 从源大JSON文件(employee-sample.json)获取数据
  2. 一个简单的spark应用程序,将其作为文本文件读取并存储在镶木地板(simple-loader.java)中。我不知道JSON文件中有什么,所以我不能放任何模式,所以我想要读取架构,而不是写入架构。一个镶木地板文件,其中一列名为“value”,其中包含已创建的JSON字符串
  3. 在镶木地板文件上创建一个HIVE外部表,当我“从表中选择*”时,我看到一列出现了JSON数据。

我真正需要的是创建一个HIVE表,它可以读取“值”列中的JSON数据并应用模式和发出列,这样我就可以根据需要在我的RAW数据上创建各种表。

我已经在JSON文件上创建了hive表,并提取了列,但是这个来自镶木地板和应用JSON模式的提取列正在欺骗我

employee-sample.json

{"name":"Dave", "age" : 30 , "DOB":"1987-01-01"}
{"name":"Steve", "age" : 31 , "DOB":"1986-01-01"}
{"name":"Kumar", "age" : 32 , "DOB":"1985-01-01"}

简单的Spark代码将JSON转换为镶木地板

simple-loader.java

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
            .appName(JsonToParquet.class.getName())
            .master("local[*]").getOrCreate();
    Dataset<String> eventsDataSet = sparkSession.read().textFile("D:\\dev\\employee-sample.json");
    eventsDataSet.createOrReplaceTempView("rawView");
    sparkSession.sqlContext().sql("select string(value) as value from rawView")
            .write()
            .parquet("D:\\dev\\" + UUID.randomUUID().toString());
    sparkSession.close();
}

镶木地板文件上的蜂巢表

CREATE EXTERNAL TABLE EVENTS_RAW (
VALUE STRING)
STORED AS PARQUET
LOCATION 'hdfs://XXXXXX:8020/employee/data_raw';

我尝试通过设置JSON serde,但只有当数据存储在JSON foram中时才有效,ROW FORMAT SERDE'com.proofpoint.hive.serde.JsonSerde'

预期格式

CREATE EXTERNAL TABLE EVENTS_DATA (
    NAME STRING,
    AGE STRING,
    DOB STRING)
??????????????????????????????
hadoop apache-spark hive hiveql parquet
1个回答
0
投票

创建配置单元外部表示例:

 public static final String CREATE_EXTERNAL = "CREATE EXTERNAL TABLE %s" +
        " (%s) " +
        " PARTITIONED BY(%s) " +
        " STORED AS %s" +
        " LOCATION '%s'";
/**
 * Will create an external table and recover the partitions
 */
public void createExternalTable(SparkSession sparkSession, StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
    String createQuery = createTableString(schema, tableName, format, partitions, tablePath);
    logger.info("Going to create External table with the following query:\n " + createQuery);
    sparkSession.sql(createQuery);
    logger.debug("Finish to create External table with the following query:\n " + createQuery);
    recoverPartitions(sparkSession, tableName);
}

public String createTableString(StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
    Set<String> partitionNames = partitions.stream().map(struct -> struct.name()).collect(Collectors.toSet());
    String columns = Arrays.stream(schema.fields())
            //Filter the partitions
            .filter(field -> !partitionNames.contains(field.name()))
            //
            .map(HiveTableHelper::fieldToStringBuilder)
            .collect(Collectors.joining(", "));

    String partitionsString = partitions.stream().map(HiveTableHelper::fieldToStringBuilder).collect(Collectors.joining(", "));

    return String.format(CREATE_EXTERNAL, tableName, columns, partitionsString, format.name(), tablePath);
}

/**
 *
 * @param sparkSession
 * @param table
 */
public void recoverPartitions(SparkSession sparkSession, String table){
    String query = "ALTER TABLE " + table + " RECOVER PARTITIONS";
    logger.debug("Start: " + query);
    sparkSession.sql(query);
    sparkSession.catalog().refreshTable(table);
    logger.debug("Finish: " + query);
}

  public static StringBuilder fieldToStringBuilder(StructField field){
    StringBuilder sb = new StringBuilder();
    sb.append(field.name()).append( " ").append(field.dataType().simpleString());
    return sb;
}
© www.soinside.com 2019 - 2024. All rights reserved.