如何在Python和Pandas(Data Frame)中将条件SQL查询的数据插入到Hbase?

问题描述 投票:0回答:1

假设我在

table_name_a
中有一些样本数据如下:

    code     val_a  val_b    remark   date
   ------------------------------------------
1   00001    500    0.1      111      20191108
2   00001    1000   0.2      222      20191109
3   00002    200    0.1      111      20191110
4   00002    400    0.3      222      20191111
5   00001    200    0.2      333      20191112
6   00001    400    0.1      444      20191113

如果我在Hbase上查询如下(该值来自

select code, val_a from table_name_a where remark = 111
):

get 'test_01', '00001-20191108','n:111_a'

我的预期输出如下:

    code    111_a
   ---------------
1   00001    500       

我只知道如何在Python(pandas)中像这样导入所有数据

from db_conn import impala, hbasecon
import numpy as np
import pandas as pd


def main():
    conn_impa = impala().getcon()
    sql = """ SELECT * FROM table_name_a """
    df = pd.read_sql(sql=sql, con=conn_impa)
    df = df.fillna("")
    num = len(df)
    if num > 0:
        hfdtable = hbasecon(FEDMTABLE).gettable()
        with hfdtable.batch(batch_size=1000) as b:
            df.apply(lambda row: b.put(row["code"], {'table_name_a:code': str(row["code"])}), axis=1)
            df.apply(lambda row: b.put(row["val_a"], {'table_name_a:val_a': str(row["val_a"])}), axis=1)
            df.apply(lambda row: b.put(row["val_b"], {'table_name_a:val_b': str(row["val_b"])}), axis=1)
            df.apply(lambda row: b.put(row["date"], {'table_name_a:date': str(row["date"])}), axis=1)
            df.apply(lambda row: b.put(row["remark"], {'table_name_a:remark': str(row["remark"])}), axis=1)
    return True


if __name__ == '__main__':
    main()

但我不知道如何用Python将数据导入到Hbase,因为Hbase需要

111_a
,即带有'111'
val_a
remark

非常感谢您的建议。

python pandas dataframe hbase impala
1个回答
0
投票

这是我的解决方案:

from db_conn import impala, hbasecon
import numpy as np
import pandas as pd


def main():
    conn_impa = impala().getcon()
    sql = """
     SELECT code, date,
       CASE WHEN t.remark='111' THEN t.val_a ELSE 0 END 111_a,
       CASE WHEN t.remark='111' THEN t.val_b ELSE 0 END 111_b,
       CASE WHEN t.remark='222' THEN t.val_a ELSE 0 END 222_a,
       CASE WHEN t.remark='222' THEN t.val_b ELSE 0 END 222_b,
       CASE WHEN t.remark='333' THEN t.val_a ELSE 0 END 333_a,
       CASE WHEN t.remark='333' THEN t.val_b ELSE 0 END 333_b,
       CASE WHEN t.remark='444' THEN t.val_a ELSE 0 END 444_a,
       CASE WHEN t.remark='444' THEN t.val_b ELSE 0 END 444_b,
    FROM table_name_a t
    """
    df = pd.read_sql(sql=sql, con=conn_impa)
    df = df.fillna("")
    num = len(df)
    if num > 0:
        hfdtable = hbasecon(FEDMTABLE).gettable()
        with hfdtable.batch(batch_size=1000) as b:
            df.apply(lambda row: b.put(row["code"], {'table_name_a:code': str(row["code"])}), axis=1)
            df.apply(lambda row: b.put(row["111_a"], {'table_name_a:111_a': str(row["111_a"])}), axis=1)
            df.apply(lambda row: b.put(row["111_b"], {'table_name_a:111_b': str(row["111_b"])}), axis=1)
            df.apply(lambda row: b.put(row["222_a"], {'table_name_a:222_a': str(row["222_a"])}), axis=1)
            df.apply(lambda row: b.put(row["222_b"], {'table_name_a:222_b': str(row["222_b"])}), axis=1)
            df.apply(lambda row: b.put(row["333_a"], {'table_name_a:333_a': str(row["333_a"])}), axis=1)
            df.apply(lambda row: b.put(row["333_b"], {'table_name_a:333_b': str(row["333_b"])}), axis=1)
            df.apply(lambda row: b.put(row["444_a"], {'table_name_a:444_a': str(row["444_a"])}), axis=1)
            df.apply(lambda row: b.put(row["444_b"], {'table_name_a:444_b': str(row["444_b"])}), axis=1)
    return True


if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.