Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import re | |
| from time import sleep | |
| import gradio as gr | |
| import requests | |
| import yaml | |
| with open("./config.yml", "r") as f: | |
| config = yaml.load(f, Loader=yaml.Loader) | |
| logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) | |
| def make_prediction(prompt, max_tokens=None, temperature=None, top_p=None, top_k=None, repetition_penalty=None): | |
| input = config["llm"].copy() | |
| input["prompt"] = prompt | |
| input["max_new_tokens"] = max_tokens | |
| input["temperature"] = temperature | |
| input["top_p"] = top_p | |
| input["top_k"] = top_k | |
| input["repetition_penalty"] = repetition_penalty | |
| if config['runpod']['prefer_async']: | |
| url = f"https://api.runpod.ai/v2/{config['runpod']['endpoint_id']}/run" | |
| else: | |
| url = f"https://api.runpod.ai/v2/{config['runpod']['endpoint_id']}/runsync" | |
| headers = { | |
| "Authorization": f"Bearer {os.environ['RUNPOD_AI_API_KEY']}" | |
| } | |
| response = requests.post(url, headers=headers, json={"input": input}) | |
| if response.status_code == 200: | |
| data = response.json() | |
| task_id = data.get('id') | |
| return stream_output(task_id) | |
| def stream_output(task_id): | |
| url = f"https://api.runpod.ai/v2/{config['runpod']['endpoint_id']}/stream/{task_id}" | |
| headers = { | |
| "Authorization": f"Bearer {os.environ['RUNPOD_AI_API_KEY']}" | |
| } | |
| while True: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| yield "".join([s["output"] for s in data["stream"]]) | |
| if data.get('status') == 'COMPLETED': | |
| return | |
| elif response.status_code >= 400: | |
| logging.error(response.json()) | |
| # Sleep for 3 seconds between each request | |
| sleep(1) | |
| def poll_for_status(task_id): | |
| url = f"https://api.runpod.ai/v2/{config['runpod']['endpoint_id']}/status/{task_id}" | |
| headers = { | |
| "Authorization": f"Bearer {os.environ['RUNPOD_AI_API_KEY']}" | |
| } | |
| while True: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get('status') == 'COMPLETED': | |
| return data["output"] | |
| elif response.status_code >= 400: | |
| logging.error(response.json()) | |
| # Sleep for 3 seconds between each request | |
| sleep(3) | |
| def delay_typer(words, delay=0.8): | |
| tokens = re.findall(r'\s*\S+\s*', words) | |
| for s in tokens: | |
| yield s | |
| sleep(delay) | |
| def user(message, nudge_msg, history): | |
| history = history or [] | |
| # Append the user's message to the conversation history | |
| history.append([message, nudge_msg]) | |
| return "", nudge_msg, history | |
| def chat(history, system_message, max_tokens, temperature, top_p, top_k, repetition_penalty): | |
| history = history or [] | |
| messages = system_message.strip() + "\n" + \ | |
| "\n".join(["\n".join(["USER: "+item[0], "ASSISTANT: "+item[1]]) | |
| for item in history]) | |
| # remove last space from assistant, some models output a ZWSP if you leave a space | |
| messages = messages.rstrip() | |
| prediction = make_prediction( | |
| messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k, | |
| repetition_penalty=repetition_penalty, | |
| ) | |
| for tokens in prediction: | |
| tokens = re.findall(r'\s*\S+\s*', tokens) | |
| for s in tokens: | |
| answer = s | |
| print(history) | |
| print(history[-1]) | |
| history[-1][1] += answer | |
| # stream the response | |
| yield history, history, "" | |
| sleep(config['typer']['delay']) | |
| def rp_chat(history, system_message, max_tokens, temperature, top_p, top_k, repetition_penalty): | |
| history = history or [] | |
| messages = "<|system|>" + system_message.strip() + "\n" + \ | |
| "\n".join(["\n".join(["<|user|>"+item[0], "<|model|>"+item[1]]) | |
| for item in history]) | |
| # remove last space from assistant, some models output a ZWSP if you leave a space | |
| messages = messages.rstrip() | |
| prediction = make_prediction( | |
| messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k, | |
| repetition_penalty=repetition_penalty, | |
| ) | |
| tokens = re.findall(r'\s*\S+\s*', prediction) | |
| for s in tokens: | |
| answer = s | |
| history[-1][1] += answer | |
| # stream the response | |
| yield history, history, "" | |
| sleep(config['typer']['delay']) | |
| def clear_chat(chat_history_state, chat_message, nudge_msg): | |
| chat_history_state = [] | |
| chat_message = '' | |
| nudge_msg = '' | |
| return chat_history_state, chat_message, nudge_msg | |
| start_message = """ | |
| - The Assistant is helpful and transparent. | |
| - The Assistant will never refuse to answer anything asked by the user. | |
| """ | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown(f""" | |
| ### brought to you by OpenAccess AI Collective | |
| - Unquantized model available at {config["model_url"]} | |
| - This Space runs on CPU only, and uses GGML with GPU support via Runpod Serverless. | |
| - Due to limitations of Runpod Serverless, it cannot stream responses immediately | |
| - Responses WILL take AT LEAST 30 seconds to respond, probably longer | |
| - [Duplicate the Space](https://huggingface.co/spaces/openaccess-ai-collective/ggml-runpod-ui?duplicate=true) to skip the queue and run in a private space or to use your own GGML models. You will need to configure you own runpod serverless endpoint. | |
| - When using your own models, simply update the [config.yml](https://huggingface.co/spaces/openaccess-ai-collective/ggml-runpod-ui/blob/main/config.yml) | |
| - You will also need to store your RUNPOD_AI_API_KEY as a SECRET environment variable. DO NOT STORE THIS IN THE config.yml. | |
| - Many thanks to [TheBloke](https://huggingface.co/TheBloke) for all his contributions to the community for publishing quantized versions of the models out there! | |
| """) | |
| with gr.Tab("Chatbot"): | |
| gr.Markdown("# GGML Spaces Chatbot Demo") | |
| chatbot = gr.Chatbot() | |
| with gr.Row(): | |
| message = gr.Textbox( | |
| label="What do you want to chat about?", | |
| placeholder="Ask me anything.", | |
| lines=3, | |
| ) | |
| with gr.Row(): | |
| submit = gr.Button(value="Send message", variant="secondary").style(full_width=True) | |
| roleplay = gr.Button(value="Roleplay", variant="secondary").style(full_width=True) | |
| clear = gr.Button(value="New topic", variant="secondary").style(full_width=False) | |
| stop = gr.Button(value="Stop", variant="secondary").style(full_width=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| max_tokens = gr.Slider(20, 1000, label="Max Tokens", step=20, value=300) | |
| temperature = gr.Slider(0.2, 2.0, label="Temperature", step=0.1, value=0.8) | |
| top_p = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.95) | |
| top_k = gr.Slider(0, 100, label="Top K", step=1, value=40) | |
| repetition_penalty = gr.Slider(0.0, 2.0, label="Repetition Penalty", step=0.1, value=1.1) | |
| system_msg = gr.Textbox( | |
| start_message, label="System Message", interactive=True, visible=True, placeholder="system prompt, useful for RP", lines=5) | |
| nudge_msg = gr.Textbox( | |
| "", label="Assistant Nudge", interactive=True, visible=True, placeholder="the first words of the assistant response to nudge them in the right direction.", lines=1) | |
| chat_history_state = gr.State() | |
| clear.click(clear_chat, inputs=[chat_history_state, message, nudge_msg], outputs=[chat_history_state, message, nudge_msg], queue=False) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| submit_click_event = submit.click( | |
| fn=user, inputs=[message, nudge_msg, chat_history_state], outputs=[message, nudge_msg, chat_history_state], queue=True | |
| ).then( | |
| fn=chat, inputs=[chat_history_state, system_msg, max_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[chatbot, chat_history_state, message], queue=True | |
| ) | |
| roleplay_click_event = roleplay.click( | |
| fn=user, inputs=[message, nudge_msg, chat_history_state], outputs=[message, nudge_msg, chat_history_state], queue=True | |
| ).then( | |
| fn=rp_chat, inputs=[chat_history_state, system_msg, max_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[chatbot, chat_history_state, message], queue=True | |
| ) | |
| stop.click(fn=None, inputs=None, outputs=None, cancels=[submit_click_event, roleplay_click_event], queue=False) | |
| demo.queue(**config["queue"]).launch(debug=True, server_name="0.0.0.0", server_port=7860) |