如何将 Pandas DataFrame 插入 Cassandra?

问题描述 投票:0回答:3

我有一个数据框如下:

df

date        time       open   high   low   last
01-01-2017  11:00:00   37      45     36    42
01-01-2017  11:23:00   36      43     33    38
01-01-2017  12:00:00   45      55     35    43

....

我想把它写进cassandra。这是用Python处理数据后的批量上传。

cassandra 的架构如下:

CREATE TABLE ks.table1(date text, time text, open float, high float, low 
                       float, last float, PRIMARY KEY(date, time))

要将单行插入 cassandra,我们可以在 python 中使用 cassandra-driver,但我找不到有关上传整个数据帧的任何详细信息。

from cassandra.cluster import Cluster

session.execute(
    """
    INSERT INTO ks.table1 (date,time,open,high,low,last)
    VALUES (01-01-2017, 11:00:00, 37, 45, 36, 42)
    """)

P.S:类似的问题之前已被问过,但没有回答我的问题。

python pandas cassandra
3个回答
5
投票

即使我面临这个问题,但我发现即使将数百万行(确切地说是 1900 万行)上传到 Cassandra 中也不需要太多时间。

针对您的问题,您可以使用cassandra Bulk LOADER 完成你的工作。

编辑1:

您可以使用准备好的语句来帮助将数据上传到 cassandra 表中,同时迭代 dataFrame。

    from cassandra.cluster import Cluster
    cluster = Cluster(ip_address)
    session = cluster.connect(keyspace_name)
    query = "INSERT INTO data(date,time,open,high,low,last) VALUES (?,?,?,?,?,?)"
    prepared = session.prepare(query)

“?”用于输入变量

    for item in dataFrame:
        session.execute(prepared, (item.date_value,item.time_value,item.open_value,item.high_value,item.low_value,item.last_value))

    for item in dataFrame:
        session.execute(prepared, (item[0],item[1],item[2],item[3],item[4],item[5]))

我的意思是使用for循环提取数据并使用session.execute()上传。

有关准备好的声明的更多信息

希望这有帮助..


0
投票

不错的选择是使用批次。首先,您可以将 df 拆分为偶数分区(感谢Python/Pandas - 将 pandas DataFrame 分区为 10 个不相交、大小相等的子集),然后将每个分区作为批处理放入 Cassandra 中。批量大小受 Cassandra (cassandra.yaml) 设置限制:

batch_size_fail_threshold_in_kb: 50

Pandas df批量插入代码:

    from cassandra.cluster import Cluster
    from cassandra import ConsistencyLevel
    from cassandra.query import BatchStatement

    CASSANDRA_PARTITION_NUM = 1500

    def write_to_cassandra(df):
        cassandra_cluster = Cluster('ip')
        session = cassandra_cluster.connect('keyspace')
        prepared_query = session.prepare('INSERT INTO users(id, name) VALUES (?,?)')
        for partition in split_to_partitions(df, CASSANDRA_PARTITION_NUM):
            batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
            for index, item in partition.iterrows():
                batch.add(prepared_query, (item.id, item.name))
            session.execute(batch)

    def split_to_partitions(self, df, partition_number):
        permuted_indices = np.random.permutation(len(df))
        partitions = []
        for i in range(partition_number):
            partitions.append(df.iloc[permuted_indices[i::partition_number]])
        return partitions

更新: 仅当批次位于同一分区内时才执行此操作。


0
投票

DataFrame 似乎不再按第一个答案中的预期工作。以下是我如何访问 df 中的数据(下面的示例使用 cassandra db):

        columns = list(df.columns.values)

        query = "INSERT INTO {} ({}) VALUES({})".format(table, ','.join(columns), ','.join([val.replace(val, "?") for val in columns]))

        preparedquery = self.session.prepare(query)

        for row in df.loc:

            values = [row[col] for col in columns]

            self.session.execute(preparedquery, values)

最后一行有一个“不在范围内”错误,我认为这是 pandas 的错误。

© www.soinside.com 2019 - 2024. All rights reserved.