import gradio as gr
import subprocess
import re
import os
import uuid
import json
import random
import html
import threading
from queue import Queue, Empty
from typing import Optional, List, Any, Dict, Tuple
from run_question import answer_question_recall
# --------------------------------------------------------------------
# Required part – start the serper host server (required for fathom search)
# (left UNCHANGED as requested)
# --------------------------------------------------------------------
port = os.environ["PORT_SERPER_HOST"]
# lsof = subprocess.run(["lsof", "-i", f":{port}"], capture_output=True)
# if len(lsof.stdout) == 0:
# print("Starting serper host server")
# _ = subprocess.Popen(["nohup", "sh", "./web_agents_5/host_serper2.sh"])
try:
print("Starting serper host server")
_ = subprocess.Popen(["nohup", "sh", "./web_agents_5/host_serper2.sh"])
except Exception as e:
print(f"Failed to start serper host server: {e}")
pass
# --------------------------------------------------------------------
# Rendering options
# --------------------------------------------------------------------
ORDER_TOOL_CALLS_FIRST = False # keep assistant text before calls
SPACER = '
' # small vertical gap
MIN_TOOLCALLOUT_CHARS = 160 # hide calls in *content* if text is too short
PREVIEW_LIMIT = 135 # condensed preview length
BOXED_WRAP_WIDTH = int(os.getenv("BOXED_WRAP_WIDTH", "130")) # boxed soft-wrap width (chars)
# --------------------------------------------------------------------
# Helpers
# --------------------------------------------------------------------
INVISIBLES = ('\ufeff', '\u200b', '\u200d') # BOM/ZWSP/ZWNJ
def _strip_leading_garbage(md: str) -> str:
if not isinstance(md, str):
md = str(md)
# remove invisible chars
for ch in INVISIBLES:
md = md.replace(ch, '')
# normalise newlines
md = md.replace('\r\n', '\n').replace('\r', '\n')
# drop ALL leading blank/space-only lines
md = re.sub(r'^\s*\n', '', md, count=0, flags=re.M)
return md
def _ensure_md_blocks(md: str) -> str:
# ensure a blank line after any Markdown heading
md = re.sub(r'(^|\n)(#{1,6}[^\n]+)\n(?!\n)', r'\1\2\n\n', md)
return md
def format_math(text: str) -> str:
"""
No-op math formatter.
Previously this function attempted to:
- Convert [ ... ] into $$ ... $$, and
- Convert \( ... \) into $ ... $
That interfered with already-sanitized LaTeX (ensure_display_math), producing
nested $$ inside $$ blocks and breaking KaTeX rendering.
We now leave the string unchanged to avoid introducing invalid math delimiters
post-sanitization.
"""
if not isinstance(text, str):
return str(text)
return text
def generate_conversation_id():
return str(uuid.uuid4())[:8]
def _maybe_json_load(s: Any) -> Any:
if isinstance(s, (dict, list, int, float, bool)) or s is None:
return s
if not isinstance(s, str):
return s
for _ in range(2):
try:
s = json.loads(s)
except Exception:
break
return s
def _pretty_json(anything: Any) -> str:
obj = _maybe_json_load(anything)
if isinstance(obj, (dict, list, int, float, bool)) or obj is None:
try:
return json.dumps(obj, ensure_ascii=False, indent=2)
except Exception:
return str(obj)
if isinstance(obj, str):
if "\\n" in obj or "\\t" in obj:
obj = obj.replace("\\n", "\n").replace("\\t", "\t")
return obj
return str(obj)
def _code_block(lang: str, text: Any) -> str:
s = str(text) if not isinstance(text, str) else text
s = s.replace("```", "``\u200b`") # avoid fence confusion
return f"```{lang}\n{s}\n```"
def _normalize_tool_call(tc: Any) -> Dict[str, Any]:
if isinstance(tc, dict):
return {
"name": tc.get("name") or "tool",
"arguments": tc.get("arguments") or tc.get("args") or {}
}
parsed = _maybe_json_load(tc)
if isinstance(parsed, dict):
return {
"name": parsed.get("name") or "tool",
"arguments": parsed.get("arguments") or parsed.get("args") or {}
}
return {"name": "tool", "arguments": {"raw": str(tc)}}
def _letter(i: int) -> str:
return f"{chr(96 + i)})" # 1->a), 2->b), ...
def _summarize_call(tc: Any) -> Tuple[str, str]:
norm = _normalize_tool_call(tc)
name = (norm.get("name") or "tool").lower()
args = norm.get("arguments") or {}
if name == "search_urls":
return "Search", str(args.get("query", "")).strip() or "(no query)"
if name == "query_url":
return "Query URL", str(args.get("url", "")).strip() or "(no url)"
label = norm.get("name") or "tool"
val = json.dumps(args, ensure_ascii=False) if isinstance(args, (dict, list)) else str(args)
if len(val) > PREVIEW_LIMIT:
val = val[:PREVIEW_LIMIT] + "…"
return label, val
def _render_tool_calls_details(tool_calls: List[Any], tool_results: Optional[List[Any]] = None) -> str:
if not tool_calls:
return ""
tool_results = tool_results or []
parts = []
parts.append("Tool calls:")
parts.append(SPACER)
for i, tc in enumerate(tool_calls, 1):
label, value = _summarize_call(tc)
summary_line = f"{_letter(i)} {label}: {value}"
call_json = _pretty_json(tc)
result_json = _pretty_json(tool_results[i - 1]) if (i - 1) < len(tool_results) else None
parts.append(f"{summary_line}
\n")
parts.append("Tool Call
\n\n")
parts.append(_code_block("json", call_json))
parts.append("\n")
parts.append("Tool Result
\n\n")
if result_json is None:
parts.append("_(pending)_\n")
else:
parts.append(_code_block("json", result_json))
parts.append("\n")
parts.append(" \n")
parts.append(SPACER)
parts.append(SPACER)
return "".join(parts).strip()
def _strip_tool_json_blobs(s: str) -> str:
out, i, n = [], 0, len(s)
while i < n:
ch = s[i]
if ch == '{':
depth, k = 1, i + 1
while k < n:
c = s[k]
if c == '\\' and k + 1 < n:
k += 2; continue
if c == '{':
depth += 1
elif c == '}':
depth -= 1
if depth == 0:
blob = s[i:k+1]; lc = blob.lower()
if '"name"' in lc and '"arguments"' in lc:
i = k + 1 # drop suspected tool JSON
break
else:
out.append(blob); i = k + 1; break
k += 1
else:
out.append(ch); i += 1
else:
out.append(ch); i += 1
return "".join(out)
def _plain_preview(md: str, limit: int = PREVIEW_LIMIT) -> str:
md = _strip_tool_json_blobs(md or "")
s = re.sub(r"<[^>]+>", "", md)
s = re.sub(r"```.*?```", "", s, flags=re.S)
s = re.sub(r"`{1,3}.*?`{1,3}", "", s, flags=re.S)
s = re.sub(r"[*_#>\-]+", "", s)
s = s.replace("\n", " ").strip()
if len(s) > limit:
s = s[:limit].rstrip() + "…"
return s or "(no preview)"
def _assistant_section(text_md: str) -> str:
text_md = ensure_display_math(text_md)
return (
f"Assistant Response:\n\n"
f""
f"{format_math(text_md).strip()}"
f"
\n\n"
f"{SPACER}"
)
def _wrap_turn(inner_html: str) -> str:
return (
f""
f"{inner_html}
"
)
def _wrap_condensed_step(inner_html: str) -> str:
"""Yellow sub-bubble wrapper for each condensed step inside the blue cluster."""
return (
f""
f"{inner_html}
"
)
def _condensed_step_block(assistant_text: str,
tool_calls: Optional[List[Any]] = None,
tool_results: Optional[List[Any]] = None,
last_open: bool = False) -> str:
"""A single condensed step: details with assistant preview + (full assistant + tool calls) inside, wrapped yellow.
last_open: when True, render this step expanded by default.
"""
preview = _plain_preview(assistant_text, PREVIEW_LIMIT)
inner_parts = []
inner_parts.append(_assistant_section(assistant_text))
if tool_calls:
inner_parts.append(_render_tool_calls_details(tool_calls, tool_results))
inner_html = "".join(inner_parts).strip()
open_attr = " open" if last_open else ""
details_html = (
f"Thinking (preview): {preview}
\n\n"
f"{inner_html}\n\n "
)
return _wrap_condensed_step(details_html)
def _cluster_wrapper(inner_html: str) -> str:
"""Blue cluster bubble that holds all condensed steps. Expanded by default."""
return (
f""
f"Initial thinking, steps, tool calls
\n\n"
f"{inner_html}\n\n"
f"
"
)
# import re
# LATEX_SPECIALS = {'%': r'\%', '#': r'\#', '&': r'\&', '_': r'\_'}
LATEX_ESC_MAP_IN_MATH = {
'%': r'\%',
'#': r'\#',
# NOTE: do NOT escape & or _ in math; they’re valid there
}
def _escape_unescaped_dollars(s: str) -> str:
# $ -> \$ unless already escaped
return re.sub(r'(? str:
return re.sub(r'\*\*(.+?)\*\*', r'\\textbf{\1}', s)
def _escape_other_specials_in_math(s: str) -> str:
return re.sub(r'(? str:
"""
Remove any standalone $$ tokens that appear inside math content (not proper $$…$$ pairs).
We do this BEFORE escaping $, so they don't turn into literal '\$\$'.
"""
# Collapse isolated $$ on their own lines or with spaces around
s = re.sub(r'(^|\s)\$\$(\s|$)', lambda m: (m.group(1) or '') + (m.group(2) or ''), s)
# Also remove any leftover pairs of $$ that aren't part of a proper $$…$$ block
# (very conservative: delete $$ that are not followed by content then $$ soon)
s = re.sub(r'\$\$(?=\s*(?:\\end|\)|}|$))', '', s)
s = re.sub(r'(?:(?<=\\begin\{aligned\})|\n)\s*\$\$\s*', '', s)
return s
def _mask_text_blocks(s: str):
"""
Temporarily mask \text{...} spans (balanced) so we don’t insert \allowbreak inside them.
Returns masked string and list of (token, original).
"""
out, tokens = [], []
i, n = 0, len(s)
tcount = 0
while i < n:
j = s.find(r'\text{', i)
if j == -1:
out.append(s[i:]); break
out.append(s[i:j])
# balance braces
k = j + len(r'\text{'); depth = 1
while k < n and depth > 0:
c = s[k]
if c == '\\' and k + 1 < n:
k += 2; continue
if c == '{': depth += 1
elif c == '}': depth -= 1
k += 1
block = s[j:k] if depth == 0 else s[j:]
token = f'__TEXTBLOCK_{tcount}__'
tokens.append((token, block))
out.append(token)
i = k if depth == 0 else n
tcount += 1
return ''.join(out), tokens
def _unmask_text_blocks(s: str, tokens) -> str:
for token, block in tokens:
s = s.replace(token, block)
return s
def _mask_command_blocks(s: str, commands: List[str]):
"""
Mask balanced \cmd{...} spans for specific single-argument commands so we can
safely wrap surrounding plain text into \text{...} without splitting inside these blocks.
IMPORTANT:
- Only mask single-argument commands (e.g., \textbf{...}, \emph{...}).
- Do NOT mask multi-argument commands (e.g., \href{..}{..}) to avoid brace imbalance.
Returns (masked_string, tokens) where tokens is a list of (token, original_block).
"""
try:
tokenized = s or ""
tokens: List[Tuple[str, str]] = []
tcount = 0
for cmd in (commands or []):
pattern = r'\\' + cmd + r'{'
i = 0
n = len(tokenized)
while i < n:
j = tokenized.find(pattern, i)
if j == -1:
break
# scan balanced braces for this single-arg command
k = j + len(pattern)
depth = 1
while k < n and depth > 0:
c = tokenized[k]
if c == '\\' and k + 1 < n:
k += 2
continue
if c == '{':
depth += 1
elif c == '}':
depth -= 1
k += 1
if depth == 0:
block = tokenized[j:k]
token = f'__CMDBLOCK_{tcount}__'
tokens.append((token, block))
tokenized = tokenized[:j] + token + tokenized[k:]
n = len(tokenized)
i = j + len(token)
tcount += 1
else:
# Unbalanced command; do not mask to avoid dropping trailing content
i = j + len(pattern)
return tokenized, tokens
except Exception:
return s, []
def _add_break_opportunities_outside_text(s: str) -> str:
"""
Add \allowbreak only outside \text{...} spans.
"""
masked, toks = _mask_text_blocks(s)
# Soft breaks after slashes
masked = re.sub(r'(? str:
r"""
Inside math, handle model patterns like: $$Label$$(https://example.com)
Be robust to \allowbreak{} sprinkled by earlier passes.
1) $$Label$$(url) -> \href{url}{Label}
2) any remaining $$...$$ -> \text{...}
"""
# allow break markers inside URLs
AB = r'(?:\\allowbreak\{\})*'
# url = protocol://... (allow \allowbreak{} anywhere)
url_pat = rf'(https?{AB}:{AB}//{AB}[^)]+?)'
# 1) $$Label$$(url) -> \href{url}{Label}
s = re.sub(
rf'\$\$(.+?)\$\$\({url_pat}\)',
r'\\href{\2}{\1}',
s,
flags=re.S
)
# 2) Any remaining $$...$$ (malformed or not a link) -> \text{...}
s = re.sub(r'\$\$(.+?)\$\$', r'\\text{\1}', s, flags=re.S)
return s
# --- add this final safety net (remove ANY nested $$ left in math) ---
def _strip_all_nested_dollars(s: str) -> str:
"""
After all transforms, if any '$$' survive inside math content,
convert the smallest $$...$$ spans to \text{...}. Repeat until gone.
"""
# keep replacing until no $$ remain
while '$$' in s:
new_s = re.sub(r'\$\$(.+?)\$\$', r'\\text{\1}', s, flags=re.S)
if new_s == s:
# if we couldn't match a pair, drop lone $$ defensively
new_s = s.replace('$$', '')
s = new_s
return s
def _cleanup_tex_command_braces_and_percent(s: str) -> str:
"""
Final cleanup for LaTeX text:
- Fix escaped brace after single-argument commands: \textbf\{ -> \textbf{, \emph\{ -> \emph{
- Collapse double-backslash percent: \\% -> \% (avoids '%' commenting the rest of the line after a linebreak)
Patterns are conservative and won't affect '\\\\' row breaks since they aren't followed by '%'.
"""
try:
# 1) Repair malformed single-arg command braces
s = re.sub(r'\\(text|textbf|emph)\\\{', r'\\\1{', s)
# 2) Ensure percent is escaped with exactly one backslash
s = re.sub(r'\\{2}%', r'\\%', s)
return s
except Exception:
return s
def _normalize_escaped_closing_braces_for_single_arg_commands(s: str, commands: List[str]) -> str:
"""
Regex-normalize cases like '\textbf{Batters\}:' -> '\textbf{Batters}:'
Works for simple single-arg commands where the content has no nested braces.
This is a fast-path fix; more complex cases are handled by the scanner-based repair.
"""
try:
out = s or ""
for cmd in (commands or []):
# \cmd{ ... \}} -> \cmd{ ... }}
out = re.sub(rf'\\{cmd}\s*\{{([^{{}}]*?)\\\}}', rf'\\{cmd}{{\1}}', out)
return out
except Exception:
return s
def _repair_unclosed_single_arg_commands(s: str, commands: List[str]) -> str:
"""
Heuristically repair single-argument commands like \textbf{...}, \emph{...}, \text{...}
that were incorrectly closed with a literal '\}' instead of '}' (leaving the group
unbalanced and causing 'File ended while scanning ...').
Supports optional whitespace between the command and '{'.
Only affects the specified single-argument commands and only when their top-level
argument fails to close.
"""
try:
out = s or ""
for cmd in (commands or []):
pat = re.compile(r'\\' + cmd + r'\s*{')
i = 0
while True:
m = pat.search(out, i)
if not m:
break
j = m.start()
k = m.end()
n = len(out)
depth = 1
last_esc_close_top = -1
while k < n:
c = out[k]
if c == '\\' and k + 1 < n:
# record top-level literal '\}' so we can convert it if needed
if out[k + 1] == '}' and depth == 1:
last_esc_close_top = k
k += 2
continue
if c == '{':
depth += 1
elif c == '}':
depth -= 1
if depth == 0:
break
k += 1
if depth == 0:
# Properly closed, continue after this block
i = k + 1
continue
# Unclosed: promote the last top-level '\}' to '}' and rescan
if last_esc_close_top != -1:
out = out[:last_esc_close_top] + out[last_esc_close_top + 1:]
# Rescan from the same command start (structure changed)
i = j
continue
# Nothing to fix; skip past this occurrence to avoid infinite loop
i = m.end()
return out
except Exception:
return s
def _repair_unclosed_single_arg_commands_in_text_groups(s: str, commands: List[str]) -> str:
"""
Within each \text{...} group, repair unclosed single-arg commands like \textbf{...}, \emph{...}
where the closing brace was emitted as '\}' and thus remained unbalanced inside the text group.
This confines the scan to the text group's inner content so outer braces aren't misinterpreted.
"""
try:
out: List[str] = []
i, n = 0, len(s or "")
text_pat = re.compile(r'\\text\s*{')
while i < n:
m = text_pat.search(s, i)
if not m:
out.append(s[i:])
break
# emit up to and including '\text{'
out.append(s[i:m.end()])
# find matching closing '}' for this \text{
k = m.end()
depth = 1
while k < n and depth > 0:
c = s[k]
if c == '\\' and k + 1 < n:
k += 2
continue
if c == '{':
depth += 1
elif c == '}':
depth -= 1
k += 1
if depth != 0:
# unmatched -> emit rest and stop
out.append(s[m.end():])
break
inner = s[m.end():k-1]
fixed_inner = _repair_unclosed_single_arg_commands(inner, commands)
out.append(fixed_inner)
out.append('}')
i = k
return ''.join(out) or s
except Exception:
return s
def _unwrap_envs_from_text(s: str) -> str:
"""
If the model emits \text{ \begin{env} ... \end{env} }, unwrap the environment
from the \text{...} wrapper so it's in math mode again.
Run repeatedly until nothing to unwrap.
"""
# generic LaTeX environments, e.g., aligned, align, cases, bmatrix, etc.
env_pat = re.compile(r'\\text\s*\{\s*(\\begin\{[a-zA-Z*]+\}.*?\\end\{[a-zA-Z*]+\})\s*\}', re.S)
# also handle \text{\boxed{...}} -> \boxed{...}
boxed_pat = re.compile(r'\\text\s*\{\s*(\\boxed\{.*?\})\s*\}', re.S)
while True:
new_s = env_pat.sub(r'\1', s)
new_s = boxed_pat.sub(r'\1', new_s)
if new_s == s:
break
s = new_s
return s
def _sanitize_inside_math(s: str) -> str:
# 0) FIX: get environments out of \text{...}
s = _unwrap_envs_from_text(s)
# 1) markdown -> tex
s = _markdown_bold_to_tex(s)
# 2) convert $$Label$$(url) and $$...$$ early (handles \allowbreak in URLs)
s = _fix_inline_links_in_math(s)
# 3) escape raw $ (prices) AFTER dealing with $$…$$
s = _escape_unescaped_dollars(s)
# 4) escape only % and # (leave & and _ alone for math)
s = _escape_other_specials_in_math(s)
# 5) add wrap hints (avoid inside \text{...})
# s = _add_break_opportunities_outside_text(s)
# 6) FINAL guard: if any $$ survived, neutralize them
s = _strip_all_nested_dollars(s)
return s
def _force_multiline_box_content(s: str) -> str:
"""
If a \boxed{...} inner content appears to be plain text (no multiline constructs),
render it as multiple lines with left alignment.
Heuristics (applied in order):
- If inner already has \begin{...}, \end{...}, \aligned, '\\\\' or '&', leave as-is.
- If it contains explicit '\\' breaks without an environment, split on them.
- If content looks like a JSON array [ {...}, {...}, ... ], render one object per row.
- Split on ' > ' (or any whitespace '>' whitespace) to break comparative/ranking summaries.
- Prefer splitting at numbered list markers like '1. ', '2. '.
- Otherwise honor explicit newlines.
- As fallback for very long text, split at ' - ' bullets.
Formatting:
- Use \\begin{array}{l} ... \\end{array} so rows are left-aligned.
- Wrap a row into \\text{...} when it has no TeX commands already.
- Escape { } _ & inside \\text{...}.
"""
def _fmt_lines_array(lines: List[str]) -> str:
out = []
for l in lines:
l = l.strip()
if not l:
continue
if re.search(r'\\[a-zA-Z]+', l):
# Mixed content: keep existing TeX commands, but wrap plain text runs in \text{...}
try:
masked_text, toks_text = _mask_text_blocks(l)
masked_cmd, toks_cmd = _mask_command_blocks(masked_text, ["textbf", "emph"])
# Split around tokens and wrap non-token chunks
parts = re.split(r'(__TEXTBLOCK_\d+__|__CMDBLOCK_\d+__)', masked_cmd)
rebuilt = []
for part in parts:
if not part:
continue
if part.startswith('__TEXTBLOCK_') or part.startswith('__CMDBLOCK_'):
rebuilt.append(part)
else:
# wrap plain chunk in \text{...} with text-mode escaping
esc = (
part.replace('{', r'\{')
.replace('}', r'\}')
.replace('_', r'\_')
.replace('&', r'\&')
.replace('%', r'\%')
.replace('#', r'\#')
)
rebuilt.append(r'\text{' + esc + '}')
mixed = "".join(rebuilt)
# Unmask in reverse order: commands first, then existing \text blocks
mixed = _unmask_text_blocks(mixed, toks_cmd)
mixed = _unmask_text_blocks(mixed, toks_text)
out.append(mixed)
except Exception:
out.append(l)
else:
esc = (
l.replace('{', r'\{')
.replace('}', r'\}')
.replace('_', r'\_')
.replace('&', r'\&')
.replace('%', r'\%')
.replace('#', r'\#')
)
out.append(r'\text{' + esc + '}')
return r'\begin{array}{l}' + r' \\\\ '.join(out) + r'\end{array}'
try:
txt = s or ""
# Already structured -> leave unchanged
# NOTE: Do not treat a bare '&' as "structured"; allow multiline formatting even if '&' exists in plain text.
if re.search(r'(\\begin\{|\\end\{|\\aligned|\\\\)', txt):
return txt
# SAFETY: Avoid inserting breaks inside \text{...} groups by masking them first
masked_txt, toks = _mask_text_blocks(txt)
# Force a line break after every ':' except URL schemes like 'http://' or 'https://'
masked_txt = re.sub(r':(?!//)', ':\n', masked_txt)
# Force a line break before every ' - ' bullet if not already at a new line.
# Conservative rule: only when surrounded by spaces, so hyphens in words/URLs aren't affected.
masked_txt = re.sub(r'\s-\s', '\n- ', masked_txt)
# Enforce max line length: 130 chars (BOXED_WRAP_WIDTH) by wrapping at the NEXT whitespace >= width
# 1) Split on existing/inserted newlines first
parts = [ln.strip() for ln in re.split(r'\s*\n+\s*', masked_txt) if ln.strip()]
if not parts:
parts = [masked_txt.strip()]
# 2) Soft-wrap each part by spaces without breaking masked \text{...} tokens.
# - Prefer the first whitespace at or after the width threshold.
# - If a single token exceeds width and has no whitespace, keep it on one line (do not split tokens).
W = BOXED_WRAP_WIDTH
def _wrap_masked_by_space_next(s: str, width: int) -> List[str]:
out: List[str] = []
s = s.strip()
while len(s) > width:
# find first whitespace at or after width
m = re.search(r'\s', s[width:])
if m:
cut = width + m.start()
out.append(s[:cut].rstrip())
s = s[cut+1:].lstrip() # drop the breaking whitespace
continue
# no whitespace ahead; if there's whitespace before, break there
prev = s.rfind(' ')
if prev != -1:
out.append(s[:prev].rstrip())
s = s[prev+1:].lstrip()
else:
# single long token; emit as a line without splitting
out.append(s)
s = ""
if s:
out.append(s)
return out
wrapped_masked: List[str] = []
for p in parts:
wrapped_masked.extend(_wrap_masked_by_space_next(p, W))
# 3) If we produced multiple rows, unmask and return as left-aligned array
if len(wrapped_masked) > 1:
unmasked_lines = [_unmask_text_blocks(line, toks) for line in wrapped_masked]
return _fmt_lines_array(unmasked_lines)
# Single line after wrapping; unmask back and continue with other heuristics
txt = _unmask_text_blocks(wrapped_masked[0], toks)
# 0) If explicit math line breaks (\\) are present without an environment, split on them
if re.search(r'\\\\', txt):
lines = [ln.strip() for ln in re.split(r'\s*\\\\\s*', txt) if ln.strip()]
if lines:
return _fmt_lines_array(lines)
# 1) JSON array lines (e.g., [ {..}, {..}, ... ])
m = re.match(r'^\s*\[(.*)\]\s*$', txt, flags=re.S)
if m:
inner = m.group(1)
parts = re.split(r'\},\s*', inner)
lines = []
for p in parts:
p = p.strip()
if not p:
continue
if not p.endswith('}'):
p = p + '}'
p = p.lstrip(',')
lines.append(p)
if lines:
return _fmt_lines_array(lines)
# 2) Split on '>' separators commonly used in rankings
gt_parts = [p.strip() for p in re.split(r'\s*>\s*', txt) if p and p.strip()]
if len(gt_parts) > 1:
lines = []
for idx, part in enumerate(gt_parts):
lines.append(part if idx == 0 else ('> ' + part))
return _fmt_lines_array(lines)
# 3) Assumptions block: split heading and bullets/newlines into separate rows
# Supports '**Assumptions**:' or '\textbf{Assumptions}:' or plain 'Assumptions:'
m_assume = re.search(r'(?im)^\s*(?:\*{0,2}\s*)?(?:\\textbf\{)?Assumptions(?:\})?\s*:?', txt)
if m_assume:
head = txt[:m_assume.start()].strip()
tail = txt[m_assume.start():].strip()
# Split head as numbered list or by newlines
head_tokens = re.split(r'(?=(?:^|\n|\s)\d+\.\s)', head)
head_lines = [t.strip() for t in head_tokens if t and t.strip()]
if len(head_lines) <= 1:
head_lines = [ln.strip() for ln in re.split(r'\s*\n+\s*', head) if ln.strip()]
# Split tail by newlines first; if it stays one line, split by '- ' bullets
tail_lines = [ln.strip() for ln in re.split(r'\s*\n+\s*', tail) if ln.strip()]
if len(tail_lines) <= 1:
tail_lines = [p.strip() for p in re.split(r'\s+-\s+', tail) if p.strip()]
lines = head_lines + tail_lines
if lines:
return _fmt_lines_array(lines)
# 4) Prefer split at numbered list markers like '1. ', '2. '
tokens = re.split(r'(?=(?:^|\n|\s)\d+\.\s)', txt)
lines = [t.strip() for t in tokens if t and t.strip()]
# 4) If not split, honor explicit newlines if present
if len(lines) <= 1:
nl_lines = [ln.strip() for ln in re.split(r'\s*\n+\s*', txt) if ln.strip()]
if len(nl_lines) > 1:
lines = nl_lines
# 5) Fallback for very long text: split on bullet-like dashes
if len(lines) <= 1 and len(txt.strip()) > 120:
bullet_lines = [b.strip() for b in re.split(r'\s+-\s+', txt) if b.strip()]
if len(bullet_lines) > 1:
lines = bullet_lines
if len(lines) > 1:
return _fmt_lines_array(lines)
# 8) Width-based soft wrap at 80 chars (split at nearest space)
W = BOXED_WRAP_WIDTH
txt_stripped = txt.strip()
if len(txt_stripped) > W:
def _soft_wrap_by_space(t: str, width: int = 80) -> List[str]:
rows: List[str] = []
s2 = t.strip()
while len(s2) > width:
cut = s2.rfind(' ', 0, width + 1)
if cut == -1:
cut = width
rows.append(s2[:cut].strip())
s2 = s2[cut:].lstrip()
if s2:
rows.append(s2)
return rows
soft_lines = _soft_wrap_by_space(txt_stripped, W)
if len(soft_lines) > 1:
return _fmt_lines_array(soft_lines)
# Final safeguard for single-line boxed content:
# Render upright (non-italic). If TeX commands are present, wrap only plain fragments
# in \text{...}; otherwise wrap the whole line in \text{...}.
def _escape_text_mode_local(s2: str) -> str:
return (
s2.replace('{', r'\{')
.replace('}', r'\}')
.replace('_', r'\_')
.replace('&', r'\&')
.replace('%', r'\%')
.replace('#', r'\#')
)
def _wrap_plain_outside_commands_local(s2: str) -> str:
n2 = len(s2)
i2 = 0
parts2: List[str] = []
plain2: List[str] = []
def flush_plain2():
if not plain2:
return
chunk2 = ''.join(plain2)
if chunk2:
parts2.append(r'\text{' + _escape_text_mode_local(chunk2) + '}')
plain2.clear()
while i2 < n2:
ch2 = s2[i2]
if ch2 == '\\':
# finish accumulated plain text before emitting the command
flush_plain2()
j2 = i2 + 1
# read command name (letters)
while j2 < n2 and s2[j2].isalpha():
j2 += 1
# handle escapes like \%, \_, \{, \}
if j2 == i2 + 1:
if i2 + 1 < n2:
parts2.append(s2[i2:i2+2])
i2 += 2
else:
parts2.append('\\')
i2 += 1
continue
# capture following balanced brace groups (arguments), if any
k2 = j2
while k2 < n2 and s2[k2] == '{':
depth2 = 1
t = k2 + 1
while t < n2 and depth2 > 0:
c3 = s2[t]
if c3 == '\\' and t + 1 < n2:
t += 2
continue
if c3 == '{':
depth2 += 1
elif c3 == '}':
depth2 -= 1
t += 1
k2 = t
if depth2 != 0:
break
parts2.append(s2[i2:(k2 if k2 > j2 else j2)])
i2 = (k2 if k2 > j2 else j2)
else:
plain2.append(ch2)
i2 += 1
flush_plain2()
return ''.join(parts2)
if not re.search(r'\\[a-zA-Z]+', txt):
# No TeX commands at all → wrap whole line
esc = _escape_text_mode_local(txt_stripped)
return r'\text{' + esc + '}'
else:
# Mixed content on a single line → wrap only the plain fragments
return _wrap_plain_outside_commands_local(txt)
except Exception:
return s
def ensure_display_math(s: str) -> str:
"""
Sanitize math content robustly WITHOUT double-wrapping:
- Sanitize content of EXISTING $$…$$ blocks in place.
- For \begin{aligned}…\end{aligned}: if outside $$, wrap; if inside, just sanitize.
- For \boxed{…}: if outside $$, wrap; if inside, just sanitize.
- ALSO: unwrap envs accidentally wrapped in \text{...} at the whole-string level.
"""
if not isinstance(s, str) or not s:
return s
# 🔧 unwrap envs/boxed placed incorrectly inside \text{...} (top level)
s = _unwrap_envs_from_text(s)
display_blocks = []
def _sanitize_display(m):
start, end = m.span()
content = m.group(1)
content = _sanitize_inside_math(content)
display_blocks.append((start, start + 2, end - 2, end))
return "$$\n" + content + "\n$$"
s = re.sub(r'\$\$(.*?)\$\$', _sanitize_display, s, flags=re.S)
# Recompute display '$$...$$' spans in the NEW string so inside checks are accurate
def _collect_display_spans_in_text(text: str):
spans = []
i3, n3 = 0, len(text)
while i3 < n3:
a = text.find('$$', i3)
if a == -1:
break
b = text.find('$$', a + 2)
if b == -1:
break
spans.append((a, b + 2))
i3 = b + 2
return spans
display_spans = _collect_display_spans_in_text(s)
def _inside_display(idx: int) -> bool:
for (a, b) in display_spans:
if a <= idx <= b:
return True
return False
# Collect boxed spans to prevent wrapping aligned/array inside \boxed{...} with $$
def _collect_boxed_spans(text: str):
spans = []
i2, n2 = 0, len(text)
while i2 < n2:
j2 = text.find(r'\boxed{', i2)
if j2 == -1:
break
k2 = j2 + len(r'\boxed{'); depth2 = 1
while k2 < n2 and depth2 > 0:
c2 = text[k2]
if c2 == '\\' and k2 + 1 < n2:
k2 += 2; continue
if c2 == '{':
depth2 += 1
elif c2 == '}':
depth2 -= 1
if depth2 == 0:
spans.append((j2, k2))
break
k2 += 1
i2 = k2 if k2 > j2 else j2 + 1
return spans
boxed_spans = _collect_boxed_spans(s)
def _inside_boxed(idx: int) -> bool:
for (a, b) in boxed_spans:
if a <= idx <= b:
return True
return False
# aligned (only wrap with $$ if not already inside display or boxed)
def _proc_aligned(m):
start, _ = m.span(1)
blk = _sanitize_inside_math(m.group(1))
return blk if (_inside_display(start) or _inside_boxed(start)) else "$$\n" + blk + "\n$$"
s = re.sub(r'(\\begin\{aligned\}.*?\\end\{aligned\})', _proc_aligned, s, flags=re.S)
# Pre-normalize escaped closing braces like '\}' inside single-arg commands to real '}' so subsequent scans see balanced braces
s = _normalize_escaped_closing_braces_for_single_arg_commands(s, ["textbf", "emph"])
# boxed (balanced-scan)
out = []; i, n = 0, len(s)
while i < n:
j = s.find(r'\boxed{', i)
if j == -1:
out.append(s[i:]); break
out.append(s[i:j])
k = j + len(r'\boxed{'); depth = 1
while k < n and depth > 0:
c = s[k]
if c == '\\' and k + 1 < n:
k += 2; continue
if c == '{': depth += 1
elif c == '}': depth -= 1
k += 1
if depth != 0:
out.append(s[j:]); i = n; break
inner = s[j+len(r'\boxed{'):k-1]
inner = _sanitize_inside_math(inner)
inner = _force_multiline_box_content(inner)
fixed = r'\boxed{' + inner + '}'
out.append(fixed if _inside_display(j) else "$$\n" + fixed + "\n$$")
i = k
s = "".join(out)
# 🔧 unwrap again in case earlier steps exposed a fresh \text{ \begin...\end... }
s = _unwrap_envs_from_text(s)
# 🔧 final cleanup for malformed commands and percent escapes
s = _cleanup_tex_command_braces_and_percent(s)
# 🔧 normalize simple '\}' -> '}' within single-arg commands (fast path)
s = _normalize_escaped_closing_braces_for_single_arg_commands(s, ["textbf", "emph", "text"])
# 🔧 first, repair unclosed single-arg commands INSIDE \text{...} groups
s = _repair_unclosed_single_arg_commands_in_text_groups(s, ["textbf", "emph"])
# 🔧 then, global repair for any remaining single-arg commands
s = _repair_unclosed_single_arg_commands(s, ["textbf", "emph", "text"])
return s
def _final_response_bubble(text_md: str) -> str:
"""Standalone GREEN bubble for the last/primary narrative."""
# Log exactly final response before/after LaTeX processing (only two prints)
# try:
# # print("FINAL_RESPONSE_RAW_BEFORE_LATEX:\n", text_md)
# pass
# except Exception:
# pass
text_md = ensure_display_math(text_md)
# try:
# # print("FINAL_RESPONSE_STRUCTURED_AFTER_LATEX:\n", format_math(text_md).strip())
# pass
# except Exception:
# pass
nlc = ' \n\n'
return (
f""
f"
Final Response:\n\n"
f"
"
f"{nlc + format_math(text_md).strip()}"
f"
\n\n"
f"
"
)
def _final_answer_block(answer: Any) -> str:
if answer is None:
return "**Final Answer**:\n\n_(not available)_\n\n" + SPACER
s = str(answer).strip()
return f"**Final Answer**:\n\n$$\\boxed{{{s}}}$$\n\n{SPACER}"
def _remove_progress_bubble_if_any(history: List[Dict[str, str]],
progress_bubble_idx: Optional[int]) -> Optional[int]:
if progress_bubble_idx is None:
return None
del history[progress_bubble_idx]
return None
def _spinner_text_bubble(label: str, frame: str) -> str:
"""ASCII spinner frame bubble."""
return (
f""
f"
{frame}
"
f"
{label}
"
f"
"
)
# --------------------------------------------------------------------
# Streaming generator with TEXT spinner at ~10 Hz via background worker thread
# --------------------------------------------------------------------
def generate_response(
user_message: str,
max_tokens: int,
temperature: float,
top_p: float,
history_state: list,
convo_id: str,
raw_prompts: dict,
deepresearch_on_value: bool,
summary_llm: str
):
if not user_message.strip():
# Empty input: yield a single no-op update with sidebar placeholders to keep arity consistent
yield history_state, history_state, raw_prompts, "", ""
return
old_prompt = raw_prompts.get(convo_id)
history = history_state + [{"role": "user", "content": user_message}]
# One cluster bubble for *this* turn; we update it as steps arrive
cluster_idx: Optional[int] = None
steps: List[Dict[str, Any]] = [] # each: {"text": str, "calls": list, "results": list}
progress_bubble_idx: Optional[int] = None
spinner_frames = ["-", "\\", "|", "/"]
spinner_i = 0
spinner_label = "" # "Working… fetching results" or "Working… synthesizing LLM response"
# NEW: Track sources during this turn
visited_urls: List[str] = []
visited_set: set = set()
search_urls_all: List[str] = []
search_set: set = set()
sources_bubble_idx: Optional[int] = None
# Sidebar sources data structures
visited_meta_map: Dict[str, Dict[str, str]] = {} # url -> {"title": str, "snippet": str}
search_meta_map: Dict[str, Dict[str, str]] = {} # url -> {"title": str, "snippet": str}
visited_html_str: str = ""
search_html_str: str = ""
# --- helpers for the Sources sidebar (cards) ---
def _truncate_50(s: Any) -> str:
t = str(_pretty_json(s))
t = re.sub(r'\s+', ' ', t).strip()
return (t[:50] + "…") if len(t) > 50 else t
def _card_html(url: str, title: Optional[str], snippet: Optional[str]) -> str:
esc_url = html.escape(url, quote=True)
esc_title = html.escape((title or url).strip(), quote=True)
esc_snip = html.escape((snippet or "").strip(), quote=True)
return (
""
f"
"
f"
{_truncate_50(esc_snip)}
"
"
"
)
def _parse_search_entries(res: Any) -> List[Dict[str, str]]:
"""
Parse markdown-like search results into list of {url, title, snippet}.
Fallback to bare URLs if no rich structure is found.
"""
text = _pretty_json(res)
if not isinstance(text, str):
text = str(text)
entries: List[Dict[str, str]] = []
pat = re.compile(
r'###\s*\d+\.\s*(?P.*?)\s*\n\*\*URL\*\*:\s*(?P\S+)\s*\n\*\*Snippet\*\*:\s*(?P.*?)(?:\n---|\Z)',
re.S
)
for m in pat.finditer(text):
url = (m.group('url') or '').strip()
title = (m.group('title') or '').strip()
snip = (m.group('snip') or '').strip()
if url:
entries.append({'url': url, 'title': title, 'snippet': snip})
if not entries:
for u in _extract_urls_from_result(res):
entries.append({'url': u, 'title': '', 'snippet': ''})
return entries
def _extract_query_summary(res: Any) -> str:
"""
Robustly extract a concise summary from a tool result.
Priority:
1) If result is a dict, look for common summary-like fields.
2) If text contains "Summary:" use the LAST occurrence closest to the end,
and return everything after "Summary:" up to the next blank line or next
labeled section (e.g., "URL:", "Title:", "---") or end of string.
3) Otherwise, fallback to the first 1–2 sentences.
Always returns a non-empty string if any meaningful text is present.
"""
splits = res.split("Summary:")
last_part = splits[-1]
return last_part.strip() or "(info unavailable)"
# # 1) Structured result
# try:
# if isinstance(res, dict):
# for k in ("Summary", "summary", "short_summary", "abstract"):
# v = res.get(k)
# if isinstance(v, str) and v.strip():
# return v.strip()
# except Exception:
# pass
# # 2) Stringify and try "Summary:" block (take the LAST occurrence)
# text = _pretty_json(res)
# if not isinstance(text, str):
# text = str(text)
# try:
# # Find last "Summary:" (case-insensitive)
# matches = list(re.finditer(r'(?is)\bSummary\s*:', text))
# if matches:
# start = matches[-1].end()
# tail = text[start:]
# # Stop at next blank line OR next labeled section like "Word:" at line start OR '---'
# stop_pat = re.compile(r'(?im)^\s*$|^\s*[A-Z][A-Za-z0-9 _\-]{0,24}:\s|^\s*---\s*$', re.M)
# mstop = stop_pat.search(tail)
# segment = tail[:mstop.start()] if mstop else tail
# seg = segment.strip()
# if seg:
# return seg
# except Exception:
# pass
# # Also accept LAST "Synopsis:" or "TL;DR:" as proxies
# try:
# matches2 = list(re.finditer(r'(?is)\b(?:Synopsis|TL;DR)\s*:', text))
# if matches2:
# start = matches2[-1].end()
# tail = text[start:]
# stop_pat = re.compile(r'(?im)^\s*$|^\s*[A-Z][A-Za-z0-9 _\-]{0,24}:\s|^\s*---\s*$', re.M)
# mstop = stop_pat.search(tail)
# segment = tail[:mstop.start()] if mstop else tail
# seg = segment.strip()
# if seg:
# return seg
# except Exception:
# pass
# # 3) Fallback: first 1–2 sentences
# s = re.sub(r'\s+', ' ', (text or '').strip())
# if not s:
# return "(summary unavailable)"
# sentences = re.split(r'(?<=[.!?])\s+', s)
# head = " ".join(sentences[:2]).strip()
# return head or "(summary unavailable)"
# def _render_sidebar_lists(visited_list: List[str], search_list: List[str]) -> Tuple[str, str]:
def _render_sidebar_lists(visited_list: List[str]) -> str:
"""
Build HTML strings for the visited and search sections as card lists.
"""
# visited cards
# print("rendering info")
v_parts: List[str] = []
for u in visited_list:
meta = visited_meta_map.get(u, {})
v_parts.append(_card_html(u, meta.get('title') or '', meta.get('snippet') or '(fetching…)'))
visited_html = "".join(v_parts) or "(none yet)
"
# search cards
# s_parts: List[str] = []
# for u in search_list:
# meta = search_meta_map.get(u, {})
# s_parts.append(_card_html(u, meta.get('title') or '', meta.get('snippet') or ''))
# search_html = "".join(s_parts) or "No search results extracted yet.
"
# return visited_html, search_html
return visited_html
def _extract_urls_from_result(res: Any) -> List[str]:
"""
Robustly extract URLs from tool results.
Handles:
- dicts (look for 'url', 'link', 'href' keys, and scan values)
- lists/tuples
- strings (prefer '**URL**: https://...' lines; else generic URL regex)
Preserves encounter order and uniqueness for this call result.
"""
found: List[str] = []
seen: set = set()
def _add(u: str):
if not isinstance(u, str):
return
u = u.strip()
if not u:
return
# trim trailing punctuation/brackets
u2 = re.sub(r'[)\]\}>,;]+$', '', u)
if u2 not in seen:
seen.add(u2)
found.append(u2)
def _walk(x: Any):
if x is None:
return
if isinstance(x, str):
s = x
# Prefer labeled lines like: **URL**: https://example.com
for m in re.findall(r'\*\*URL\*\*:\s*(https?://\S+)', s):
_add(m)
# Fallback to generic URL scan
for m in re.findall(r'https?://[^\s<>()\]\}"]+', s):
_add(m)
elif isinstance(x, dict):
# common fields
for k in ("url", "link", "href"):
v = x.get(k)
if isinstance(v, str):
_add(v)
for v in x.values():
_walk(v)
elif isinstance(x, (list, tuple, set)):
for v in x:
_walk(v)
else:
s = str(x)
for m in re.findall(r'https?://[^\s<>()\]\}"]+', s):
_add(m)
_walk(res)
return found
# --- URL normalizer for robust matching across variants (https/http, www., fragments, trailing slash) ---
def _normalize_url(u: str) -> str:
try:
s = (u or "").strip()
if not s:
return ""
# strip fragment
s = re.sub(r'#.*$', '', s)
# normalize scheme/host case; drop www.
m = re.match(r'^(https?)://([^/]+)(/.*|$)', s, flags=re.I)
if m:
scheme = m.group(1).lower()
host = m.group(2).lower()
host = host[4:] if host.startswith('www.') else host
rest = m.group(3) or ''
# drop trailing slash except for root
if rest.endswith('/') and len(rest) > 1:
rest = rest[:-1]
return f"{scheme}://{host}{rest}"
# fallback: drop trailing slash
if s.endswith('/') and len(s) > 1:
s = s[:-1]
return s
except Exception:
return (u or "").strip()
def _search_meta_snippet_for_url(search_meta_map: Dict[str, Dict[str, str]], url: str) -> str:
"""Find a snippet in search_meta_map using normalized URL comparison."""
try:
n = _normalize_url(url)
for k, meta in (search_meta_map or {}).items():
if _normalize_url(k) == n:
return (meta.get("snippet") or "").strip()
except Exception:
pass
return ""
def _upsert_sources_bubble():
"""
Repurposed: update the sidebar HTML strings instead of inserting a chat bubble.
"""
# nonlocal visited_html_str, search_html_str
# search_dedup = [u for u in search_urls_all if u not in visited_set]
# visited_html_str, search_html_str = _render_sidebar_lists(visited_urls, search_dedup)
nonlocal visited_html_str
# global visited_urls
visited_html_str = _render_sidebar_lists(visited_urls)
# Start model worker in background thread and stream events through a queue
q: Queue = Queue()
def _worker():
try:
# print("send")
# print(f"DeepResearch_on : {deepresearch_on_value}")
gen = answer_question_recall(
question=user_message,
temperature=float(temperature),
top_p=float(top_p),
max_new_tokens=int(max_tokens),
old_prompt=old_prompt,
deepresearch_on=deepresearch_on_value,
summary_llm=summary_llm
)
while True:
tag, out = next(gen) # will raise StopIteration when done
q.put((tag, out))
if tag == "answer":
break
except StopIteration as e:
# propagate any final value if present
q.put(("stop", getattr(e, "value", None)))
except Exception as e:
q.put(("error", str(e)))
t = threading.Thread(target=_worker, daemon=True)
t.start()
# Main render loop: poll queue at ~10 Hz; animate text spinner when queue is empty
while True:
try:
tag, out = q.get(timeout=0.1) # 10 Hz
# Remove spinner bubble BEFORE rendering the next event
progress_bubble_idx = _remove_progress_bubble_if_any(history, progress_bubble_idx)
if tag == "assistant_resp":
assistant_text, tool_calls = out or ("", [])
# print(f"NUMBER OF TOOL_CALLS:{len(out[1]), len(tool_calls)}\n\nTOOL_CALLS:\n{out[1]}\n\n")
steps.append({"text": assistant_text, "calls": tool_calls or [], "results": []})
# Extract query_url targets early (from arguments)
for call in (tool_calls or []):
norm = _normalize_tool_call(call)
name = (norm.get("name") or "").lower()
if name == "query_url":
u = str((norm.get("arguments") or {}).get("url", "")).strip()
if u and u not in visited_set:
visited_set.add(u)
visited_urls.append(u)
# Initialize placeholder meta for visited
if u not in visited_meta_map:
# print("appending visited url (from call):", u)
visited_meta_map[u] = {"title": "", "snippet": "(fetching…)"}
# Re-render the BLUE cluster bubble with YELLOW condensed step bubbles
condensed_html_list = []
for idx, tstep in enumerate(steps):
condensed_html_list.append(
_condensed_step_block(
tstep["text"],
tstep["calls"],
tstep["results"],
# last_open=(idx == len(steps) - 1)
last_open=True
)
)
cluster_html = _cluster_wrapper("".join(condensed_html_list).strip() or "_(no steps)_")
if cluster_idx is None:
history.append({"role": "assistant", "content": cluster_html})
cluster_idx = len(history) - 1
else:
history[cluster_idx]["content"] = cluster_html
# Update sidebar sources HTML
_upsert_sources_bubble()
yield history, history, raw_prompts, visited_html_str, search_html_str
# After assistant_resp, spinner for fetching results
spinner_label = "Working… fetching results"
frame = spinner_frames[spinner_i % len(spinner_frames)]
history.append({"role": "assistant", "content": _spinner_text_bubble(spinner_label, frame)})
progress_bubble_idx = len(history) - 1
spinner_i += 1
yield history, history, raw_prompts, visited_html_str, search_html_str
elif tag == "tool_results":
results_list = out[0] or []
# print(f"NUMBER OF TOOL_RESULTS:{len(out[0]), len(results_list)}\n\nTOOL_RESULTS:\n{out[0]}\n\n")
if steps:
steps[-1]["results"] = results_list
# Align results to calls and extract sources + meta
calls = steps[-1].get("calls") or []
for i, call in enumerate(calls):
norm = _normalize_tool_call(call)
name = (norm.get("name") or "").lower()
if name == "query_url":
# print("got query url")
u = str((norm.get("arguments") or {}).get("url", "")).strip()
if u and u not in visited_set:
visited_set.add(u)
visited_urls.append(u)
# Update visited meta from result summary (aligned by index if available)
res_i = results_list[i] if i < len(results_list) else None
# print("result for query url:\n", res_i.replace('\n',' '))
if u:
if u not in visited_meta_map:
# print(u, "not in visited_meta_map, adding")
visited_meta_map[u] = {"title": "", "snippet": ""}
# print("extracted query summary for visited url:\n", _extract_query_summary(res_i)[:50].replace('\n',' '))
visited_meta_map[u]["snippet"] = _extract_query_summary(res_i)
elif name == "search_urls":
res_i = results_list[i] if i < len(results_list) else None
# Parse structured search entries and update meta + list
for entry in _parse_search_entries(res_i):
u = entry.get("url", "").strip()
if not u:
continue
if u not in search_set and u not in visited_set:
search_set.add(u)
search_urls_all.append(u)
search_meta_map[u] = {
"title": entry.get("title", "").strip(),
"snippet": entry.get("snippet", "").strip(),
}
# # --- SECOND PASS: robustly fill snippets even when results/calls are misaligned ---
# url_summary_map: Dict[str, str] = {}
# for r in (results_list or []):
# try:
# sumtxt = _extract_query_summary(r).strip()
# except Exception:
# sumtxt = ""
# if not sumtxt:
# continue
# # Associate this summary with all URLs detected in this result (normalized)
# for u2 in _extract_urls_from_result(r):
# if not isinstance(u2, str) or not u2.strip():
# continue
# key = _normalize_url(u2.strip())
# if key and key not in url_summary_map:
# url_summary_map[key] = sumtxt
# # For any visited URL still missing a snippet, try normalized url_summary_map first, then search_meta_map (normalized)
# for vu in list(visited_urls):
# current_snip = (visited_meta_map.get(vu, {}) or {}).get("snippet", "")
# needs_fill = (not current_snip) or (current_snip.strip() == "(fetching…)")
# if not needs_fill:
# continue
# key = _normalize_url(vu)
# fill = url_summary_map.get(key, "")
# if fill:
# visited_meta_map.setdefault(vu, {"title": "", "snippet": ""})
# visited_meta_map[vu]["snippet"] = fill
# continue
# # Fallback to search result snippet if available (normalized match)
# search_snip = _search_meta_snippet_for_url(search_meta_map, vu)
# if search_snip:
# visited_meta_map.setdefault(vu, {"title": "", "snippet": ""})
# visited_meta_map[vu]["snippet"] = search_snip
# Re-render cluster with results folded into the last step
condensed_html_list = []
for idx, tstep in enumerate(steps):
condensed_html_list.append(
_condensed_step_block(
tstep["text"],
tstep["calls"],
tstep["results"],
# last_open=(idx == len(steps) - 1)
last_open=True
)
)
cluster_html = _cluster_wrapper("".join(condensed_html_list).strip())
if cluster_idx is not None:
history[cluster_idx]["content"] = cluster_html
# Update sidebar sources HTML
_upsert_sources_bubble()
yield history, history, raw_prompts, visited_html_str, search_html_str
# Switch spinner to synthesizing
spinner_label = "Working… synthesizing LLM response"
frame = spinner_frames[spinner_i % len(spinner_frames)]
history.append({"role": "assistant", "content": _spinner_text_bubble(spinner_label, frame)})
progress_bubble_idx = len(history) - 1
spinner_i += 1
yield history, history, raw_prompts, visited_html_str, search_html_str
elif tag == "end":
chat_str = out[0]
raw_prompts[convo_id] = chat_str
# Move the *last* step out of the blue cluster → GREEN Final Response
final_resp_html = ""
if steps:
last = steps[-1]
final_resp_html = _final_response_bubble(last["text"])
prev_steps = steps[:-1]
else:
prev_steps = []
# Update (or remove) the cluster bubble
if cluster_idx is not None:
if prev_steps:
condensed_html_list = []
for tstep in prev_steps:
condensed_html_list.append(
_condensed_step_block(tstep["text"], tstep["calls"], tstep["results"], last_open=True)
)
cluster_html = _cluster_wrapper("".join(condensed_html_list).strip())
history[cluster_idx]["content"] = cluster_html
else:
# Remove empty cluster bubble and adjust indices after deletion
deleted_index = cluster_idx
del history[cluster_idx]
cluster_idx = None
if sources_bubble_idx is not None and sources_bubble_idx > deleted_index:
sources_bubble_idx -= 1
# Update sidebar sources HTML
_upsert_sources_bubble()
# Insert final response AFTER sources bubble (or cluster if sources missing)
if sources_bubble_idx is not None:
insert_at = sources_bubble_idx + 1
elif cluster_idx is not None:
insert_at = cluster_idx + 1
else:
insert_at = len(history)
if final_resp_html:
history.insert(insert_at, {"role": "assistant", "content": final_resp_html})
insert_at += 1
# Status marker
history.insert(insert_at, {"role": "assistant", "content": f"{SPACER}✅ Information retrieved\n\n{SPACER}"})
yield history, history, raw_prompts, visited_html_str, search_html_str
# no spinner after end; continue loop for 'answer'
spinner_label = ""
elif tag == "answer":
answer = out[0] if out else None
final_block = _final_answer_block(answer)
yield history, history, raw_prompts, visited_html_str, search_html_str
break
else:
break
except Empty:
# No new model events -> animate text spinner at ~10 Hz
if progress_bubble_idx is not None and spinner_label:
frame = spinner_frames[spinner_i % len(spinner_frames)]
history[progress_bubble_idx]["content"] = _spinner_text_bubble(spinner_label, frame)
spinner_i += 1
yield history, history, raw_prompts, visited_html_str, search_html_str
# else: nothing to animate
except StopIteration as e:
tag, out = e.value if e.value else (None, None)
if tag != "answer":
break
answer = out[0] if out else None
final_block = _final_answer_block(answer)
yield history, history, raw_prompts, visited_html_str, search_html_str
break
# end while
# (worker thread will end after 'answer'; daemon=True so it won't hang)
# 🔄 Updated examples for SEARCH use-cases
example_messages = {
"DeepResearch on the latest GST policy": (
"Do a detailed deepresearch on the recent GST rate change, find all major rate changes and tell me about its implications. Find the official government sources and also tell me about how it will affect the general public."
),
"UPSC 2025 Prelims Question": (
"Among a list of European countries (Austria, Bulgaria, Croatia, Serbia, Sweden, North Macedonia), how many are NATO members?"
),
"DeepResearch on inflammation in treatments for cancer": (
"Access recent cancer studies that used antibody-drug treatments and do a detailed deepresearch on the studies. Tell me about how long after starting did lung inflammation usually appear (average weeks), and how many out of 100 had a serious cases? Discuss about the possible reasons, results, and conclusions drawn from these studies."
),
"Which Indian math model was post-trained for ~$499?": (
"Identify the Indian AI lab which reportedly post-trained a math model for about $499 with performance comparable to o3-mini. Give model name, lab name, and brief method summary."
),
"DeepResearch analysis on EV Adoption": (
"Do an in-depth, data-driven comparison of EV adoption in the US, Europe, China, and India. Cover: (1) Market overview—penetration rates, 5–10 year trends, and key policies/incentives shaping growth. (2) Consumer behavior—attitudes and purchase drivers (environment, TCO, tech). (3) Charging—public/private build-out, accessibility, tech differences, and major initiatives. (4) Automakers—top OEM strategies, production capacity, partnerships, and cross-border competition. (5) Barriers vs. opportunities—price, range, supply chain vs. battery advances, renewables integration, smart mobility. (6) Outlook to 2030—regional growth projections, impacts on global trends and emerging markets. Use specific examples and recent quantitative data"
)
}
def pick_random_example():
try:
return gr.update(value=random.choice(list(example_messages.values())))
except Exception:
return gr.update(value="")
# --------------------------------------------------------------------
# CLEAR helpers: wipe UI + internal memory for the current conversation
# --------------------------------------------------------------------
def _clear_current_conversation(convo_id: str,
conversations: Dict[str, Any],
raw_prompts: Dict[str, str]):
if convo_id in conversations:
conversations[convo_id]["messages"] = []
if convo_id in raw_prompts:
del raw_prompts[convo_id]
return conversations, raw_prompts
with gr.Blocks(theme=gr.themes.Soft()) as demo:
conversations_state = gr.State({})
current_convo_id = gr.State(generate_conversation_id())
history_state = gr.State([])
raw_prompts_state = gr.State({}) # convo_id -> raw chat_str
# UI state for sources sidebar
sources_visible = gr.State(False)
# print("Search is initialised to True")
deepresearch_on = gr.State(True) # default: search enabled
# summary_llm = gr.State("gpt-4.1-mini") # default: GPT-3.5 for summaries
summary_llm = gr.State(os.environ["SUMMARY_HF_MODEL_URL"])
# deepresearch = gr.State(True) # default: deepresearch enabled
# Header
gr.HTML(
"""
"""
)
with gr.Sidebar(width=270, open=False):
gr.HTML("""
Fathom-Search-4B + Fathom-Synthesizer-4B
""")
gr.Markdown("## 💬 Conversations")
conversation_selector = gr.Radio(choices=[], label="Select Conversation", interactive=True, elem_id="conv_selector")
new_convo_button = gr.Button("New Conversation ➕")
gr.Markdown("## ⚙️ Settings")
max_tokens_slider = gr.Slider(minimum=8000, maximum=40000, step=1000, value=40000, label="Max Tokens", elem_id="slider")
with gr.Accordion("Advanced Settings", open=False):
temperature_slider = gr.Slider(minimum=0.0, maximum=2.0, value=0.6, label="Temperature", elem_id="slider")
top_p_slider = gr.Slider(minimum=0.0, maximum=1.0, value=0.95, label="Top-p", elem_id="slider")
gr.Markdown("Note: Closing this demo window will clear saved conversations.")
gr.Markdown("""
We sincerely acknowledge [VIDraft](https://huggingface.co/VIDraft) for their Phi 4 Reasoning Plus [space](https://huggingface.co/spaces/VIDraft/phi-4-reasoning-plus), which inspired parts of this demo UI.
""")
# Sources UI moved to dedicated right Sidebar (see below)
with gr.Sidebar(open=False, visible=False, width=240, elem_id="sources_sidebar", position="right") as sources_sidebar:
gr.Markdown("# Sources")
visited_sources_html = gr.HTML("(none yet)
")
# Hidden placeholder to preserve outputs arity; no UI for search
search_sources_html = gr.HTML("", visible=False)
with gr.Row():
with gr.Column(elem_id="right_col"):
chatbot = gr.Chatbot(label="Chat", type="messages", height=520, elem_id="main_chat", autoscroll=False)
with gr.Row(elem_id="composer_row", equal_height=True):
user_input = gr.Textbox(label="User Input", placeholder="Ask a web question…", lines=1, scale=5, min_width=280, elem_id="composer_input")
with gr.Column(scale=2, min_width=220):
with gr.Row(elem_id="buttons_grid"): # one container
submit_button = gr.Button("Send", variant="primary")
clear_button = gr.Button("Clear")
search_button = gr.Button("DeepResearch ON", variant="huggingface")
sources_button = gr.Button("Sources")
gr.Markdown("### Example questions")
with gr.Row():
example1_button = gr.Button("DeepResearch on the latest GST policy", scale=1)
example2_button = gr.Button("UPSC 2025 Prelims Question", scale=1)
example3_button = gr.Button("DeepResearch on inflammation in treatments for cancer", scale=1)
example4_button = gr.Button("Which Indian math model was post-trained for ~$499?", scale=1)
example5_button = gr.Button("DeepResearch analysis on EV Adoption", scale=1)
# Sources moved to Sidebar Accordion (defined in Sidebar above)
# Placeholders kept for variable references:
# visited_sources_html and search_sources_html are now created inside Sidebar Accordion.
# ---------- helper functions ----------
def update_conversation_list(conversations):
return [conversations[cid]["title"] for cid in conversations]
def toggle_sources(is_open):
return gr.update(open=not is_open), (not is_open)
def toggle_search(is_on, convo_id, conversations):
if convo_id in conversations:
conversations[convo_id]["deepresearch_on"] = not is_on
return gr.update(value = "DeepResearch OFF" if is_on else "DeepResearch ON", variant="secondary" if is_on else "huggingface"), (not is_on), conversations
def render_visited_html(urls, meta=None, url_trunc=72, summary_trunc=100):
if not urls:
return "(none yet)
"
cards = []
meta = meta or {}
for u in urls:
href_url = u # full URL in href
# Truncate display URL to 72 chars
disp = u if len(u) <= url_trunc else (u[:url_trunc] + "…")
esc_disp = html.escape(disp, quote=True)
esc_href = html.escape(href_url, quote=True)
# Truncate summary to 100 chars
raw_snip = (meta.get(u) or "").strip()
snip_trunc = raw_snip if len(raw_snip) <= summary_trunc else (raw_snip[:summary_trunc] + "…")
esc_snip = html.escape(snip_trunc, quote=True)
cards.append(
""
)
return "".join(cards)
def extract_urls_from_html(html_str):
if not isinstance(html_str, str):
return []
# Match href='https://...' or href="https://..."
return re.findall(r"href=(?:'|\")(https?://[^'\\\"]+)(?:'|\")", html_str)
def extract_url_snippets_from_visited_html(html_str):
"""
Parse visited_sources_html (cards) and extract [(url, snippet)] pairs.
"""
pairs = []
if not isinstance(html_str, str) or not html_str.strip():
return pairs
card_pat = re.compile(
r"",
re.S
)
for m in card_pat.finditer(html_str):
url = (m.group(1) or "").strip()
snippet_raw = (m.group(2) or "").strip()
# Strip any HTML tags from snippet
snippet = re.sub(r"<[^>]+>", "", snippet_raw)
if url:
pairs.append((url, snippet))
return pairs
def open_sources(vis):
return gr.update(open=True), True
# Keep State in sync when user clicks the sidebar chevron manually
def mark_sources_open():
return True
def mark_sources_closed():
return False
def get_visited_html_for_selected(selected_title, conversations):
for cid, convo in conversations.items():
if convo["title"] == selected_title:
src = convo.get("sources") or {}
urls = src.get("visited", [])
meta_map = src.get("meta", {})
return render_visited_html(urls, meta=meta_map)
return render_visited_html([])
def start_new_conversation(conversations):
new_id = generate_conversation_id()
conversations[new_id] = {
"title": f"New Conversation {new_id}",
"messages": [],
"sources": {"visited": [], "meta": {}},
"deepresearch_on": True
}
return new_id, [], gr.update(choices=update_conversation_list(conversations), value=conversations[new_id]["title"]), conversations, gr.update(value = "DeepResearch ON", variant="huggingface"), True
def load_conversation(selected_title, conversations):
for cid, convo in conversations.items():
if convo["title"] == selected_title:
return cid, convo["messages"], convo["messages"], gr.update(value = "DeepResearch ON" if convo["deepresearch_on"] else "DeepResearch OFF", variant="huggingface" if convo["deepresearch_on"] else "secondary"), convo["deepresearch_on"]
return current_convo_id.value, history_state.value, history_state.value, gr.update(value = "DeepResearch ON" if history_state["deepresearch_on"] else "DeepResearch OFF", variant="huggingface" if history_state["deepresearch_on"] else "secondary"), history_state["deepresearch_on"]
def send_message(user_message, max_tokens, temperature, top_p, convo_id, history, conversations, raw_prompts, deepresearch_on_value, summary_llm):
if convo_id not in conversations:
title = " ".join(user_message.strip().split()[:5])
titles_in_convo = []
for conver_id in conversations:
if title in conversations[conver_id]["title"]:
titles_in_convo.append(conversations[conver_id]["title"])
while title in titles_in_convo:
title += f" {generate_conversation_id()}"
# print(f"Initializing the conversation dictionary with convo id: {convo_id} and deep research on: {deepresearch_on_value}")
conversations[convo_id] = {"title": title, "messages": history, "sources": {"visited": [], "meta": {}}, "deepresearch_on": deepresearch_on_value}
if "sources" not in conversations[convo_id]:
conversations[convo_id]["sources"] = {"visited": [], "meta": {}}
else:
conversations[convo_id]["sources"].setdefault("visited", [])
conversations[convo_id]["sources"].setdefault("meta", {})
if conversations[convo_id]["title"].startswith("New Conversation"):
title = " ".join(user_message.strip().split()[:5])
titles_in_convo = []
for conver_id in conversations:
if title in conversations[conver_id]["title"]:
titles_in_convo.append(conversations[conver_id]["title"])
while title in titles_in_convo:
title += f" {generate_conversation_id()}"
conversations[convo_id]["title"] = title
# Immediately open the Sources sidebar if closed (before streaming begins)
try:
conv_sources = conversations[convo_id]["sources"]
except Exception:
conv_sources = conversations[convo_id].setdefault("sources", {"visited": [], "meta": {}})
visited_union_html = render_visited_html(conv_sources.get("visited", []), meta=conv_sources.get("meta", {}))
# panel_update = gr.update(open=True) if not sources_open else gr.update()
# sources_open = True if not sources_open else sources_open
# panel_update = gr.update(open=False)
# sources_open = False
# print(user_message)
# Yield a preliminary update so the Sources sidebar opens instantly on Send
yield (
history, # keep chat unchanged until streaming starts
history,
gr.update(choices=update_conversation_list(conversations), value=conversations[convo_id]["title"]),
conversations,
raw_prompts,
gr.update(value=visited_union_html),
gr.update(value=""),
# panel_update,
# sources_open,
# gr.update(value = "DeepResearch ON" if deepresearch_on_value else "DeepResearch OFF", variant="huggingface" if deepresearch_on_value else "secondary"),
# deepresearch_on_value,
)
for updated_history, new_history, new_raw_prompts, visited_html, search_html in generate_response(
user_message, max_tokens, temperature, top_p, history, convo_id, raw_prompts, deepresearch_on_value, summary_llm
):
# Persist messages
conversations[convo_id]["messages"] = new_history
raw_prompts = new_raw_prompts
conv_sources = conversations[convo_id]["sources"]
visited_list = conv_sources["visited"]
meta_map = conv_sources["meta"]
# Merge visited URLs from this turn
turn_urls = extract_urls_from_html(visited_html)
for u in turn_urls:
if u not in visited_list:
visited_list.append(u)
# Merge snippets with priority: search results > visited summaries
url_snips_search = extract_url_snippets_from_visited_html(search_html)
for u, snip in url_snips_search:
snip = (snip or "").strip()
if not snip:
continue
# Only seed from search when nothing exists or we only have a placeholder
existing = (meta_map.get(u) or "").strip()
if not existing or existing == "(fetching…)":
meta_map[u] = snip
url_snips_visited = extract_url_snippets_from_visited_html(visited_html)
for u, snip in url_snips_visited:
snip = (snip or "").strip()
if not snip:
continue
# Prefer summaries from visited (tool results). Overwrite placeholders or search snippets.
existing = (meta_map.get(u) or "").strip()
# Do not write placeholder "(fetching…)" over a real one
if snip == "(fetching…)":
# Only seed placeholder if nothing exists yet
if not existing:
meta_map[u] = snip
else:
# Real summary: overwrite empty, placeholder, or different content
if not existing or existing == "(fetching…)" or snip != existing:
meta_map[u] = snip
# Render union with per-convo meta
visited_union_html = render_visited_html(visited_list, meta=meta_map)
# Open the Sources sidebar immediately on first send if currently closed; otherwise keep as-is.
# panel_update = gr.update(open=True) if not sources_open else gr.update()
# sources_open = True if not sources_open else sources_open
yield (
updated_history,
new_history,
gr.update(choices=update_conversation_list(conversations), value=conversations[convo_id]["title"]),
conversations,
raw_prompts,
gr.update(value=visited_union_html),
gr.update(value=""),
# panel_update,
# sources_open,
# gr.update(value = "DeepResearch ON" if deepresearch_on_value else "DeepResearch OFF", variant="huggingface" if deepresearch_on_value else "secondary"),
# deepresearch_on_value,
)
def clear_current(user_convo_id, conversations, raw_prompts):
"""
UI clear + memory clear for the *current* conversation.
"""
conversations, raw_prompts = _clear_current_conversation(user_convo_id, conversations, raw_prompts)
if user_convo_id in conversations:
conversations[user_convo_id].setdefault("sources", {"visited": [], "meta": {}})
conversations[user_convo_id]["sources"]["visited"] = []
conversations[user_convo_id]["sources"]["meta"] = {}
return (
[], # chatbot (cleared)
[], # history_state (cleared)
gr.update(choices=update_conversation_list(conversations),
value=conversations.get(user_convo_id, {}).get("title", "")),
conversations,
raw_prompts,
gr.update(value=""), # visited_sources_html cleared
gr.update(value=""), # search_sources_html cleared (kept as hidden placeholder)
)
submit_button.click(
fn=send_message,
inputs=[user_input, max_tokens_slider, temperature_slider, top_p_slider, current_convo_id, history_state, conversations_state, raw_prompts_state, deepresearch_on, summary_llm],
outputs=[chatbot, history_state, conversation_selector, conversations_state, raw_prompts_state, visited_sources_html, search_sources_html], #, search_button, deepresearch_on],
concurrency_limit=16
).then(
fn=lambda: gr.update(value=""),
inputs=None,
outputs=user_input
)
# Clear also wipes raw_prompts_state for this conversation
clear_button.click(
fn=clear_current,
inputs=[current_convo_id, conversations_state, raw_prompts_state],
outputs=[chatbot, history_state, conversation_selector, conversations_state, raw_prompts_state, visited_sources_html, search_sources_html]
)
new_convo_button.click(
fn=start_new_conversation,
inputs=[conversations_state],
outputs=[current_convo_id, history_state, conversation_selector, conversations_state, search_button, deepresearch_on]
).then(
fn=lambda: gr.update(value=""),
inputs=None,
outputs=visited_sources_html
)
conversation_selector.change(
fn=load_conversation,
inputs=[conversation_selector, conversations_state],
outputs=[current_convo_id, history_state, chatbot, search_button, deepresearch_on]
).then(
fn=get_visited_html_for_selected,
inputs=[conversation_selector, conversations_state],
outputs=visited_sources_html
)
example1_button.click(fn=lambda: gr.update(value=example_messages["DeepResearch on the latest GST policy"]), inputs=None, outputs=user_input)
example2_button.click(fn=lambda: gr.update(value=example_messages["UPSC 2025 Prelims Question"]), inputs=None, outputs=user_input)
example3_button.click(fn=lambda: gr.update(value=example_messages["DeepResearch on inflammation in treatments for cancer"]), inputs=None, outputs=user_input)
example4_button.click(fn=lambda: gr.update(value=example_messages["Which Indian math model was post-trained for ~$499?"]), inputs=None, outputs=user_input)
example5_button.click(fn=lambda: gr.update(value=example_messages["DeepResearch analysis on EV Adoption"]), inputs=None, outputs=user_input)
# Prefill user input with a random example on initial page load
demo.load(fn=pick_random_example, inputs=None, outputs=user_input)
# Wire up sources sidebar interactions
sources_sidebar.expand(
fn=mark_sources_open,
inputs=None,
outputs=[sources_visible]
)
sources_sidebar.collapse(
fn=mark_sources_closed,
inputs=None,
outputs=[sources_visible]
)
sources_button.click(
fn=toggle_sources,
inputs=[sources_visible],
outputs=[sources_sidebar, sources_visible]
)
search_button.click(
fn=toggle_search,
inputs=[deepresearch_on, current_convo_id, conversations_state],
outputs=[search_button, deepresearch_on, conversations_state]
)
# ).then(
# fn=lambda x: print(f"Search toggled to {x}"),
# inputs=deepresearch_on,
# outputs=None
# )
# demo.launch(share=True, ssr_mode=False)
if __name__ == "__main__":
demo.queue().launch(share=True, ssr_mode=False)