我们的场景是同时向bigquery插入大约400万行数据
按照google文档,使用Storage Write API批量加载数据,https://cloud.google.com/bigquery/docs/write-api-batch
我们使用bigquery存储写入api以pending模式将数据插入bigquery,但是当数据中有大量行时速度很慢
我们使用的包是
protobuf==3.20.0
google-cloud-bigquery==3.18.0
谷歌云存储==2.14.0
谷歌云容器==2.12.0
google-cloud-bigquery-storage==2.14.0
有谁知道如何改进吗?而且我们还发现当追加更多数据时 proto_rows.serialized_rows.append 会变得很慢,这就是我们选择批量大小为 300 的原因
例如我们插入1000000行需要115秒 示例代码如下
import pandas as pd
import numpy as np
from configs import logger
import pandas as pd
import time
from logging import Logger
from google.cloud.bigquery_storage_v1 import types, writer, BigQueryWriteClient
from google.protobuf import descriptor_pb2
from google.cloud.bigquery import Client
import sample_demo_pb2
import google.auth
from google.auth import impersonated_credentials
def timing():
import time
from functools import wraps
def decorator(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
time1 = time.time()
ret = func(*args, **kwargs)
time2 = time.time()
print(
"%s function complete in %.6f secs"
% (func.__name__, (time2 - time1))
)
return ret
return func_wrapper
return decorator
class ServiceBase:
"""A base class for setting up service account credentials and project_id."""
def __init__(self, lifetime: int = 3600) -> None:
"""Setup credentials and project_id from the host environment.
Args:
lifetime (int, optional): The lifetime of a service credential. Defaults to 3600.
Raises:
LookupError: project_id can not be found from the host environment.
"""
super().__init__()
source_credentials, project_id = google.auth.default()
if project_id is None:
if source_credentials.quota_project_id is not None:
project_id = source_credentials.quota_project_id
else:
raise LookupError(
"Required project_id can not be found. Please try to setup GOOGLE_CLOUD_PROJECT environment variable."
)
service_account = (
f"shared-service-account@{project_id}.iam.gserviceaccount.com"
)
target_scopes = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
]
self.credentials = impersonated_credentials.Credentials(
source_credentials=source_credentials,
target_principal=service_account,
target_scopes=target_scopes,
lifetime=lifetime,
)
self.project_id = project_id
class BigQueryService(ServiceBase):
__slots__ = ("client",)
def __init__(self, logger: Logger) -> None:
super().__init__()
self.client = Client(
project=self.project_id, credentials=self.credentials
)
self.writer_client = BigQueryWriteClient(credentials=self.credentials)
self._logger = logger
@timing()
def to_bqtable(
self,
data: pd.DataFrame,
full_table_name: str,
) -> None:
init_time = time.time()
table_name = full_table_name.split(".")[-1]
project_id, dataset_id, table_id = full_table_name.split(".")
bigquery_table = self.writer_client.table_path(
project_id, dataset_id, table_id
)
write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = self.writer_client.create_write_stream(
parent=bigquery_table, write_stream=write_stream
)
stream_name = write_stream.name
request_template = types.AppendRowsRequest()
request_template.write_stream = stream_name
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_demo_pb2.demo_data.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
append_rows_stream = writer.AppendRowsStream(
self.writer_client, request_template
)
init_convet_time = time.time()
list_data = data.to_dict("records")
int2_time = time.time()
print(f"convert list_data using {int2_time - init_convet_time} secs")
batch_szie = 1000
n = len(list_data)
request_tmps = []
for batch_idx in range(0, n, batch_szie):
print(f"{n} {batch_idx} to {min(batch_idx + batch_szie, n)}")
request = self.serialized_data(
list_data[batch_idx : min(batch_idx + batch_szie, n)],
batch_idx,
table_name,
)
request_tmp = append_rows_stream.send(request)
request_tmps.append(request_tmp)
for r in request_tmps:
r.result()
append_rows_stream.close()
self.writer_client.finalize_write_stream(name=write_stream.name)
batch_commit_write_streams_request = (
types.BatchCommitWriteStreamsRequest()
)
batch_commit_write_streams_request.parent = bigquery_table
batch_commit_write_streams_request.write_streams = [write_stream.name]
self.writer_client.batch_commit_write_streams(
batch_commit_write_streams_request
)
print(f"Writes to stream: '{write_stream.name}' have been committed.")
@timing()
def serialized_data(self, data: list, offset: int, table_name: str):
proto_rows = types.ProtoRows()
init_time = time.time()
serialized_data = [self.create_serialized_data(row) for row in data]
# print(f"serialized_data using {time.time() - init_time} secs")
for idx, ele in enumerate(serialized_data):
init2_time = time.time()
proto_rows.serialized_rows.append(
ele
) # this line will very slow when append a lot of data, ex more then 1k rows
# if idx % 500 == 0:
# print(
# f"{idx} extend single serialized_data using {time.time() - init2_time} secs"
# )
request = types.AppendRowsRequest()
request.offset = offset
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
return request
# @timing()
def create_serialized_data(self, rows_to_insert: dict):
# Note XXX_pb2 is the file compiled by the protocol buffer compiler
return sample_demo_pb2.demo_data(**rows_to_insert).SerializeToString()
def __del__(self) -> None:
self.client.close()
rows = 1000000
bq_client = BigQueryService(logger)
list_data = [{"Index": str(i), "Name": f"name_{i}"} for i in range(rows)]
data = pd.DataFrame(list_data)
bq_client.to_bqtable(
data,
"XXX.default_dataset.demo_data",
)
print("=============================")
sample_demo_pb2.py如下
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: sample_demo.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='sample_demo.proto',
package='',
syntax='proto2',
serialized_options=None,
serialized_pb=_b('\n\x11sample_demo.proto\"(\n\tdemo_data\x12\r\n\x05Index\x18\x01 \x01(\t\x12\x0c\n\x04Name\x18\x02 \x01(\t')
)
_DEMO_DATA = _descriptor.Descriptor(
name='demo_data',
full_name='demo_data',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='Index', full_name='demo_data.Index', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='Name', full_name='demo_data.Name', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=21,
serialized_end=61,
)
DESCRIPTOR.message_types_by_name['demo_data'] = _DEMO_DATA
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
demo_data = _reflection.GeneratedProtocolMessageType('demo_data', (_message.Message,), dict(
DESCRIPTOR = _DEMO_DATA,
__module__ = 'sample_demo_pb2'
# @@protoc_insertion_point(class_scope:demo_data)
))
_sym_db.RegisterMessage(demo_data)
# @@protoc_insertion_point(module_scope)
就您而言,Write API 不是正确的选择。它是为流媒体设计的。
如果您想批量插入数据,请使用加载作业:更快、更便宜。因为你告诉我数据来自 Cloud Storage,所以我认为这是你 4M 行的最佳设计
如果您需要将数据转换为符合 bigquery 的格式(CSV、AVRO、ORC...),请执行预处理并在 Cloud Storage 上创建新文件。然后加载这个文件。
您可以使用编排服务通过 Cloud Workflows 或 Cloud Composer(托管 Airflow)按顺序执行这些步骤