FinRobot-Forecaster-claude-4.1-opus / app1 - claude-4.1-opus.py
BaoKhuong's picture
Upload 2 files
7da862f verified
"""
App entry uses Fin-o1-14B GGUF via llama.cpp on CPU-only Spaces.
Removed heavy transformers/peft and Google Gemini imports.
"""
import os
import json
import time
import random
from collections import defaultdict
from datetime import date, datetime, timedelta
import gradio as gr
import pandas as pd
import finnhub
# Removed Google Generative AI and transformer-based imports (not used)
from io import StringIO
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from transformers import AutoModel # kept minimal placeholder to avoid breaking any implicit refs
from huggingface_hub import hf_hub_download
try:
from llama_cpp import Llama
except Exception:
Llama = None
import platform
import sys
try:
import psutil # optional, for CPU/RAM stats
except Exception:
psutil = None
try:
import torch # optional
except Exception:
torch = None
try:
import transformers as transformers_mod # to get version
except Exception:
transformers_mod = None
try:
import huggingface_hub as hfhub # to get version
except Exception:
hfhub = None
try:
import llama_cpp as llama_cpp_mod # to get version
except Exception:
llama_cpp_mod = None
# Suppress Google Cloud warnings
os.environ['GRPC_VERBOSITY'] = 'ERROR'
os.environ['GRPC_TRACE'] = ''
# Suppress other warnings
import warnings
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
# ---------- CẤU HÌNH ---------------------------------------------------------
GEMINI_MODEL = "gemini-2.5-pro"
# Fin-o1-14B GGUF via llama.cpp configuration (CPU-only)
FIN_O1_REPO = os.getenv("FIN_O1_REPO", "mradermacher/Fin-o1-14B-GGUF")
# Default to Q4_K_S for balance of quality/size; override via env
FIN_O1_QUANT = os.getenv("FIN_O1_QUANT", "Q4_K_S")
FIN_O1_FILENAME = os.getenv("FIN_O1_FILENAME", f"Fin-o1-14B.{FIN_O1_QUANT}.gguf")
# Optional local loading on CPU: set either FIN_O1_LOCAL_PATH to a file
# or FIN_O1_LOCAL_DIR to a directory containing the GGUF file(s)
FIN_O1_LOCAL_PATH = os.getenv("FIN_O1_LOCAL_PATH", "")
FIN_O1_LOCAL_DIR = os.getenv("FIN_O1_LOCAL_DIR", "")
FIN_O1_LOCAL_ONLY = os.getenv("FIN_O1_LOCAL_ONLY", "false").lower() in {"1", "true", "yes"}
FIN_O1_PRECACHE = os.getenv("FIN_O1_PRECACHE", "true").lower() in {"1", "true", "yes"}
LLAMA_CONTEXT_SIZE = int(os.getenv("LLAMA_CONTEXT_SIZE", "2048"))
LLAMA_THREADS = int(os.getenv("LLAMA_THREADS", str(os.cpu_count() or 2)))
LLAMA_N_GPU_LAYERS = int(os.getenv("LLAMA_N_GPU_LAYERS", "0")) # CPU-only on HF Spaces
# KV-cache quantization settings (requires llama.cpp >= 0.2.82)
LLAMA_KV_TYPE_K = os.getenv("LLAMA_KV_TYPE_K", "q4_0")
LLAMA_KV_TYPE_V = os.getenv("LLAMA_KV_TYPE_V", "q4_0")
# Singleton for llama.cpp model
_LLAMA_INSTANCE = None
def _resolve_fin_o1_gguf_path() -> str:
"""Download (if needed) and return local path to the desired GGUF.
Tries preferred filename first, then common variants and case patterns.
"""
preferred = FIN_O1_FILENAME
# 1) Prefer explicit local file path
if FIN_O1_LOCAL_PATH and os.path.isfile(FIN_O1_LOCAL_PATH):
print(f"📂 Using local GGUF file: {FIN_O1_LOCAL_PATH}")
return FIN_O1_LOCAL_PATH
# 2) Prefer local directory search if provided
if FIN_O1_LOCAL_DIR and os.path.isdir(FIN_O1_LOCAL_DIR):
dir_candidates = [
os.path.join(FIN_O1_LOCAL_DIR, preferred),
os.path.join(FIN_O1_LOCAL_DIR, preferred.replace("Fin-o1-14B", "fin-o1-14b")),
os.path.join(FIN_O1_LOCAL_DIR, preferred.replace("fin-o1-14b", "Fin-o1-14B")),
os.path.join(FIN_O1_LOCAL_DIR, "Fin-o1-14B.IQ3_M.gguf"),
os.path.join(FIN_O1_LOCAL_DIR, "fin-o1-14b.IQ3_M.gguf"),
os.path.join(FIN_O1_LOCAL_DIR, "Fin-o1-14B.Q3_K_M.gguf"),
os.path.join(FIN_O1_LOCAL_DIR, "fin-o1-14b.Q3_K_M.gguf"),
os.path.join(FIN_O1_LOCAL_DIR, "Fin-o1-14B.Q4_0.gguf"),
os.path.join(FIN_O1_LOCAL_DIR, "Fin-o1-14B.Q5_0.gguf"),
os.path.join(FIN_O1_LOCAL_DIR, "Fin-o1-14B.Q6_K.gguf"),
]
for p in dir_candidates:
if os.path.isfile(p):
print(f"📂 Using local GGUF from directory: {p}")
return p
# 3) Hosted download candidates (skipped if local-only)
candidates = [
preferred,
# Case variants
preferred.replace("Fin-o1-14B", "fin-o1-14b"),
preferred.replace("fin-o1-14b", "Fin-o1-14B"),
# Alternate common quants
"Fin-o1-14B.IQ3_M.gguf",
"fin-o1-14b.IQ3_M.gguf",
"Fin-o1-14B.Q3_K_M.gguf",
"fin-o1-14b.Q3_K_M.gguf",
"Fin-o1-14B.Q4_0.gguf",
"Fin-o1-14B.Q5_0.gguf",
"Fin-o1-14B.Q6_K.gguf",
]
if FIN_O1_LOCAL_ONLY:
raise RuntimeError("Local-only mode enabled but no local GGUF found. Set FIN_O1_LOCAL_PATH or FIN_O1_LOCAL_DIR.")
last_err = None
for fname in candidates:
try:
print(f"⬇️ Trying GGUF: {FIN_O1_REPO}/{fname}")
return hf_hub_download(repo_id=FIN_O1_REPO, filename=fname, repo_type="model")
except Exception as e:
last_err = e
continue
raise RuntimeError(f"Failed to download GGUF from {FIN_O1_REPO}. Last error: {last_err}")
def _get_llama_instance() -> Llama:
global _LLAMA_INSTANCE
if _LLAMA_INSTANCE is not None:
return _LLAMA_INSTANCE
if Llama is None:
raise RuntimeError("llama_cpp is not installed. This app variant requires llama-cpp-python.")
# Basic version guard: Fin-o1-14B GGUFs require a recent llama.cpp
try:
ver = getattr(llama_cpp_mod, "__version__", "0.0.0") or "0.0.0"
def _ver_tuple(s: str):
parts = [p for p in s.split(".") if p.isdigit()]
return tuple(int(p) for p in (parts + ["0", "0"])[:3])
if _ver_tuple(ver) < _ver_tuple("0.3.0"):
print(f"⚠️ Detected llama-cpp-python {ver}. Recommend >= 0.3.0 for these GGUFs.")
print(" Try: pip install -U llama-cpp-python")
except Exception:
pass
model_path = _resolve_fin_o1_gguf_path()
def try_make_llama(ctx_size: int) -> Llama | None:
base_kwargs = dict(
model_path=model_path,
n_ctx=ctx_size,
n_threads=LLAMA_THREADS,
n_gpu_layers=LLAMA_N_GPU_LAYERS,
verbose=False,
)
# Try a few safe loader configurations
configs = [
{"use_mmap": True, "use_mlock": False, "with_kv": True},
{"use_mmap": False, "use_mlock": False, "with_kv": True},
{"use_mmap": False, "use_mlock": False, "with_kv": False},
]
for cfg in configs:
kwargs = dict(base_kwargs)
kwargs["use_mmap"] = cfg["use_mmap"]
kwargs["use_mlock"] = cfg["use_mlock"]
if cfg["with_kv"]:
try:
kwargs["kv_overrides"] = {"type_k": LLAMA_KV_TYPE_K, "type_v": LLAMA_KV_TYPE_V}
except Exception:
pass
else:
kwargs.pop("kv_overrides", None)
try:
return Llama(**kwargs)
except TypeError:
kwargs.pop("kv_overrides", None)
try:
return Llama(**kwargs)
except Exception as e2:
print(f"⚠️ Llama init failed (cfg no-kv) at n_ctx={ctx_size}: {e2}")
except Exception as e:
print(f"⚠️ Llama init failed (mmap={cfg['use_mmap']}, kv={cfg['with_kv']}) at n_ctx={ctx_size}: {e}")
return None
# Try multiple context sizes to reduce memory pressure if needed
for ctx in [LLAMA_CONTEXT_SIZE, 1536, 1024, 768, 512]:
llama = try_make_llama(ctx)
if llama is not None:
if ctx != LLAMA_CONTEXT_SIZE:
print(f"ℹ️ Loaded with reduced context size {ctx} due to previous failures")
_LLAMA_INSTANCE = llama
return _LLAMA_INSTANCE
# As a last resort, try alternate quant files and retry
fallback_files = [
# Prefer more compatible/lighter quants first
"Fin-o1-14B.IQ3_M.gguf",
"Fin-o1-14B.Q3_K_M.gguf",
"Fin-o1-14B.Q4_0.gguf",
"Fin-o1-14B.Q5_0.gguf",
"Fin-o1-14B.Q4_K_S.gguf",
"Fin-o1-14B.Q5_K_S.gguf",
# name variants
"fin-o1-14b.IQ3_M.gguf",
"fin-o1-14b.Q3_K_M.gguf",
"fin-o1-14b.Q4_0.gguf",
"fin-o1-14b.Q5_0.gguf",
"fin-o1-14b.Q4_K_S.gguf",
"fin-o1-14b.Q5_K_S.gguf",
]
for alt in fallback_files:
try:
# If local-only is requested, skip remote alternates entirely
if FIN_O1_LOCAL_ONLY:
continue
alt_path = hf_hub_download(repo_id=FIN_O1_REPO, filename=alt, repo_type="model")
print(f"ℹ️ Retrying with alternate GGUF: {alt}")
# IMPORTANT: point the loader to the newly downloaded alt file
# so that subsequent init attempts actually use it.
model_path = alt_path
for ctx in [1024, 768, 512]:
llama = try_make_llama(ctx)
if llama is not None:
_LLAMA_INSTANCE = llama
return _LLAMA_INSTANCE
print(f"⚠️ Alternate GGUF init failed for {alt}; trying next candidate...")
except Exception as e:
print(f"⚠️ Alternate GGUF load failed for {alt}: {e}")
raise RuntimeError("Failed to initialize llama.cpp after multiple attempts and fallbacks.")
# RapidAPI Configuration
RAPIDAPI_HOST = "alpha-vantage.p.rapidapi.com"
# Load Finnhub API keys from single secret (multiple keys separated by newlines)
FINNHUB_KEYS_RAW = os.getenv("FINNHUB_KEYS", "")
if FINNHUB_KEYS_RAW:
FINNHUB_KEYS = [key.strip() for key in FINNHUB_KEYS_RAW.split('\n') if key.strip()]
else:
FINNHUB_KEYS = []
# Load RapidAPI keys from single secret (multiple keys separated by newlines)
RAPIDAPI_KEYS_RAW = os.getenv("RAPIDAPI_KEYS", "")
if RAPIDAPI_KEYS_RAW:
RAPIDAPI_KEYS = [key.strip() for key in RAPIDAPI_KEYS_RAW.split('\n') if key.strip()]
else:
RAPIDAPI_KEYS = []
# Load Google API keys from single secret (multiple keys separated by newlines)
GOOGLE_API_KEYS_RAW = os.getenv("GOOGLE_API_KEYS", "")
if GOOGLE_API_KEYS_RAW:
GOOGLE_API_KEYS = [key.strip() for key in GOOGLE_API_KEYS_RAW.split('\n') if key.strip()]
else:
GOOGLE_API_KEYS = []
# Filter out empty keys
FINNHUB_KEYS = [key for key in FINNHUB_KEYS if key.strip()]
GOOGLE_API_KEYS = [key for key in GOOGLE_API_KEYS if key.strip()]
# Validate that we have at least one key for each service
if not FINNHUB_KEYS:
print("⚠️ Warning: No Finnhub API keys found in secrets")
if not RAPIDAPI_KEYS:
print("⚠️ Warning: No RapidAPI keys found in secrets")
if not GOOGLE_API_KEYS:
print("⚠️ Warning: No Google API keys found in secrets")
# Chọn ngẫu nhiên một khóa API để bắt đầu (if available)
GOOGLE_API_KEY = random.choice(GOOGLE_API_KEYS) if GOOGLE_API_KEYS else None
print("=" * 50)
print("🚀 FinRobot Forecaster Starting Up...")
print("=" * 50)
if FINNHUB_KEYS:
print(f"📊 Finnhub API: {len(FINNHUB_KEYS)} keys loaded")
else:
print("📊 Finnhub API: Not configured")
if RAPIDAPI_KEYS:
print(f"📈 RapidAPI Alpha Vantage: {RAPIDAPI_HOST} ({len(RAPIDAPI_KEYS)} keys loaded)")
else:
print("📈 RapidAPI Alpha Vantage: Not configured")
if GOOGLE_API_KEYS:
print(f"🤖 Google Gemini API: {len(GOOGLE_API_KEYS)} keys loaded")
else:
print("🤖 Google Gemini API: Not configured")
print("✅ Application started successfully!")
print("=" * 50)
print("ℹ️ Fin-o1 via llama.cpp will be used for inference (no Google calls).")
try:
if FIN_O1_PRECACHE and not FIN_O1_LOCAL_ONLY:
# Warm the HF cache by resolving the model once at startup
_ = _resolve_fin_o1_gguf_path()
print("✅ GGUF cache warmed (HF Hub)")
elif FIN_O1_PRECACHE and FIN_O1_LOCAL_ONLY:
# In local-only mode, just verify local presence if path/dir provided
_ = _resolve_fin_o1_gguf_path()
print("✅ GGUF local presence verified")
except Exception as e:
print(f"⚠️ GGUF pre-cache/verify skipped: {e}")
# Cấu hình Finnhub client (if keys available)
if FINNHUB_KEYS:
# Configure with first key for initial setup
finnhub_client = finnhub.Client(api_key=FINNHUB_KEYS[0])
print(f"✅ Finnhub configured with {len(FINNHUB_KEYS)} keys")
else:
finnhub_client = None
print("⚠️ Finnhub not configured - will use mock news data")
# Tạo session với retry strategy cho requests
def create_session():
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Tạo session global
requests_session = create_session()
SYSTEM_PROMPT = (
"You are a seasoned stock-market analyst. "
"Given recent company news and optional basic financials, "
"return:\n"
"[Positive Developments] – 2-4 bullets\n"
"[Potential Concerns] – 2-4 bullets\n"
"[Prediction & Analysis] – a one-week price outlook with rationale."
)
# ---------- UTILITY HELPERS ----------------------------------------
def today() -> str:
return date.today().strftime("%Y-%m-%d")
def n_weeks_before(date_string: str, n: int) -> str:
return (datetime.strptime(date_string, "%Y-%m-%d") -
timedelta(days=7 * n)).strftime("%Y-%m-%d")
def get_debug_info() -> str:
"""Return a human-readable debug string for the Debug Info tab."""
lines = []
# Model info
source = (
f"local path={FIN_O1_LOCAL_PATH}" if (FIN_O1_LOCAL_PATH and os.path.isfile(FIN_O1_LOCAL_PATH)) else
(f"local dir={FIN_O1_LOCAL_DIR}" if (FIN_O1_LOCAL_DIR and os.path.isdir(FIN_O1_LOCAL_DIR)) else
f"hf repo={FIN_O1_REPO}")
)
model_name = f"Fin-o1 via llama.cpp ({source} :: {FIN_O1_FILENAME})"
lines.append(f"Model: {model_name}")
lines.append(f"LLAMA context size: {LLAMA_CONTEXT_SIZE}")
lines.append(f"LLAMA threads: {LLAMA_THREADS}")
lines.append(f"LLAMA n_gpu_layers: {LLAMA_N_GPU_LAYERS}")
# Library versions
def v(mod, default="unknown"):
try:
return getattr(mod, "__version__", default) or default
except Exception:
return default
libs = {
"python": sys.version.split(" ")[0],
"gradio": getattr(gr, "__version__", "unknown"),
"pandas": getattr(pd, "__version__", "unknown"),
"requests": getattr(requests, "__version__", "unknown"),
"finnhub": getattr(finnhub, "__version__", "unknown"),
"transformers": v(transformers_mod),
"huggingface_hub": v(hfhub),
"llama_cpp": v(llama_cpp_mod),
"torch": getattr(torch, "__version__", "not installed"),
}
lines.append("\nLibraries:")
for name, ver in libs.items():
lines.append(f"- {name}: {ver}")
# Torch device info (if available)
if torch is not None:
try:
cuda = torch.cuda.is_available()
lines.append(f"\nTorch CUDA available: {cuda}")
if cuda:
lines.append(f"CUDA device count: {torch.cuda.device_count()}")
lines.append(f"CUDA current device: {torch.cuda.current_device()}")
lines.append(f"CUDA device name: {torch.cuda.get_device_name(0)}")
except Exception as e:
lines.append(f"Torch device query error: {e}")
else:
lines.append("\nTorch: not installed")
# System info
lines.append("\nSystem:")
lines.append(f"- Platform: {platform.platform()}")
lines.append(f"- Machine: {platform.machine()}")
lines.append(f"- Processor: {platform.processor() or 'unknown'}")
lines.append(f"- CPU count: {os.cpu_count()}")
# CPU/RAM stats
if psutil is not None:
try:
cpu_percent = psutil.cpu_percent(interval=0.5)
vm = psutil.virtual_memory()
lines.append("\nRuntime stats:")
lines.append(f"- CPU usage: {cpu_percent:.1f}%")
lines.append(f"- RAM used: {vm.percent:.1f}% ({vm.used / (1024**3):.2f} GB / {vm.total / (1024**3):.2f} GB)")
except Exception as e:
lines.append(f"Runtime stats error: {e}")
else:
lines.append("\nRuntime stats: psutil not installed")
# Keys presence (counts only, not values)
lines.append("\nAPI keys loaded:")
lines.append(f"- FINNHUB_KEYS: {len(FINNHUB_KEYS)}")
lines.append(f"- RAPIDAPI_KEYS: {len(RAPIDAPI_KEYS)}")
lines.append(f"- GOOGLE_API_KEYS: {len(GOOGLE_API_KEYS)}")
return "\n".join(lines)
# ---------- DATA FETCHING --------------------------------------------------
def get_stock_data(symbol: str, steps: list[str]) -> pd.DataFrame:
# Thử tất cả RapidAPI Alpha Vantage keys
for rapidapi_key in RAPIDAPI_KEYS:
try:
print(f"📈 Fetching stock data for {symbol} via RapidAPI (key: {rapidapi_key[:8]}...)")
# RapidAPI Alpha Vantage endpoint
url = f"https://{RAPIDAPI_HOST}/query"
headers = {
"X-RapidAPI-Host": RAPIDAPI_HOST,
"X-RapidAPI-Key": rapidapi_key
}
params = {
"function": "TIME_SERIES_DAILY",
"symbol": symbol,
"outputsize": "full",
"datatype": "csv"
}
# Thử lại 3 lần với RapidAPI key hiện tại
for attempt in range(3):
try:
resp = requests_session.get(url, headers=headers, params=params, timeout=30)
if not resp.ok:
print(f"RapidAPI HTTP error {resp.status_code} with key {rapidapi_key[:8]}..., attempt {attempt + 1}")
time.sleep(2 ** attempt)
continue
text = resp.text.strip()
if text.startswith("{"):
info = resp.json()
msg = info.get("Note") or info.get("Error Message") or info.get("Information") or str(info)
if "rate limit" in msg.lower() or "quota" in msg.lower():
print(f"RapidAPI rate limit hit with key {rapidapi_key[:8]}..., trying next key")
break # Thử key tiếp theo
raise RuntimeError(f"RapidAPI Alpha Vantage Error: {msg}")
# Parse CSV data
df = pd.read_csv(StringIO(text))
date_col = "timestamp" if "timestamp" in df.columns else df.columns[0]
df[date_col] = pd.to_datetime(df[date_col])
df = df.sort_values(date_col).set_index(date_col)
data = {"Start Date": [], "End Date": [], "Start Price": [], "End Price": []}
for i in range(len(steps) - 1):
s_date = pd.to_datetime(steps[i])
e_date = pd.to_datetime(steps[i+1])
seg = df.loc[s_date:e_date]
if seg.empty:
raise RuntimeError(
f"RapidAPI Alpha Vantage cannot get {symbol} data for {steps[i]}{steps[i+1]}"
)
data["Start Date"].append(seg.index[0])
data["Start Price"].append(seg["close"].iloc[0])
data["End Date"].append(seg.index[-1])
data["End Price"].append(seg["close"].iloc[-1])
time.sleep(1) # RapidAPI has higher limits
print(f"✅ Successfully retrieved {symbol} data via RapidAPI (key: {rapidapi_key[:8]}...)")
return pd.DataFrame(data)
except requests.exceptions.Timeout:
print(f"RapidAPI timeout with key {rapidapi_key[:8]}..., attempt {attempt + 1}")
if attempt < 2:
time.sleep(5 * (attempt + 1))
continue
else:
break
except requests.exceptions.RequestException as e:
print(f"RapidAPI request error with key {rapidapi_key[:8]}..., attempt {attempt + 1}: {e}")
if attempt < 2:
time.sleep(3)
continue
else:
break
except Exception as e:
print(f"RapidAPI Alpha Vantage failed with key {rapidapi_key[:8]}...: {e}")
continue # Thử key tiếp theo
# Fallback: Tạo mock data nếu tất cả RapidAPI keys đều fail
print("⚠️ All RapidAPI keys failed, using mock data for demonstration...")
return create_mock_stock_data(symbol, steps)
def create_mock_stock_data(symbol: str, steps: list[str]) -> pd.DataFrame:
"""Tạo mock data để demo khi API không hoạt động"""
import numpy as np
data = {"Start Date": [], "End Date": [], "Start Price": [], "End Price": []}
# Giá cơ bản khác nhau cho các symbol khác nhau
base_prices = {
"AAPL": 180.0, "MSFT": 350.0, "GOOGL": 140.0,
"TSLA": 200.0, "NVDA": 450.0, "AMZN": 150.0
}
base_price = base_prices.get(symbol.upper(), 150.0)
for i in range(len(steps) - 1):
s_date = pd.to_datetime(steps[i])
e_date = pd.to_datetime(steps[i+1])
# Tạo giá ngẫu nhiên với xu hướng tăng nhẹ
start_price = base_price + np.random.normal(0, 5)
end_price = start_price + np.random.normal(2, 8) # Xu hướng tăng nhẹ
data["Start Date"].append(s_date)
data["Start Price"].append(round(start_price, 2))
data["End Date"].append(e_date)
data["End Price"].append(round(end_price, 2))
base_price = end_price # Cập nhật giá cơ bản cho tuần tiếp theo
return pd.DataFrame(data)
def current_basics(symbol: str, curday: str) -> dict:
# Check if Finnhub is configured
if not FINNHUB_KEYS:
print(f"⚠️ Finnhub not configured, skipping financial basics for {symbol}")
return {}
# Thử với tất cả các Finnhub API keys
for api_key in FINNHUB_KEYS:
try:
client = finnhub.Client(api_key=api_key)
# Thêm timeout cho Finnhub client
raw = client.company_basic_financials(symbol, "all")
if not raw["series"]:
continue
merged = defaultdict(dict)
for metric, vals in raw["series"]["quarterly"].items():
for v in vals:
merged[v["period"]][metric] = v["v"]
latest = max((p for p in merged if p <= curday), default=None)
if latest is None:
continue
d = dict(merged[latest])
d["period"] = latest
return d
except Exception as e:
print(f"Error getting basics for {symbol} with key {api_key[:8]}...: {e}")
time.sleep(2) # Thêm delay trước khi thử key tiếp theo
continue
return {}
def attach_news(symbol: str, df: pd.DataFrame) -> pd.DataFrame:
news_col = []
for _, row in df.iterrows():
start = row["Start Date"].strftime("%Y-%m-%d")
end = row["End Date"].strftime("%Y-%m-%d")
time.sleep(2) # Tăng delay để tránh rate limit
# Check if Finnhub is configured
if not FINNHUB_KEYS:
print(f"⚠️ Finnhub not configured, using mock news for {symbol}")
news_data = create_mock_news(symbol, start, end)
news_col.append(json.dumps(news_data))
continue
# Thử với tất cả các Finnhub API keys
news_data = []
for api_key in FINNHUB_KEYS:
try:
client = finnhub.Client(api_key=api_key)
weekly = client.company_news(symbol, _from=start, to=end)
weekly_fmt = [
{
"date" : datetime.fromtimestamp(n["datetime"]).strftime("%Y%m%d%H%M%S"),
"headline": n["headline"],
"summary" : n["summary"],
}
for n in weekly
]
weekly_fmt.sort(key=lambda x: x["date"])
news_data = weekly_fmt
break # Thành công, thoát khỏi loop
except Exception as e:
print(f"Error with Finnhub key {api_key[:8]}... for {symbol} from {start} to {end}: {e}")
time.sleep(3) # Thêm delay trước khi thử key tiếp theo
continue
# Nếu không có news data, tạo mock news
if not news_data:
news_data = create_mock_news(symbol, start, end)
news_col.append(json.dumps(news_data))
df["News"] = news_col
return df
def create_mock_news(symbol: str, start: str, end: str) -> list:
"""Tạo mock news data khi API không hoạt động"""
mock_news = [
{
"date": f"{start}120000",
"headline": f"{symbol} Shows Strong Performance in Recent Trading",
"summary": f"Company {symbol} has demonstrated resilience in the current market conditions with positive investor sentiment."
},
{
"date": f"{end}090000",
"headline": f"Analysts Maintain Positive Outlook for {symbol}",
"summary": f"Financial analysts continue to recommend {symbol} based on strong fundamentals and growth prospects."
}
]
return mock_news
# ---------- PROMPT CONSTRUCTION -------------------------------------------
def sample_news(news: list[str], k: int = 5) -> list[str]:
if len(news) <= k:
return news
return [news[i] for i in sorted(random.sample(range(len(news)), k))]
def make_prompt(symbol: str, df: pd.DataFrame, curday: str, use_basics=False) -> str:
# Thử với tất cả các Finnhub API keys để lấy company profile
company_blurb = f"[Company Introduction]:\n{symbol} is a publicly traded company.\n"
if FINNHUB_KEYS:
for api_key in FINNHUB_KEYS:
try:
client = finnhub.Client(api_key=api_key)
prof = client.company_profile2(symbol=symbol)
company_blurb = (
f"[Company Introduction]:\n{prof['name']} operates in the "
f"{prof['finnhubIndustry']} sector ({prof['country']}). "
f"Founded {prof['ipo']}, market cap {prof['marketCapitalization']:.1f} "
f"{prof['currency']}; ticker {symbol} on {prof['exchange']}.\n"
)
break # Thành công, thoát khỏi loop
except Exception as e:
print(f"Error getting company profile for {symbol} with key {api_key[:8]}...: {e}")
time.sleep(2) # Thêm delay trước khi thử key tiếp theo
continue
else:
print(f"⚠️ Finnhub not configured, using basic company info for {symbol}")
# Past weeks block
past_block = ""
for _, row in df.iterrows():
term = "increased" if row["End Price"] > row["Start Price"] else "decreased"
head = (f"From {row['Start Date']:%Y-%m-%d} to {row['End Date']:%Y-%m-%d}, "
f"{symbol}'s stock price {term} from "
f"{row['Start Price']:.2f} to {row['End Price']:.2f}.")
news_items = json.loads(row["News"])
summaries = [
f"[Headline] {n['headline']}\n[Summary] {n['summary']}\n"
for n in news_items
if not n["summary"].startswith("Looking for stock market analysis")
]
past_block += "\n" + head + "\n" + "".join(sample_news(summaries, 5))
# Optional basic financials
if use_basics:
basics = current_basics(symbol, curday)
if basics:
basics_txt = "\n".join(f"{k}: {v}" for k, v in basics.items() if k != "period")
basics_block = (f"\n[Basic Financials] (reported {basics['period']}):\n{basics_txt}\n")
else:
basics_block = "\n[Basic Financials]: not available\n"
else:
basics_block = "\n[Basic Financials]: not requested\n"
horizon = f"{curday} to {n_weeks_before(curday, -1)}"
final_user_msg = (
company_blurb
+ past_block
+ basics_block
+ f"\nBased on all information before {curday}, analyse positive "
"developments and potential concerns for {symbol}, then predict its "
f"price movement for next week ({horizon})."
)
return final_user_msg
# ---------- LLM CALL -------------------------------------------------------
def chat_completion(prompt: str,
model: str = "Fin-o1-14B-GGUF",
temperature: float = 0.2,
stream: bool = False,
symbol: str = "STOCK") -> str:
"""Generate completion using Fin-o1-14B GGUF via llama.cpp.
Note: streaming is not implemented for llama.cpp in this app's UI path.
"""
try:
llama = _get_llama_instance()
full_prompt = f"{SYSTEM_PROMPT}\n\n{prompt}"
# Conservative defaults for CPU-only inference on 16GB RAM
max_tokens = int(os.getenv("LLAMA_MAX_TOKENS", "1024"))
top_p = float(os.getenv("LLAMA_TOP_P", "0.9"))
top_k = int(os.getenv("LLAMA_TOP_K", "40"))
repeat_penalty = float(os.getenv("LLAMA_REPEAT_PENALTY", "1.1"))
if stream:
print("ℹ️ Streaming not enabled for llama.cpp path; generating non-streaming output.")
result = llama.create_completion(
prompt=full_prompt,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
top_k=top_k,
repeat_penalty=repeat_penalty,
)
return result.get("choices", [{}])[0].get("text", "")
except Exception as e:
print(f"⚠️ Fin-o1 llama.cpp inference failed for {symbol}: {e}")
return create_mock_ai_response(symbol)
def create_mock_ai_response(symbol: str) -> str:
"""Tạo mock AI response khi Google API không hoạt động"""
return f"""
[Positive Developments]
• Strong market position and brand recognition for {symbol}
• Recent quarterly earnings showing growth potential
• Positive analyst sentiment and institutional investor interest
• Technological innovation and market expansion opportunities
[Potential Concerns]
• Market volatility and economic uncertainty
• Competitive pressures in the industry
• Regulatory changes that may impact operations
• Global economic factors affecting stock performance
[Prediction & Analysis]
Based on the current market conditions and company fundamentals, {symbol} is expected to show moderate growth over the next week. The stock may experience some volatility but should maintain an upward trend with a potential price increase of 2-5%. This prediction is based on current market sentiment and technical analysis patterns.
Note: This is a demonstration response using mock data. For real investment decisions, please consult with qualified financial professionals.
"""
# ---------- MAIN PREDICTION FUNCTION -----------------------------------------
def predict(symbol: str = "AAPL",
curday: str = today(),
n_weeks: int = 3,
use_basics: bool = False,
stream: bool = False) -> tuple[str, str]:
try:
steps = [n_weeks_before(curday, n) for n in range(n_weeks + 1)][::-1]
df = get_stock_data(symbol, steps)
df = attach_news(symbol, df)
prompt_info = make_prompt(symbol, df, curday, use_basics)
answer = chat_completion(prompt_info, stream=stream, symbol=symbol)
return prompt_info, answer
except Exception as e:
error_msg = f"Error in prediction: {str(e)}"
print(f"Prediction error: {e}") # Log the error for debugging
return error_msg, error_msg
# ---------- HUGGINGFACE SPACES INTERFACE -----------------------------------------
def hf_predict(symbol, n_weeks, use_basics):
# 1. get curday
curday = date.today().strftime("%Y-%m-%d")
# 2. call predict
prompt, answer = predict(
symbol=symbol.upper(),
curday=curday,
n_weeks=int(n_weeks),
use_basics=bool(use_basics),
stream=False
)
return prompt, answer
# ---------- GRADIO INTERFACE -----------------------------------------
def create_interface():
with gr.Blocks(
title="FinRobot Forecaster",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1200px !important;
margin: auto !important;
}
#model_prompt_textbox textarea {
overflow-y: auto !important;
max-height: none !important;
min-height: 400px !important;
resize: vertical !important;
white-space: pre-wrap !important;
word-wrap: break-word !important;
height: auto !important;
}
#model_prompt_textbox {
height: auto !important;
}
#analysis_results_textbox textarea {
overflow-y: auto !important;
max-height: none !important;
min-height: 400px !important;
resize: vertical !important;
white-space: pre-wrap !important;
word-wrap: break-word !important;
height: auto !important;
}
#analysis_results_textbox {
height: auto !important;
}
.textarea textarea {
overflow-y: auto !important;
max-height: 500px !important;
resize: vertical !important;
}
.textarea {
height: auto !important;
min-height: 300px !important;
}
.gradio-textbox {
height: auto !important;
max-height: none !important;
}
.gradio-textbox textarea {
height: auto !important;
max-height: none !important;
overflow-y: auto !important;
}
"""
) as demo:
gr.Markdown("""
# 🤖 FinRobot Forecaster
**AI-powered stock market analysis and prediction using advanced language models**
This application analyzes stock market data, company news, and financial metrics to provide comprehensive market insights and predictions.
⚠️ **Note**: Free API keys have daily rate limits. If you encounter errors, the app will use mock data for demonstration purposes.
""")
with gr.Row():
with gr.Column(scale=1):
symbol = gr.Textbox(
label="Stock Symbol",
value="AAPL",
placeholder="Enter stock symbol (e.g., AAPL, MSFT, GOOGL)",
info="Enter the ticker symbol of the stock you want to analyze"
)
n_weeks = gr.Slider(
1, 6,
value=3,
step=1,
label="Historical Weeks to Analyze",
info="Number of weeks of historical data to include in analysis"
)
use_basics = gr.Checkbox(
label="Include Basic Financials",
value=True,
info="Include basic financial metrics in the analysis"
)
btn = gr.Button(
"🚀 Run Analysis",
variant="primary"
)
with gr.Column(scale=2):
with gr.Tabs():
with gr.Tab("📊 Analysis Results"):
gr.Markdown("**AI Analysis & Prediction**")
output_answer = gr.Textbox(
label="",
lines=40,
show_copy_button=True,
interactive=False,
placeholder="AI analysis and predictions will appear here...",
container=True,
scale=1,
elem_id="analysis_results_textbox"
)
with gr.Tab("🔍 Model Prompt"):
gr.Markdown("**Generated Prompt**")
output_prompt = gr.Textbox(
label="",
lines=40,
show_copy_button=True,
interactive=False,
placeholder="Generated prompt will appear here...",
container=True,
scale=1,
elem_id="model_prompt_textbox"
)
with gr.Tab("🛠️ Debug Info"):
gr.Markdown("**Environment & Runtime Diagnostics**")
debug_text = gr.Textbox(
label="",
lines=30,
show_copy_button=True,
interactive=False,
value=get_debug_info(),
container=True,
scale=1,
elem_id="debug_info_textbox"
)
refresh_debug = gr.Button("🔄 Refresh Debug Info")
# Examples
gr.Examples(
examples=[
["AAPL", 3, False],
["MSFT", 4, True],
["GOOGL", 2, False],
["TSLA", 5, True],
["NVDA", 3, True]
],
inputs=[symbol, n_weeks, use_basics],
label="💡 Try these examples"
)
# Event handlers
btn.click(
fn=hf_predict,
inputs=[symbol, n_weeks, use_basics],
outputs=[output_prompt, output_answer],
show_progress=True
)
# Debug refresh handler
def _refresh_debug():
return get_debug_info()
refresh_debug.click(
fn=_refresh_debug,
inputs=[],
outputs=[debug_text],
show_progress=False
)
# Footer
gr.Markdown("""
---
**Disclaimer**: This application is for educational and research purposes only.
The predictions and analysis provided should not be considered as financial advice.
Always consult with qualified financial professionals before making investment decisions.
""")
return demo
# Expose a top-level Gradio app instance for HF Spaces import-time discovery
demo = create_interface()
# ---------- MAIN EXECUTION -----------------------------------------
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
debug=False,
quiet=True
)