如何使用 pySpark 创建自定义转换器?

问题描述 投票:0回答:1

我正在尝试构建一个自定义变压器并在 pyspark pipline 中制作它,但我不知道如何实现。

我的目标是创建这个变压器来估计概率。这是我在没有变压器的情况下使用的代码:

# Calculate the pass probability for all softbins
softbins_df = (
    counts_df
    .groupBy('softbin_first_test', 'time_window')
    .agg(F.sum('PASS').alias('total_pass'), F.sum('FAIL').alias('total_fail'))
) 
# Calculate the prior pass probability for each softbin
softbins_total_counts_df = softbins_df.withColumn('total', F.col('total_pass') + F.col('total_fail'))
prior_pass_prob = F.col('total_pass') / F.col('total')
prior_fail_prob = F.col('total_fail') / F.col('total')
# Calculate probabilities using Bayesian estimation with empirical prior
rho = 0.3
prior_window = Window.partitionBy('softbin_first_test').orderBy(F.col('time_window')).rowsBetween(Window.unboundedPreceding, Window.currentRow)
prior_pass = F.mean(prior_pass_prob).over(prior_window)
prior_fail = F.mean(prior_fail_prob).over(prior_window)
alpha = prior_pass * (1 - rho) / rho
beta = prior_fail * (1 - rho) / rho
pass_prob = alpha / (alpha + beta)

我想创建它以避免在train_test_split期间发生数据泄漏,因为在这里我计算整个数据集的概率,因此当我分割数据时,训练集具有来自未来的信息(因为计算的概率使用整个数据集),这就是为什么我想创建这个变压器并合并一个管道

python machine-learning pyspark huggingface-transformers apache-spark-mllib
1个回答
1
投票

创建自定义转换器是处理数据泄露的一个不错的选择。

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

# Initialize SparkSession
spark = SparkSession.builder.appName("ProbabilityEstimator").getOrCreate()

# Sample data
data = [
    (1, "A", 2, 4),
    (1, "A", 3, 6),
    (1, "B", 4, 3),
    (1, "B", 5, 7),
    (2, "A", 1, 2),
    (2, "A", 2, 1),
    (2, "B", 2, 2),
    (2, "B", 3, 3)
]

columns = ["softbin_first_test", "time_window", "PASS", "FAIL"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Custom Transformer
class ProbabilityEstimator(Transformer, HasInputCol, HasOutputCol):
    
    def __init__(self, inputCol=None, outputCol=None):
        super(ProbabilityEstimator, self).__init__()
        self._setDefault(inputCol=inputCol, outputCol=outputCol)  
        self.setParams(inputCol=inputCol, outputCol=outputCol)  

    def setParams(self, inputCol=None, outputCol=None):
        return self._set(inputCol=inputCol, outputCol=outputCol)  

    def _transform(self, df):
        softbins_window = Window.partitionBy('softbin_first_test').orderBy('time_window').rowsBetween(Window.unboundedPreceding, Window.currentRow)
        prior_pass_prob = F.sum('PASS').over(softbins_window) / F.sum(F.col('PASS') + F.col('FAIL')).over(softbins_window)
        prior_fail_prob = F.sum('FAIL').over(softbins_window) / F.sum(F.col('PASS') + F.col('FAIL')).over(softbins_window)

        rho = 0.3
        alpha = prior_pass_prob * (1 - rho) / rho
        beta = prior_fail_prob * (1 - rho) / rho
        pass_prob = alpha / (alpha + beta)

        return df.withColumn(self.getOutputCol(), pass_prob)

# Initialize the custom transformer
probability_estimator = ProbabilityEstimator(inputCol='input_col', outputCol='output_col')

# Define pipeline
pipeline = Pipeline(stages=[probability_estimator])

# Fit the pipeline to transform the DataFrame
model = pipeline.fit(df)
df_transformed = model.transform(df)

# Show the results
df_transformed.show()

输出:

+------------------+-----------+----+----+-------------------+
|softbin_first_test|time_window|PASS|FAIL|         output_col|
+------------------+-----------+----+----+-------------------+
|                 1|          A|   2|   4| 0.3333333333333333|
|                 1|          A|   3|   6| 0.3333333333333333|
|                 1|          B|   4|   3|0.40909090909090906|
|                 1|          B|   5|   7|  0.411764705882353|
|                 2|          A|   1|   2| 0.3333333333333333|
|                 2|          A|   2|   1|                0.5|
|                 2|          B|   2|   2|                0.5|
|                 2|          B|   3|   3|                0.5|
+------------------+-----------+----+----+-------------------+

根据您的数据进行适当的更改。这只是为了展示

custom transformer
功能

© www.soinside.com 2019 - 2024. All rights reserved.