diff --git a/configs/model_config.py.example b/configs/model_config.py.example index e6d2390..46c0604 100644 --- a/configs/model_config.py.example +++ b/configs/model_config.py.example @@ -51,6 +51,17 @@ EMBEDDING_MODEL = "text2vec-base" # Embedding 模型运行设备 EMBEDDING_DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" +ONLINE_LLM_MODEL = { + # 线上模型。请在server_config中为每个在线API设置不同的端口 + + "openai-api": { + "model_name": "gpt-3.5-turbo", + "api_base_url": "https://api.openai.com/v1", + "api_key": "", + "openai_proxy": "", + }, +} + llm_model_dict = { "chatglm-6b": { @@ -123,6 +134,7 @@ llm_model_dict = llm_model_dict_c # LLM 名称 LLM_MODEL = "gpt-3.5-turbo" +LLM_MODELs = ["chatglm2-6b"] USE_FASTCHAT = "gpt" not in LLM_MODEL # 判断是否进行fastchat # LLM 运行设备 diff --git a/configs/server_config.py.example b/configs/server_config.py.example index 8f3dd7a..e1b5f18 100644 --- a/configs/server_config.py.example +++ b/configs/server_config.py.example @@ -76,7 +76,7 @@ SANDBOX_SERVER = { # 这些模型必须是在model_config.llm_model_dict中正确配置的。 # 在启动startup.py时,可用通过`--model-worker --model-name xxxx`指定模型,不指定则为LLM_MODEL FSCHAT_MODEL_WORKERS = { - LLM_MODEL: { + "default": { "host": DEFAULT_BIND_HOST, "port": 20002, "device": LLM_DEVICE, @@ -100,8 +100,13 @@ FSCHAT_MODEL_WORKERS = { # "stream_interval": 2, # "no_register": False, }, + "chatglm2-6b": { + "port": 20003 + }, + "baichuan2-7b-base": { + "port": 20004 + } } - # fastchat multi model worker server FSCHAT_MULTI_MODEL_WORKERS = { # todo diff --git a/dev_opsgpt/service/llm_api.py b/dev_opsgpt/service/llm_api.py index 00d67e6..b13c696 100644 --- a/dev_opsgpt/service/llm_api.py +++ b/dev_opsgpt/service/llm_api.py @@ -1,7 +1,14 @@ +# Attention: code copied from https://github.com/chatchat-space/Langchain-Chatchat/blob/master/server/llm_api.py + from multiprocessing import Process, Queue import multiprocessing as mp import sys import os +from typing import List, Union, Dict +import httpx +import asyncio +import datetime +import argparse src_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -10,8 +17,13 @@ sys.path.append(src_dir) sys.path.append(os.path.dirname(os.path.dirname(__file__))) from configs.model_config import llm_model_dict, LLM_MODEL, LLM_DEVICE, LOG_PATH, logger -from dev_opsgpt.utils.server_utils import MakeFastAPIOffline - +from configs.server_config import ( + FSCHAT_CONTROLLER, FSCHAT_MODEL_WORKERS, FSCHAT_OPENAI_API +) +from dev_opsgpt.utils.server_utils import ( + MakeFastAPIOffline, +) +from fastapi import FastAPI host_ip = "0.0.0.0" controller_port = 20001 @@ -19,7 +31,10 @@ model_worker_port = 20002 openai_api_port = 8888 base_url = "http://127.0.0.1:{}" + os.environ['PATH'] = os.environ.get("PATH", "") + os.pathsep +log_verbose = True + def set_httpx_timeout(timeout=60.0): import httpx @@ -28,144 +43,409 @@ def set_httpx_timeout(timeout=60.0): httpx._config.DEFAULT_TIMEOUT_CONFIG.write = timeout -def create_controller_app( - dispatch_method="shortest_queue", +def get_model_worker_config(model_name: str = None) -> dict: + ''' + 加载model worker的配置项。 + 优先级:FSCHAT_MODEL_WORKERS[model_name] > ONLINE_LLM_MODEL[model_name] > FSCHAT_MODEL_WORKERS["default"] + ''' + from configs.model_config import ONLINE_LLM_MODEL + from configs.server_config import FSCHAT_MODEL_WORKERS + # from server import model_workers + + config = FSCHAT_MODEL_WORKERS.get("default", {}).copy() + # config.update(ONLINE_LLM_MODEL.get(model_name, {}).copy()) + config.update(FSCHAT_MODEL_WORKERS.get(model_name, {}).copy()) + + # if model_name in ONLINE_LLM_MODEL: + # config["online_api"] = True + # if provider := config.get("provider"): + # try: + # config["worker_class"] = getattr(model_workers, provider) + # except Exception as e: + # msg = f"在线模型 ‘{model_name}’ 的provider没有正确配置" + # logger.error(f'{e.__class__.__name__}: {msg}', + # exc_info=e if log_verbose else None) + # 本地模型 + if model_name in llm_model_dict: + path = llm_model_dict[model_name]["local_model_path"] + config["model_path"] = path + if path and os.path.isdir(path): + config["model_path_exists"] = True + config["device"] = LLM_DEVICE + return config + + +def get_all_model_worker_configs() -> dict: + result = {} + model_names = set(FSCHAT_MODEL_WORKERS.keys()) + for name in model_names: + if name != "default": + result[name] = get_model_worker_config(name) + return result + + +def fschat_controller_address() -> str: + from configs.server_config import FSCHAT_CONTROLLER + + host = FSCHAT_CONTROLLER["host"] + if host == "0.0.0.0": + host = "127.0.0.1" + port = FSCHAT_CONTROLLER["port"] + return f"http://{host}:{port}" + + +def fschat_model_worker_address(model_name: str = LLM_MODEL) -> str: + if model := get_model_worker_config(model_name): # TODO: depends fastchat + host = model["host"] + if host == "0.0.0.0": + host = "127.0.0.1" + port = model["port"] + return f"http://{host}:{port}" + return "" + + +def fschat_openai_api_address() -> str: + from configs.server_config import FSCHAT_OPENAI_API + + host = FSCHAT_OPENAI_API["host"] + if host == "0.0.0.0": + host = "127.0.0.1" + port = FSCHAT_OPENAI_API["port"] + return f"http://{host}:{port}/v1" + + +def set_httpx_config( + timeout: float = 300, + proxy: Union[str, Dict] = None, ): + ''' + 设置httpx默认timeout。httpx默认timeout是5秒,在请求LLM回答时不够用。 + 将本项目相关服务加入无代理列表,避免fastchat的服务器请求错误。(windows下无效) + 对于chatgpt等在线API,如要使用代理需要手动配置。搜索引擎的代理如何处置还需考虑。 + ''' + + import httpx + import os + + httpx._config.DEFAULT_TIMEOUT_CONFIG.connect = timeout + httpx._config.DEFAULT_TIMEOUT_CONFIG.read = timeout + httpx._config.DEFAULT_TIMEOUT_CONFIG.write = timeout + + # 在进程范围内设置系统级代理 + proxies = {} + if isinstance(proxy, str): + for n in ["http", "https", "all"]: + proxies[n + "_proxy"] = proxy + elif isinstance(proxy, dict): + for n in ["http", "https", "all"]: + if p := proxy.get(n): + proxies[n + "_proxy"] = p + elif p := proxy.get(n + "_proxy"): + proxies[n + "_proxy"] = p + + for k, v in proxies.items(): + os.environ[k] = v + + # set host to bypass proxy + no_proxy = [x.strip() for x in os.environ.get("no_proxy", "").split(",") if x.strip()] + no_proxy += [ + # do not use proxy for locahost + "http://127.0.0.1", + "http://localhost", + ] + # do not use proxy for user deployed fastchat servers + for x in [ + fschat_controller_address(), + fschat_model_worker_address(), + fschat_openai_api_address(), + ]: + host = ":".join(x.split(":")[:2]) + if host not in no_proxy: + no_proxy.append(host) + os.environ["NO_PROXY"] = ",".join(no_proxy) + + # TODO: 简单的清除系统代理不是个好的选择,影响太多。似乎修改代理服务器的bypass列表更好。 + # patch requests to use custom proxies instead of system settings + def _get_proxies(): + return proxies + + import urllib.request + urllib.request.getproxies = _get_proxies + + # 自动检查torch可用的设备。分布式部署时,不运行LLM的机器上可以不装torch + + +def get_httpx_client( + use_async: bool = False, + proxies: Union[str, Dict] = None, + timeout: float = 300, + **kwargs, +) -> Union[httpx.Client, httpx.AsyncClient]: + ''' + helper to get httpx client with default proxies that bypass local addesses. + ''' + default_proxies = { + # do not use proxy for locahost + "all://127.0.0.1": None, + "all://localhost": None, + } + # do not use proxy for user deployed fastchat servers + for x in [ + fschat_controller_address(), + fschat_model_worker_address(), + fschat_openai_api_address(), + ]: + host = ":".join(x.split(":")[:2]) + default_proxies.update({host: None}) + + # get proxies from system envionrent + # proxy not str empty string, None, False, 0, [] or {} + default_proxies.update({ + "http://": (os.environ.get("http_proxy") + if os.environ.get("http_proxy") and len(os.environ.get("http_proxy").strip()) + else None), + "https://": (os.environ.get("https_proxy") + if os.environ.get("https_proxy") and len(os.environ.get("https_proxy").strip()) + else None), + "all://": (os.environ.get("all_proxy") + if os.environ.get("all_proxy") and len(os.environ.get("all_proxy").strip()) + else None), + }) + for host in os.environ.get("no_proxy", "").split(","): + if host := host.strip(): + # default_proxies.update({host: None}) # Origin code + default_proxies.update({'all://' + host: None}) # PR 1838 fix, if not add 'all://', httpx will raise error + + # merge default proxies with user provided proxies + if isinstance(proxies, str): + proxies = {"all://": proxies} + + if isinstance(proxies, dict): + default_proxies.update(proxies) + + # construct Client + kwargs.update(timeout=timeout, proxies=default_proxies) + + if log_verbose: + logger.info(f'{get_httpx_client.__class__.__name__}:kwargs: {kwargs}') + + if use_async: + return httpx.AsyncClient(**kwargs) + else: + return httpx.Client(**kwargs) + + +def create_controller_app( + dispatch_method: str, + log_level: str = "INFO", +) -> FastAPI: import fastchat.constants fastchat.constants.LOGDIR = LOG_PATH - from fastchat.serve.controller import app, Controller + from fastchat.serve.controller import app, Controller, logger + logger.setLevel(log_level) controller = Controller(dispatch_method) sys.modules["fastchat.serve.controller"].controller = controller MakeFastAPIOffline(app) app.title = "FastChat Controller" + app._controller = controller return app -def create_model_worker_app( - worker_address=base_url.format(model_worker_port), - controller_address=base_url.format(controller_port), - model_path=llm_model_dict[LLM_MODEL].get("local_model_path"), - device=LLM_DEVICE, - gpus=None, - max_gpu_memory="8GiB", - load_8bit=False, - cpu_offloading=None, - gptq_ckpt=None, - gptq_wbits=16, - gptq_groupsize=-1, - gptq_act_order=False, - awq_ckpt=None, - awq_wbits=16, - awq_groupsize=-1, - model_names=[LLM_MODEL], - num_gpus=1, # not in fastchat - conv_template=None, - limit_worker_concurrency=5, - stream_interval=2, - no_register=False, -): +def create_model_worker_app(log_level: str = "INFO", **kwargs) -> FastAPI: + """ + kwargs包含的字段如下: + host: + port: + model_names:[`model_name`] + controller_address: + worker_address: + + 对于Langchain支持的模型: + langchain_model:True + 不会使用fschat + 对于online_api: + online_api:True + worker_class: `provider` + 对于离线模型: + model_path: `model_name_or_path`,huggingface的repo-id或本地路径 + device:`LLM_DEVICE` + """ import fastchat.constants fastchat.constants.LOGDIR = LOG_PATH - from fastchat.serve.model_worker import app, GptqConfig, AWQConfig, ModelWorker, worker_id import argparse - import threading - import fastchat.serve.model_worker - - # workaround to make program exit with Ctrl+c - # it should be deleted after pr is merged by fastchat - def _new_init_heart_beat(self): - self.register_to_controller() - self.heart_beat_thread = threading.Thread( - target=fastchat.serve.model_worker.heart_beat_worker, args=(self,), daemon=True, - ) - self.heart_beat_thread.start() - ModelWorker.init_heart_beat = _new_init_heart_beat parser = argparse.ArgumentParser() - args = parser.parse_args() - args.model_path = model_path - args.model_names = model_names - args.device = device - args.load_8bit = load_8bit - args.gptq_ckpt = gptq_ckpt - args.gptq_wbits = gptq_wbits - args.gptq_groupsize = gptq_groupsize - args.gptq_act_order = gptq_act_order - args.awq_ckpt = awq_ckpt - args.awq_wbits = awq_wbits - args.awq_groupsize = awq_groupsize - args.gpus = gpus - args.num_gpus = num_gpus - args.max_gpu_memory = max_gpu_memory - args.cpu_offloading = cpu_offloading - args.worker_address = worker_address - args.controller_address = controller_address - args.conv_template = conv_template - args.limit_worker_concurrency = limit_worker_concurrency - args.stream_interval = stream_interval - args.no_register = no_register + args = parser.parse_args([]) - if args.gpus: - if len(args.gpus.split(",")) < args.num_gpus: - raise ValueError( - f"Larger --num-gpus ({args.num_gpus}) than --gpus {args.gpus}!" + for k, v in kwargs.items(): + setattr(args, k, v) + if worker_class := kwargs.get("langchain_model"): #Langchian支持的模型不用做操作 + from fastchat.serve.base_model_worker import app + worker = "" + # 在线模型API + elif worker_class := kwargs.get("worker_class"): + from fastchat.serve.base_model_worker import app + + worker = worker_class(model_names=args.model_names, + controller_addr=args.controller_address, + worker_addr=args.worker_address) + # sys.modules["fastchat.serve.base_model_worker"].worker = worker + sys.modules["fastchat.serve.base_model_worker"].logger.setLevel(log_level) + # 本地模型 + else: + from configs.model_config import VLLM_MODEL_DICT + if kwargs["model_names"][0] in VLLM_MODEL_DICT and args.infer_turbo == "vllm": + import fastchat.serve.vllm_worker + from fastchat.serve.vllm_worker import VLLMWorker, app, worker_id + from vllm import AsyncLLMEngine + from vllm.engine.arg_utils import AsyncEngineArgs,EngineArgs + + args.tokenizer = args.model_path # 如果tokenizer与model_path不一致在此处添加 + args.tokenizer_mode = 'auto' + args.trust_remote_code= True + args.download_dir= None + args.load_format = 'auto' + args.dtype = 'auto' + args.seed = 0 + args.worker_use_ray = False + args.pipeline_parallel_size = 1 + args.tensor_parallel_size = 1 + args.block_size = 16 + args.swap_space = 4 # GiB + args.gpu_memory_utilization = 0.90 + args.max_num_batched_tokens = None # 一个批次中的最大令牌(tokens)数量,这个取决于你的显卡和大模型设置,设置太大显存会不够 + args.max_num_seqs = 256 + args.disable_log_stats = False + args.conv_template = None + args.limit_worker_concurrency = 5 + args.no_register = False + args.num_gpus = 4 # vllm worker的切分是tensor并行,这里填写显卡的数量 + args.engine_use_ray = False + args.disable_log_requests = False + + # 0.2.1 vllm后要加的参数, 但是这里不需要 + args.max_model_len = None + args.revision = None + args.quantization = None + args.max_log_len = None + args.tokenizer_revision = None + + # 0.2.2 vllm需要新加的参数 + args.max_paddings = 256 + + if args.model_path: + args.model = args.model_path + if args.num_gpus > 1: + args.tensor_parallel_size = args.num_gpus + + for k, v in kwargs.items(): + setattr(args, k, v) + + engine_args = AsyncEngineArgs.from_cli_args(args) + engine = AsyncLLMEngine.from_engine_args(engine_args) + + worker = VLLMWorker( + controller_addr = args.controller_address, + worker_addr = args.worker_address, + worker_id = worker_id, + model_path = args.model_path, + model_names = args.model_names, + limit_worker_concurrency = args.limit_worker_concurrency, + no_register = args.no_register, + llm_engine = engine, + conv_template = args.conv_template, + ) + sys.modules["fastchat.serve.vllm_worker"].engine = engine + sys.modules["fastchat.serve.vllm_worker"].worker = worker + sys.modules["fastchat.serve.vllm_worker"].logger.setLevel(log_level) + + else: + from fastchat.serve.model_worker import app, GptqConfig, AWQConfig, ModelWorker, worker_id + + args.gpus = "0" # GPU的编号,如果有多个GPU,可以设置为"0,1,2,3" + args.max_gpu_memory = "22GiB" + args.num_gpus = 1 # model worker的切分是model并行,这里填写显卡的数量 + + args.load_8bit = False + args.cpu_offloading = None + args.gptq_ckpt = None + args.gptq_wbits = 16 + args.gptq_groupsize = -1 + args.gptq_act_order = False + args.awq_ckpt = None + args.awq_wbits = 16 + args.awq_groupsize = -1 + args.model_names = [""] + args.conv_template = None + args.limit_worker_concurrency = 5 + args.stream_interval = 2 + args.no_register = False + args.embed_in_truncate = False + for k, v in kwargs.items(): + setattr(args, k, v) + if args.gpus: + if args.num_gpus is None: + args.num_gpus = len(args.gpus.split(',')) + if len(args.gpus.split(",")) < args.num_gpus: + raise ValueError( + f"Larger --num-gpus ({args.num_gpus}) than --gpus {args.gpus}!" + ) + os.environ["CUDA_VISIBLE_DEVICES"] = args.gpus + gptq_config = GptqConfig( + ckpt=args.gptq_ckpt or args.model_path, + wbits=args.gptq_wbits, + groupsize=args.gptq_groupsize, + act_order=args.gptq_act_order, + ) + awq_config = AWQConfig( + ckpt=args.awq_ckpt or args.model_path, + wbits=args.awq_wbits, + groupsize=args.awq_groupsize, ) - os.environ["CUDA_VISIBLE_DEVICES"] = args.gpus - if gpus and num_gpus is None: - num_gpus = len(gpus.split(',')) - args.num_gpus = num_gpus + worker = ModelWorker( + controller_addr=args.controller_address, + worker_addr=args.worker_address, + worker_id=worker_id, + model_path=args.model_path, + model_names=args.model_names, + limit_worker_concurrency=args.limit_worker_concurrency, + no_register=args.no_register, + device=args.device, + num_gpus=args.num_gpus, + max_gpu_memory=args.max_gpu_memory, + load_8bit=args.load_8bit, + cpu_offloading=args.cpu_offloading, + gptq_config=gptq_config, + awq_config=awq_config, + stream_interval=args.stream_interval, + conv_template=args.conv_template, + embed_in_truncate=args.embed_in_truncate, + ) + sys.modules["fastchat.serve.model_worker"].args = args + sys.modules["fastchat.serve.model_worker"].gptq_config = gptq_config + # sys.modules["fastchat.serve.model_worker"].worker = worker + sys.modules["fastchat.serve.model_worker"].logger.setLevel(log_level) - gptq_config = GptqConfig( - ckpt=gptq_ckpt or model_path, - wbits=args.gptq_wbits, - groupsize=args.gptq_groupsize, - act_order=args.gptq_act_order, - ) - awq_config = AWQConfig( - ckpt=args.awq_ckpt or args.model_path, - wbits=args.awq_wbits, - groupsize=args.awq_groupsize, - ) - - # torch.multiprocessing.set_start_method('spawn') - worker = ModelWorker( - controller_addr=args.controller_address, - worker_addr=args.worker_address, - worker_id=worker_id, - model_path=args.model_path, - model_names=args.model_names, - limit_worker_concurrency=args.limit_worker_concurrency, - no_register=args.no_register, - device=args.device, - num_gpus=args.num_gpus, - max_gpu_memory=args.max_gpu_memory, - load_8bit=args.load_8bit, - cpu_offloading=args.cpu_offloading, - gptq_config=gptq_config, - awq_config=awq_config, - stream_interval=args.stream_interval, - conv_template=args.conv_template, - ) - - sys.modules["fastchat.serve.model_worker"].worker = worker - sys.modules["fastchat.serve.model_worker"].args = args - sys.modules["fastchat.serve.model_worker"].gptq_config = gptq_config - MakeFastAPIOffline(app) - app.title = f"FastChat LLM Server ({LLM_MODEL})" + app.title = f"FastChat LLM Server ({args.model_names[0]})" + app._worker = worker return app def create_openai_api_app( - controller_address=base_url.format(controller_port), - api_keys=[], -): + controller_address: str, + api_keys: List = [], + log_level: str = "INFO", +) -> FastAPI: import fastchat.constants fastchat.constants.LOGDIR = LOG_PATH from fastchat.serve.openai_api_server import app, CORSMiddleware, app_settings + from fastchat.utils import build_logger + logger = build_logger("openai_api", "openai_api.log") + logger.setLevel(log_level) app.add_middleware( CORSMiddleware, @@ -175,6 +455,7 @@ def create_openai_api_app( allow_headers=["*"], ) + sys.modules["fastchat.serve.openai_api_server"].logger = logger app_settings.controller_address = controller_address app_settings.api_keys = api_keys @@ -183,98 +464,509 @@ def create_openai_api_app( return app -def run_controller(q): - import uvicorn - app = create_controller_app() - +def _set_app_event(app: FastAPI, started_event: mp.Event = None): @app.on_event("startup") async def on_startup(): - set_httpx_timeout() - q.put(1) - - uvicorn.run(app, host=host_ip, port=controller_port) + if started_event is not None: + started_event.set() -def run_model_worker(q, *args, **kwargs): +def run_controller(log_level: str = "INFO", started_event: mp.Event = None): import uvicorn - app = create_model_worker_app(*args, **kwargs) + import httpx + from fastapi import Body + import time + import sys + # from server.utils import set_httpx_config + set_httpx_config() - @app.on_event("startup") - async def on_startup(): - set_httpx_timeout() - while True: - no = q.get() - if no != 1: - q.put(no) + app = create_controller_app( + dispatch_method=FSCHAT_CONTROLLER.get("dispatch_method"), + log_level=log_level, + ) + _set_app_event(app, started_event) + + # add interface to release and load model worker + @app.post("/release_worker") + def release_worker( + model_name: str = Body(..., description="要释放模型的名称", samples=["chatglm-6b"]), + # worker_address: str = Body(None, description="要释放模型的地址,与名称二选一", samples=[FSCHAT_CONTROLLER_address()]), + new_model_name: str = Body(None, description="释放后加载该模型"), + keep_origin: bool = Body(False, description="不释放原模型,加载新模型") + ) -> Dict: + available_models = app._controller.list_models() + if new_model_name in available_models: + msg = f"要切换的LLM模型 {new_model_name} 已经存在" + logger.info(msg) + return {"code": 500, "msg": msg} + + if new_model_name: + logger.info(f"开始切换LLM模型:从 {model_name} 到 {new_model_name}") + else: + logger.info(f"即将停止LLM模型: {model_name}") + + if model_name not in available_models: + msg = f"the model {model_name} is not available" + logger.error(msg) + return {"code": 500, "msg": msg} + + worker_address = app._controller.get_worker_address(model_name) + if not worker_address: + msg = f"can not find model_worker address for {model_name}" + logger.error(msg) + return {"code": 500, "msg": msg} + + with get_httpx_client() as client: + r = client.post(worker_address + "/release", + json={"new_model_name": new_model_name, "keep_origin": keep_origin}) + if r.status_code != 200: + msg = f"failed to release model: {model_name}" + logger.error(msg) + return {"code": 500, "msg": msg} + + if new_model_name: + timer = 300 # wait for new model_worker register + while timer > 0: + models = app._controller.list_models() + if new_model_name in models: + break + time.sleep(1) + timer -= 1 + if timer > 0: + msg = f"sucess change model from {model_name} to {new_model_name}" + logger.info(msg) + return {"code": 200, "msg": msg} else: - break - q.put(2) + msg = f"failed change model from {model_name} to {new_model_name}" + logger.error(msg) + return {"code": 500, "msg": msg} + else: + msg = f"sucess to release model: {model_name}" + logger.info(msg) + return {"code": 200, "msg": msg} - uvicorn.run(app, host=host_ip, port=model_worker_port) + host = FSCHAT_CONTROLLER["host"] + port = FSCHAT_CONTROLLER["port"] + + if log_level == "ERROR": + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + uvicorn.run(app, host=host, port=port, log_level=log_level.lower()) -def run_openai_api(q): +def run_model_worker( + model_name: str = LLM_MODEL, + controller_address: str = "", + log_level: str = "INFO", + q: mp.Queue = None, + started_event: mp.Event = None, +): import uvicorn - app = create_openai_api_app() + from fastapi import Body + import sys + set_httpx_config() - @app.on_event("startup") - async def on_startup(): - set_httpx_timeout() - while True: - no = q.get() - if no != 2: - q.put(no) + kwargs = get_model_worker_config(model_name) + host = kwargs.pop("host") + port = kwargs.pop("port") + kwargs["model_names"] = [model_name] + kwargs["controller_address"] = controller_address or fschat_controller_address() + kwargs["worker_address"] = fschat_model_worker_address(model_name) + model_path = kwargs.get("model_path", "") + kwargs["model_path"] = model_path + # kwargs["gptq_wbits"] = 4 + + app = create_model_worker_app(log_level=log_level, **kwargs) + _set_app_event(app, started_event) + if log_level == "ERROR": + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + # add interface to release and load model + @app.post("/release") + def release_model( + new_model_name: str = Body(None, description="释放后加载该模型"), + keep_origin: bool = Body(False, description="不释放原模型,加载新模型") + ) -> Dict: + if keep_origin: + if new_model_name: + q.put([model_name, "start", new_model_name]) + else: + if new_model_name: + q.put([model_name, "replace", new_model_name]) else: - break - q.put(3) + q.put([model_name, "stop", None]) + return {"code": 200, "msg": "done"} - uvicorn.run(app, host=host_ip, port=openai_api_port) + uvicorn.run(app, host=host, port=port, log_level=log_level.lower()) + + +def run_openai_api(log_level: str = "INFO", started_event: mp.Event = None): + import uvicorn + import sys + set_httpx_config() + + controller_addr = fschat_controller_address() + app = create_openai_api_app(controller_addr, log_level=log_level) # TODO: not support keys yet. + _set_app_event(app, started_event) + + host = FSCHAT_OPENAI_API["host"] + port = FSCHAT_OPENAI_API["port"] + if log_level == "ERROR": + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + uvicorn.run(app, host=host, port=port) + + +def parse_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument( + "-a", + "--all-webui", + action="store_true", + help="run fastchat's controller/openai_api/model_worker servers, run api.py and webui.py", + dest="all_webui", + ) + parser.add_argument( + "--all-api", + action="store_true", + help="run fastchat's controller/openai_api/model_worker servers, run api.py", + dest="all_api", + ) + parser.add_argument( + "--llm-api", + action="store_true", + help="run fastchat's controller/openai_api/model_worker servers", + dest="llm_api", + ) + parser.add_argument( + "-o", + "--openai-api", + action="store_true", + help="run fastchat's controller/openai_api servers", + dest="openai_api", + ) + parser.add_argument( + "-m", + "--model-worker", + action="store_true", + help="run fastchat's model_worker server with specified model name. " + "specify --model-name if not using default LLM_MODELS", + dest="model_worker", + ) + parser.add_argument( + "-n", + "--model-name", + type=str, + nargs="+", + default=[LLM_MODEL], + help="specify model name for model worker. " + "add addition names with space seperated to start multiple model workers.", + dest="model_name", + ) + parser.add_argument( + "-c", + "--controller", + type=str, + help="specify controller address the worker is registered to. default is FSCHAT_CONTROLLER", + dest="controller_address", + ) + parser.add_argument( + "--api", + action="store_true", + help="run api.py server", + dest="api", + ) + parser.add_argument( + "-p", + "--api-worker", + action="store_true", + help="run online model api such as zhipuai", + dest="api_worker", + ) + parser.add_argument( + "-w", + "--webui", + action="store_true", + help="run webui.py server", + dest="webui", + ) + parser.add_argument( + "-q", + "--quiet", + action="store_true", + help="减少fastchat服务log信息", + dest="quiet", + ) + parser.add_argument( + "-i", + "--lite", + action="store_true", + help="以Lite模式运行:仅支持在线API的LLM对话、搜索引擎对话", + dest="lite", + ) + args = parser.parse_args() + return args, parser + + +def dump_server_info(after_start=False, args=None): + import platform + import langchain + import fastchat + + print("\n") + print("=" * 30 + "Langchain-Chatchat Configuration" + "=" * 30) + print(f"操作系统:{platform.platform()}.") + print(f"python版本:{sys.version}") + print(f"langchain版本:{langchain.__version__}. fastchat版本:{fastchat.__version__}") + print("\n") + + models = [LLM_MODEL] + if args and args.model_name: + models = args.model_name + + print(f"当前启动的LLM模型:{models} @ {LLM_DEVICE}") + + for model in models: + print(get_model_worker_config(model)) + + if after_start: + print("\n") + print(f"服务端运行信息:") + print(f" OpenAI API Server: {fschat_openai_api_address()}") + print("\n") + + +async def start_main_server(): + import time + import signal + + def handler(signalname): + """ + Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed. + Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose. + """ + def f(signal_received, frame): + raise KeyboardInterrupt(f"{signalname} received") + return f + + # This will be inherited by the child process if it is forked (not spawned) + signal.signal(signal.SIGINT, handler("SIGINT")) + signal.signal(signal.SIGTERM, handler("SIGTERM")) + + mp.set_start_method("spawn") + manager = mp.Manager() + run_mode = None + + queue = manager.Queue() + args, parser = parse_args() + logger.debug(f"args: {args}") + + dump_server_info(args=args) + + if len(sys.argv) > 1: + logger.info(f"正在启动服务:") + logger.info(f"如需查看 llm_api 日志,请前往 {LOG_PATH}") + + processes = {"online_api": {}, "model_worker": {}} + + def process_count(): + return len(processes) + len(processes["online_api"]) + len(processes["model_worker"]) - 2 + + if args.quiet or not log_verbose: + log_level = "ERROR" + else: + log_level = "INFO" + + controller_started = manager.Event() + process = Process( + target=run_controller, + name=f"controller", + kwargs=dict(log_level=log_level, started_event=controller_started), + daemon=True, + ) + processes["controller"] = process + + process = Process( + target=run_openai_api, + name=f"openai_api", + daemon=True, + ) + processes["openai_api"] = process + + model_worker_started = [] + for model_name in args.model_name: + config = get_model_worker_config(model_name) + if not config.get("online_api"): + e = manager.Event() + model_worker_started.append(e) + process = Process( + target=run_model_worker, + name=f"model_worker - {model_name}", + kwargs=dict(model_name=model_name, + controller_address=args.controller_address, + log_level=log_level, + q=queue, + started_event=e), + daemon=True, + ) + processes["model_worker"][model_name] = process + + for model_name in args.model_name: + config = get_model_worker_config(model_name) + if (config.get("online_api") + and config.get("worker_class") + and model_name in FSCHAT_MODEL_WORKERS): + e = manager.Event() + model_worker_started.append(e) + process = Process( + target=run_model_worker, + name=f"api_worker - {model_name}", + kwargs=dict(model_name=model_name, + controller_address=args.controller_address, + log_level=log_level, + q=queue, + started_event=e), + daemon=True, + ) + processes["online_api"][model_name] = process + + + if process_count() == 0: + parser.print_help() + else: + try: + # 保证任务收到SIGINT后,能够正常退出 + if p:= processes.get("controller"): + p.start() + p.name = f"{p.name} ({p.pid})" + controller_started.wait() # 等待controller启动完成 + + if p:= processes.get("openai_api"): + p.start() + p.name = f"{p.name} ({p.pid})" + + for n, p in processes.get("model_worker", {}).items(): + p.start() + p.name = f"{p.name} ({p.pid})" + + for n, p in processes.get("online_api", []).items(): + p.start() + p.name = f"{p.name} ({p.pid})" + + # 等待所有model_worker启动完成 + for e in model_worker_started: + e.wait() + + dump_server_info(after_start=True, args=args) + + while True: + cmd = queue.get() # 收到切换模型的消息 + e = manager.Event() + if isinstance(cmd, list): + model_name, cmd, new_model_name = cmd + if cmd == "start": # 运行新模型 + logger.info(f"准备启动新模型进程:{new_model_name}") + process = Process( + target=run_model_worker, + name=f"model_worker - {new_model_name}", + kwargs=dict(model_name=new_model_name, + controller_address=args.controller_address, + log_level=log_level, + q=queue, + started_event=e), + daemon=True, + ) + process.start() + process.name = f"{process.name} ({process.pid})" + processes["model_worker"][new_model_name] = process + e.wait() + logger.info(f"成功启动新模型进程:{new_model_name}") + elif cmd == "stop": + if process := processes["model_worker"].get(model_name): + time.sleep(1) + process.terminate() + process.join() + logger.info(f"停止模型进程:{model_name}") + else: + logger.error(f"未找到模型进程:{model_name}") + elif cmd == "replace": + if process := processes["model_worker"].pop(model_name, None): + logger.info(f"停止模型进程:{model_name}") + start_time = datetime.now() + time.sleep(1) + process.terminate() + process.join() + process = Process( + target=run_model_worker, + name=f"model_worker - {new_model_name}", + kwargs=dict(model_name=new_model_name, + controller_address=args.controller_address, + log_level=log_level, + q=queue, + started_event=e), + daemon=True, + ) + process.start() + process.name = f"{process.name} ({process.pid})" + processes["model_worker"][new_model_name] = process + e.wait() + timing = datetime.now() - start_time + logger.info(f"成功启动新模型进程:{new_model_name}。用时:{timing}。") + else: + logger.error(f"未找到模型进程:{model_name}") + + + # for process in processes.get("model_worker", {}).values(): + # process.join() + # for process in processes.get("online_api", {}).values(): + # process.join() + + # for name, process in processes.items(): + # if name not in ["model_worker", "online_api"]: + # if isinstance(p, dict): + # for work_process in p.values(): + # work_process.join() + # else: + # process.join() + except Exception as e: + logger.error(e) + logger.warning("Caught KeyboardInterrupt! Setting stop event...") + finally: + # Send SIGINT if process doesn't exit quickly enough, and kill it as last resort + # .is_alive() also implicitly joins the process (good practice in linux) + # while alive_procs := [p for p in processes.values() if p.is_alive()]: + + for p in processes.values(): + logger.warning("Sending SIGKILL to %s", p) + # Queues and other inter-process communication primitives can break when + # process is killed, but we don't care here + + if isinstance(p, dict): + for process in p.values(): + process.kill() + else: + p.kill() + + for p in processes.values(): + logger.info("Process status: %s", p) if __name__ == "__main__": - mp.set_start_method("spawn") - queue = Queue() - logger.info(llm_model_dict[LLM_MODEL]) - model_path = llm_model_dict[LLM_MODEL]["local_model_path"] - logger.info(f"如需查看 llm_api 日志,请前往 {LOG_PATH}") - - if not model_path: - logger.error("local_model_path 不能为空") + if sys.version_info < (3, 10): + loop = asyncio.get_event_loop() else: - controller_process = Process( - target=run_controller, - name=f"controller({os.getpid()})", - args=(queue,), - daemon=True, - ) - controller_process.start() - - model_worker_process = Process( - target=run_model_worker, - name=f"model_worker({os.getpid()})", - args=(queue,), - # kwargs={"load_8bit": True}, - daemon=True, - ) - model_worker_process.start() - - openai_api_process = Process( - target=run_openai_api, - name=f"openai_api({os.getpid()})", - args=(queue,), - daemon=True, - ) - openai_api_process.start() - try: - model_worker_process.join() - controller_process.join() - openai_api_process.join() - except KeyboardInterrupt: - model_worker_process.terminate() - controller_process.terminate() - openai_api_process.terminate() + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + + asyncio.set_event_loop(loop) + # 同步调用协程代码 + loop.run_until_complete(start_main_server()) + # 服务启动后接口调用示例: # import openai diff --git a/dev_opsgpt/service/llm_api.py.bak b/dev_opsgpt/service/llm_api.py.bak new file mode 100644 index 0000000..e69de29 diff --git a/examples/agent_examples/codeChatXX_examply.py b/examples/agent_examples/codeChatXX_examply.py new file mode 100644 index 0000000..263e36e --- /dev/null +++ b/examples/agent_examples/codeChatXX_examply.py @@ -0,0 +1,59 @@ +import os, sys, requests + +src_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +sys.path.append(src_dir) + +from dev_opsgpt.tools import ( + toLangchainTools, get_tool_schema, DDGSTool, DocRetrieval, + TOOL_DICT, TOOL_SETS + ) + +from configs.model_config import * +from dev_opsgpt.connector.phase import BasePhase +from dev_opsgpt.connector.agents import BaseAgent +from dev_opsgpt.connector.chains import BaseChain +from dev_opsgpt.connector.schema import ( + Message, Memory, load_role_configs, load_phase_configs, load_chain_configs + ) +from dev_opsgpt.connector.configs import AGETN_CONFIGS, CHAIN_CONFIGS, PHASE_CONFIGS +import importlib + +tools = toLangchainTools([TOOL_DICT[i] for i in TOOL_SETS if i in TOOL_DICT]) + + +role_configs = load_role_configs(AGETN_CONFIGS) +chain_configs = load_chain_configs(CHAIN_CONFIGS) +phase_configs = load_phase_configs(PHASE_CONFIGS) + +agent_module = importlib.import_module("dev_opsgpt.connector.agents") + + +phase_name = "codeChatPhase" +phase = BasePhase(phase_name, + task = None, + phase_config = PHASE_CONFIGS, + chain_config = CHAIN_CONFIGS, + role_config = AGETN_CONFIGS, + do_summary=False, + do_code_retrieval=True, + do_doc_retrieval=False, + do_search=False, + ) + +# 代码一共有多少类 => 基于cypher +# 代码库里有哪些函数,返回5个就行 => 基于cypher +# remove 这个函数是做什么的 => 基于标签 +# 有没有函数已经实现了从字符串删除指定字符串的功能,使用的话可以怎么使用,写个java代码 => 基于描述 +# 有根据我以下的需求用 java 开发一个方法:输入为字符串,将输入中的 .java 字符串给删除掉,然后返回新的字符串 => 基于描述 + +# round-1 +query_content = "代码一共有多少类" +query = Message( + role_name="user", role_type="human", + role_content=query_content, input_query=query_content, origin_query=query_content, + code_engine_name="client", score_threshold=1.0, top_k=3, cb_search_type="cypher" + ) + +output_message1, _ = phase.step(query) diff --git a/requirements.txt b/requirements.txt index 168a06e..df8b74c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ langchain==0.0.266 openai==0.28.1 sentence_transformers -fschat==0.2.24 +fschat==0.2.33 transformers>=4.31.0 # torch~=2.0.0 fastapi~=0.99.1 @@ -45,3 +45,10 @@ javalang==0.13.0 jsonref==1.1.0 chromadb==0.4.17 nebula3-python==3.1.0 + + +# qwen model +protobuf==3.20.* +transformers_stream_generator +einops +