Data Analysis 数据分析是AI工程的基石,让我们从数据中发现价值

数据分析与处理

学习目标

完成本模块学习后,你将能够:

  • 熟练使用Pandas进行数据操作和分析
  • 掌握数据清洗和预处理技巧
  • 进行探索性数据分析
  • 处理真实世界的数据问题

先修知识

  • Python基础编程
  • 基本的统计学概念
  • NumPy基础操作

1. Pandas高级操作

1.1 数据操作

import pandas as pd
import numpy as np

# 高效的数据读取
df = pd.read_csv('data.csv', 
                 usecols=['user_id', 'timestamp', 'value'],
                 parse_dates=['timestamp'],
                 dtype={'user_id': 'category'})

# 链式操作
result = (df
    .groupby('user_id')
    .agg({
        'value': ['mean', 'std'],
        'timestamp': ['min', 'max']
    })
    .round(2)
    .reset_index())

# 多层索引操作
df.columns = pd.MultiIndex.from_product([['A', 'B'], ['val1', 'val2']])
df.loc[:, ('A', 'val1')]  # 访问特定列

# 高效的数据筛选
mask = (df['value'] > df['value'].mean()) & \
       (df['timestamp'].dt.year == 2024)
filtered_df = df.loc[mask]

1.2 数据转换

# 类型转换和优化
df['category'] = df['category'].astype('category')
df['timestamp'] = pd.to_datetime(df['timestamp'])

# 透视表操作
pivot_table = df.pivot_table(
    values='value',
    index='user_id',
    columns='category',
    aggfunc=['mean', 'count'],
    fill_value=0
)

# 时间序列重采样
daily_stats = df.set_index('timestamp').resample('D').agg({
    'value': ['mean', 'sum', 'count'],
    'user_id': 'nunique'
})

# 自定义转换函数
def normalize_column(series):
    """标准化数值列"""
    return (series - series.mean()) / series.std()

df['value_normalized'] = df.groupby('category')['value'].transform(normalize_column)

2. 数据清洗与预处理

2.1 缺失值处理

# 检查缺失值
missing_stats = df.isnull().sum() / len(df) * 100
print("缺失值比例(%):")
print(missing_stats[missing_stats > 0])

# 智能填充缺失值
df['value'] = df['value'].fillna(df.groupby('category')['value'].transform('mean'))

# 处理异常值
def remove_outliers(group, column, n_std=3):
    """使用Z分数法删除异常值"""
    z_scores = np.abs((group[column] - group[column].mean()) / group[column].std())
    return group[z_scores < n_std]

clean_df = df.groupby('category').apply(
    lambda x: remove_outliers(x, 'value')
).reset_index(drop=True)

2.2 数据质量提升

# 重复值处理
duplicates = df[df.duplicated(subset=['user_id', 'timestamp'], keep=False)]
df = df.drop_duplicates(subset=['user_id', 'timestamp'], keep='last')

# 数据一致性检查
def validate_data(df):
    """数据验证函数"""
    checks = {
        'missing_values': df.isnull().sum().to_dict(),
        'negative_values': (df < 0).sum().to_dict(),
        'unique_counts': df.nunique().to_dict(),
        'value_ranges': {
            col: {'min': df[col].min(), 
                  'max': df[col].max()}
            for col in df.select_dtypes('number').columns
        }
    }
    return pd.DataFrame(checks)

# 数据清理pipeline
class DataCleaner:
    def __init__(self, df):
        self.df = df.copy()
    
    def remove_duplicates(self):
        self.df = self.df.drop_duplicates()
        return self
    
    def handle_missing_values(self):
        for col in self.df.columns:
            if self.df[col].dtype in ['int64', 'float64']:
                self.df[col] = self.df[col].fillna(self.df[col].median())
            else:
                self.df[col] = self.df[col].fillna(self.df[col].mode()[0])
        return self
    
    def remove_outliers(self, columns, n_std=3):
        for col in columns:
            self.df = self.df[np.abs(self.df[col] - self.df[col].mean()) <= 
                            (n_std * self.df[col].std())]
        return self
    
    def get_clean_data(self):
        return self.df

# 使用pipeline
cleaner = DataCleaner(df)
clean_df = (cleaner
    .remove_duplicates()
    .handle_missing_values()
    .remove_outliers(['value'])
    .get_clean_data())

3. 探索性数据分析

3.1 统计描述

# 基本统计量
summary = df.describe(include='all').round(2)

