214 lines
11 KiB
Python
214 lines
11 KiB
Python
|
from pydantic import BaseModel
|
||
|
from typing import List, Union, Tuple, Any
|
||
|
import re
|
||
|
import json
|
||
|
import traceback
|
||
|
import copy
|
||
|
from loguru import logger
|
||
|
|
||
|
from langchain.prompts.chat import ChatPromptTemplate
|
||
|
|
||
|
from dev_opsgpt.connector.schema import (
|
||
|
Memory, Task, Env, Role, Message, ActionStatus
|
||
|
)
|
||
|
from dev_opsgpt.llm_models import getChatModel
|
||
|
from dev_opsgpt.connector.configs.prompts import EXECUTOR_PROMPT_INPUT, BEGIN_PROMPT_INPUT
|
||
|
from dev_opsgpt.connector.utils import parse_section
|
||
|
|
||
|
from .base_agent import BaseAgent
|
||
|
|
||
|
|
||
|
class ExecutorAgent(BaseAgent):
|
||
|
def __init__(
|
||
|
self,
|
||
|
role: Role,
|
||
|
task: Task = None,
|
||
|
memory: Memory = None,
|
||
|
chat_turn: int = 1,
|
||
|
do_search: bool = False,
|
||
|
do_doc_retrieval: bool = False,
|
||
|
do_tool_retrieval: bool = False,
|
||
|
temperature: float = 0.2,
|
||
|
stop: Union[List[str], str] = None,
|
||
|
do_filter: bool = True,
|
||
|
do_use_self_memory: bool = True,
|
||
|
focus_agents: List[str] = [],
|
||
|
focus_message_keys: List[str] = [],
|
||
|
# prompt_mamnger: PromptManager
|
||
|
):
|
||
|
|
||
|
super().__init__(role, task, memory, chat_turn, do_search, do_doc_retrieval,
|
||
|
do_tool_retrieval, temperature, stop, do_filter,do_use_self_memory,
|
||
|
focus_agents, focus_message_keys
|
||
|
)
|
||
|
self.do_all_task = True # run all tasks
|
||
|
|
||
|
def arun(self, query: Message, history: Memory = None, background: Memory = None, memory_pool: Memory=None) -> Message:
|
||
|
'''agent reponse from multi-message'''
|
||
|
# insert query into memory
|
||
|
task_executor_memory = Memory(messages=[])
|
||
|
# insert query
|
||
|
output_message = Message(
|
||
|
role_name=self.role.role_name,
|
||
|
role_type="ai", #self.role.role_type,
|
||
|
role_content=query.input_query,
|
||
|
step_content="",
|
||
|
input_query=query.input_query,
|
||
|
tools=query.tools,
|
||
|
parsed_output_list=[query.parsed_output]
|
||
|
)
|
||
|
|
||
|
self_memory = self.memory if self.do_use_self_memory else None
|
||
|
|
||
|
plan_step = int(query.parsed_output.get("PLAN_STEP", 0))
|
||
|
# 如果存在plan字段且plan字段为str的时候
|
||
|
if "PLAN" not in query.parsed_output or isinstance(query.parsed_output.get("PLAN", []), str) or plan_step >= len(query.parsed_output.get("PLAN", [])):
|
||
|
query_c = copy.deepcopy(query)
|
||
|
query_c.parsed_output = {"Question": query_c.input_query}
|
||
|
task_executor_memory.append(query_c)
|
||
|
for output_message, task_executor_memory in self._arun_step(output_message, query_c, self_memory, history, background, memory_pool, task_executor_memory):
|
||
|
pass
|
||
|
# task_executor_memory.append(query_c)
|
||
|
# content = "the execution step of the plan is exceed the planned scope."
|
||
|
# output_message.parsed_dict = {"Thought": content, "Action Status": "finished", "Action": content}
|
||
|
# task_executor_memory.append(output_message)
|
||
|
|
||
|
elif "PLAN" in query.parsed_output:
|
||
|
logger.debug(f"{query.parsed_output['PLAN']}")
|
||
|
if self.do_all_task:
|
||
|
# run all tasks step by step
|
||
|
for task_content in query.parsed_output["PLAN"][plan_step:]:
|
||
|
# create your llm prompt
|
||
|
query_c = copy.deepcopy(query)
|
||
|
query_c.parsed_output = {"Question": task_content}
|
||
|
task_executor_memory.append(query_c)
|
||
|
for output_message, task_executor_memory in self._arun_step(output_message, query_c, self_memory, history, background, memory_pool, task_executor_memory):
|
||
|
pass
|
||
|
yield output_message
|
||
|
else:
|
||
|
query_c = copy.deepcopy(query)
|
||
|
task_content = query_c.parsed_output["PLAN"][plan_step]
|
||
|
query_c.parsed_output = {"Question": task_content}
|
||
|
task_executor_memory.append(query_c)
|
||
|
for output_message, task_executor_memory in self._arun_step(output_message, query_c, self_memory, history, background, memory_pool, task_executor_memory):
|
||
|
pass
|
||
|
output_message.parsed_output.update({"CURRENT_STEP": plan_step})
|
||
|
# update self_memory
|
||
|
self.append_history(query)
|
||
|
self.append_history(output_message)
|
||
|
# logger.info(f"{self.role.role_name} currenct question: {output_message.input_query}\nllm_executor_run: {output_message.step_content}")
|
||
|
# logger.info(f"{self.role.role_name} currenct parserd_output_list: {output_message.parserd_output_list}")
|
||
|
output_message.input_query = output_message.role_content
|
||
|
# update memory pool
|
||
|
memory_pool.append(output_message)
|
||
|
yield output_message
|
||
|
|
||
|
def _arun_step(self, output_message: Message, query: Message, self_memory: Memory,
|
||
|
history: Memory, background: Memory, memory_pool: Memory,
|
||
|
react_memory: Memory) -> Union[Message, Memory]:
|
||
|
'''execute the llm predict by created prompt'''
|
||
|
prompt = self.create_prompt(query, self_memory, history, background, memory_pool=memory_pool, react_memory=react_memory)
|
||
|
content = self.llm.predict(prompt)
|
||
|
# logger.debug(f"{self.role.role_name} prompt: {prompt}")
|
||
|
logger.debug(f"{self.role.role_name} content: {content}")
|
||
|
|
||
|
output_message.role_content = content
|
||
|
output_message.role_contents += [content]
|
||
|
output_message.step_content += "\n"+output_message.role_content
|
||
|
output_message.step_contents + [output_message.role_content]
|
||
|
|
||
|
output_message = self.message_utils.parser(output_message)
|
||
|
# according the output to choose one action for code_content or tool_content
|
||
|
output_message, observation_message = self.message_utils.step_router(output_message)
|
||
|
# logger.debug(f"{self.role.role_name} content: {content}")
|
||
|
# update parserd_output_list
|
||
|
output_message.parsed_output_list.append(output_message.parsed_output)
|
||
|
|
||
|
react_message = copy.deepcopy(output_message)
|
||
|
react_memory.append(react_message)
|
||
|
if observation_message:
|
||
|
react_memory.append(observation_message)
|
||
|
output_message.parsed_output_list.append(observation_message.parsed_output)
|
||
|
logger.debug(f"{observation_message.role_name} content: {observation_message.role_content}")
|
||
|
yield output_message, react_memory
|
||
|
|
||
|
def create_prompt(
|
||
|
self, query: Message, memory: Memory =None, history: Memory = None, background: Memory = None, memory_pool: Memory=None, react_memory: Memory = None, prompt_mamnger=None) -> str:
|
||
|
'''
|
||
|
role\task\tools\docs\memory
|
||
|
'''
|
||
|
#
|
||
|
doc_infos = self.create_doc_prompt(query)
|
||
|
code_infos = self.create_codedoc_prompt(query)
|
||
|
#
|
||
|
formatted_tools, tool_names = self.create_tools_prompt(query)
|
||
|
task_prompt = self.create_task_prompt(query)
|
||
|
background_prompt = self.create_background_prompt(background, control_key="step_content")
|
||
|
history_prompt = self.create_history_prompt(history)
|
||
|
selfmemory_prompt = self.create_selfmemory_prompt(memory, control_key="step_content")
|
||
|
|
||
|
#
|
||
|
memory_pool_select_by_agent_key = self.select_memory_by_agent_key(memory_pool)
|
||
|
memory_pool_select_by_agent_key_context = '\n\n'.join([
|
||
|
f"*{k}*\n{v}" for parsed_output in memory_pool_select_by_agent_key.get_parserd_output_list() for k, v in parsed_output.items() if k not in ['Action Status']
|
||
|
])
|
||
|
|
||
|
DocInfos = ""
|
||
|
if doc_infos is not None and doc_infos!="" and doc_infos!="不存在知识库辅助信息":
|
||
|
DocInfos += f"\nDocument Information: {doc_infos}"
|
||
|
|
||
|
if code_infos is not None and code_infos!="" and code_infos!="不存在代码库辅助信息":
|
||
|
DocInfos += f"\nCodeBase Infomation: {code_infos}"
|
||
|
|
||
|
# extra_system_prompt = self.role.role_prompt
|
||
|
prompt = self.role.role_prompt.format(**{"formatted_tools": formatted_tools, "tool_names": tool_names})
|
||
|
|
||
|
# input_query = react_memory.to_tuple_messages(content_key="role_content")
|
||
|
# logger.debug(f"get_parserd_dict {react_memory.get_parserd_output()}")
|
||
|
input_query = "\n".join(["\n".join([f"**{k}:**\n{v}" for k,v in _dict.items()]) for _dict in react_memory.get_parserd_output()])
|
||
|
# input_query = query.input_query + "\n".join([f"{v}" for k, v in input_query if v])
|
||
|
last_agent_parsed_output = "\n".join(["\n".join([f"*{k}*\n{v}" for k,v in _dict.items()]) for _dict in query.parsed_output_list])
|
||
|
react_parsed_output = "\n".join(["\n".join([f"*{k}_context*\n{v}" for k,v in _dict.items()]) for _dict in react_memory.get_parserd_output()[:-1]])
|
||
|
#
|
||
|
prompt += "\n" + BEGIN_PROMPT_INPUT
|
||
|
|
||
|
input_keys = parse_section(self.role.role_prompt, 'Input Format')
|
||
|
if input_keys:
|
||
|
for input_key in input_keys:
|
||
|
if input_key == "Origin Query":
|
||
|
prompt += "\n**Origin Query:**\n" + query.origin_query
|
||
|
elif input_key == "DocInfos":
|
||
|
prompt += "\n**DocInfos:**\n" + DocInfos
|
||
|
elif input_key == "Context":
|
||
|
if self.focus_agents and memory_pool_select_by_agent_key_context:
|
||
|
context = memory_pool_select_by_agent_key_context
|
||
|
else:
|
||
|
context = last_agent_parsed_output
|
||
|
prompt += "\n**Context:**\n" + context + f"\n{react_parsed_output}"
|
||
|
elif input_key == "Question":
|
||
|
prompt += "\n**Question:**\n" + query.parsed_output.get("Question")
|
||
|
else:
|
||
|
prompt += "\n" + input_query
|
||
|
|
||
|
task = query.task or self.task
|
||
|
# if task_prompt is not None:
|
||
|
# prompt += "\n" + task.task_prompt
|
||
|
|
||
|
# if selfmemory_prompt:
|
||
|
# prompt += "\n" + selfmemory_prompt
|
||
|
|
||
|
# if background_prompt:
|
||
|
# prompt += "\n" + background_prompt
|
||
|
|
||
|
# if history_prompt:
|
||
|
# prompt += "\n" + history_prompt
|
||
|
|
||
|
# prompt = extra_system_prompt.format(**{"query": input_query, "doc_infos": doc_infos, "formatted_tools": formatted_tools, "tool_names": tool_names})
|
||
|
while "{{" in prompt or "}}" in prompt:
|
||
|
prompt = prompt.replace("{{", "{")
|
||
|
prompt = prompt.replace("}}", "}")
|
||
|
return prompt
|
||
|
|
||
|
def set_task(self, do_all_task):
|
||
|
'''set task exec type'''
|
||
|
self.do_all_task = do_all_task
|