高性能计算
课程介绍
本模块聚焦AI开发中的高性能计算技术,教你如何优化代码性能,充分利用硬件资源。通过实际案例,掌握从CPU到GPU的各种加速技术。
学习目标
完成本模块学习后,你将能够:
- 使用NumPy进行高效计算
- 实现并行和分布式计算
- 使用GPU加速深度学习
- 优化大规模数据处理性能
1. NumPy优化技巧
1.1 向量化运算
# ⚡️ 实战案例:图像处理优化
import numpy as np
import time
def process_image_loop(image):
"""使用循环处理图像(慢)"""
height, width = image.shape
result = np.zeros_like(image)
for i in range(1, height-1):
for j in range(1, width-1):
# 3x3卷积核
result[i, j] = np.sum(image[i-1:i+2, j-1:j+2]) / 9
return result
def process_image_vectorized(image):
"""使用向量化处理图像(快)"""
# 使用卷积操作
kernel = np.ones((3, 3)) / 9
return np.correlate(image, kernel, mode='same')
# 性能对比
image = np.random.rand(1000, 1000)
start = time.time()
result_loop = process_image_loop(image)
print(f"Loop time: {time.time() - start:.2f}s")
start = time.time()
result_vectorized = process_image_vectorized(image)
print(f"Vectorized time: {time.time() - start:.2f}s")
1.2 内存优化
# 💾 实战案例:内存优化
def optimize_array_memory():
"""内存使用优化技巧"""
# 1. 使用适当的数据类型
x = np.zeros(1000000, dtype=np.float32) # 而不是float64
# 2. 使用视图而不是复制
y = x.view() # 而不是x.copy()
# 3. 使用内存映射
data = np.memmap('large_array.npy', dtype=np.float32, mode='w+', shape=(1000000,))
# 4. 使用生成器处理大数据
def process_chunks(array, chunk_size=1000):
for i in range(0, len(array), chunk_size):
yield array[i:i + chunk_size]
2. 并行计算基础
2.1 多进程处理
# 🔄 实战案例:并行数据处理
from multiprocessing import Pool
import numpy as np
def process_chunk(chunk):
"""处理数据块的函数"""
# 示例:计算每个数字的平方根和平方的和
return np.sum(np.sqrt(chunk) + np.square(chunk))
def parallel_processing(data, num_processes=4):
"""并行处理大数据集"""
# 将数据分割成块
chunks = np.array_split(data, num_processes)
# 创建进程池
with Pool(num_processes) as pool:
# 并行处理每个块
results = pool.map(process_chunk, chunks)
# 合并结果
return sum(results)
# 使用示例
data = np.random.rand(1000000)
result = parallel_processing(data)
2.2 异步处理
# ⚡️ 实战案例:异步数据加载
import asyncio
import aiohttp
import aiofiles
async def download_file(url, filename):
"""异步下载文件"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
async with aiofiles.open(filename, mode='wb') as f:
await f.write(await response.read())
return filename
return None
async def process_file(filename):
"""异步处理文件"""
async with aiofiles.open(filename, mode='r') as f:
content = await f.read()
# 处理文件内容
return len(content)
async def main():
"""主异步函数"""
# 下载任务
download_tasks = [
download_file(url, f"file_{i}.txt")
for i, url in enumerate(urls)
]
# 并发下载
files = await asyncio.gather(*download_tasks)
# 处理任务
process_tasks = [
process_file(f) for f in files if f is not None
]
# 并发处理
results = await asyncio.gather(*process_tasks)
return results
3. GPU加速入门
3.1 CUDA基础
# 🚀 实战案例:GPU加速计算
import torch
def cuda_acceleration_demo():
"""GPU加速示例"""
# 检查GPU是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 创建大矩阵
size = 10000
a = torch.randn(size, size)
b = torch.randn(size, size)
# CPU计算
start = time.time()
c_cpu = torch.mm(a, b)
cpu_time = time.time() - start
print(f"CPU time: {cpu_time:.2f}s")
# GPU计算
if device.type == 'cuda':
a = a.to(device)
b = b.to(device)
start = time.time()
c_gpu = torch.mm(a, b)
gpu_time = time.time() - start
print(f"GPU time: {gpu_time:.2f}s")
print(f"Speedup: {cpu_time/gpu_time:.1f}x")
3.2 CuPy使用
# 📊 实战案例:CuPy数据处理
import cupy as cp
def cupy_processing_demo():
"""CuPy处理示例"""
# 创建GPU数组
x_gpu = cp.random.random((1000, 1000))
y_gpu = cp.random.random((1000, 1000))
# GPU上进行计算
start = time.time()
# 矩阵乘法
z_gpu = cp.dot(x_gpu, y_gpu)
# FFT变换
fft_gpu = cp.fft.fft2(z_gpu)
# 卷积操作
kernel = cp.random.random((3, 3))
conv_gpu = cp.correlate2d(z_gpu, kernel, mode='same')
gpu_time = time.time() - start
print(f"GPU processing time: {gpu_time:.2f}s")
# 将结果转回CPU
z_cpu = cp.asnumpy(z_gpu)
return z_cpu
实战项目:图像处理加速器
项目描述
实现一个高性能的图像处理系统,结合CPU和GPU加速技术。
项目代码框架
class ImageProcessor:
def __init__(self, use_gpu=True):
self.device = 'gpu' if use_gpu and torch.cuda.is_available() else 'cpu'
self.model = self.load_model()
def load_model(self):
"""加载预训练模型"""
model = torch.hub.load('pytorch/vision:v0.10.0',
'resnet18', pretrained=True)
if self.device == 'gpu':
model = model.cuda()
return model
def process_batch(self, images):
"""批量处理图像"""
# 转换为张量
batch = torch.stack([self.preprocess(img) for img in images])
if self.device == 'gpu':
batch = batch.cuda()
# 使用模型处理
with torch.no_grad():
output = self.model(batch)
# 后处理
results = self.postprocess(output)
return results
def parallel_process(self, image_list, batch_size=32, num_workers=4):
"""并行处理大量图像"""
# 创建数据加载器
dataset = ImageDataset(image_list)
loader = DataLoader(dataset, batch_size=batch_size,
num_workers=num_workers)
results = []
for batch in loader:
batch_results = self.process_batch(batch)
results.extend(batch_results)
return results
练习与作业
- 实现矩阵运算的GPU加速
- 创建并行数据处理pipeline
- 优化大规模计算性能
扩展阅读
小测验
- 向量化计算的优势是什么?
- 如何选择合适的并行策略?
- GPU加速的适用场景有哪些?
下一步学习
- 分布式计算
- 深度学习优化
- 模型部署加速
常见问题解答
Q: CPU和GPU如何选择? A: 根据任务特点选择:CPU适合逻辑复杂的串行任务,GPU适合大规模并行的数值计算。
Q: 如何避免内存溢出? A: 使用生成器、内存映射、分批处理等技术,避免一次加载过多数据。