# 相关性分析
correlation = df.corr()
correlation_matrix = df.corr().round(2)

# 分组统计
group_stats = df.groupby('category').agg({
    'value': ['count', 'mean', 'std', 'min', 'max'],
    'user_id': 'nunique'
}).round(2)

# 时间模式分析
time_patterns = df.set_index('timestamp').groupby([
    df['timestamp'].dt.hour,
    df['timestamp'].dt.dayofweek
])['value'].mean().unstack()

3.2 数据可视化

import seaborn as sns
import matplotlib.pyplot as plt

# 设置可视化风格
plt.style.use('seaborn')
sns.set_palette("husl")

# 分布可视化
def plot_distribution(df, column):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # 直方图
    sns.histplot(data=df, x=column, kde=True, ax=ax1)
    ax1.set_title(f'{column}分布')
    
    # 箱线图
    sns.boxplot(data=df, y=column, ax=ax2)
    ax2.set_title(f'{column}箱线图')
    
    plt.tight_layout()
    return fig

# 相关性热力图
def plot_correlation_matrix(df, figsize=(10, 8)):
    plt.figure(figsize=figsize)
    sns.heatmap(df.corr(), 
                annot=True, 
                cmap='coolwarm', 
                center=0,
                fmt='.2f')
    plt.title('特征相关性矩阵')
    
# 时间序列趋势
def plot_time_trends(df, value_col, date_col='timestamp'):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))
    
    # 日趋势
    daily = df.set_index(date_col).resample('D')[value_col].mean()
    daily.plot(ax=ax1, title='日均值趋势')
    
    # 月趋势
    monthly = df.set_index(date_col).resample('M')[value_col].mean()
    monthly.plot(ax=ax2, title='月均值趋势')
    
    plt.tight_layout()
    return fig

4. 实战案例:用户行为分析

4.1 数据准备

# 加载数据
df = pd.read_csv('user_behavior.csv')

# 数据清理
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['user_id'] = df['user_id'].astype('category')
df['event_type'] = df['event_type'].astype('category')

# 特征工程
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek

4.2 用户行为分析

# 用户活跃度分析
user_activity = df.groupby('user_id').agg({
    'timestamp': ['count', 'min', 'max'],
    'event_type': lambda x: x.nunique()
}).round(2)

# 事件频率分析
event_frequency = df.groupby(['hour', 'day_of_week'])['event_type'].count().unstack()

# 用户留存分析
def calculate_retention(df, time_col='timestamp', user_col='user_id'):
    """计算用户留存率"""
    # 获取用户首次活跃时间
    first_activity = df.groupby(user_col)[time_col].min().reset_index()
    first_activity['cohort'] = first_activity[time_col].dt.to_period('M')
    
    # 计算每个用户在每个月的活跃情况
    df['activity_month'] = df[time_col].dt.to_period('M')
    monthly_activity = df.groupby([user_col, 'activity_month']).size().reset_index()
    
    # 合并首次活跃信息
    retention_data = pd.merge(monthly_activity, first_activity, on=user_col)
    retention_data['months_since_first'] = (
        retention_data['activity_month'] - retention_data['cohort']
    ).apply(lambda x: x.n)
    
    # 计算留存率
    cohort_sizes = retention_data.groupby('cohort').size()
    retention_rates = retention_data.pivot_table(
        index='cohort',
        columns='months_since_first',
        values=user_col,
        aggfunc='nunique'
    ).divide(cohort_sizes, axis=0) * 100
    
    return retention_rates.round(2)

# 计算并可视化留存率
retention_rates = calculate_retention(df)
plt.figure(figsize=(12, 8))
sns.heatmap(retention_rates, 
            annot=True, 
            fmt='.1f', 
            cmap='YlOrRd')
plt.title('用户留存率热力图 (%)')

常见问题解答

Q: 如何处理大规模数据集? A: 使用分块读取(chunk)、内存优化(如类型转换)和并行处理等技术。可以考虑使用dask或vaex等专门处理大数据的库。

Q: 如何选择合适的数据清洗策略? A: 根据数据特点和业务需求选择。对于关键特征,可能需要更保守的方法(如手动检查);对于次要特征,可以使用自动化方法(如统计填充)。

Q: 探索性数据分析的关键步骤是什么? A: 首先了解数据基本情况(大小、类型、缺失值等),然后进行统计描述,最后通过可视化深入分析数据特征和关系。要特别关注异常值和特殊模式。