使用 BigQuery Storage API(测试版)启动并读取多个流

问题描述 投票:0回答:2

BigQuery Storage API (https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html) 对于从 BigQuery 表中读取数据非常有用,速度几乎比之前快 10 倍标准 BigQuery API。为了使其速度更快,它支持多个读取流,每个读取流从相关表中读取一组动态分配的行。

我的问题是这样的:虽然你可能会请求多个流,但是请求后分配的流不在你的控制范围内。因此,我无法启动超过 1 个流。

我正在读取的数据由 3 列和 600 万行组成,如下所示。我将创建的流总数打印到控制台。

from google.cloud import bigquery_storage_v1beta1

project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")

# I request 3 streams to be created!
requested_streams = 3  

parent = "projects/{}".format(project_id)
session = client.create_read_session(
    table_ref, parent, table_modifiers=modifiers, read_options=read_options, 
    requested_streams=requested_streams
)  

response = client.batch_create_read_session_streams(session, requested_streams)

# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + str(session.streams))


reader = client.read_rows(
    bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)

rows = reader.rows(session)

names = set()

import time
start = time.time()
#---------------------------------------------------
i=0
for row in rows:
    i += 1
    names.add(row["name"])
    if i > 6000000:
        break
#---------------------------------------------------    
end = time.time()
print(end - start)
print("Got {} unique names and {} total rows.".format(len(names), i))

我有几个问题:

1) 我是否只看到 1 个流,因为多流实现尚未完成(API 处于 Beta 版本)?

2) 我是否只看到 1 个流,因为对于流分配算法来说数据相对“小”? 6m 行已经相当大了。

3)如果我开始看到创建的多个流,API 文档没有描述如何并行读取这些流。关于如何做到这一点有什么想法吗?

google-bigquery storage
2个回答
3
投票

问题是您正在读取的表只有一个可用的输入文件。虽然它有 600 万行,但数据具有高度可压缩性,因此数据只有一个后备列式文件。目前,存储 API 不会比这更精细地分割数据。

如果您检查从此表中进行 SELECT 的查询计划,您会看到相同的情况(只有一个输入)。


0
投票

BigQuery 只是非常喜欢拥有一个流。看看自 V1Beta1 以来编写的代码中的注释,然后我想知道是否它的多个流还没有实现,不管文档怎么说?

我尝试强迫它

    requested_session = types.ReadSession()
    requested_session.table = session_table
    requested_session.data_format = types.DataFormat.ARROW

    parent = "projects/{}".format(project_id)
    session = bq_storage_client.create_read_session( 
        parent=parent,
        read_session=requested_session,
        max_stream_count = 1
    )
    
    request = SplitReadStreamRequest()
    request.name = session.streams[0].name
    request.fraction = 0.5
    x = bq_storage_client.split_read_stream(request)
    
    stream0 = x.primary_stream
    stream1 = x.remainder_stream

    reader0 = bq_storage_client.read_rows(stream0.name)
    reader1 = bq_storage_client.read_rows(stream1.name)

    df0 =  reader0.to_dataframe(session) 
    df1 =  reader1.to_dataframe(session) 
    
    print(len(df0.index), len(df1.index))

但运气不佳。所有工作均由第一个(“主”)流完成,没有任何行分配给辅助流。即使我在主节点之前启动辅助节点,那么它仍然是主节点完成工作。

我还尝试了 AVRO 格式,因为这可能更以行为中心,但没有。

© www.soinside.com 2019 - 2024. All rights reserved.