"""
反脱敏脚本
对分析结果中的markdown文档和excel表格进行反脱敏映射
Author: Multi-Agent Workflow Framework
Date: 2025
"""
import pandas as pd
import json
import os
import re
import sys
from pathlib import Path
# 获取脚本所在目录的父目录(项目根目录)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
# 配置路径
MAPPING_FILE = os.path.join(PROJECT_ROOT, 'data', '映射记录.json')
INPUT_MARKDOWN = os.path.join(PROJECT_ROOT, '分析结果', '高价值客户产品分析报告.md')
INPUT_EXCEL = os.path.join(PROJECT_ROOT, '分析结果', '高价值客户产品分析结果.xlsx')
OUTPUT_DIR = os.path.join(PROJECT_ROOT, '分析结果', '真实结论')
def validate_file_exists(file_path: str, file_type: str = "文件") -> None:
"""
验证文件是否存在
Args:
file_path: 文件路径
file_type: 文件类型描述
Raises:
FileNotFoundError: 文件不存在时抛出
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"错误:找不到{file_type} {file_path}")
def load_mapping() -> dict:
"""
加载映射关系
Returns:
反向映射字典(从脱敏值到真实值)
Raises:
FileNotFoundError: 映射文件不存在
json.JSONDecodeError: JSON解析错误
"""
validate_file_exists(MAPPING_FILE, "映射文件")
print(f"正在加载映射关系: {MAPPING_FILE}")
try:
with open(MAPPING_FILE, 'r', encoding='utf-8') as f:
mapping_data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"映射文件JSON格式错误: {e}")
except Exception as e:
raise RuntimeError(f"读取映射文件失败: {e}")
# 构建反向映射字典(从脱敏值到真实值)
reverse_mappings = {}
mapping_tables = mapping_data.get('mapping_tables', {})
if not mapping_tables:
raise ValueError("映射文件中没有找到 mapping_tables 字段")
for field, mapping_dict in mapping_tables.items():
if not isinstance(mapping_dict, dict):
print(f"警告:字段 {field} 的映射不是字典格式,跳过")
continue
reverse_mappings[field] = {v: k for k, v in mapping_dict.items()}
print(f"已加载 {len(mapping_tables)} 个字段的映射关系")
return reverse_mappings
def desensitize_text(text: str, reverse_mappings: dict) -> str:
"""
对文本进行反脱敏处理
Args:
text: 要处理的文本
reverse_mappings: 反向映射字典
Returns:
反脱敏后的文本
"""
result = text
# 按字段优先级处理(先处理更具体的字段,避免误替换)
# 按字段名长度降序排列,先处理长字段名
sorted_fields = sorted(reverse_mappings.keys(), key=len, reverse=True)
for field in sorted_fields:
mapping = reverse_mappings[field]
for masked_value, real_value in mapping.items():
# 使用正则表达式进行精确匹配替换
# 匹配完整的词(前后是边界或标点)
try:
pattern = r'\b' + re.escape(str(masked_value)) + r'\b'
result = re.sub(pattern, str(real_value), result)
except re.error as e:
print(f"警告:字段 {field} 的值 {masked_value} 正则表达式错误: {e}")
continue
return result
def desensitize_markdown(input_file: str, output_file: str, reverse_mappings: dict) -> None:
"""
对markdown文件进行反脱敏
Args:
input_file: 输入Markdown文件路径
output_file: 输出Markdown文件路径
reverse_mappings: 反向映射字典
Raises:
FileNotFoundError: 输入文件不存在
IOError: 文件读写错误
"""
validate_file_exists(input_file, "Markdown文件")
print(f"正在处理Markdown文件: {input_file}")
try:
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
raise IOError(f"读取Markdown文件失败: {e}")
if not content:
raise ValueError(f"Markdown文件为空: {input_file}")
# 进行反脱敏
desensitized_content = desensitize_text(content, reverse_mappings)
# 保存结果
try:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(desensitized_content)
except Exception as e:
raise IOError(f"保存Markdown文件失败: {e}")
print(f"已保存反脱敏后的Markdown: {output_file}")
def desensitize_dataframe(df: pd.DataFrame, reverse_mappings: dict) -> pd.DataFrame:
"""
对DataFrame进行反脱敏处理
Args:
df: 要处理的DataFrame
reverse_mappings: 反向映射字典
Returns:
反脱敏后的DataFrame
"""
df_result = df.copy()
# 定义需要反脱敏的字段映射(列名 -> 映射字段名)
field_column_mapping = {
'客户简称': '客户简称',
'客户类型': '客户类型',
'销售': '销售',
'部门': '部门名称',
'部门名称': '部门名称',
'子产品名称': '子产品名称',
'售前主产品名称': '售前主产品名称',
'系统主产品名称': '系统主产品名称'
}
# 遍历所有列,查找需要反脱敏的列
for col in df_result.columns:
# 检查列名是否在映射关系中
if col in field_column_mapping:
field_name = field_column_mapping[col]
if field_name in reverse_mappings:
mapping = reverse_mappings[field_name]
# 对该列进行反脱敏
if df_result[col].dtype == 'object': # 字符串类型
df_result[col] = df_result[col].apply(
lambda x: mapping.get(str(x), x) if pd.notna(x) and str(x) in mapping else x
)
else:
# 尝试模糊匹配(列名包含字段名)
for field_name, mapping in reverse_mappings.items():
if field_name in col:
# 对该列进行反脱敏
if df_result[col].dtype == 'object': # 字符串类型
df_result[col] = df_result[col].apply(
lambda x: mapping.get(str(x), x) if pd.notna(x) and str(x) in mapping else x
)
break
return df_result
def desensitize_excel(input_file: str, output_file: str, reverse_mappings: dict) -> None:
"""
对Excel文件进行反脱敏处理
Args:
input_file: 输入Excel文件路径
output_file: 输出Excel文件路径
reverse_mappings: 反向映射字典
Raises:
FileNotFoundError: 输入文件不存在
RuntimeError: Excel处理错误
"""
validate_file_exists(input_file, "Excel文件")
print(f"正在处理Excel文件: {input_file}")
try:
# 读取所有sheet
excel_file = pd.ExcelFile(input_file)
sheet_names = excel_file.sheet_names
except Exception as e:
raise RuntimeError(f"读取Excel文件失败: {e}")
if not sheet_names:
raise ValueError(f"Excel文件中没有Sheet: {input_file}")
print(f"发现 {len(sheet_names)} 个Sheet: {', '.join(sheet_names)}")
# 创建输出目录
try:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
except Exception as e:
raise IOError(f"创建输出目录失败: {e}")
# 使用ExcelWriter写入所有sheet
try:
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
for sheet_name in sheet_names:
print(f" 正在处理Sheet: {sheet_name}")
try:
df = pd.read_excel(input_file, sheet_name=sheet_name)
# 进行反脱敏
df_desensitized = desensitize_dataframe(df, reverse_mappings)
# 写入新的Excel文件
df_desensitized.to_excel(writer, sheet_name=sheet_name, index=False)
except Exception as e:
print(f"警告:处理Sheet {sheet_name} 时出错: {e}")
# 继续处理其他Sheet
continue
except Exception as e:
raise RuntimeError(f"保存Excel文件失败: {e}")
print(f"已保存反脱敏后的Excel: {output_file}")
def process_all() -> None:
"""
处理所有文件
Raises:
各种异常会在函数内部处理并打印错误信息
"""
print("=" * 60)
print("开始反脱敏处理")
print("=" * 60)
try:
# 加载映射关系
reverse_mappings = load_mapping()
if not reverse_mappings:
raise ValueError("映射关系为空,无法进行反脱敏处理")
# 处理Markdown文件
output_markdown = os.path.join(OUTPUT_DIR, '高价值客户产品分析报告.md')
desensitize_markdown(INPUT_MARKDOWN, output_markdown, reverse_mappings)
# 处理Excel文件
output_excel = os.path.join(OUTPUT_DIR, '高价值客户产品分析结果.xlsx')
desensitize_excel(INPUT_EXCEL, output_excel, reverse_mappings)
print("=" * 60)
print("反脱敏处理完成!")
print(f"结果已保存到: {OUTPUT_DIR}")
print("=" * 60)
except Exception as e:
print("=" * 60)
print(f"反脱敏处理失败: {e}")
print("=" * 60)
raise
if __name__ == '__main__':
try:
process_all()
sys.exit(0)
except FileNotFoundError as e:
print(f"文件错误: {e}", file=sys.stderr)
sys.exit(1)
except ValueError as e:
print(f"数据错误: {e}", file=sys.stderr)
sys.exit(1)
except RuntimeError as e:
print(f"处理错误: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
except Exception as e:
print(f"未知错误: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)