如何正确优化Spark和Milvus来处理大数据?

问题描述 投票:0回答:1

我有一个包含 2 列的 Spark 数据框:id 和向量。

向量列是一个包含 20,000 个元素长的浮点数的列表。

Dataframe 本身有 2,500,000 行长。

我使用 Spark-Milvus 连接器来插入数据,因为我尝试了各种方法格式化小数据并尝试将其插入到 Milvus 集合中,但无济于事。

当我在 Milvus 中创建一个集合并尝试从 Spark DataFrame 中插入一批 200,0000 行时,需要超过 10 分钟,有时还会崩溃。

加载 Milvus 200,000 条记录集合需要超过 1 小时,并且永远不会结束。

插入批次后,需要近 10 分钟才能将索引分配给向量列。

我想知道处理大批量数据时是否有关于如何优化插入和索引的处理时间的通用实践。

我应该使用 Spark 和 Milvus 的什么设置才能获得最佳性能?

在插入 Milvus 集合之前如何正确转换数据?数据应该以 numpy 数组还是任何其他格式呈现?

从我的 Spark 数据框中收集的随机行如下所示:[1005, [0.01, ..., 0.78],其中 1005 是一个 id,浮点数列表是一个 20,000 长的向量。

这是我的火花设置:

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("collab_filter_test_on_local") \
    .config("spark.driver.extraClassPath", '/data/notebook_files/clickhouse-native-jdbc-shaded-2.6.5.jar') \
    .config("spark.jars", "/data/notebook_files/spark-milvus-1.0.0-SNAPSHOT.jar") \
    .config("spark.driver.memory", "16g") \
    .getOrCreate()

这是我的 Milvus 设置:

connections.connect(alias="default", host="localhost", port=19530)

fields = [
    FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=False),
    FieldSchema(name='vec', dtype=DataType.FLOAT_VECTOR, dim=dim_size)
]

schema = CollectionSchema(fields, 'data')

data = Collection('data', schema)
python apache-spark pyspark bigdata milvus
1个回答
0
投票

我是Spark-milvus连接器的开发者。很高兴听到您正在使用它。请在github上创建issue,以便我们可以更及时地回复您。

对于你的问题:

1、20000 个暗向量相当大,显然在所有步骤中都会占用更多资源。请考虑一下是否有必要以及是否可以减少dim

2、10分钟插入200,0000行~约3k/s。其实还不错。崩溃时请将错误信息上传到github。

© www.soinside.com 2019 - 2024. All rights reserved.