我是云大表的初学者,使用云功能将数据从pub / sub写入bigtable时遇到了大问题。
[Cloud函数从pubsub获取消息,但是问题出在下一步,将其写入bigtable。
该消息是在python脚本中创建的,并发送到pub / sub。
一个消息示例:
b'{“ eda”:2.015176,“温度”:33.39,“ bvp”:-0.49,“ x_acc”:-36.0,“ y_acc”:-38.0,“ z_acc”:-128.0,“ heart_rate”:83.78 ,“ iddevice”:15.0,“时间戳”:“ 2019-12-01T20:01:36.927Z”}'
为了将其写入bigtable,我创建了一个表:
from google.cloud import bigtable
from google.cloud.bigtable import column_family
client = bigtable.Client(project="projectid", admin=True)
instance = client.instance("bigtableinstance")
table = instance.table("bigtable1")
print('Creating the {} table.'.format(table))
print('Creating columnfamily cf1 with Max Version GC rule...')
max_versions_rule = column_family.MaxVersionsGCRule(2)
column_family_id = 'cf1'
column_families = {column_family_id: max_versions_rule}
if not table.exists():
table.create(column_families=column_families)
print("Table {} is created.".format(table))
else:
print("Table {} already exists.".format(table))
这没有问题。
现在,我尝试使用main方法在云函数中使用以下python代码通过pub / sub将消息写入bigtable:
import json
import base64
import os
from google.cloud import bigtable
from google.cloud.bigtable import column_family, row_filters
project_id = os.environ.get('projetid', 'UNKNOWN')
INSTANCE = 'bigtableinstance'
TABLE = 'bigtable1'
client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(INSTANCE)
colFamily = "cf1"
def writeToBigTable(table, data):
# Parameters row_key (bytes) – The key for the row being created.
# Returns A row owned by this table.
row_key = data[colFamily]['iddevice'].value.encode()
row = table.row(row_key)
for colFamily in data.keys():
for key in data[colFamily].keys():
row.set_cell(colFamily,
key,
data[colFamily][key])
table.mutate_rows([row])
return data
def selectTable():
stage = os.environ.get('stage', 'dev')
table_id = TABLE + stage
table = instance.table(table_id)
return table
def main(event, context):
data = base64.b64decode(event['data']).decode('utf-8')
print("DATA: {}".format(data))
eda, temperature, bvp, x_acc, y_acc, z_acc, heart_rate, iddevice, timestamp = data.split(',')
table = selectTable()
data = {'eda': eda,
'temperature': temperature,
'bvp': bvp,
'x_acc':x_acc,
'y_acc':y_acc,
'z_acc':z_acc,
'heart_rate':heart_rate,
'iddevice':iddevice,
'timestamp':timestamp}
writeToBigTable(table, data)
print("Data Written: {}".format(data))
我尝试了不同的版本,但找不到解决方案。
感谢您的帮助。
一切顺利
多米尼克
我认为这行是错误的:
row_key = data[colFamily]['iddevice'].value.encode()
您正在传递数据对象,但是它没有'cf1'属性。您也不必对其进行编码。试试看:
row_key = data['iddevice']
您的for循环也会有同样的问题。我想这是您想要的]]
for col in data.keys(): row.set_cell(colFamily, key, data[key])
另外,我知道您只是在玩它,但是使用设备ID作为行键的唯一值将导致效果很差。建议将行键和日期或其他属性之一(取决于查询)结合起来,然后将其用作行键。 Cloud Bigtable schema上有一个文档非常有用,codelab使用了更现实的示例数据集,并逐步介绍了如何为该示例选择架构。它使用Java,但是您仍然可以导入数据并运行自己的查询。
首先非常感谢您的帮助。