codefuse-chatbot/dev_opsgpt/connector/agents/base_agent.py

349 lines
17 KiB
Python
Raw Normal View History

from pydantic import BaseModel
from typing import List, Union
import re
import copy
import json
import traceback
import uuid
from loguru import logger
from dev_opsgpt.connector.schema import (
Memory, Task, Env, Role, Message, ActionStatus, CodeDoc, Doc
)
from configs.server_config import SANDBOX_SERVER
from dev_opsgpt.sandbox import PyCodeBox, CodeBoxResponse
from dev_opsgpt.tools import DDGSTool, DocRetrieval, CodeRetrieval
from dev_opsgpt.connector.configs.prompts import BASE_PROMPT_INPUT, QUERY_CONTEXT_DOC_PROMPT_INPUT, BEGIN_PROMPT_INPUT
from dev_opsgpt.connector.message_process import MessageUtils
from dev_opsgpt.connector.configs.agent_config import REACT_PROMPT_INPUT, QUERY_CONTEXT_PROMPT_INPUT, PLAN_PROMPT_INPUT
from dev_opsgpt.llm_models import getChatModel, getExtraModel
from dev_opsgpt.connector.utils import parse_section
class BaseAgent:
def __init__(
self,
role: Role,
task: Task = None,
memory: Memory = None,
chat_turn: int = 1,
do_search: bool = False,
do_doc_retrieval: bool = False,
do_tool_retrieval: bool = False,
temperature: float = 0.2,
stop: Union[List[str], str] = None,
do_filter: bool = True,
do_use_self_memory: bool = True,
focus_agents: List[str] = [],
focus_message_keys: List[str] = [],
# prompt_mamnger: PromptManager
):
self.task = task
self.role = role
self.message_utils = MessageUtils(role)
self.llm = self.create_llm_engine(temperature, stop)
self.memory = self.init_history(memory)
self.chat_turn = chat_turn
self.do_search = do_search
self.do_doc_retrieval = do_doc_retrieval
self.do_tool_retrieval = do_tool_retrieval
self.focus_agents = focus_agents
self.focus_message_keys = focus_message_keys
self.do_filter = do_filter
self.do_use_self_memory = do_use_self_memory
# self.prompt_manager = None
def run(self, query: Message, history: Memory = None, background: Memory = None, memory_pool: Memory=None) -> Message:
'''agent reponse from multi-message'''
message = None
for message in self.arun(query, history, background, memory_pool):
pass
return message
def arun(self, query: Message, history: Memory = None, background: Memory = None, memory_pool: Memory=None) -> Message:
'''agent reponse from multi-message'''
# insert query into memory
query_c = copy.deepcopy(query)
query_c = self.start_action_step(query_c)
self_memory = self.memory if self.do_use_self_memory else None
# create your llm prompt
prompt = self.create_prompt(query_c, self_memory, history, background, memory_pool=memory_pool)
content = self.llm.predict(prompt)
logger.debug(f"{self.role.role_name} prompt: {prompt}")
logger.debug(f"{self.role.role_name} content: {content}")
output_message = Message(
role_name=self.role.role_name,
role_type="ai", #self.role.role_type,
role_content=content,
step_content=content,
input_query=query_c.input_query,
tools=query_c.tools,
parsed_output_list=[query.parsed_output],
customed_kargs=query_c.customed_kargs
)
# common parse llm' content to message
output_message = self.message_utils.parser(output_message)
if self.do_filter:
output_message = self.message_utils.filter(output_message)
# action step
output_message, observation_message = self.message_utils.step_router(output_message, history, background, memory_pool=memory_pool)
output_message.parsed_output_list.append(output_message.parsed_output)
if observation_message:
output_message.parsed_output_list.append(observation_message.parsed_output)
# update self_memory
self.append_history(query_c)
self.append_history(output_message)
# logger.info(f"{self.role.role_name} currenct question: {output_message.input_query}\nllm_step_run: {output_message.role_content}")
output_message.input_query = output_message.role_content
# output_message.parsed_output_list.append(output_message.parsed_output) # 与上述重复?
# end
output_message = self.message_utils.inherit_extrainfo(query, output_message)
output_message = self.end_action_step(output_message)
# update memory pool
memory_pool.append(output_message)
yield output_message
def create_prompt(
self, query: Message, memory: Memory =None, history: Memory = None, background: Memory = None, memory_pool: Memory=None, prompt_mamnger=None) -> str:
'''
prompt engineer, contains role\task\tools\docs\memory
'''
#
doc_infos = self.create_doc_prompt(query)
code_infos = self.create_codedoc_prompt(query)
#
formatted_tools, tool_names, _ = self.create_tools_prompt(query)
task_prompt = self.create_task_prompt(query)
background_prompt = self.create_background_prompt(background, control_key="step_content")
history_prompt = self.create_history_prompt(history)
selfmemory_prompt = self.create_selfmemory_prompt(memory, control_key="step_content")
# extra_system_prompt = self.role.role_prompt
prompt = self.role.role_prompt.format(**{"formatted_tools": formatted_tools, "tool_names": tool_names})
#
memory_pool_select_by_agent_key = self.select_memory_by_agent_key(memory_pool)
memory_pool_select_by_agent_key_context = '\n\n'.join([f"*{k}*\n{v}" for parsed_output in memory_pool_select_by_agent_key.get_parserd_output_list() for k, v in parsed_output.items() if k not in ['Action Status']])
# input_query = query.input_query
# # logger.debug(f"{self.role.role_name} extra_system_prompt: {self.role.role_prompt}")
# # logger.debug(f"{self.role.role_name} input_query: {input_query}")
# # logger.debug(f"{self.role.role_name} doc_infos: {doc_infos}")
# # logger.debug(f"{self.role.role_name} tool_names: {tool_names}")
# if "**Context:**" in self.role.role_prompt:
# # logger.debug(f"parsed_output_list: {query.parsed_output_list}")
# # input_query = "'''" + "\n".join([f"###{k}###\n{v}" for i in query.parsed_output_list for k,v in i.items() if "Action Status" !=k]) + "'''"
# context = "\n".join([f"*{k}*\n{v}" for i in query.parsed_output_list for k,v in i.items() if "Action Status" !=k])
# # context = history_prompt or '""'
# # logger.debug(f"parsed_output_list: {t}")
# prompt += "\n" + QUERY_CONTEXT_PROMPT_INPUT.format(**{"context": context, "query": query.origin_query})
# else:
# prompt += "\n" + PLAN_PROMPT_INPUT.format(**{"query": input_query})
task = query.task or self.task
if task_prompt is not None:
prompt += "\n" + task.task_prompt
DocInfos = ""
if doc_infos is not None and doc_infos!="" and doc_infos!="不存在知识库辅助信息":
DocInfos += f"\nDocument Information: {doc_infos}"
if code_infos is not None and code_infos!="" and code_infos!="不存在代码库辅助信息":
DocInfos += f"\nCodeBase Infomation: {code_infos}"
# if selfmemory_prompt:
# prompt += "\n" + selfmemory_prompt
# if background_prompt:
# prompt += "\n" + background_prompt
# if history_prompt:
# prompt += "\n" + history_prompt
input_query = query.input_query
# logger.debug(f"{self.role.role_name} extra_system_prompt: {self.role.role_prompt}")
# logger.debug(f"{self.role.role_name} input_query: {input_query}")
# logger.debug(f"{self.role.role_name} doc_infos: {doc_infos}")
# logger.debug(f"{self.role.role_name} tool_names: {tool_names}")
# extra_system_prompt = self.role.role_prompt
input_keys = parse_section(self.role.role_prompt, 'Input Format')
prompt = self.role.role_prompt.format(**{"formatted_tools": formatted_tools, "tool_names": tool_names})
prompt += "\n" + BEGIN_PROMPT_INPUT
for input_key in input_keys:
if input_key == "Origin Query":
prompt += "\n**Origin Query:**\n" + query.origin_query
elif input_key == "Context":
context = "\n".join([f"*{k}*\n{v}" for i in query.parsed_output_list for k,v in i.items() if "Action Status" !=k])
if history:
context = history_prompt + "\n" + context
if not context:
context = "there is no context"
if self.focus_agents and memory_pool_select_by_agent_key_context:
context = memory_pool_select_by_agent_key_context
prompt += "\n**Context:**\n" + context + "\n" + input_query
elif input_key == "DocInfos":
if DocInfos:
prompt += "\n**DocInfos:**\n" + DocInfos
else:
prompt += "\n**DocInfos:**\n" + "Empty"
elif input_key == "Question":
prompt += "\n**Question:**\n" + input_query
# if "**Context:**" in self.role.role_prompt:
# # logger.debug(f"parsed_output_list: {query.parsed_output_list}")
# # input_query = "'''" + "\n".join([f"###{k}###\n{v}" for i in query.parsed_output_list for k,v in i.items() if "Action Status" !=k]) + "'''"
# context = "\n".join([f"*{k}*\n{v}" for i in query.parsed_output_list for k,v in i.items() if "Action Status" !=k])
# if history:
# context = history_prompt + "\n" + context
# if not context:
# context = "there is no context"
# # logger.debug(f"parsed_output_list: {t}")
# if "DocInfos" in prompt:
# prompt += "\n" + QUERY_CONTEXT_DOC_PROMPT_INPUT.format(**{"context": context, "query": query.origin_query, "DocInfos": DocInfos})
# else:
# prompt += "\n" + QUERY_CONTEXT_PROMPT_INPUT.format(**{"context": context, "query": query.origin_query, "DocInfos": DocInfos})
# else:
# prompt += "\n" + BASE_PROMPT_INPUT.format(**{"query": input_query})
# prompt = extra_system_prompt.format(**{"query": input_query, "doc_infos": doc_infos, "formatted_tools": formatted_tools, "tool_names": tool_names})
while "{{" in prompt or "}}" in prompt:
prompt = prompt.replace("{{", "{")
prompt = prompt.replace("}}", "}")
# logger.debug(f"{self.role.role_name} prompt: {prompt}")
return prompt
def create_doc_prompt(self, message: Message) -> str:
''''''
db_docs = message.db_docs
search_docs = message.search_docs
doc_infos = "\n".join([doc.get_snippet() for doc in db_docs] + [doc.get_snippet() for doc in search_docs])
return doc_infos or "不存在知识库辅助信息"
def create_codedoc_prompt(self, message: Message) -> str:
''''''
code_docs = message.code_docs
doc_infos = "\n".join([doc.get_code() for doc in code_docs])
return doc_infos or "不存在代码库辅助信息"
def create_tools_prompt(self, message: Message) -> str:
tools = message.tools
tool_strings = []
tools_descs = []
for tool in tools:
args_schema = re.sub("}", "}}}}", re.sub("{", "{{{{", str(tool.args)))
tool_strings.append(f"{tool.name}: {tool.description}, args: {args_schema}")
tools_descs.append(f"{tool.name}: {tool.description}")
formatted_tools = "\n".join(tool_strings)
tools_desc_str = "\n".join(tools_descs)
tool_names = ", ".join([tool.name for tool in tools])
return formatted_tools, tool_names, tools_desc_str
def create_task_prompt(self, message: Message) -> str:
task = message.task or self.task
return "\n任务目标: " + task.task_prompt if task is not None else None
def create_background_prompt(self, background: Memory, control_key="role_content") -> str:
background_message = None if background is None else background.to_str_messages(content_key=control_key)
# logger.debug(f"background_message: {background_message}")
if background_message:
background_message = re.sub("}", "}}", re.sub("{", "{{", background_message))
return "\n背景信息: " + background_message if background_message else None
def create_history_prompt(self, history: Memory, control_key="role_content") -> str:
history_message = None if history is None else history.to_str_messages(content_key=control_key)
if history_message:
history_message = re.sub("}", "}}", re.sub("{", "{{", history_message))
return "\n补充对话信息: " + history_message if history_message else None
def create_selfmemory_prompt(self, selfmemory: Memory, control_key="role_content") -> str:
selfmemory_message = None if selfmemory is None else selfmemory.to_str_messages(content_key=control_key)
if selfmemory_message:
selfmemory_message = re.sub("}", "}}", re.sub("{", "{{", selfmemory_message))
return "\n补充自身对话信息: " + selfmemory_message if selfmemory_message else None
def init_history(self, memory: Memory = None) -> Memory:
return Memory(messages=[])
def update_history(self, message: Message):
self.memory.append(message)
def append_history(self, message: Message):
self.memory.append(message)
def clear_history(self, ):
self.memory.clear()
self.memory = self.init_history()
def create_llm_engine(self, temperature=0.2, stop=None):
return getChatModel(temperature=temperature, stop=stop)
def registry_actions(self, actions):
'''registry llm's actions'''
self.action_list = actions
def start_action_step(self, message: Message) -> Message:
'''do action before agent predict '''
# action_json = self.start_action()
# message["customed_kargs"]["xx"] = action_json
return message
def end_action_step(self, message: Message) -> Message:
'''do action after agent predict '''
# action_json = self.end_action()
# message["customed_kargs"]["xx"] = action_json
return message
def token_usage(self, ):
'''calculate the usage of token'''
pass
def select_memory_by_key(self, memory: Memory) -> Memory:
return Memory(
messages=[self.select_message_by_key(message) for message in memory.messages
if self.select_message_by_key(message) is not None]
)
def select_memory_by_agent_key(self, memory: Memory) -> Memory:
return Memory(
messages=[self.select_message_by_agent_key(message) for message in memory.messages
if self.select_message_by_agent_key(message) is not None]
)
def select_message_by_agent_key(self, message: Message) -> Message:
# assume we focus all agents
if self.focus_agents == []:
return message
return None if message is None or message.role_name not in self.focus_agents else self.select_message_by_key(message)
def select_message_by_key(self, message: Message) -> Message:
# assume we focus all key contents
if message is None:
return message
if self.focus_message_keys == []:
return message
message_c = copy.deepcopy(message)
message_c.parsed_output = {k: v for k,v in message_c.parsed_output.items() if k in self.focus_message_keys}
message_c.parsed_output_list = [{k: v for k,v in parsed_output.items() if k in self.focus_message_keys} for parsed_output in message_c.parsed_output_list]
return message_c
def get_memory(self, content_key="role_content"):
return self.memory.to_tuple_messages(content_key="step_content")
def get_memory_str(self, content_key="role_content"):
return "\n".join([": ".join(i) for i in self.memory.to_tuple_messages(content_key="step_content")])