26.2 异步子Agent任务
约 1387 字大约 5 分钟
异步子Agent任务概述
异步子Agent任务是Claude Code中处理长时间运行任务的重要机制。通过异步执行,子Agent可以在后台执行任务,主Agent可以继续处理其他请求,从而提高整体效率和响应速度。
异步任务的基本概念
1. 同步 vs 异步
示例:同步 vs 异步
def process_task(task):
"""同步处理任务"""
result1 = agent1.execute(task)
result2 = agent2.execute(result1)
result3 = agent3.execute(result2)
return result3
# 执行时间: 10 + 15 + 20 = 45秒异步执行:
async def process_task_async(task):
"""异步处理任务"""
result1 = await agent1.execute_async(task)
result2 = await agent2.execute_async(result1)
result3 = await agent3.execute_async(result2)
return result3
# 执行时间: 20秒(并行执行)2. 异步任务的优势
| 特性 | 同步任务 | 异步任务 |
|---|---|---|
| 执行方式 | 顺序执行 | 并行执行 |
| 响应时间 | 长 | 短 |
| 资源利用率 | 低 | 高 |
| 用户体验 | 差 | 好 |
| 复杂度 | 低 | 中 |
异步子Agent实现
1. 基础异步子Agent
示例:基础异步子Agent
用户请求: "创建一个异步代码生成子Agent"
Claude Code 生成的代码:
import asyncio
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
class AsyncCodeGenerationAgent:
"""异步代码生成子Agent"""
def __init__(self, context):
self.context = context
self.task_queue = asyncio.Queue()
self.results = {}
self.running = False
async def start(self):
"""启动Agent"""
self.running = True
logger.info("AsyncCodeGenerationAgent started")
# 启动任务处理器
asyncio.create_task(self._process_tasks())
async def stop(self):
"""停止Agent"""
self.running = False
logger.info("AsyncCodeGenerationAgent stopped")
async def submit_task(self, task_id: str, task: Dict[str, Any]):
"""提交任务"""
await self.task_queue.put((task_id, task))
logger.info(f"Task {task_id} submitted")
async def get_result(self, task_id: str) -> Dict[str, Any]:
"""获取任务结果"""
while task_id not in self.results:
await asyncio.sleep(0.1)
return self.results[task_id]
async def _process_tasks(self):
"""处理任务"""
while self.running:
try:
task_id, task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
logger.info(f"Processing task {task_id}")
# 处理任务
result = await self._generate_code(task)
# 保存结果
self.results[task_id] = result
logger.info(f"Task {task_id} completed")
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error processing task: {e}")
async def _generate_code(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""生成代码"""
# 模拟长时间运行的任务
await asyncio.sleep(2)
code = f"""
# Generated code for {task['name']}
def {task['function_name']}():
pass
"""
return {
'code': code,
'status': 'completed',
'task_id': task.get('task_id')
}
# 使用示例
async def main():
"""主函数"""
agent = AsyncCodeGenerationAgent({})
# 启动Agent
await agent.start()
# 提交任务
task1 = {'name': 'Task 1', 'function_name': 'func1'}
task2 = {'name': 'Task 2', 'function_name': 'func2'}
task3 = {'name': 'Task 3', 'function_name': 'func3'}
await agent.submit_task('task1', task1)
await agent.submit_task('task2', task2)
await agent.submit_task('task3', task3)
# 获取结果
result1 = await agent.get_result('task1')
result2 = await agent.get_result('task2')
result3 = await agent.get_result('task3')
print("Task 1 result:", result1['status'])
print("Task 2 result:", result2['status'])
print("Task 3 result:", result3['status'])
# 停止Agent
await agent.stop()
if __name__ == '__main__':
asyncio.run(main())异步子Agent特点:
- 使用asyncio实现异步处理
- 任务队列管理
- 结果存储和检索
- 后台任务处理
2. 高级异步子Agent
示例:高级异步子Agent
用户请求: "创建一个支持任务优先级和超时的异步子Agent"
Claude Code 生成的代码:
import asyncio
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import heapq
import logging
logger = logging.getLogger(__name__)
class AsyncPriorityAgent:
"""支持优先级的异步子Agent"""
def __init__(self, context, max_workers=4):
self.context = context
self.task_queue = []
self.task_lock = asyncio.Lock()
self.results = {}
self.running = False
self.max_workers = max_workers
self.workers = []
self.task_stats = {
'submitted': 0,
'completed': 0,
'failed': 0,
'timeout': 0
}
async def start(self):
"""启动Agent"""
self.running = True
logger.info(f"AsyncPriorityAgent started with {self.max_workers} workers")
# 启动工作线程
for i in range(self.max_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.workers.append(worker)
async def stop(self):
"""停止Agent"""
self.running = False
# 等待所有工作线程完成
await asyncio.gather(*self.workers, return_exceptions=True)
logger.info("AsyncPriorityAgent stopped")
async def submit_task(
self,
task_id: str,
task: Dict[str, Any],
priority: int = 0,
timeout: Optional[float] = None
):
"""提交任务"""
task_data = {
'task_id': task_id,
'task': task,
'priority': priority,
'timeout': timeout,
'submitted_at': datetime.utcnow(),
'status': 'pending'
}
async with self.task_lock:
heapq.heappush(self.task_queue, (-priority, task_data))
self.task_stats['submitted'] += 1
logger.info(f"Task {task_id} submitted with priority {priority}")
async def get_result(self, task_id: str, timeout: float = 30.0) -> Dict[str, Any]:
"""获取任务结果"""
start_time = datetime.utcnow()
while True:
if task_id in self.results:
return self.results[task_id]
# 检查超时
elapsed = (datetime.utcnow() - start_time).total_seconds()
if elapsed > timeout:
raise TimeoutError(f"Task {task_id} timeout after {timeout}s")
await asyncio.sleep(0.1)
async def _worker(self, worker_name: str):
"""工作线程"""
logger.info(f"{worker_name} started")
while self.running:
try:
# 获取任务
task_data = await self._get_task()
if task_data is None:
await asyncio.sleep(0.1)
continue
task_id = task_data['task_id']
task = task_data['task']
timeout = task_data.get('timeout')
logger.info(f"{worker_name} processing task {task_id}")
# 执行任务
try:
if timeout:
result = await asyncio.wait_for(
self._execute_task(task),
timeout=timeout
)
else:
result = await self._execute_task(task)
self.results[task_id] = {
'result': result,
'status': 'completed',
'worker': worker_name,
'completed_at': datetime.utcnow()
}
self.task_stats['completed'] += 1
logger.info(f"{worker_name} completed task {task_id}")
except asyncio.TimeoutError:
self.results[task_id] = {
'error': 'Task timeout',
'status': 'timeout',
'worker': worker_name
}
self.task_stats['timeout'] += 1
logger.warning(f"{worker_name} task {task_id} timeout")
except Exception as e:
self.results[task_id] = {
'error': str(e),
'status': 'failed',
'worker': worker_name
}
self.task_stats['failed'] += 1
logger.error(f"{worker_name} task {task_id} failed: {e}")
except Exception as e:
logger.error(f"{worker_name} error: {e}")
await asyncio.sleep(1)
logger.info(f"{worker_name} stopped")
async def _get_task(self) -> Optional[Dict[str, Any]]:
"""获取任务"""
async with self.task_lock:
if self.task_queue:
_, task_data = heapq.heappop(self.task_queue)
task_data['status'] = 'processing'
return task_data
return None
async def _execute_task(self, task: Dict[str, Any]) -> Any:
"""执行任务"""
task_type = task.get('type', 'default')
if task_type == 'code_generation':
return await self._generate_code(task)
elif task_type == 'code_review':
return await self._review_code(task)
elif task_type == 'test_generation':
return await self._generate_tests(task)
else:
return await self._default_task(task)
async def _generate_code(self, task: Dict[str, Any]) -> str:
"""生成代码"""
await asyncio.sleep(2)
return f"""
# Generated code for {task['name']}
def {task['function_name']}():
pass
"""
async def _review_code(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""审查代码"""
await asyncio.sleep(1.5)
return {
'issues': [],
'suggestions': [],
'metrics': {}
}
async def _generate_tests(self, task: Dict[str, Any]) -> str:
"""生成测试"""
await asyncio.sleep(1)
return """
import unittest
class TestGeneratedCode(unittest.TestCase):
pass
"""
async def _default_task(self, task: Dict[str, Any]) -> Any:
"""默认任务"""
await asyncio.sleep(1)
return {'result': 'completed'}
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'stats': self.task_stats.copy(),
'queue_size': len(self.task_queue),
'active_workers': len(self.workers)
}
# 使用示例
async def main():
"""主函数"""
agent = AsyncPriorityAgent({}, max_workers=4)
# 启动Agent
await agent.start()
# 提交不同优先级的任务
await agent.submit_task('task1', {'name': 'Low Priority', 'type': 'code_generation'}, priority=1)
await agent.submit_task('task2', {'name': 'High Priority', 'type': 'code_review'}, priority=10)
await agent.submit_task('task3', {'name': 'Medium Priority', 'type': 'test_generation'}, priority=5)
await agent.submit_task('task4', {'name': 'Timeout Task', 'type': 'code_generation'}, priority=8, timeout=1.0)
# 获取结果
try:
result1 = await agent.get_result('task1')
print("Task 1 result:", result1['status'])
except TimeoutError as e:
print(f"Task 1 error: {e}")
try:
result2 = await agent.get_result('task2')
print("Task 2 result:", result2['status'])
except TimeoutError as e:
print(f"Task 2 error: {e}")
try:
result3 = await agent.get_result('task3')
print("Task 3 result:", result3['status'])
except TimeoutError as e:
print(f"Task 3 error: {e}")
try:
result4 = await agent.get_result('task4')
```
if name == 'main':
asyncio.run(main())
任务监控特点:
- 实时状态跟踪
- 任务统计
- 事件记录
- 状态查询