通过云函数将数据从pubsub写入bigtable

问题描述 投票:0回答:2

我是云大表的初学者,使用云功能将数据从pub / sub写入bigtable时遇到了大问题。

[Cloud函数从pubsub获取消息,但是问题出在下一步,将其写入bigtable。

该消息是在python脚本中创建的,并发送到pub / sub。

一个消息示例:

b'{“ eda”:2.015176,“温度”:33.39,“ bvp”:-0.49,“ x_acc”:-36.0,“ y_acc”:-38.0,“ z_acc”:-128.0,“ heart_rate”:83.78 ,“ iddevice”:15.0,“时间戳”:“ 2019-12-01T20:01:36.927Z”}'

为了将其写入bigtable,我创建了一个表:

 from google.cloud import bigtable 
 from google.cloud.bigtable import column_family

 client = bigtable.Client(project="projectid", admin=True) 
 instance = client.instance("bigtableinstance")
 table = instance.table("bigtable1")
 print('Creating the {} table.'.format(table)) 
 print('Creating columnfamily cf1 with Max Version GC rule...')
 max_versions_rule = column_family.MaxVersionsGCRule(2)
 column_family_id = 'cf1'
 column_families = {column_family_id: max_versions_rule}
 if not table.exists():
     table.create(column_families=column_families)
     print("Table {} is created.".format(table)) 
 else:
     print("Table {} already exists.".format(table))

这没有问题。

现在,我尝试使用main方法在云函数中使用以下python代码通过pub / sub将消息写入bigtable:

import json
import base64
import os
from google.cloud import bigtable
from google.cloud.bigtable import column_family, row_filters


project_id = os.environ.get('projetid', 'UNKNOWN')
INSTANCE = 'bigtableinstance'
TABLE = 'bigtable1'

client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(INSTANCE)

colFamily = "cf1"
def writeToBigTable(table, data):
#    Parameters row_key (bytes) – The key for the row being created.
#    Returns A row owned by this table.
        row_key = data[colFamily]['iddevice'].value.encode()
        row = table.row(row_key)
        for colFamily in data.keys():
            for key in data[colFamily].keys():
                row.set_cell(colFamily,
                                        key,
                                        data[colFamily][key])
        table.mutate_rows([row])
        return data

def selectTable():
    stage = os.environ.get('stage', 'dev')
    table_id = TABLE + stage
    table = instance.table(table_id)
    return table


def main(event, context):
    data = base64.b64decode(event['data']).decode('utf-8')
    print("DATA: {}".format(data))
    eda, temperature, bvp, x_acc, y_acc, z_acc, heart_rate, iddevice, timestamp = data.split(',')

    table = selectTable()

    data = {'eda': eda,
         'temperature': temperature,
         'bvp': bvp,
         'x_acc':x_acc,
         'y_acc':y_acc,
         'z_acc':z_acc,
         'heart_rate':heart_rate,
         'iddevice':iddevice,
         'timestamp':timestamp}
    writeToBigTable(table, data)
    print("Data Written: {}".format(data))

我尝试了不同的版本,但找不到解决方案。

感谢您的帮助。

一切顺利

多米尼克

python google-cloud-functions google-cloud-bigtable
2个回答
0
投票

我认为这行是错误的:

    row_key = data[colFamily]['iddevice'].value.encode()

您正在传递数据对象,但是它没有'cf1'属性。您也不必对其进行编码。试试看:

    row_key = data['iddevice']

您的for循环也会有同样的问题。我想这是您想要的]]

    for col in data.keys():
        row.set_cell(colFamily, key, data[key])

另外,我知道您只是在玩它,但是使用设备ID作为行键的唯一值将导致效果很差。建议将行键和日期或其他属性之一(取决于查询)结合起来,然后将其用作行键。 Cloud Bigtable schema上有一个文档非常有用,codelab使用了更现实的示例数据集,并逐步介绍了如何为该示例选择架构。它使用Java,但是您仍然可以导入数据并运行自己的查询。


0
投票

首先非常感谢您的帮助。

© www.soinside.com 2019 - 2024. All rights reserved.