"""
高价值客户与产品分析脚本
根据分析方案进行客户和产品的价值分析
Author: Multi-Agent Workflow Framework
Date: 2025
"""
import pandas as pd
import numpy as np
import os
import sys
from math import ceil
from datetime import datetime
from pathlib import Path
# 获取脚本所在目录的父目录(项目根目录)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
# 配置路径
INPUT_FILE = os.path.join(PROJECT_ROOT, 'data', '脱敏数据_增强版.xlsx')
OUTPUT_FILE = os.path.join(PROJECT_ROOT, '分析结果', '高价值客户产品分析结果.xlsx')
def validate_file_exists(file_path: str, file_type: str = "文件") -> None:
"""
验证文件是否存在
Args:
file_path: 文件路径
file_type: 文件类型描述
Raises:
FileNotFoundError: 文件不存在时抛出
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"错误:找不到{file_type} {file_path}")
def validate_dataframe(df: pd.DataFrame, required_columns: list) -> None:
"""
验证DataFrame是否包含必需的列
Args:
df: 要验证的DataFrame
required_columns: 必需的列名列表
Raises:
ValueError: 缺少必需列时抛出
"""
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"错误:数据缺少必需的列: {', '.join(missing_columns)}")
def min_max_normalize(series: pd.Series) -> pd.Series:
"""
Min-Max标准化,将数据映射到[0,1]范围
Args:
series: 要标准化的Series
Returns:
标准化后的Series
"""
min_val = series.min()
max_val = series.max()
if max_val == min_val:
return pd.Series([0.5] * len(series), index=series.index)
return (series - min_val) / (max_val - min_val)
def calculate_test_days(row: pd.Series) -> float:
"""
计算测试时间(申请时间到测试返回时间的天数)
Args:
row: 包含申请时间和测试返回时间的行数据
Returns:
测试天数,如果数据无效则返回NaN
"""
if pd.isna(row.get('申请时间')) or pd.isna(row.get('测试返回时间')):
return np.nan
try:
return (row['测试返回时间'] - row['申请时间']).days
except (TypeError, AttributeError) as e:
print(f"警告:计算测试天数时出错: {e}")
return np.nan
def analyze_customers(df: pd.DataFrame) -> pd.DataFrame:
"""
客户维度分析
Args:
df: 原始数据DataFrame
Returns:
客户统计DataFrame
"""
print("正在进行客户维度分析...")
# 验证必需列
required_columns = ['客户简称', '是否调用', '测试阶段_已接入', '测试阶段_不接入',
'测试阶段_可接入', '样本量', '申请时间', '测试返回时间']
validate_dataframe(df, required_columns)
# 先计算测试天数
df['测试天数'] = df.apply(calculate_test_days, axis=1)
# 按客户简称分组聚合
try:
customer_stats = df.groupby('客户简称').agg({
'客户简称': 'first', # 保留客户简称
'是否调用': lambda x: (x == '是').sum(), # 调用记录数
'测试阶段_已接入': lambda x: (x == '是').sum(), # 已接入数
'测试阶段_不接入': lambda x: (x == '是').sum(), # 不接入数
'测试阶段_可接入': lambda x: (x == '是').sum(), # 可接入数
'样本量': 'mean', # 平均样本量
'测试天数': 'mean' # 平均测试时间
}).reset_index(drop=True)
except Exception as e:
raise RuntimeError(f"客户维度分析失败: {e}")
# 重命名列
customer_stats.columns = ['客户简称', '调用记录数', '已接入数', '不接入数', '可接入数', '平均样本量', '平均测试时间']
# 计算总记录数
customer_stats['总记录数'] = df.groupby('客户简称').size().values
# 计算可接入但未调用数
customer_stats['可接入但未调用数'] = df.groupby('客户简称').apply(
lambda x: ((x['测试阶段_可接入'] == '是') & (x['是否调用'] == '否')).sum()
).values
# 计算比率(避免除零错误)
customer_stats['调用率'] = customer_stats['调用记录数'] / customer_stats['总记录数'].replace(0, np.nan)
customer_stats['签约率'] = customer_stats['已接入数'] / customer_stats['总记录数'].replace(0, np.nan)
customer_stats['不接入率'] = customer_stats['不接入数'] / customer_stats['总记录数'].replace(0, np.nan)
# 标准化调用记录数
customer_stats['标准化调用记录数'] = min_max_normalize(customer_stats['调用记录数'])
# 计算综合评分
w1, w2, w3 = 0.50, 0.30, 0.20
customer_stats['综合评分'] = (
w1 * customer_stats['调用率'].fillna(0) +
w2 * customer_stats['标准化调用记录数'] +
w3 * customer_stats['签约率'].fillna(0)
)
# 按综合评分降序排列,评分相同时按客户简称字母顺序排序
customer_stats = customer_stats.sort_values(
by=['综合评分', '客户简称'],
ascending=[False, True]
).reset_index(drop=True)
# 添加排名(从1开始,连续排名)
customer_stats['排名'] = range(1, len(customer_stats) + 1)
return customer_stats
def analyze_products(df: pd.DataFrame) -> pd.DataFrame:
"""
产品维度分析
Args:
df: 原始数据DataFrame
Returns:
产品统计DataFrame
"""
print("正在进行产品维度分析...")
# 验证必需列
required_columns = ['子产品名称', '是否调用', '测试阶段_已接入', '测试阶段_不接入',
'测试阶段_测试中', '样本量']
validate_dataframe(df, required_columns)
# 按子产品名称分组聚合
try:
product_stats = df.groupby('子产品名称').agg({
'子产品名称': 'first', # 保留子产品名称
'是否调用': lambda x: (x == '是').sum(), # 调用数
'测试阶段_已接入': lambda x: (x == '是').sum(), # 已接入数
'测试阶段_不接入': lambda x: (x == '是').sum(), # 不接入数
'测试阶段_测试中': lambda x: (x == '是').sum(), # 测试中数
'样本量': 'mean', # 平均样本量
'测试天数': 'mean' # 平均测试时间
}).reset_index(drop=True)
except Exception as e:
raise RuntimeError(f"产品维度分析失败: {e}")
# 重命名列
product_stats.columns = ['子产品名称', '调用数', '已接入数', '不接入数', '测试中数', '平均样本量', '平均测试时间']
# 计算总记录数
product_stats['总记录数'] = df.groupby('子产品名称').size().values
# 计算比率(避免除零错误)
product_stats['调用转化率'] = product_stats['调用数'] / product_stats['总记录数'].replace(0, np.nan)
product_stats['签约转化率'] = product_stats['已接入数'] / product_stats['总记录数'].replace(0, np.nan)
product_stats['不接入率'] = product_stats['不接入数'] / product_stats['总记录数'].replace(0, np.nan)
product_stats['测试中占比'] = product_stats['测试中数'] / product_stats['总记录数'].replace(0, np.nan)
# 标准化总申请数
product_stats['标准化总申请数'] = min_max_normalize(product_stats['总记录数'])
# 计算综合评分
w1, w2, w3, w4 = 0.50, 0.25, 0.15, 0.10
product_stats['综合评分'] = (
w1 * product_stats['调用转化率'].fillna(0) +
w2 * product_stats['签约转化率'].fillna(0) +
w3 * product_stats['标准化总申请数'] -
w4 * product_stats['不接入率'].fillna(0)
)
# 按综合评分降序排列,评分相同时按子产品名称字母顺序排序
product_stats = product_stats.sort_values(
by=['综合评分', '子产品名称'],
ascending=[False, True]
).reset_index(drop=True)
# 添加排名(从1开始,连续排名)
product_stats['排名'] = range(1, len(product_stats) + 1)
return product_stats
def classify_high_low_value(stats_df: pd.DataFrame, name_col: str,
top_pct: float = 0.2, bottom_pct: float = 0.2) -> tuple:
"""
分类高价值和低价值
Args:
stats_df: 统计数据DataFrame
name_col: 名称列名
top_pct: 前百分比(高价值)
bottom_pct: 后百分比(低价值)
Returns:
(高价值DataFrame, 低价值DataFrame) 元组
"""
if stats_df.empty:
return pd.DataFrame(), pd.DataFrame()
total = len(stats_df)
top_count = max(1, ceil(total * top_pct))
bottom_count = max(1, ceil(total * bottom_pct))
high_value = stats_df.head(top_count).copy()
low_value = stats_df.tail(bottom_count).copy()
return high_value, low_value
def analyze_feature_differences(df: pd.DataFrame, high_value_names: list,
low_value_names: list, name_col: str,
categorical_features: list, numerical_features: list) -> pd.DataFrame:
"""
分析特征差异
Args:
df: 原始数据DataFrame
high_value_names: 高价值名称列表
low_value_names: 低价值名称列表
name_col: 名称列名
categorical_features: 分类型特征列表
numerical_features: 数值型特征列表
Returns:
特征差异统计DataFrame
"""
# 筛选高价值和低价值组的数据
high_value_data = df[df[name_col].isin(high_value_names)]
low_value_data = df[df[name_col].isin(low_value_names)]
results = []
# 分类型特征:计算各特征值的占比
for feature in categorical_features:
if feature not in df.columns:
continue
# 高价值组特征值分布
high_counts = high_value_data[feature].value_counts(dropna=False)
high_total = len(high_value_data)
high_pct = high_counts / high_total if high_total > 0 else pd.Series()
# 低价值组特征值分布
low_counts = low_value_data[feature].value_counts(dropna=False)
low_total = len(low_value_data)
low_pct = low_counts / low_total if low_total > 0 else pd.Series()
# 获取所有特征值(包括缺失值)
all_values = set(high_counts.index.tolist() + low_counts.index.tolist())
for value in all_values:
# 处理缺失值
is_na = pd.isna(value)
if is_na:
# 尝试不同的NaN表示方式
high_val = 0
low_val = 0
for na_val in [pd.NA, np.nan, None]:
if na_val in high_pct.index:
high_val = high_pct[na_val]
break
for na_val in [pd.NA, np.nan, None]:
if na_val in low_pct.index:
low_val = low_pct[na_val]
break
display_value = '-'
else:
high_val = high_pct.get(value, 0)
low_val = low_pct.get(value, 0)
display_value = str(value)
# 格式化显示(所有特征值都显示,包括0)
high_display = f"{high_val:.4f}"
low_display = f"{low_val:.4f}"
diff = high_val - low_val
results.append({
'特征维度': feature,
'特征值': display_value,
'高价值组占比': high_display,
'低价值组占比': low_display,
'差异': f"{diff:.4f}"
})
# 数值型特征:计算均值
for feature in numerical_features:
if feature not in df.columns:
continue
high_mean = high_value_data[feature].mean()
low_mean = low_value_data[feature].mean()
high_display = f"{high_mean:.4f}" if not pd.isna(high_mean) else "-"
low_display = f"{low_mean:.4f}" if not pd.isna(low_mean) else "-"
diff = (high_mean - low_mean) if (not pd.isna(high_mean) and not pd.isna(low_mean)) else np.nan
diff_display = f"{diff:.4f}" if not pd.isna(diff) else "-"
results.append({
'特征维度': feature,
'特征值': '-',
'高价值组占比': high_display,
'低价值组占比': low_display,
'差异': diff_display
})
return pd.DataFrame(results)
def process_data() -> dict:
"""
主处理流程
Returns:
包含分析结果的字典
Raises:
FileNotFoundError: 输入文件不存在
ValueError: 数据格式错误
RuntimeError: 处理过程出错
"""
# 验证输入文件
validate_file_exists(INPUT_FILE, "输入数据文件")
# 读取数据
print(f"正在读取数据: {INPUT_FILE}")
try:
df = pd.read_excel(INPUT_FILE)
except Exception as e:
raise RuntimeError(f"读取Excel文件失败: {e}")
if df.empty:
raise ValueError("错误:输入数据文件为空")
print(f"原始数据: {len(df)}行, {len(df.columns)}列")
# 数据预处理:转换日期类型
print("正在预处理数据...")
date_columns = ['申请时间', '测试返回时间']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# 客户维度分析
customer_stats = analyze_customers(df)
# 产品维度分析
product_stats = analyze_products(df)
# 分类高价值和低价值
print("正在分类高价值和低价值...")
high_value_customers, low_value_customers = classify_high_low_value(
customer_stats, '客户简称'
)
high_value_products, low_value_products = classify_high_low_value(
product_stats, '子产品名称'
)
# 特征差异分析
print("正在进行特征差异分析...")
# 客户特征差异
customer_categorical = ['客户类型', '部门', '销售', '反馈结果', '接入意向']
customer_numerical = ['样本量', '测试天数']
customer_feature_diff = analyze_feature_differences(
df,
high_value_customers['客户简称'].tolist(),
low_value_customers['客户简称'].tolist(),
'客户简称',
customer_categorical,
customer_numerical
)
# 产品特征差异
product_categorical = ['客户类型', '部门', '反馈结果']
product_numerical = ['样本量']
product_feature_diff = analyze_feature_differences(
df,
high_value_products['子产品名称'].tolist(),
low_value_products['子产品名称'].tolist(),
'子产品名称',
product_categorical,
product_numerical
)
# 准备输出数据
print("正在准备输出数据...")
# Sheet1: 高价值客户列表
high_customer_cols = ['客户简称', '总记录数', '调用率', '调用记录数', '签约率',
'不接入率', '可接入但未调用数', '平均样本量', '平均测试时间',
'综合评分', '排名']
high_customers_output = high_value_customers[high_customer_cols].copy()
# Sheet2: 低价值客户列表
low_customers_output = low_value_customers[high_customer_cols].copy()
# Sheet3: 高价值产品列表
high_product_cols = ['子产品名称', '总记录数', '调用转化率', '签约转化率', '不接入率',
'测试中占比', '平均样本量', '平均测试时间', '综合评分', '排名']
high_products_output = high_value_products[high_product_cols].copy()
# Sheet4: 低价值产品列表
low_products_output = low_value_products[high_product_cols].copy()
# Sheet5: 客户特征差异统计
customer_feature_output = customer_feature_diff.copy()
# Sheet6: 产品特征差异统计
product_feature_output = product_feature_diff.copy()
# Sheet7: 客户全部排名
all_customer_cols = ['排名', '客户简称', '总记录数', '调用率', '调用记录数',
'签约率', '不接入率', '可接入但未调用数', '平均样本量',
'平均测试时间', '综合评分']
all_customers_output = customer_stats[all_customer_cols].copy()
# Sheet8: 产品全部排名
all_product_cols = ['排名', '子产品名称', '总记录数', '调用转化率', '签约转化率',
'不接入率', '测试中占比', '平均样本量', '平均测试时间', '综合评分']
all_products_output = product_stats[all_product_cols].copy()
# 保存到Excel
print(f"正在保存结果到: {OUTPUT_FILE}")
try:
# 确保输出目录存在
output_dir = os.path.dirname(OUTPUT_FILE)
os.makedirs(output_dir, exist_ok=True)
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
high_customers_output.to_excel(writer, sheet_name='高价值客户列表', index=False)
low_customers_output.to_excel(writer, sheet_name='低价值客户列表', index=False)
high_products_output.to_excel(writer, sheet_name='高价值产品列表', index=False)
low_products_output.to_excel(writer, sheet_name='低价值产品列表', index=False)
customer_feature_output.to_excel(writer, sheet_name='客户特征差异统计', index=False)
product_feature_output.to_excel(writer, sheet_name='产品特征差异统计', index=False)
all_customers_output.to_excel(writer, sheet_name='客户全部排名', index=False)
all_products_output.to_excel(writer, sheet_name='产品全部排名', index=False)
except Exception as e:
raise RuntimeError(f"保存Excel文件失败: {e}")
print("✓ 分析完成!")
print(f" - 高价值客户: {len(high_customers_output)}个")
print(f" - 低价值客户: {len(low_customers_output)}个")
print(f" - 高价值产品: {len(high_products_output)}个")
print(f" - 低价值产品: {len(low_products_output)}个")
return {
'customer_stats': customer_stats,
'product_stats': product_stats,
'high_customers': high_customers_output,
'low_customers': low_customers_output,
'high_products': high_products_output,
'low_products': low_products_output
}
if __name__ == '__main__':
try:
process_data()
sys.exit(0)
except FileNotFoundError as e:
print(f"文件错误: {e}", file=sys.stderr)
sys.exit(1)
except ValueError as e:
print(f"数据错误: {e}", file=sys.stderr)
sys.exit(1)
except RuntimeError as e:
print(f"处理错误: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
except Exception as e:
print(f"未知错误: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)