工具集成:扩展智能体能力
工具集成概述 🛠️
工具集成是扩展智能体能力的关键方法。通过集成外部工具和API,智能体可以执行更多复杂的任务,如文件操作、网络请求、数据处理等。
核心概念
- 工具抽象:统一的工具调用接口
- 能力扩展:集成外部功能
- 错误处理:异常管理机制
- 性能优化:高效调用策略
工具框架 🏗️
1. 基础架构
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
import asyncio
import json
import logging
class Tool(ABC):
def __init__(
self,
name: str,
description: str
):
self.name = name
self.description = description
self.logger = logging.getLogger(name)
@abstractmethod
async def execute(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行工具调用"""
pass
def validate_params(
self,
params: Dict[str, Any]
) -> bool:
"""验证参数"""
return True
class ToolRegistry:
def __init__(self):
self.tools = {}
def register_tool(
self,
tool: Tool
):
"""注册工具"""
self.tools[tool.name] = tool
def get_tool(
self,
name: str
) -> Optional[Tool]:
"""获取工具"""
return self.tools.get(name)
def list_tools(self) -> List[Dict[str, str]]:
"""列出所有工具"""
return [
{
'name': tool.name,
'description': tool.description
}
for tool in self.tools.values()
]
2. 工具管理
class ToolManager:
def __init__(self):
self.registry = ToolRegistry()
self.execution_history = []
async def execute_tool(
self,
tool_name: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行工具调用"""
tool = self.registry.get_tool(tool_name)
if not tool:
raise ValueError(f"未知的工具: {tool_name}")
# 验证参数
if not tool.validate_params(params):
raise ValueError(f"无效的参数: {params}")
try:
# 执行工具
result = await tool.execute(params)
# 记录执行历史
self.execution_history.append({
'tool': tool_name,
'params': params,
'result': result,
'status': 'success'
})
return result
except Exception as e:
# 记录错误
self.execution_history.append({
'tool': tool_name,
'params': params,
'error': str(e),
'status': 'error'
})
raise
工具示例 🔧
1. 文件操作
class FileSystemTool(Tool):
def __init__(self):
super().__init__(
"file_system",
"文件系统操作工具"
)
async def execute(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行文件操作"""
operation = params.get('operation')
path = params.get('path')
if operation == 'read':
return await self._read_file(path)
elif operation == 'write':
content = params.get('content')
return await self._write_file(path, content)
elif operation == 'delete':
return await self._delete_file(path)
else:
raise ValueError(f"不支持的操作: {operation}")
async def _read_file(
self,
path: str
) -> Dict[str, Any]:
"""读取文件"""
try:
async with aiofiles.open(path, 'r') as f:
content = await f.read()
return {
'success': True,
'content': content
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def _write_file(
self,
path: str,
content: str
) -> Dict[str, Any]:
"""写入文件"""
try:
async with aiofiles.open(path, 'w') as f:
await f.write(content)
return {
'success': True
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def _delete_file(
self,
path: str
) -> Dict[str, Any]:
"""删除文件"""
try:
os.remove(path)
return {
'success': True
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
2. API调用
class APITool(Tool):
def __init__(
self,
base_url: str,
headers: Optional[Dict[str, str]] = None
):
super().__init__(
"api_client",
"API调用工具"
)
self.base_url = base_url
self.headers = headers or {}
self.session = aiohttp.ClientSession(
headers=self.headers
)
async def execute(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行API调用"""
method = params.get('method', 'GET')
endpoint = params.get('endpoint')
data = params.get('data')
url = f"{self.base_url}{endpoint}"
try:
async with self.session.request(
method,
url,
json=data
) as response:
result = await response.json()
return {
'success': True,
'status_code': response.status,
'data': result
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def close(self):
"""关闭会话"""
await self.session.close()
错误处理 🚨
1. 重试机制
class RetryHandler:
def __init__(
self,
max_retries: int = 3,
delay: float = 1.0
):
self.max_retries = max_retries
self.delay = delay
async def execute_with_retry(
self,
func: callable,
*args,
**kwargs
) -> Any:
"""执行带重试的操作"""
retries = 0
last_error = None
while retries < self.max_retries:
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
retries += 1
if retries < self.max_retries:
await asyncio.sleep(
self.delay * retries
)
raise last_error
class RetryableTool(Tool):
def __init__(
self,
name: str,
description: str,
max_retries: int = 3
):
super().__init__(name, description)
self.retry_handler = RetryHandler(max_retries)
async def execute(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行带重试的工具调用"""
return await self.retry_handler.execute_with_retry(
self._execute_internal,
params
)
@abstractmethod
async def _execute_internal(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""实际的执行逻辑"""
pass
2. 错误恢复
class ErrorRecovery:
def __init__(self):
self.error_handlers = {}
def register_handler(
self,
error_type: type,
handler: callable
):
"""注册错误处理器"""
self.error_handlers[error_type] = handler
async def handle_error(
self,
error: Exception,
context: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""处理错误"""
for error_type, handler in self.error_handlers.items():
if isinstance(error, error_type):
return await handler(error, context)
return None
class RecoverableTool(RetryableTool):
def __init__(
self,
name: str,
description: str
):
super().__init__(name, description)
self.error_recovery = ErrorRecovery()
# 注册默认错误处理器
self.error_recovery.register_handler(
ValueError,
self._handle_value_error
)
self.error_recovery.register_handler(
IOError,
self._handle_io_error
)
async def execute(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行带错误恢复的工具调用"""
try:
return await super().execute(params)
except Exception as e:
# 尝试恢复
recovery_result = await self.error_recovery.handle_error(
e,
{'params': params}
)
if recovery_result:
return recovery_result
raise
async def _handle_value_error(
self,
error: ValueError,
context: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""处理参数错误"""
params = context['params']
# 实现参数修正逻辑
return None
async def _handle_io_error(
self,
error: IOError,
context: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""处理IO错误"""
# 实现IO错误恢复逻辑
return None
性能优化 ⚡
1. 并行执行
class ParallelExecutor:
def __init__(
self,
max_workers: int = 5
):
self.max_workers = max_workers
self.semaphore = asyncio.Semaphore(max_workers)
async def execute_parallel(
self,
tasks: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""并行执行多个任务"""
async def wrapped_task(task):
async with self.semaphore:
return await task
# 创建任务
coroutines = [
wrapped_task(task)
for task in tasks
]
# 并行执行
results = await asyncio.gather(
*coroutines,
return_exceptions=True
)
return results
class OptimizedToolManager(ToolManager):
def __init__(
self,
max_workers: int = 5
):
super().__init__()
self.parallel_executor = ParallelExecutor(
max_workers
)
async def execute_batch(
self,
tasks: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""批量执行工具调用"""
# 准备任务
coroutines = [
self.execute_tool(
task['tool'],
task['params']
)
for task in tasks
]
# 并行执行
return await self.parallel_executor.execute_parallel(
coroutines
)
2. 缓存优化
class CacheManager:
def __init__(
self,
ttl: int = 3600
):
self.cache = {}
self.ttl = ttl
def get(
self,
key: str
) -> Optional[Any]:
"""获取缓存"""
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] < self.ttl:
return entry['value']
else:
del self.cache[key]
return None
def set(
self,
key: str,
value: Any
):
"""设置缓存"""
self.cache[key] = {
'value': value,
'timestamp': time.time()
}
class CachedTool(Tool):
def __init__(
self,
name: str,
description: str,
ttl: int = 3600
):
super().__init__(name, description)
self.cache = CacheManager(ttl)
async def execute(
self,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""执行带缓存的工具调用"""
# 生成缓存键
cache_key = self._generate_cache_key(params)
# 检查缓存
cached_result = self.cache.get(cache_key)
if cached_result:
return cached_result
# 执行调用
result = await super().execute(params)
# 更新缓存
self.cache.set(cache_key, result)
return result
def _generate_cache_key(
self,
params: Dict[str, Any]
) -> str:
"""生成缓存键"""
return json.dumps(
params,
sort_keys=True
)
最佳实践 ✨
工具设计
- 保持接口简单
- 实现参数验证
- 提供详细文档
错误处理
- 实现重试机制
- 提供错误恢复
- 记录详细日志
性能优化
- 使用并行执行
- 实现结果缓存
- 控制资源使用
安全考虑
- 验证输入数据
- 控制访问权限
- 保护敏感信息
小结 📝
本章我们学习了工具集成的核心内容:
基础框架
- 工具抽象
- 注册机制
- 执行流程
实现技术
- 文件操作
- API调用
- 错误处理
优化方法
- 并行执行
- 缓存机制
- 资源控制
通过这些知识,我们可以为智能体集成各种外部工具,显著扩展其能力范围。在下一章中,我们将探讨如何使用MetaGPT框架开发更复杂的智能体系统。