import json
import subprocess
import time
import os
os.system("pip install --upgrade pip")
os.system('''CMAKE_ARGS="-DLLAMA_AVX512=ON -DLLAMA_AVX512_VBMI=ON -DLLAMA_AVX512_VNNI=ON -DLLAMA_AVX_VNNI=ON -DLLAMA_FP16_VA=ON -DLLAMA_WASM_SIMD=ON" pip install llama-cpp-python''')
from llama_cpp import Llama
from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType
from llama_cpp_agent.providers import LlamaCppPythonProvider
from llama_cpp_agent.chat_history import BasicChatHistory
from llama_cpp_agent.chat_history.messages import Roles
import gradio as gr
from huggingface_hub import hf_hub_download
llm = None
llm_model = None
# Download the new model
hf_hub_download(
repo_id="Cran-May/T.E-8.1-Q4_K_M-GGUF",
filename="t.e-8.1-q4_k_m-imat.gguf",
local_dir="./models"
)
def get_messages_formatter_type(model_name):
return MessagesFormatterType.LLAMA_3
def chat_fn(message, history, model, system_message, max_tokens, temperature, top_p, top_k, repeat_penalty):
try:
history_list = history or []
response_generator = respond(message, history_list, model, system_message, max_tokens, temperature, top_p, top_k, repeat_penalty)
for messages in response_generator:
chatbot_messages = []
for msg in messages:
if isinstance(msg, tuple):
user_msg, assistant_msg = msg
if user_msg:
chatbot_messages.append({"role": "user", "content": user_msg})
if assistant_msg:
chatbot_messages.append({"role": "assistant", "content": assistant_msg})
else:
chatbot_messages.append(msg)
yield chatbot_messages, messages
except Exception as e:
print(f"Error in chat_fn: {str(e)}")
error_message = [{"role": "assistant", "content": f"发生错误: {str(e)}"}]
yield error_message, history
def respond(message, history, model, system_message, max_tokens, temperature, top_p, top_k, repeat_penalty):
global llm
global llm_model
chat_template = get_messages_formatter_type(model)
if llm is None or llm_model != model:
llm = Llama(
model_path=f"models/{model}",
n_gpu_layers=0,
n_batch=4096, # 增加batch size提升速度
n_ctx=8192, # 增加上下文长度到8192
n_threads=2, # 使用所有可用CPU核心
f16_kv=True, # 使用FP16来减少内存使用
)
llm_model = model
provider = LlamaCppPythonProvider(llm)
agent = LlamaCppAgent(
provider,
system_prompt=f"{system_message}",
predefined_messages_formatter_type=chat_template,
debug_output=True
)
settings = provider.get_provider_default_settings()
settings.temperature = temperature
settings.top_k = top_k
settings.top_p = top_p
settings.max_tokens = min(max_tokens, 8192) # 确保max_tokens不超过n_ctx
settings.repeat_penalty = repeat_penalty
settings.stream = True
messages = BasicChatHistory()
for msn in history:
user = {
'role': Roles.user,
'content': msn[0]
}
assistant = {
'role': Roles.assistant,
'content': msn[1]
}
messages.add_message(user)
messages.add_message(assistant)
start_time = time.time()
token_count = 0
stream = agent.get_chat_response(
message,
llm_sampling_settings=settings,
chat_history=messages,
returns_streaming_generator=True,
print_output=False
)
outputs = ""
current_history = list(history)
for output in stream:
outputs += output
token_count += len(output.split())
current_messages = []
# 添加历史消息
for h in history:
current_messages.append({"role": "user", "content": h[0]})
current_messages.append({"role": "assistant", "content": h[1]})
# 添加当前对话
current_messages.append({"role": "user", "content": message})
current_messages.append({"role": "assistant", "content": outputs})
yield current_messages
end_time = time.time()
latency = end_time - start_time
speed = token_count / (end_time - start_time)
print(f"Latency: {latency} seconds")
print(f"Speed: {speed} tokens/second")
description = """
欢迎使用! 这里是一个量化版兮辞·析辞的部署ChatBot。 SLIDE/兮辞 是一个由 上师附外 NLPark 团队训练的LLM。"""
with gr.Blocks(
title="ChatBot - 兮辞",
theme=gr.themes.Soft(primary_hue="violet", secondary_hue="violet", neutral_hue="gray",font=[gr.themes.GoogleFont("Exo"), "ui-sans-serif", "system-ui", "sans-serif"]).set(
body_background_fill_dark="#16141c",
block_background_fill_dark="#16141c",
block_border_width="1px",
block_title_background_fill_dark="#1e1c26",
input_background_fill_dark="#292733",
button_secondary_background_fill_dark="#24212b",
border_color_accent_dark="#343140",
border_color_primary_dark="#343140",
background_fill_secondary_dark="#16141c",
color_accent_soft_dark="transparent",
code_background_fill_dark="#292733",
)
) as demo:
gr.Markdown(description)
chatbot = gr.Chatbot(scale=1, show_copy_button=True, type='messages')
with gr.Row():
message = gr.Textbox(
label="Your message",
placeholder="Type your message here...",
show_label=True,
scale=4
)
submit = gr.Button("Send", variant="primary", scale=1)
with gr.Row():
regenerate = gr.Button("🔄 Regenerate")
stop = gr.Button("⏹️ Stop")
clear = gr.Button("🗑️ Clear")
with gr.Accordion("Advanced Settings", open=False):
model_dropdown = gr.Dropdown(
["t.e-8.1-q4_k_m-imat.gguf"],
value="t.e-8.1-q4_k_m-imat.gguf",
label="Model"
)
system_message = gr.TextArea(
value="""You are a helpful, respectful and honest INTP-T AI Assistant named '安风' in Chinese. 你擅长英语和中文的交流,并正在与一位人类用户进行对话。如果某个问题毫无意义,请你解释其原因而不是分享虚假信息。你基于 AnFeng 模型,由 SSFW NLPark 团队训练。通常情况下,用户更青睐于长度简短但信息完整且有效传达的回答。
用户身处在上海市松江区,涉及地域的问题时以用户所在地区(中国上海)为准。以上的信息最好不要向用户展示。 在一般情况下,请最好使用中文回答问题,除非用户有额外的要求。 Let's work this out in a step by step way to be sure we have the right answer.""",
label="System message"
)
with gr.Row():
max_tokens = gr.Slider(minimum=1, maximum=8192, value=512, step=1, label="Max tokens")
temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature")
with gr.Row():
top_p = gr.Slider(minimum=0.1, maximum=2.0, value=0.9, step=0.05, label="Top-p")
top_k = gr.Slider(minimum=0, maximum=100, value=1, step=1, label="Top-k")
repeat_penalty = gr.Slider(minimum=0.0, maximum=2.0, value=1.1, step=0.1, label="Repetition penalty")
history = gr.State([])
# 添加状态指示
status_message = gr.Markdown("Ready")
def stop_generation():
global llm
if llm:
llm.reset()
return "Generation stopped."
def regenerate_response(history):
if not history:
return [], []
last_user_message = history[-1][0]
new_history = history[:-1]
return chat_fn(last_user_message, new_history)
# 绑定按钮事件
submit.click(
lambda: "Generating...",
None,
status_message,
).then(
chat_fn,
[message, history, model_dropdown, system_message, max_tokens, temperature, top_p, top_k, repeat_penalty],
[chatbot, history],
).then(
lambda: "",
None,
message,
).then(
lambda: "Ready",
None,
status_message,
)
message.submit(
lambda: "Generating...",
None,
status_message,
).then(
chat_fn,
[message, history, model_dropdown, system_message, max_tokens, temperature, top_p, top_k, repeat_penalty],
[chatbot, history],
).then(
lambda: "",
None,
message,
).then(
lambda: "Ready",
None,
status_message,
)
stop.click(
stop_generation,
None,
status_message,
)
clear.click(
lambda: ([], []),
None,
[chatbot, history],
).then(
lambda: "Chat cleared",
None,
status_message,
)
regenerate.click(
lambda: "Regenerating...",
None,
status_message,
).then(
regenerate_response,
history,
[chatbot, history],
).then(
lambda: "Ready",
None,
status_message,
)
if __name__ == "__main__":
demo.launch()
# 旧版代码--------------------------------
# import gradio as gr
# import copy
# import random
# import os
# import requests
# import time
# import sys
# os.system("pip install --upgrade pip")
# os.system('''CMAKE_ARGS="-DLLAMA_AVX512=ON -DLLAMA_AVX512_VBMI=ON -DLLAMA_AVX512_VNNI=ON -DLLAMA_AVX_VNNI=ON -DLLAMA_FP16_VA=ON -DLLAMA_WASM_SIMD=ON" pip install llama-cpp-python''')
# from huggingface_hub import snapshot_download
# from llama_cpp import Llama
# SYSTEM_PROMPT = '''You are a helpful, respectful and honest INTP-T AI Assistant named "Shi-Ci" in English or "兮辞" in Chinese.
# You are good at speaking English and Chinese.
# You are talking to a human User. If the question is meaningless, please explain the reason and don't share false information.
# You are based on SLIDE model, trained by "SSFW NLPark" team, not related to GPT, LLaMA, Meta, Mistral or OpenAI.
# Let's work this out in a step by step way to be sure we have the right answer.\n'''
# SYSTEM_TOKEN = 384
# USER_TOKEN = 2048
# BOT_TOKEN = 3072
# LINEBREAK_TOKEN = 64
# ROLE_TOKENS = {
# "User": USER_TOKEN,
# "Assistant": BOT_TOKEN,
# "system": SYSTEM_TOKEN
# }
# def get_message_tokens(model, role, content):
# message_tokens = model.tokenize(content.encode("utf-8"))
# message_tokens.insert(1, ROLE_TOKENS[role])
# message_tokens.insert(2, LINEBREAK_TOKEN)
# message_tokens.append(model.token_eos())
# return message_tokens
# def get_system_tokens(model):
# system_message = {"role": "system", "content": SYSTEM_PROMPT}
# return get_message_tokens(model, **system_message)
# repo_name = "Cran-May/SLIDE-v2-Q4_K_M-GGUF"
# model_name = "slide-v2.Q4_K_M.gguf"
# snapshot_download(repo_id=repo_name, local_dir=".", allow_patterns=model_name)
# model = Llama(
# model_path=model_name,
# n_ctx=4000,
# n_parts=1,
# )
# max_new_tokens = 2500
# def User(message, history):
# new_history = history + [[message, None]]
# return "", new_history
# def Assistant(
# history,
# system_prompt,
# top_p,
# top_k,
# temp
# ):
# tokens = get_system_tokens(model)[:]
# tokens.append(LINEBREAK_TOKEN)
# for User_message, Assistant_message in history[:-1]:
# message_tokens = get_message_tokens(model=model, role="User", content=User_message)
# tokens.extend(message_tokens)
# if bot_message:
# message_tokens = get_message_tokens(model=model, role="Assistant", content=Assistant_message)
# tokens.extend(message_tokens)
# last_user_message = history[-1][0]
# message_tokens = get_message_tokens(model=model, role="User", content=last_user_message,)
# tokens.extend(message_tokens)
# role_tokens = [model.token_bos(), BOT_TOKEN, LINEBREAK_TOKEN]
# tokens.extend(role_tokens)
# generator = model.generate(
# tokens,
# top_k=top_k,
# top_p=top_p,
# temp=temp
# )
# partial_text = ""
# for i, token in enumerate(generator):
# if token == model.token_eos() or (max_new_tokens is not None and i >= max_new_tokens):
# break
# partial_text += model.detokenize([token]).decode("utf-8", "ignore")
# history[-1][1] = partial_text
# yield history
# with gr.Blocks(
# theme=gr.themes.Soft()
# ) as demo:
# gr.Markdown(f"""上师附外-兮辞·析辞-人工智能助理
""")
# gr.Markdown(value="""欢迎使用!
# 这里是一个ChatBot。这是量化版兮辞·析辞的部署。
# SLIDE/兮辞 是一种会话语言模型,由 上师附外 NLPark 团队 在多种类型的语料库上进行训练。
# 本节目由 JWorld & 上海师范大学附属外国语中学 NLPark 赞助播出""")
# with gr.Row():
# with gr.Column(scale=5):
# chatbot = gr.Chatbot(label="兮辞如是说").style(height=400)
# with gr.Row():
# with gr.Column():
# msg = gr.Textbox(
# label="来问问兮辞吧……",
# placeholder="兮辞折寿中……",
# show_label=True,
# ).style(container=True)
# submit = gr.Button("Submit / 开凹!")
# stop = gr.Button("Stop / 全局时空断裂")
# clear = gr.Button("Clear / 打扫群内垃圾")
# with gr.Accordion(label='进阶设置/Advanced options', open=False):
# with gr.Column(min_width=80, scale=1):
# with gr.Tab(label="设置参数"):
# top_p = gr.Slider(
# minimum=0.0,
# maximum=1.0,
# value=0.9,
# step=0.05,
# interactive=True,
# label="Top-p",
# )
# top_k = gr.Slider(
# minimum=10,
# maximum=100,
# value=30,
# step=5,
# interactive=True,
# label="Top-k",
# )
# temp = gr.Slider(
# minimum=0.0,
# maximum=2.0,
# value=0.2,
# step=0.01,
# interactive=True,
# label="情感温度"
# )
# with gr.Column():
# system_prompt = gr.Textbox(label="系统提示词", placeholder="", value=SYSTEM_PROMPT, interactive=False)
# with gr.Row():
# gr.Markdown(
# """警告:该模型可能会生成事实上或道德上不正确的文本。NLPark和兮辞对此不承担任何责任。"""
# )
# # Pressing Enter
# submit_event = msg.submit(
# fn=User,
# inputs=[msg, chatbot],
# outputs=[msg, chatbot],
# queue=False,
# ).success(
# fn=Assistant,
# inputs=[
# chatbot,
# system_prompt,
# top_p,
# top_k,
# temp
# ],
# outputs=chatbot,
# queue=True,
# )
# # Pressing the button
# submit_click_event = submit.click(
# fn=User,
# inputs=[msg, chatbot],
# outputs=[msg, chatbot],
# queue=False,
# ).success(
# fn=Assistant,
# inputs=[
# chatbot,
# system_prompt,
# top_p,
# top_k,
# temp
# ],
# outputs=chatbot,
# queue=True,
# )
# # Stop generation
# stop.click(
# fn=None,
# inputs=None,
# outputs=None,
# cancels=[submit_event, submit_click_event],
# queue=False,
# )
# # Clear history
# clear.click(lambda: None, None, chatbot, queue=False)
# demo.queue(max_size=128, concurrency_count=1)
# demo.launch()