如何使用pyspark加密parquet的特定列数据

问题描述 投票:0回答:1

我的项目需要在将数据写入镶木地板文件时加密一些 PII 列数据。为了将数据写入 parquet 文件中,正在使用 Azure Synapse pyspark 笔记本。

在互联网上没有得到任何参考。任何线索都会非常有帮助。

azure pyspark encryption parquet azure-synapse
1个回答
0
投票

我们正在使用以下实现。您可以根据您的范围尝试任何人

使用数据屏蔽

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

class ColumnMasker:
    def __init__(self, spark):
        self.spark = spark
        
    def mask_columns(self, df, columns_to_mask):
        masked_df = df
        
        for column in columns_to_mask:
            masked_df = masked_df.withColumn(
                column,
                expr(f"CASE WHEN {column} IS NOT NULL THEN 'xxxxxxx' ELSE NULL END")
            )
        
        return masked_df

实施

# Create a Spark session
spark = SparkSession.builder.appName("ColumnMasking").getOrCreate()

# Sample data
data = [("SK", "rt", "12345-6789"),
        ("Prakash", "Kh", None),
        ("Test", "fg", "98765-4321")]

columns = ["first_name", "last_name", "PAN"]

# Create a DataFrame
df = spark.createDataFrame(data, columns)

# Instantiate the ColumnMasker
masker = ColumnMasker(spark)

# Specify columns to mask
columns_to_mask = ["PAN"]

# Mask the specified columns
masked_df = masker.mask_columns(df, columns_to_mask)

# Show the masked DataFrame
masked_df.show()

使用加密

from pyspark.sql import functions as F
from pyspark.sql import DataFrame

class DataFrameMasker:
    def __init__(self, df: DataFrame):
        self.df = df
    
    def mask_columns(self, columns_to_mask: dict):
        masked_df = self.df
        
        for column_name, masking_strategy in columns_to_mask.items():
            if masking_strategy == "hash":
                masked_df = self._mask_hash(masked_df, column_name)
            elif masking_strategy == "random":
                masked_df = self._mask_random(masked_df, column_name)
            # Add more masking strategies here if needed
        
        return masked_df
    
    def _mask_hash(self, df: DataFrame, column_name: str) -> DataFrame:
        masked_df = df.withColumn(column_name, F.sha2(F.col(column_name), 256))
        return masked_df
    
    def _mask_random(self, df: DataFrame, column_name: str) -> DataFrame:
        masked_df = df.withColumn(column_name, F.expr("substring(md5(cast(rand() as string)), 0, 10)"))
        return masked_df

# Create a sample DataFrame
data = [("Subash", "Konar", "[email protected]"),
        ("Test", "Message", "[email protected]")]

columns = ["first_name", "last_name", "email"]
df = spark.createDataFrame(data, columns)

# Initialize the DataFrameMasker with the DataFrame
masker = DataFrameMasker(df)

# Define columns to mask and their masking strategy
columns_to_mask = {
    "first_name": "hash",
    "last_name": "random"
}

# Mask specified columns
masked_df = masker.mask_columns(columns_to_mask)

# Show the masked DataFrame
masked_df.show()

© www.soinside.com 2019 - 2024. All rights reserved.