我正在测试 Fugue 库,以比较其与纯 PySpark 相比的优势,为此我希望能够使用 Fugue 严格测试不同的操作。
虽然我已经可以使用 Fugue 来执行应用 Pandas 函数的转换,但我无法直接使用 Fugue 从 databricks 笔记本加载 databricks 表。我怎样才能做到呢?
澄清:我可以使用 PySpark 加载该表,没有任何问题。我也尝试按照文档(https://fugue-tutorials.readthedocs.io/tutorials/beginner/io.html)并尝试使用:
import fugue.api as fa
df = fa.load(f'{db_name_model_data}.{table_name_model_data}', engine=spark)
Output:
NotImplementedError: .my_table_name is not supported
我也尝试过:
from fugue import FugueWorkflow, Schema, FugueSQLWorkflow
# Define a Fugue workflow
with FugueWorkflow() as dag:
# Load a table from a CSV file (example source, replace with your data source)
df = dag.load(f'{db_name_model_data}.{table_name_model_data}')
# Show the loaded DataFrame
df.show()
dag.run()
我希望直接使用 Fugue 来将任何类型的 DataFrame 加载到“df”中
我已尝试以下步骤:
第1步:安装Fugue
第 2 步: Databricks 连接 在这一步中,我们需要卸载PySpark以避免与Databricks Connect发生冲突。
pip install databricks-connect
注意:删除pyspark
pip uninstall pyspark
配置 Databricks Connect 连接到集群后。
import pandas as pd
from fugue import transform
from fugue_spark import SparkExecutionEngine
data = pd.DataFrame({'numbers':[1,2,3,4], 'words':['hello','world','apple','banana']})
#schema: *, reversed:str
def reverse_word(df: pd.DataFrame) -> pd.DataFrame:
df['reversed'] = df['words'].apply(lambda x: x[::-1])
return df
spark_df = transform(data, reverse_word, engine=SparkExecutionEngine())
spark_df.show()
第3步:在集群上使用Fugue-sql
from fugue_notebook import setup
setup()
%%fsql spark
SELECT *
FROM data
TRANSFORM USING reverse_word
PRINT
结果:
附加配置:
from pyspark.sql import SparkSession
from fugue_spark import SparkExecutionEngine
spark_session = (SparkSession
.builder
.config("spark.executor.cores",4)
.config("fugue.dummy","dummy")
.getOrCreate())
engine = SparkExecutionEngine(spark_session, {"additional_conf":"abc"})