codefuse-chatbot/tests/sandbox_test.py

164 lines
4.7 KiB
Python

import json
from uuid import uuid4
import uuid
import requests
from websockets.sync.client import connect as ws_connect_sync
from websocket import create_connection
from websockets.client import connect as ws_connect
import sys, os
src_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
sys.path.append(src_dir)
from dev_opsgpt.sandbox.pycodebox import PyCodeBox
from pathlib import Path
# python = Path(sys.executable).absolute()
# print(Path(python).as_posix())
# print(python)
# print(sys.executable)
# import requests
# # 设置Jupyter Notebook服务器的URL
# url = 'http://localhost:5050' # 或者是你自己的Jupyter服务器的URL
# # 发送GET请求来获取Jupyter Notebook的登录页面
# response = requests.get(url)
# # 检查响应状态码
# if response.status_code == 200:
# # 打印响应内容
# print('connect success')
# else:
# print('connect fail')
# import subprocess
# jupyter = subprocess.Popen(
# [
# "juptyer", "notebnook",
# "--NotebookApp.token=mytoken",
# f"--port=4949",
# "--no-browser",
# "--ServerApp.disable_check_xsrf=True",
# ],
# stderr=subprocess.PIPE,
# stdin=subprocess.PIPE,
# stdout=subprocess.PIPE,
# )
# 测试1
import time, psutil
from loguru import logger
pycodebox = PyCodeBox(remote_url="http://localhost:5050",
remote_ip="http://localhost",
remote_port="5050",
token="mytoken",
do_code_exe=True,
do_remote=False)
reuslt = pycodebox.chat("```print('hello world!')```", do_code_exe=True)
print(reuslt)
reuslt = pycodebox.chat("print('hello world!')", do_code_exe=False)
print(reuslt)
for process in psutil.process_iter(["pid", "name", "cmdline"]):
# 检查进程名是否包含"jupyter"
if 'port=5050' in str(process.info["cmdline"]).lower() and \
"jupyter" in process.info['name'].lower():
logger.warning(f'port=5050, {process.info}')
# 关闭进程
process.terminate()
# 测试2
# with PyCodeBox(remote_url="http://localhost:5050",
# remote_ip="http://localhost",
# remote_port="5050",
# token="mytoken",
# do_code_exe=True,
# do_remote=True) as codebox:
# result = codebox.run("print('hello world!')")
# print(result)
# with PyCodeBox(
# remote_ip="http://localhost",
# remote_port="5555",
# token="mytoken",
# do_code_exe=True,
# do_remote=False
# ) as codebox:
# result = codebox.run("print('hello world!')")
# print(result)
# headers = {'Authorization': 'Token mytoken', 'token': 'mytoken'}
# kernel_url = "http://localhost:5050/api/kernels"
# response = requests.get(kernel_url, headers=headers)
# if len(response.json())>0:
# kernel_id = response.json()[0]["id"]
# else:
# response = requests.post(kernel_url, headers=headers)
# kernel_id = response.json()["id"]
# print(f"ws://localhost:5050/api/kernels/{kernel_id}/channels?token=mytoken")
# ws = create_connection(f"ws://localhost:5050/api/kernels/{kernel_id}/channels?token=mytoken", headers=headers)
# code_text = "print('hello world!')"
# # code_text = "import matplotlib.pyplot as plt\n\nplt.figure(figsize=(4,2))\nplt.plot([1,2,3,4,5])\nplt.show()"
# ws.send(
# json.dumps(
# {
# "header": {
# "msg_id": (msg_id := uuid4().hex),
# "msg_type": "execute_request",
# },
# "parent_header": {},
# "metadata": {},
# "content": {
# "code": code_text,
# "silent": True,
# "store_history": True,
# "user_expressions": {},
# "allow_stdin": False,
# "stop_on_error": True,
# },
# "channel": "shell",
# "buffers": [],
# }
# )
# )
# while True:
# received_msg = json.loads(ws.recv())
# if received_msg["msg_type"] == "stream":
# result_msg = received_msg # 找到结果消息
# break
# elif received_msg["header"]["msg_type"] == "execute_result":
# result_msg = received_msg # 找到结果消息
# break
# elif received_msg["header"]["msg_type"] == "display_data":
# result_msg = received_msg # 找到结果消息
# break
# print(received_msg)