深度学习框架实践

学习目标

完成本节学习后,你将能够:

  • 使用PyTorch构建神经网络
  • 实现数据加载和预处理
  • 掌握模型训练和评估方法
  • 进行模型部署和优化

1. PyTorch基础

1.1 张量操作

PyTorch的核心数据结构是张量(Tensor):

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

def tensor_basics():
    """PyTorch张量基础操作"""
    # 创建张量
    x = torch.tensor([[1, 2], [3, 4]], dtype=torch.float32)
    y = torch.randn(2, 2)  # 随机正态分布
    
    # 基本运算
    print("加法:", x + y)
    print("矩阵乘法:", torch.mm(x, y))
    print("元素乘法:", x * y)
    
    # 维度操作
    print("维度:", x.shape)
    print("转置:", x.t())
    
    # 设备迁移
    if torch.cuda.is_available():
        x_gpu = x.cuda()
        print("GPU张量:", x_gpu.device)
    
    # 梯度计算
    x.requires_grad_(True)  # 启用梯度计算
    z = torch.sum(x ** 2)
    z.backward()  # 反向传播
    print("梯度:", x.grad)

1.2 数据加载

使用Dataset和DataLoader进行高效的数据加载:

class CustomDataset(Dataset):
    """自定义数据集"""
    def __init__(self, X, y, transform=None):
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)
        self.transform = transform
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.y[idx]
        
        if self.transform:
            x = self.transform(x)
        
        return x, y

def create_data_loaders(X_train, y_train, X_val, y_val, batch_size=32):
    """创建数据加载器"""
    # 创建数据集
    train_dataset = CustomDataset(X_train, y_train)
    val_dataset = CustomDataset(X_val, y_val)
    
    # 创建数据加载器
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=4
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4
    )
    
    return train_loader, val_loader

2. 模型构建

2.1 神经网络模块

使用PyTorch的nn.Module构建神经网络:

class SimpleNet(nn.Module):
    """简单神经网络"""
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, output_size)
        )
    
    def forward(self, x):
        return self.network(x)

class ConvNet(nn.Module):
    """卷积神经网络"""
    def __init__(self, num_classes):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(128 * 8 * 8, 512),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

3. 模型训练

3.1 训练循环

实现完整的训练循环:

class Trainer:
    """模型训练器"""
    def __init__(self, model, criterion, optimizer, device):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device
        self.history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
    
    def train_epoch(self, train_loader):
        """训练一个epoch"""
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(self.device), target.to(self.device)
            
            # 前向传播
            self.optimizer.zero_grad()
            output = self.model(data)
            loss = self.criterion(output, target)
            
            # 反向传播
            loss.backward()
            self.optimizer.step()
            
            total_loss += loss.item()
            
            # 打印进度
            if batch_idx % 100 == 0:
                print(f'Train Batch: {batch_idx}/{len(train_loader)} '
                      f'Loss: {loss.item():.6f}')
        
        return total_loss / len(train_loader)
    
    def validate(self, val_loader):
        """验证模型"""
        self.model.eval()
        val_loss = 0
        correct = 0
        
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                val_loss += self.criterion(output, target).item()
                
                # 计算准确率
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
        
        val_loss /= len(val_loader)
        accuracy = 100. * correct / len(val_loader.dataset)
        
        return val_loss, accuracy
    
    def train(self, train_loader, val_loader, epochs, 
             scheduler=None, early_stopping=None):
        """完整训练流程"""
        best_val_loss = float('inf')
        
        for epoch in range(epochs):
            print(f'\nEpoch: {epoch+1}/{epochs}')
            
            # 训练和验证
            train_loss = self.train_epoch(train_loader)
            val_loss, val_acc = self.validate(val_loader)
            
            # 更新学习率
            if scheduler is not None:
                scheduler.step(val_loss)
            
            # 更新历史记录
            self.history['train_loss'].append(train_loss)
            self.history['val_loss'].append(val_loss)
            self.history['val_acc'].append(val_acc)
            
            # 保存最佳模型
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': self.model.state_dict(),
                    'optimizer_state_dict': self.optimizer.state_dict(),
                    'val_loss': val_loss,
                }, 'best_model.pth')
            
            # 早停
            if early_stopping is not None:
                if early_stopping(val_loss):
                    print("Early stopping triggered")
                    break
            
            print(f'Train Loss: {train_loss:.4f}')
            print(f'Val Loss: {val_loss:.4f}')
            print(f'Val Accuracy: {val_acc:.2f}%')
    
    def plot_history(self):
        """绘制训练历史"""
        plt.figure(figsize=(12, 4))
        
        # 损失曲线
        plt.subplot(1, 2, 1)
        plt.plot(self.history['train_loss'], label='train')
        plt.plot(self.history['val_loss'], label='val')
        plt.title('Loss History')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        # 准确率曲线
        plt.subplot(1, 2, 2)
        plt.plot(self.history['val_acc'])
        plt.title('Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy (%)')
        
        return plt.gcf()

4. 模型优化

4.1 学习率调度

实现学习率调整策略:

class LRScheduler:
    """学习率调度器"""
    @staticmethod
    def create_scheduler(optimizer, scheduler_type='step', **kwargs):
        """创建学习率调度器"""
        if scheduler_type == 'step':
            return optim.lr_scheduler.StepLR(
                optimizer,
                step_size=kwargs.get('step_size', 30),
                gamma=kwargs.get('gamma', 0.1)
            )
        elif scheduler_type == 'cosine':
            return optim.lr_scheduler.CosineAnnealingLR(
                optimizer,
                T_max=kwargs.get('T_max', 100)
            )
        elif scheduler_type == 'reduce_on_plateau':
            return optim.lr_scheduler.ReduceLROnPlateau(
                optimizer,
                mode='min',
                factor=kwargs.get('factor', 0.1),
                patience=kwargs.get('patience', 10)
            )
        else:
            raise ValueError(f"Unknown scheduler type: {scheduler_type}")

4.2 早停机制

实现早停策略:

class EarlyStopping:
    """早停机制"""
    def __init__(self, patience=7, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
    
    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                return True
        else:
            self.best_loss = val_loss
            self.counter = 0
        return False

5. 模型部署

5.1 模型保存和加载

def save_model(model, optimizer, epoch, filename):
    """保存模型"""
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, filename)

def load_model(model, optimizer, filename):
    """加载模型"""
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    return checkpoint['epoch']

5.2 模型推理

def inference(model, data_loader, device):
    """模型推理"""
    model.eval()
    predictions = []
    
    with torch.no_grad():
        for data, _ in data_loader:
            data = data.to(device)
            output = model(data)
            pred = output.argmax(dim=1)
            predictions.extend(pred.cpu().numpy())
    
    return predictions

练习与作业

  1. 实现不同的优化器(Adam、RMSprop)
  2. 添加模型可视化功能
  3. 实现模型量化和压缩

扩展阅读

小测验

  1. PyTorch中的autograd机制是如何工作的?
  2. DataLoader的主要参数有哪些?
  3. 如何处理GPU内存不足的问题?

下一步学习