codefuse-chatbot/dev_opsgpt/connector/agents/executor_agent.py

217 lines
11 KiB
Python
Raw Normal View History

from pydantic import BaseModel
from typing import List, Union, Tuple, Any
import re
import json
import traceback
import copy
from loguru import logger
from langchain.prompts.chat import ChatPromptTemplate
from dev_opsgpt.connector.schema import (
Memory, Task, Env, Role, Message, ActionStatus
)
from dev_opsgpt.llm_models import getChatModel
from dev_opsgpt.connector.configs.prompts import EXECUTOR_PROMPT_INPUT, BEGIN_PROMPT_INPUT
from dev_opsgpt.connector.utils import parse_section
from .base_agent import BaseAgent
class ExecutorAgent(BaseAgent):
def __init__(
self,
role: Role,
task: Task = None,
memory: Memory = None,
chat_turn: int = 1,
do_search: bool = False,
do_doc_retrieval: bool = False,
do_tool_retrieval: bool = False,
temperature: float = 0.2,
stop: Union[List[str], str] = None,
do_filter: bool = True,
do_use_self_memory: bool = True,
focus_agents: List[str] = [],
focus_message_keys: List[str] = [],
# prompt_mamnger: PromptManager
):
super().__init__(role, task, memory, chat_turn, do_search, do_doc_retrieval,
do_tool_retrieval, temperature, stop, do_filter,do_use_self_memory,
focus_agents, focus_message_keys
)
self.do_all_task = True # run all tasks
def arun(self, query: Message, history: Memory = None, background: Memory = None, memory_pool: Memory=None) -> Message:
'''agent reponse from multi-message'''
# insert query into memory
task_executor_memory = Memory(messages=[])
# insert query
output_message = Message(
role_name=self.role.role_name,
role_type="ai", #self.role.role_type,
role_content=query.input_query,
step_content="",
input_query=query.input_query,
tools=query.tools,
parsed_output_list=[query.parsed_output],
customed_kargs=query.customed_kargs
)
self_memory = self.memory if self.do_use_self_memory else None
plan_step = int(query.parsed_output.get("PLAN_STEP", 0))
# 如果存在plan字段且plan字段为str的时候
if "PLAN" not in query.parsed_output or isinstance(query.parsed_output.get("PLAN", []), str) or plan_step >= len(query.parsed_output.get("PLAN", [])):
query_c = copy.deepcopy(query)
query_c = self.start_action_step(query_c)
query_c.parsed_output = {"Question": query_c.input_query}
task_executor_memory.append(query_c)
for output_message, task_executor_memory in self._arun_step(output_message, query_c, self_memory, history, background, memory_pool, task_executor_memory):
pass
# task_executor_memory.append(query_c)
# content = "the execution step of the plan is exceed the planned scope."
# output_message.parsed_dict = {"Thought": content, "Action Status": "finished", "Action": content}
# task_executor_memory.append(output_message)
elif "PLAN" in query.parsed_output:
logger.debug(f"{query.parsed_output['PLAN']}")
if self.do_all_task:
# run all tasks step by step
for task_content in query.parsed_output["PLAN"][plan_step:]:
# create your llm prompt
query_c = copy.deepcopy(query)
query_c.parsed_output = {"Question": task_content}
task_executor_memory.append(query_c)
for output_message, task_executor_memory in self._arun_step(output_message, query_c, self_memory, history, background, memory_pool, task_executor_memory):
pass
yield output_message
else:
query_c = copy.deepcopy(query)
query_c = self.start_action_step(query_c)
task_content = query_c.parsed_output["PLAN"][plan_step]
query_c.parsed_output = {"Question": task_content}
task_executor_memory.append(query_c)
for output_message, task_executor_memory in self._arun_step(output_message, query_c, self_memory, history, background, memory_pool, task_executor_memory):
pass
output_message.parsed_output.update({"CURRENT_STEP": plan_step})
# update self_memory
self.append_history(query)
self.append_history(output_message)
# logger.info(f"{self.role.role_name} currenct question: {output_message.input_query}\nllm_executor_run: {output_message.step_content}")
# logger.info(f"{self.role.role_name} currenct parserd_output_list: {output_message.parserd_output_list}")
output_message.input_query = output_message.role_content
# end_action_step
output_message = self.end_action_step(output_message)
# update memory pool
memory_pool.append(output_message)
yield output_message
def _arun_step(self, output_message: Message, query: Message, self_memory: Memory,
history: Memory, background: Memory, memory_pool: Memory,
react_memory: Memory) -> Union[Message, Memory]:
'''execute the llm predict by created prompt'''
prompt = self.create_prompt(query, self_memory, history, background, memory_pool=memory_pool, react_memory=react_memory)
content = self.llm.predict(prompt)
# logger.debug(f"{self.role.role_name} prompt: {prompt}")
logger.debug(f"{self.role.role_name} content: {content}")
output_message.role_content = content
output_message.step_content += "\n"+output_message.role_content
output_message = self.message_utils.parser(output_message)
# according the output to choose one action for code_content or tool_content
output_message, observation_message = self.message_utils.step_router(output_message)
# logger.debug(f"{self.role.role_name} content: {content}")
# update parserd_output_list
output_message.parsed_output_list.append(output_message.parsed_output)
react_message = copy.deepcopy(output_message)
react_memory.append(react_message)
if observation_message:
react_memory.append(observation_message)
output_message.parsed_output_list.append(observation_message.parsed_output)
logger.debug(f"{observation_message.role_name} content: {observation_message.role_content}")
yield output_message, react_memory
def create_prompt(
self, query: Message, memory: Memory =None, history: Memory = None, background: Memory = None, memory_pool: Memory=None, react_memory: Memory = None, prompt_mamnger=None) -> str:
'''
role\task\tools\docs\memory
'''
#
doc_infos = self.create_doc_prompt(query)
code_infos = self.create_codedoc_prompt(query)
#
formatted_tools, tool_names, _ = self.create_tools_prompt(query)
task_prompt = self.create_task_prompt(query)
background_prompt = self.create_background_prompt(background, control_key="step_content")
history_prompt = self.create_history_prompt(history)
selfmemory_prompt = self.create_selfmemory_prompt(memory, control_key="step_content")
#
memory_pool_select_by_agent_key = self.select_memory_by_agent_key(memory_pool)
memory_pool_select_by_agent_key_context = '\n\n'.join([
f"*{k}*\n{v}" for parsed_output in memory_pool_select_by_agent_key.get_parserd_output_list() for k, v in parsed_output.items() if k not in ['Action Status']
])
DocInfos = ""
if doc_infos is not None and doc_infos!="" and doc_infos!="不存在知识库辅助信息":
DocInfos += f"\nDocument Information: {doc_infos}"
if code_infos is not None and code_infos!="" and code_infos!="不存在代码库辅助信息":
DocInfos += f"\nCodeBase Infomation: {code_infos}"
# extra_system_prompt = self.role.role_prompt
prompt = self.role.role_prompt.format(**{"formatted_tools": formatted_tools, "tool_names": tool_names})
# input_query = react_memory.to_tuple_messages(content_key="role_content")
# logger.debug(f"get_parserd_dict {react_memory.get_parserd_output()}")
input_query = "\n".join(["\n".join([f"**{k}:**\n{v}" for k,v in _dict.items()]) for _dict in react_memory.get_parserd_output()])
# input_query = query.input_query + "\n".join([f"{v}" for k, v in input_query if v])
last_agent_parsed_output = "\n".join(["\n".join([f"*{k}*\n{v}" for k,v in _dict.items()]) for _dict in query.parsed_output_list])
react_parsed_output = "\n".join(["\n".join([f"*{k}_context*\n{v}" for k,v in _dict.items()]) for _dict in react_memory.get_parserd_output()[:-1]])
#
prompt += "\n" + BEGIN_PROMPT_INPUT
input_keys = parse_section(self.role.role_prompt, 'Input Format')
if input_keys:
for input_key in input_keys:
if input_key == "Origin Query":
prompt += "\n**Origin Query:**\n" + query.origin_query
elif input_key == "DocInfos":
prompt += "\n**DocInfos:**\n" + DocInfos
elif input_key == "Context":
if self.focus_agents and memory_pool_select_by_agent_key_context:
context = memory_pool_select_by_agent_key_context
else:
context = last_agent_parsed_output
prompt += "\n**Context:**\n" + context + f"\n{react_parsed_output}"
elif input_key == "Question":
prompt += "\n**Question:**\n" + query.parsed_output.get("Question")
else:
prompt += "\n" + input_query
task = query.task or self.task
# if task_prompt is not None:
# prompt += "\n" + task.task_prompt
# if selfmemory_prompt:
# prompt += "\n" + selfmemory_prompt
# if background_prompt:
# prompt += "\n" + background_prompt
# if history_prompt:
# prompt += "\n" + history_prompt
# prompt = extra_system_prompt.format(**{"query": input_query, "doc_infos": doc_infos, "formatted_tools": formatted_tools, "tool_names": tool_names})
while "{{" in prompt or "}}" in prompt:
prompt = prompt.replace("{{", "{")
prompt = prompt.replace("}}", "}")
return prompt
def set_task(self, do_all_task):
'''set task exec type'''
self.do_all_task = do_all_task