监控与扩展
1. 监控策略
关键指标
- 请求延迟
- 吞吐量
- 错误率
- 资源使用率
- API限额使用情况
- 缓存命中率
Prometheus + Grafana配置
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'llm_app'
static_configs:
- targets: ['localhost:8000']
监控面板示例
from prometheus_client import Counter, Histogram, Gauge
# 请求计数器
REQUEST_COUNTER = Counter(
'llm_requests_total',
'Total number of LLM API requests',
['endpoint', 'status']
)
# 响应时间
RESPONSE_TIME = Histogram(
'llm_response_time',
'Response time in seconds',
['endpoint'],
buckets=(0.1, 0.5, 1.0, 2.0, 5.0)
)
# 当前活跃请求
ACTIVE_REQUESTS = Gauge(
'llm_active_requests',
'Number of active requests'
)
2. 性能分析
性能分析工具
import cProfile
import pstats
def profile_request(func):
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
try:
return profiler.runcall(func, *args, **kwargs)
finally:
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats()
return wrapper
@profile_request
async def process_llm_request(prompt: str):
# 处理请求
pass
性能日志
import time
from contextlib import contextmanager
@contextmanager
def timing_log(name: str):
start = time.time()
try:
yield
finally:
duration = time.time() - start
logger.info(f"{name} took {duration:.2f} seconds")
async def handle_request(prompt: str):
with timing_log("LLM API call"):
response = await llm_client.generate(prompt)
return response
3. 扩展模式
水平扩展
# docker-compose.yml
version: '3'
services:
app:
build: .
deploy:
replicas: 3
resources:
limits:
cpus: '0.5'
memory: 512M
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
redis:
image: redis:alpine
volumes:
- redis_data:/data
volumes:
redis_data:
队列处理
from celery import Celery
from typing import Optional
app = Celery('llm_tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def process_llm_request(self, prompt: str) -> Optional[str]:
try:
return llm_client.generate(prompt)
except Exception as exc:
self.retry(exc=exc, countdown=2 ** self.request.retries)
4. 负载测试
Locust测试脚本
from locust import HttpUser, task, between
class LLMUser(HttpUser):
wait_time = between(1, 5)
@task
def generate_text(self):
self.client.post(
"/generate",
json={"prompt": "测试提示词"},
headers={"X-API-Key": "test-key"}
)
压力测试配置
from locust import HttpUser, task, between
from typing import Dict, Any
class LoadTest(HttpUser):
wait_time = between(1, 3)
def on_start(self):
self.headers = {"X-API-Key": "test-key"}
self.test_prompts = [
"简单提示词",
"中等长度的提示词示例",
"较长的提示词带有上下文信息的示例"
]
@task(3)
def test_short_prompt(self):
self._send_request(self.test_prompts[0])
@task(2)
def test_medium_prompt(self):
self._send_request(self.test_prompts[1])
@task(1)
def test_long_prompt(self):
self._send_request(self.test_prompts[2])
def _send_request(self, prompt: str) -> Dict[str, Any]:
return self.client.post(
"/generate",
json={"prompt": prompt},
headers=self.headers
)
5. 故障恢复
熔断器模式
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
def call_llm_api(prompt: str) -> str:
try:
return llm_client.generate(prompt)
except Exception as e:
logger.error(f"LLM API call failed: {e}")
raise
降级策略
from typing import Optional
class LLMService:
def __init__(self):
self.fallback_responses = {
"error": "服务暂时不可用,请稍后重试",
"timeout": "请求处理时间过长,请简化您的提示词"
}
async def generate(
self,
prompt: str,
timeout: float = 10.0
) -> str:
try:
return await asyncio.wait_for(
self._call_api(prompt),
timeout=timeout
)
except asyncio.TimeoutError:
return self.fallback_responses["timeout"]
except Exception:
return self.fallback_responses["error"]
6. 最佳实践清单
监控设置
- 设置基础监控指标
- 配置告警规则
- 实现健康检查
- 设置日志聚合
性能优化
- 实现缓存策略
- 优化响应时间
- 减少资源消耗
- 实现请求限流
扩展配置
- 配置自动扩展
- 实现负载均衡
- 设置资源限制
- 优化数据存储
故障处理
- 实现熔断机制
- 设置重试策略
- 准备降级方案
- 制定恢复流程