1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| import asyncio import time import threading from dataclasses import dataclass from typing import Dict, List, Optional import logging
@dataclass class ToolCallRecord: """工具调用记录""" tool_name: str start_time: float end_time: Optional[float] input_data: str output_data: Optional[str] error: Optional[str] thread_id: int
class AgentExecutionMonitor: """Agent执行监控器""" def __init__(self): self.active_calls: Dict[str, ToolCallRecord] = {} self.completed_calls: List[ToolCallRecord] = [] self.call_counter = 0 self.lock = threading.Lock() def start_tool_call(self, tool_name: str, input_data: str) -> str: """记录工具调用开始""" call_id = f"{tool_name}_{self.call_counter}_{int(time.time() * 1000)}" self.call_counter += 1 record = ToolCallRecord( tool_name=tool_name, start_time=time.time(), end_time=None, input_data=input_data[:200], output_data=None, error=None, thread_id=threading.get_ident() ) with self.lock: self.active_calls[call_id] = record logging.info(f"工具调用开始: {tool_name} (ID: {call_id})") return call_id def end_tool_call(self, call_id: str, output_data: str = None, error: str = None): """记录工具调用结束""" with self.lock: if call_id in self.active_calls: record = self.active_calls[call_id] record.end_time = time.time() record.output_data = output_data[:200] if output_data else None record.error = error self.completed_calls.append(record) del self.active_calls[call_id] duration = record.end_time - record.start_time logging.info(f"工具调用结束: {record.tool_name} (耗时: {duration:.2f}s)") def get_active_calls(self) -> List[ToolCallRecord]: """获取活跃的工具调用""" with self.lock: return list(self.active_calls.values()) def analyze_deadlock_patterns(self) -> Dict: """分析死锁模式""" active_calls = self.get_active_calls() current_time = time.time() long_running_calls = [ call for call in active_calls if current_time - call.start_time > 30 ] tool_patterns = {} for call in active_calls: tool_name = call.tool_name if tool_name not in tool_patterns: tool_patterns[tool_name] = [] tool_patterns[tool_name].append(call) potential_cycles = [] for tool_name, calls in tool_patterns.items(): if len(calls) > 1: potential_cycles.append({ 'tool': tool_name, 'concurrent_calls': len(calls), 'threads': [call.thread_id for call in calls] }) return { 'active_calls_count': len(active_calls), 'long_running_calls': len(long_running_calls), 'long_running_details': [ { 'tool': call.tool_name, 'duration': current_time - call.start_time, 'thread_id': call.thread_id } for call in long_running_calls ], 'tool_patterns': tool_patterns, 'potential_cycles': potential_cycles }
class MonitoredBaseTool: """带监控的工具基类""" def __init__(self, monitor: AgentExecutionMonitor): self.monitor = monitor async def _run_with_monitoring(self, tool_name: str, input_data: str, actual_func): """带监控的工具执行""" call_id = self.monitor.start_tool_call(tool_name, input_data) try: result = await actual_func(input_data) self.monitor.end_tool_call(call_id, str(result)) return result except Exception as e: self.monitor.end_tool_call(call_id, error=str(e)) raise
|