|
|
import streamlit as st |
|
|
from llama_cpp import Llama |
|
|
from huggingface_hub import hf_hub_download |
|
|
import os |
|
|
import gc |
|
|
import shutil |
|
|
|
|
|
|
|
|
MODELS = { |
|
|
"Qwen2.5-7B-Instruct (Q2_K)": { |
|
|
"repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF", |
|
|
"filename": "qwen2.5-7b-instruct-q2_k.gguf", |
|
|
"description": "Qwen2.5-7B Instruct (Q2_K)" |
|
|
}, |
|
|
"Gemma-3-4B-IT (Q4_K_M)": { |
|
|
"repo_id": "unsloth/gemma-3-4b-it-GGUF", |
|
|
"filename": "gemma-3-4b-it-Q4_K_M.gguf", |
|
|
"description": "Gemma 3 4B IT (Q4_K_M)" |
|
|
}, |
|
|
"Phi-4-mini-Instruct (Q4_K_M)": { |
|
|
"repo_id": "unsloth/Phi-4-mini-instruct-GGUF", |
|
|
"filename": "Phi-4-mini-instruct-Q4_K_M.gguf", |
|
|
"description": "Phi-4 Mini Instruct (Q4_K_M)" |
|
|
}, |
|
|
"Meta-Llama-3.1-8B-Instruct (Q2_K)": { |
|
|
"repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF", |
|
|
"filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf", |
|
|
"description": "Meta-Llama-3.1-8B-Instruct (Q2_K)" |
|
|
}, |
|
|
"DeepSeek-R1-Distill-Llama-8B (Q2_K)": { |
|
|
"repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF", |
|
|
"filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf", |
|
|
"description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)" |
|
|
}, |
|
|
"Mistral-7B-Instruct-v0.3 (IQ3_XS)": { |
|
|
"repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF", |
|
|
"filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf", |
|
|
"description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)" |
|
|
}, |
|
|
"Qwen2.5-Coder-7B-Instruct (Q2_K)": { |
|
|
"repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF", |
|
|
"filename": "qwen2.5-coder-7b-instruct-q2_k.gguf", |
|
|
"description": "Qwen2.5-Coder-7B-Instruct (Q2_K)" |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
st.header("⚙️ Settings") |
|
|
selected_model_name = st.selectbox("Select Model", list(MODELS.keys())) |
|
|
system_prompt = st.text_area("System Prompt", value="You are a helpful assistant.", height=80) |
|
|
max_tokens = st.slider("Max tokens", 64, 2048, 512, step=32) |
|
|
temperature = st.slider("Temperature", 0.1, 2.0, 0.7) |
|
|
top_k = st.slider("Top-K", 1, 100, 40) |
|
|
top_p = st.slider("Top-P", 0.1, 1.0, 0.95) |
|
|
repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1) |
|
|
|
|
|
if st.button("🧹 Clear All Cached Models"): |
|
|
try: |
|
|
for f in os.listdir("models"): |
|
|
if f.endswith(".gguf"): |
|
|
os.remove(os.path.join("models", f)) |
|
|
st.success("Model cache cleared.") |
|
|
except Exception as e: |
|
|
st.error(f"Failed to clear models: {e}") |
|
|
|
|
|
if st.button("📦 Show Disk Usage"): |
|
|
try: |
|
|
usage = shutil.disk_usage(".") |
|
|
used = usage.used / (1024**3) |
|
|
free = usage.free / (1024**3) |
|
|
st.info(f"Disk Used: {used:.2f} GB | Free: {free:.2f} GB") |
|
|
except Exception as e: |
|
|
st.error(f"Disk usage error: {e}") |
|
|
|
|
|
|
|
|
selected_model = MODELS[selected_model_name] |
|
|
model_path = os.path.join("models", selected_model["filename"]) |
|
|
|
|
|
|
|
|
if "model_name" not in st.session_state: |
|
|
st.session_state.model_name = None |
|
|
if "llm" not in st.session_state: |
|
|
st.session_state.llm = None |
|
|
|
|
|
|
|
|
os.makedirs("models", exist_ok=True) |
|
|
|
|
|
|
|
|
def cleanup_old_models(): |
|
|
for f in os.listdir("models"): |
|
|
if f.endswith(".gguf") and f != selected_model["filename"]: |
|
|
try: |
|
|
os.remove(os.path.join("models", f)) |
|
|
except Exception as e: |
|
|
st.warning(f"Couldn't delete old model {f}: {e}") |
|
|
|
|
|
def download_model(): |
|
|
with st.spinner(f"Downloading {selected_model['filename']}..."): |
|
|
hf_hub_download( |
|
|
repo_id=selected_model["repo_id"], |
|
|
filename=selected_model["filename"], |
|
|
local_dir="./models", |
|
|
local_dir_use_symlinks=False, |
|
|
) |
|
|
|
|
|
def try_load_model(path): |
|
|
try: |
|
|
return Llama(model_path=path, n_ctx=1024, n_threads=2, n_threads_batch=2, n_batch=4, n_gpu_layers=0, use_mlock=False, use_mmap=True, verbose=False) |
|
|
except Exception as e: |
|
|
return str(e) |
|
|
|
|
|
def validate_or_download_model(): |
|
|
if not os.path.exists(model_path): |
|
|
cleanup_old_models() |
|
|
download_model() |
|
|
|
|
|
|
|
|
result = try_load_model(model_path) |
|
|
if isinstance(result, str): |
|
|
st.warning(f"Initial load failed: {result}\nAttempting re-download...") |
|
|
try: |
|
|
os.remove(model_path) |
|
|
except: |
|
|
pass |
|
|
cleanup_old_models() |
|
|
download_model() |
|
|
result = try_load_model(model_path) |
|
|
if isinstance(result, str): |
|
|
st.error(f"Model still failed after re-download: {result}") |
|
|
st.stop() |
|
|
return result |
|
|
return result |
|
|
|
|
|
|
|
|
if st.session_state.model_name != selected_model_name: |
|
|
if st.session_state.llm is not None: |
|
|
del st.session_state.llm |
|
|
gc.collect() |
|
|
st.session_state.llm = validate_or_download_model() |
|
|
st.session_state.model_name = selected_model_name |
|
|
|
|
|
llm = st.session_state.llm |
|
|
|
|
|
|
|
|
if "chat_history" not in st.session_state: |
|
|
st.session_state.chat_history = [] |
|
|
|
|
|
st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)") |
|
|
st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}") |
|
|
|
|
|
user_input = st.chat_input("Ask something...") |
|
|
|
|
|
if user_input: |
|
|
|
|
|
if len(st.session_state.chat_history) % 2 == 1: |
|
|
st.warning("Please wait for the assistant to respond before sending another message.") |
|
|
else: |
|
|
st.session_state.chat_history.append({"role": "user", "content": user_input}) |
|
|
|
|
|
with st.chat_message("user"): |
|
|
st.markdown(user_input) |
|
|
|
|
|
|
|
|
MAX_TURNS = 8 |
|
|
trimmed_history = st.session_state.chat_history[-MAX_TURNS * 2:] |
|
|
messages = [{"role": "system", "content": system_prompt}] + trimmed_history |
|
|
|
|
|
with st.chat_message("assistant"): |
|
|
full_response = "" |
|
|
response_area = st.empty() |
|
|
|
|
|
stream = llm.create_chat_completion( |
|
|
messages=messages, |
|
|
max_tokens=max_tokens, |
|
|
temperature=temperature, |
|
|
top_k=top_k, |
|
|
top_p=top_p, |
|
|
repeat_penalty=repeat_penalty, |
|
|
stream=True, |
|
|
) |
|
|
|
|
|
for chunk in stream: |
|
|
if "choices" in chunk: |
|
|
delta = chunk["choices"][0]["delta"].get("content", "") |
|
|
full_response += delta |
|
|
response_area.markdown(full_response) |
|
|
|
|
|
st.session_state.chat_history.append({"role": "assistant", "content": full_response}) |
|
|
|