Spaces:
Running
Running
| import argparse | |
| import json | |
| import os | |
| from pyspark.sql import SparkSession | |
| from anonyspark.masking import ( | |
| mask_email_udf, mask_name_udf, mask_date_udf, | |
| mask_ssn_udf, mask_itin_udf, mask_phone_udf | |
| ) | |
| def apply_masking(df, schema): | |
| """ | |
| Apply masking UDFs based on schema definitions. | |
| """ | |
| for column, dtype in schema.items(): | |
| if dtype == "email": | |
| df = df.withColumn(f"masked_{column}", mask_email_udf(df[column])) | |
| elif dtype == "name": | |
| df = df.withColumn(f"masked_{column}", mask_name_udf(df[column])) | |
| elif dtype == "dob": | |
| df = df.withColumn(f"masked_{column}", mask_date_udf(df[column])) | |
| elif dtype == "ssn": | |
| df = df.withColumn(f"masked_{column}", mask_ssn_udf(df[column])) | |
| elif dtype == "itin": | |
| df = df.withColumn(f"masked_{column}", mask_itin_udf(df[column])) | |
| elif dtype == "phone": | |
| df = df.withColumn(f"masked_{column}", mask_phone_udf(df[column])) | |
| return df | |
| def main(): | |
| parser = argparse.ArgumentParser(description="AnonySpark CLI for masking sensitive data.") | |
| parser.add_argument('--input', type=str, required=True, help='Path to input CSV file') | |
| parser.add_argument('--output', type=str, required=True, help='Directory to save masked output') | |
| parser.add_argument('--schema', type=str, required=True, help='Path to masking schema JSON file') | |
| args = parser.parse_args() | |
| # Create output directory if it doesn't exist | |
| os.makedirs(args.output, exist_ok=True) | |
| # Start Spark | |
| spark = SparkSession.builder.master("local[*]").appName("AnonysparkCLI").getOrCreate() | |
| # Load data and schema | |
| df = spark.read.csv(args.input, header=True) | |
| with open(args.schema, 'r') as f: | |
| schema = json.load(f) | |
| # Apply masking | |
| masked_df = apply_masking(df, schema) | |
| # Save to output directory | |
| masked_df.write.mode("overwrite").csv(args.output, header=True) | |
| print(f"Masked file written to: {args.output}") | |
| if __name__ == "__main__": | |
| main() | |