工具集成:扩展智能体能力

Header Image

工具集成概述 🛠️

工具集成是扩展智能体能力的关键方法。通过集成外部工具和API,智能体可以执行更多复杂的任务,如文件操作、网络请求、数据处理等。

核心概念

  1. 工具抽象:统一的工具调用接口
  2. 能力扩展:集成外部功能
  3. 错误处理:异常管理机制
  4. 性能优化:高效调用策略

工具框架 🏗️

1. 基础架构

from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
import asyncio
import json
import logging

class Tool(ABC):
    def __init__(
        self,
        name: str,
        description: str
    ):
        self.name = name
        self.description = description
        self.logger = logging.getLogger(name)
        
    @abstractmethod
    async def execute(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行工具调用"""
        pass
        
    def validate_params(
        self,
        params: Dict[str, Any]
    ) -> bool:
        """验证参数"""
        return True

class ToolRegistry:
    def __init__(self):
        self.tools = {}
        
    def register_tool(
        self,
        tool: Tool
    ):
        """注册工具"""
        self.tools[tool.name] = tool
        
    def get_tool(
        self,
        name: str
    ) -> Optional[Tool]:
        """获取工具"""
        return self.tools.get(name)
        
    def list_tools(self) -> List[Dict[str, str]]:
        """列出所有工具"""
        return [
            {
                'name': tool.name,
                'description': tool.description
            }
            for tool in self.tools.values()
        ]

2. 工具管理

class ToolManager:
    def __init__(self):
        self.registry = ToolRegistry()
        self.execution_history = []
        
    async def execute_tool(
        self,
        tool_name: str,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行工具调用"""
        tool = self.registry.get_tool(tool_name)
        
        if not tool:
            raise ValueError(f"未知的工具: {tool_name}")
            
        # 验证参数
        if not tool.validate_params(params):
            raise ValueError(f"无效的参数: {params}")
            
        try:
            # 执行工具
            result = await tool.execute(params)
            
            # 记录执行历史
            self.execution_history.append({
                'tool': tool_name,
                'params': params,
                'result': result,
                'status': 'success'
            })
            
            return result
            
        except Exception as e:
            # 记录错误
            self.execution_history.append({
                'tool': tool_name,
                'params': params,
                'error': str(e),
                'status': 'error'
            })
            
            raise

工具示例 🔧

1. 文件操作

class FileSystemTool(Tool):
    def __init__(self):
        super().__init__(
            "file_system",
            "文件系统操作工具"
        )
        
    async def execute(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行文件操作"""
        operation = params.get('operation')
        path = params.get('path')
        
        if operation == 'read':
            return await self._read_file(path)
        elif operation == 'write':
            content = params.get('content')
            return await self._write_file(path, content)
        elif operation == 'delete':
            return await self._delete_file(path)
        else:
            raise ValueError(f"不支持的操作: {operation}")
            
    async def _read_file(
        self,
        path: str
    ) -> Dict[str, Any]:
        """读取文件"""
        try:
            async with aiofiles.open(path, 'r') as f:
                content = await f.read()
                return {
                    'success': True,
                    'content': content
                }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
            
    async def _write_file(
        self,
        path: str,
        content: str
    ) -> Dict[str, Any]:
        """写入文件"""
        try:
            async with aiofiles.open(path, 'w') as f:
                await f.write(content)
                return {
                    'success': True
                }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
            
    async def _delete_file(
        self,
        path: str
    ) -> Dict[str, Any]:
        """删除文件"""
        try:
            os.remove(path)
            return {
                'success': True
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

2. API调用

class APITool(Tool):
    def __init__(
        self,
        base_url: str,
        headers: Optional[Dict[str, str]] = None
    ):
        super().__init__(
            "api_client",
            "API调用工具"
        )
        self.base_url = base_url
        self.headers = headers or {}
        self.session = aiohttp.ClientSession(
            headers=self.headers
        )
        
    async def execute(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行API调用"""
        method = params.get('method', 'GET')
        endpoint = params.get('endpoint')
        data = params.get('data')
        
        url = f"{self.base_url}{endpoint}"
        
        try:
            async with self.session.request(
                method,
                url,
                json=data
            ) as response:
                result = await response.json()
                return {
                    'success': True,
                    'status_code': response.status,
                    'data': result
                }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
            
    async def close(self):
        """关闭会话"""
        await self.session.close()

错误处理 🚨

1. 重试机制

class RetryHandler:
    def __init__(
        self,
        max_retries: int = 3,
        delay: float = 1.0
    ):
        self.max_retries = max_retries
        self.delay = delay
        
    async def execute_with_retry(
        self,
        func: callable,
        *args,
        **kwargs
    ) -> Any:
        """执行带重试的操作"""
        retries = 0
        last_error = None
        
        while retries < self.max_retries:
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                last_error = e
                retries += 1
                
                if retries < self.max_retries:
                    await asyncio.sleep(
                        self.delay * retries
                    )
                    
        raise last_error

class RetryableTool(Tool):
    def __init__(
        self,
        name: str,
        description: str,
        max_retries: int = 3
    ):
        super().__init__(name, description)
        self.retry_handler = RetryHandler(max_retries)
        
    async def execute(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行带重试的工具调用"""
        return await self.retry_handler.execute_with_retry(
            self._execute_internal,
            params
        )
        
    @abstractmethod
    async def _execute_internal(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """实际的执行逻辑"""
        pass

2. 错误恢复

class ErrorRecovery:
    def __init__(self):
        self.error_handlers = {}
        
    def register_handler(
        self,
        error_type: type,
        handler: callable
    ):
        """注册错误处理器"""
        self.error_handlers[error_type] = handler
        
    async def handle_error(
        self,
        error: Exception,
        context: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """处理错误"""
        for error_type, handler in self.error_handlers.items():
            if isinstance(error, error_type):
                return await handler(error, context)
        return None

class RecoverableTool(RetryableTool):
    def __init__(
        self,
        name: str,
        description: str
    ):
        super().__init__(name, description)
        self.error_recovery = ErrorRecovery()
        
        # 注册默认错误处理器
        self.error_recovery.register_handler(
            ValueError,
            self._handle_value_error
        )
        self.error_recovery.register_handler(
            IOError,
            self._handle_io_error
        )
        
    async def execute(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行带错误恢复的工具调用"""
        try:
            return await super().execute(params)
        except Exception as e:
            # 尝试恢复
            recovery_result = await self.error_recovery.handle_error(
                e,
                {'params': params}
            )
            
            if recovery_result:
                return recovery_result
            raise
            
    async def _handle_value_error(
        self,
        error: ValueError,
        context: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """处理参数错误"""
        params = context['params']
        # 实现参数修正逻辑
        return None
        
    async def _handle_io_error(
        self,
        error: IOError,
        context: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """处理IO错误"""
        # 实现IO错误恢复逻辑
        return None

性能优化 ⚡

1. 并行执行

class ParallelExecutor:
    def __init__(
        self,
        max_workers: int = 5
    ):
        self.max_workers = max_workers
        self.semaphore = asyncio.Semaphore(max_workers)
        
    async def execute_parallel(
        self,
        tasks: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """并行执行多个任务"""
        async def wrapped_task(task):
            async with self.semaphore:
                return await task
                
        # 创建任务
        coroutines = [
            wrapped_task(task)
            for task in tasks
        ]
        
        # 并行执行
        results = await asyncio.gather(
            *coroutines,
            return_exceptions=True
        )
        
        return results

class OptimizedToolManager(ToolManager):
    def __init__(
        self,
        max_workers: int = 5
    ):
        super().__init__()
        self.parallel_executor = ParallelExecutor(
            max_workers
        )
        
    async def execute_batch(
        self,
        tasks: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """批量执行工具调用"""
        # 准备任务
        coroutines = [
            self.execute_tool(
                task['tool'],
                task['params']
            )
            for task in tasks
        ]
        
        # 并行执行
        return await self.parallel_executor.execute_parallel(
            coroutines
        )

2. 缓存优化

class CacheManager:
    def __init__(
        self,
        ttl: int = 3600
    ):
        self.cache = {}
        self.ttl = ttl
        
    def get(
        self,
        key: str
    ) -> Optional[Any]:
        """获取缓存"""
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry['timestamp'] < self.ttl:
                return entry['value']
            else:
                del self.cache[key]
        return None
        
    def set(
        self,
        key: str,
        value: Any
    ):
        """设置缓存"""
        self.cache[key] = {
            'value': value,
            'timestamp': time.time()
        }
        
class CachedTool(Tool):
    def __init__(
        self,
        name: str,
        description: str,
        ttl: int = 3600
    ):
        super().__init__(name, description)
        self.cache = CacheManager(ttl)
        
    async def execute(
        self,
        params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """执行带缓存的工具调用"""
        # 生成缓存键
        cache_key = self._generate_cache_key(params)
        
        # 检查缓存
        cached_result = self.cache.get(cache_key)
        if cached_result:
            return cached_result
            
        # 执行调用
        result = await super().execute(params)
        
        # 更新缓存
        self.cache.set(cache_key, result)
        
        return result
        
    def _generate_cache_key(
        self,
        params: Dict[str, Any]
    ) -> str:
        """生成缓存键"""
        return json.dumps(
            params,
            sort_keys=True
        )

最佳实践 ✨

  1. 工具设计

    • 保持接口简单
    • 实现参数验证
    • 提供详细文档
  2. 错误处理

    • 实现重试机制
    • 提供错误恢复
    • 记录详细日志
  3. 性能优化

    • 使用并行执行
    • 实现结果缓存
    • 控制资源使用
  4. 安全考虑

    • 验证输入数据
    • 控制访问权限
    • 保护敏感信息

小结 📝

本章我们学习了工具集成的核心内容:

  1. 基础框架

    • 工具抽象
    • 注册机制
    • 执行流程
  2. 实现技术

    • 文件操作
    • API调用
    • 错误处理
  3. 优化方法

    • 并行执行
    • 缓存机制
    • 资源控制

通过这些知识,我们可以为智能体集成各种外部工具,显著扩展其能力范围。在下一章中,我们将探讨如何使用MetaGPT框架开发更复杂的智能体系统。