1. 面试题目 #
在构建基于大型语言模型(LLM)的Agent时,您是否遇到过Agent陷入死循环的问题?请描述这种问题的典型场景、根本原因,并详细阐述如何从提示词设计、状态管理和兜底机制等多个维度来解决这类问题。同时,请解释为什么大型模型有时会出现这种"语境短视"的行为。
2. 参考答案 #
2.1 Agent 死循环问题的典型场景 #
在构建基于LLM的Agent时,一个常见的死循环场景是Agent在执行多步骤任务(如查询物流信息)时,无法正确判断任务的完成状态或下一步操作,从而反复执行相同的步骤。
示例场景: Agent被指示查询订单的物流信息。其调用链通常是:"先查订单号 → 得到物流单号 → 查物流单号 → 返回物流信息"。
然而,在某些情况下,Agent在获取到物流单号后,会将其与原始订单号一起再次输入给LLM。LLM可能由于"语境短视"特性,再次返回物流单号,并且没有明确的结束标志(endFlag)。这导致Agent在Java侧陷入无限循环调用,无法终止。
2.2 根本原因分析 #
Agent死循环问题的根本原因主要有以下几点:
输入提示词设计不明确: LLM Agent接收到的提示词未能清晰地指示其在不同阶段应执行的具体动作和判断条件。
缺乏明确的状态流转判断: Agent没有内置或被告知如何根据当前任务进度和已获取的信息进行状态转换,导致无法识别任务是否已完成或进入下一阶段。
欠缺调用次数/语义差异的兜底保护策略: 没有机制来限制Agent的重复调用次数,或在语义输出没有进展时强制终止或切换策略。
LLM的"语境短视"特性: 大型模型更擅长"单次输入-输出"映射,缺乏对"多步流程状态"的长期记忆。当同时输入订单号和物流单号时,模型可能仅关注"物流单号"是历史输出结果,而忽略了"这是下一步的输入条件",从而无法做出正确的决策。
2.3 解决方案与优化措施 #
为了解决Agent的死循环问题,可以从以下三个主要方面进行改造和优化:
2.3.1 优化提示词设计与强制声明 #
核心原理: 通过精心设计的提示词,明确告知Agent在不同阶段应执行的具体动作和判断条件。
实现方法:
class PromptOptimizer:
def __init__(self):
self.task_templates = {
'logistics_query': self.create_logistics_prompt(),
'order_processing': self.create_order_prompt(),
'data_extraction': self.create_extraction_prompt()
}
def create_logistics_prompt(self):
"""创建物流查询的优化提示词"""
return """
你是一个物流查询助手,请按以下步骤执行任务:
步骤1:获取订单号
- 如果用户提供了订单号,直接进入步骤2
- 如果没有,请询问用户提供订单号
步骤2:查询物流单号
- 使用订单号查询对应的物流单号
- 如果成功获取物流单号,进入步骤3
- 如果失败,返回错误信息并结束
步骤3:查询物流信息
- 使用物流单号查询详细物流信息
- 返回完整的物流跟踪信息
重要约束:
1. 每个步骤只能执行一次
2. 如果已获取物流单号,直接查询物流信息,不要重复生成
3. 任务完成时,必须返回 endFlag: true
4. 如果遇到错误,返回 endFlag: false 并说明原因
当前状态:{current_state}
用户输入:{user_input}
"""
def create_state_aware_prompt(self, task_type, current_state, user_input):
"""创建状态感知的提示词"""
base_prompt = self.task_templates[task_type]
# 添加状态信息
state_info = f"""
当前任务状态:
- 步骤:{current_state.get('step', 1)}
- 已获取信息:{current_state.get('collected_data', {})}
- 执行次数:{current_state.get('execution_count', 0)}
- 是否完成:{current_state.get('is_completed', False)}
"""
return base_prompt.format(
current_state=state_info,
user_input=user_input
)2.3.2 引入状态变量与状态流转判断 #
核心原理: 通过状态管理机制,跟踪任务执行进度,并根据当前状态决定下一步操作。
实现方法:
class StatefulAgent:
def __init__(self, llm_client):
self.llm_client = llm_client
self.state_manager = StateManager()
self.prompt_optimizer = PromptOptimizer()
def execute_task(self, task_type, user_input):
"""执行任务,支持状态管理"""
# 1. 获取当前状态
current_state = self.state_manager.get_current_state(task_type)
# 2. 检查是否已完成
if current_state.get('is_completed', False):
return self.handle_completed_task(current_state)
# 3. 检查执行次数限制
if current_state.get('execution_count', 0) >= 5:
return self.handle_max_attempts_reached()
# 4. 构建状态感知的提示词
prompt = self.prompt_optimizer.create_state_aware_prompt(
task_type, current_state, user_input
)
# 5. 调用LLM
response = self.llm_client.generate(prompt)
# 6. 解析响应并更新状态
parsed_response = self.parse_response(response)
updated_state = self.update_state(current_state, parsed_response)
# 7. 保存状态
self.state_manager.save_state(task_type, updated_state)
return parsed_response
def update_state(self, current_state, response):
"""更新任务状态"""
updated_state = current_state.copy()
# 更新执行次数
updated_state['execution_count'] = current_state.get('execution_count', 0) + 1
# 更新步骤
if 'next_step' in response:
updated_state['step'] = response['next_step']
# 更新收集的数据
if 'collected_data' in response:
updated_state['collected_data'].update(response['collected_data'])
# 检查是否完成
if response.get('endFlag', False):
updated_state['is_completed'] = True
updated_state['completion_time'] = time.time()
return updated_state
class StateManager:
def __init__(self):
self.states = {}
self.max_retention_time = 3600 # 1小时
def get_current_state(self, task_type):
"""获取当前任务状态"""
if task_type not in self.states:
self.states[task_type] = {
'step': 1,
'collected_data': {},
'execution_count': 0,
'is_completed': False,
'created_time': time.time()
}
return self.states[task_type]
def save_state(self, task_type, state):
"""保存任务状态"""
self.states[task_type] = state
def cleanup_expired_states(self):
"""清理过期状态"""
current_time = time.time()
expired_tasks = []
for task_type, state in self.states.items():
if current_time - state.get('created_time', 0) > self.max_retention_time:
expired_tasks.append(task_type)
for task_type in expired_tasks:
del self.states[task_type]2.3.3 设置调用兜底机制 #
核心原理: 通过多种兜底机制,防止Agent陷入无限循环,确保系统的稳定性和可靠性。
实现方法:
class SafeguardMechanism:
def __init__(self):
self.max_attempts = 5
self.max_execution_time = 300 # 5分钟
self.semantic_threshold = 0.8 # 语义相似度阈值
def check_execution_limits(self, state):
"""检查执行限制"""
# 1. 检查最大尝试次数
if state.get('execution_count', 0) >= self.max_attempts:
return {
'should_stop': True,
'reason': 'max_attempts_reached',
'message': '已达到最大尝试次数,任务终止'
}
# 2. 检查执行时间
if time.time() - state.get('created_time', 0) > self.max_execution_time:
return {
'should_stop': True,
'reason': 'timeout',
'message': '任务执行超时,任务终止'
}
return {'should_stop': False}
def check_semantic_progress(self, current_response, previous_responses):
"""检查语义进展"""
if len(previous_responses) < 2:
return {'has_progress': True}
# 计算当前响应与历史响应的相似度
similarities = []
for prev_response in previous_responses[-3:]: # 检查最近3次响应
similarity = self.calculate_similarity(
current_response,
prev_response
)
similarities.append(similarity)
# 如果相似度都很高,说明没有进展
if all(sim > self.semantic_threshold for sim in similarities):
return {
'has_progress': False,
'reason': 'semantic_stagnation',
'message': '检测到语义停滞,可能陷入循环'
}
return {'has_progress': True}
def check_end_flag(self, response):
"""检查结束标志"""
if response.get('endFlag', False):
return {
'should_stop': True,
'reason': 'normal_completion',
'message': '任务正常完成'
}
return {'should_stop': False}
def calculate_similarity(self, text1, text2):
"""计算文本相似度"""
# 使用简单的词汇重叠度计算
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
class RobustAgent:
def __init__(self, llm_client):
self.llm_client = llm_client
self.state_manager = StateManager()
self.safeguard = SafeguardMechanism()
self.response_history = []
def execute_with_safeguards(self, task_type, user_input):
"""带兜底机制的任务执行"""
try:
# 1. 获取当前状态
current_state = self.state_manager.get_current_state(task_type)
# 2. 检查执行限制
limit_check = self.safeguard.check_execution_limits(current_state)
if limit_check['should_stop']:
return self.handle_termination(limit_check)
# 3. 执行任务
response = self.execute_task(task_type, user_input, current_state)
# 4. 检查语义进展
progress_check = self.safeguard.check_semantic_progress(
response, self.response_history
)
if not progress_check['has_progress']:
return self.handle_semantic_stagnation(progress_check)
# 5. 检查结束标志
end_check = self.safeguard.check_end_flag(response)
if end_check['should_stop']:
return self.handle_normal_completion(response)
# 6. 记录响应历史
self.response_history.append(response)
if len(self.response_history) > 10: # 只保留最近10次响应
self.response_history.pop(0)
# 7. 更新状态
updated_state = self.update_state(current_state, response)
self.state_manager.save_state(task_type, updated_state)
return response
except Exception as e:
return self.handle_exception(e)
def handle_termination(self, termination_info):
"""处理任务终止"""
return {
'status': 'terminated',
'reason': termination_info['reason'],
'message': termination_info['message'],
'endFlag': True
}
def handle_semantic_stagnation(self, stagnation_info):
"""处理语义停滞"""
return {
'status': 'stagnated',
'reason': stagnation_info['reason'],
'message': stagnation_info['message'],
'endFlag': True
}
def handle_normal_completion(self, response):
"""处理正常完成"""
return {
'status': 'completed',
'response': response,
'endFlag': True
}
def handle_exception(self, exception):
"""处理异常"""
return {
'status': 'error',
'message': f'任务执行出错: {str(exception)}',
'endFlag': True
}2.4 高级优化策略 #
2.4.1 智能重试机制 #
class IntelligentRetryMechanism:
def __init__(self):
self.retry_strategies = {
'prompt_refinement': self.refine_prompt,
'context_expansion': self.expand_context,
'strategy_switch': self.switch_strategy
}
def should_retry(self, state, response):
"""判断是否应该重试"""
# 检查是否达到重试条件
if state.get('execution_count', 0) >= 3:
return False
# 检查响应质量
if self.is_low_quality_response(response):
return True
# 检查是否陷入循环
if self.is_circular_response(response, state):
return True
return False
def get_retry_strategy(self, state, response):
"""获取重试策略"""
if self.is_circular_response(response, state):
return 'strategy_switch'
elif self.is_low_quality_response(response):
return 'prompt_refinement'
else:
return 'context_expansion'
def refine_prompt(self, original_prompt, state):
"""优化提示词"""
refined_prompt = f"""
{original_prompt}
注意:当前已执行 {state.get('execution_count', 0)} 次,
请确保提供明确的下一步指示,避免重复操作。
"""
return refined_prompt
def expand_context(self, context, state):
"""扩展上下文"""
expanded_context = f"""
{context}
历史执行记录:
{self.format_execution_history(state)}
请基于以上历史记录,避免重复操作。
"""
return expanded_context
def switch_strategy(self, current_strategy, state):
"""切换策略"""
# 从对话式切换到指令式
if current_strategy == 'conversational':
return 'instructional'
# 从指令式切换到模板式
elif current_strategy == 'instructional':
return 'template'
else:
return 'conversational'2.4.2 动态超时调整 #
class DynamicTimeoutManager:
def __init__(self):
self.base_timeout = 60 # 基础超时时间
self.max_timeout = 300 # 最大超时时间
self.timeout_multiplier = 1.5 # 超时倍数
def calculate_timeout(self, task_complexity, execution_count):
"""计算动态超时时间"""
# 根据任务复杂度调整
complexity_factor = self.get_complexity_factor(task_complexity)
# 根据执行次数调整
retry_factor = min(execution_count * 0.2, 1.0)
# 计算最终超时时间
timeout = self.base_timeout * complexity_factor * (1 + retry_factor)
return min(timeout, self.max_timeout)
def get_complexity_factor(self, complexity):
"""获取复杂度因子"""
complexity_factors = {
'simple': 1.0,
'medium': 1.5,
'complex': 2.0,
'very_complex': 2.5
}
return complexity_factors.get(complexity, 1.0)2.5 LLM"语境短视"行为的解释 #
大型模型之所以有时会表现出"语境短视",是因为它们在设计上更擅长于处理局部上下文和单次输入-输出映射。尽管它们拥有庞大的参数量和训练数据,但在处理需要多步流程状态的复杂任务时,其"长期记忆"和对任务整体流程的理解能力相对较弱。
具体表现:
- 缺乏状态记忆: LLM无法有效记住之前执行过的步骤和获取的信息
- 局部优化: 模型倾向于优化当前输入-输出对,而忽略整体任务目标
- 上下文混淆: 当历史输出作为新输入时,模型可能无法区分这是中间结果还是新的查询
解决方案:
- 外部状态管理: 通过外部系统维护任务状态,而不是依赖LLM的记忆
- 明确的状态标识: 在每次调用时明确告知模型当前状态
- 结构化输出: 要求模型输出结构化的响应,包含状态信息
- 多轮对话管理: 通过对话管理机制,确保上下文的一致性
2.6 实际应用示例 #
# 完整的防死循环Agent实现
class AntiLoopAgent:
def __init__(self, llm_client):
self.llm_client = llm_client
self.state_manager = StateManager()
self.safeguard = SafeguardMechanism()
self.retry_mechanism = IntelligentRetryMechanism()
self.timeout_manager = DynamicTimeoutManager()
def execute_task_safely(self, task_type, user_input):
"""安全执行任务,防止死循环"""
start_time = time.time()
while True:
# 1. 获取当前状态
current_state = self.state_manager.get_current_state(task_type)
# 2. 检查各种限制
if self.should_terminate(current_state, start_time):
return self.create_termination_response()
# 3. 构建优化的提示词
prompt = self.build_optimized_prompt(task_type, current_state, user_input)
# 4. 执行任务
response = self.llm_client.generate(prompt)
# 5. 解析和验证响应
parsed_response = self.parse_and_validate_response(response)
# 6. 检查是否完成
if parsed_response.get('endFlag', False):
return self.create_completion_response(parsed_response)
# 7. 更新状态
self.update_state(current_state, parsed_response)
# 8. 检查是否需要重试
if self.retry_mechanism.should_retry(current_state, parsed_response):
retry_strategy = self.retry_mechanism.get_retry_strategy(
current_state, parsed_response
)
self.apply_retry_strategy(retry_strategy, current_state)
continue
# 9. 返回当前响应
return parsed_response
def should_terminate(self, state, start_time):
"""检查是否应该终止"""
# 检查执行次数
if state.get('execution_count', 0) >= 5:
return True
# 检查执行时间
if time.time() - start_time > 300: # 5分钟
return True
# 检查语义停滞
if self.safeguard.check_semantic_progress(
state.get('last_response', ''),
state.get('response_history', [])
)['has_progress'] == False:
return True
return False2.7 总结 #
通过以上多层次的解决方案,可以有效防止LLM Agent陷入死循环:
- 提示词优化: 明确的任务步骤和完成条件
- 状态管理: 外部状态跟踪和流转控制
- 兜底机制: 多重保护措施确保系统稳定性
- 智能重试: 动态调整策略和超时时间
- 异常处理: 完善的错误处理和恢复机制
这些措施不仅解决了死循环问题,还提升了Agent的整体可靠性和用户体验。