Spaces:
victor
/
Runtime error

web / app.py
victor's picture
victor HF Staff
Refactor README and analytics for improved clarity and functionality; update app.py to enhance search and fetch tools with better error handling and analytics tracking.
58d88bf
raw
history blame
15.6 kB
import os
import time
import re
import html
from typing import Optional, Dict, Any, List
from urllib.parse import urlsplit
from datetime import datetime, timezone
import httpx
import trafilatura
import gradio as gr
from dateutil import parser as dateparser
from limits import parse
from limits.aio.storage import MemoryStorage
from limits.aio.strategies import MovingWindowRateLimiter
from analytics import record_request, last_n_days_count_df
# ──────────────────────────────────────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────────────────────────────────────
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search"
SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news"
HEADERS = {"X-API-KEY": SERPER_API_KEY or "", "Content-Type": "application/json"}
# Rate limiting (shared by both tools)
storage = MemoryStorage()
limiter = MovingWindowRateLimiter(storage)
rate_limit = parse("360/hour") # shared global limit across search + fetch
# ──────────────────────────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────────────────────────
def _domain_from_url(url: str) -> str:
try:
netloc = urlsplit(url).netloc
return netloc.replace("www.", "")
except Exception:
return ""
def _iso_date_or_unknown(date_str: Optional[str]) -> Optional[str]:
if not date_str:
return None
try:
return dateparser.parse(date_str, fuzzy=True).strftime("%Y-%m-%d")
except Exception:
return None
def _extract_title_from_html(html_text: str) -> Optional[str]:
m = re.search(r"<title[^>]*>(.*?)</title>", html_text, re.IGNORECASE | re.DOTALL)
if not m:
return None
title = re.sub(r"\s+", " ", m.group(1)).strip()
return html.unescape(title) if title else None
# ──────────────────────────────────────────────────────────────────────────────
# Tool: search (metadata only)
# ──────────────────────────────────────────────────────────────────────────────
async def search(
query: str, search_type: str = "search", num_results: Optional[int] = 4
) -> Dict[str, Any]:
"""
Perform a web or news search via Serper and return metadata ONLY.
Does NOT fetch or extract content from result URLs.
"""
start_time = time.time()
# Validate inputs
if not query or not query.strip():
await record_request("search")
return {"error": "Missing 'query'. Please provide a search query string."}
if num_results is None:
num_results = 4
num_results = max(1, min(20, int(num_results)))
if search_type not in ["search", "news"]:
search_type = "search"
# Check API key
if not SERPER_API_KEY:
await record_request("search")
return {
"error": "SERPER_API_KEY is not set. Export SERPER_API_KEY and try again."
}
try:
# Rate limit
if not await limiter.hit(rate_limit, "global"):
await record_request("search")
return {"error": "Rate limit exceeded. Limit: 360 requests/hour."}
endpoint = (
SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT
)
payload: Dict[str, Any] = {"q": query, "num": num_results}
if search_type == "news":
payload["type"] = "news"
payload["page"] = 1
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(endpoint, headers=HEADERS, json=payload)
if resp.status_code != 200:
await record_request("search")
return {
"error": f"Search API returned status {resp.status_code}. Check your API key and query."
}
data = resp.json()
raw_results: List[Dict[str, Any]] = (
data.get("news", []) if search_type == "news" else data.get("organic", [])
)
if not raw_results:
await record_request("search")
return {
"query": query,
"search_type": search_type,
"count": 0,
"results": [],
"message": f"No {search_type} results found.",
}
formatted: List[Dict[str, Any]] = []
for idx, r in enumerate(raw_results[:num_results], start=1):
item = {
"position": idx,
"title": r.get("title"),
"link": r.get("link"),
"domain": _domain_from_url(r.get("link", "")),
"snippet": r.get("snippet") or r.get("description"),
}
if search_type == "news":
item["source"] = r.get("source")
item["date"] = _iso_date_or_unknown(r.get("date"))
formatted.append(item)
await record_request("search")
return {
"query": query,
"search_type": search_type,
"count": len(formatted),
"results": formatted,
"duration_s": round(time.time() - start_time, 2),
}
except Exception as e:
await record_request("search")
return {"error": f"Search failed: {str(e)}"}
# ──────────────────────────────────────────────────────────────────────────────
# Tool: fetch (single URL fetch + extraction)
# ──────────────────────────────────────────────────────────────────────────────
async def fetch(url: str, timeout: int = 20) -> Dict[str, Any]:
"""
Fetch a single URL and extract the main readable content.
"""
start_time = time.time()
if not url or not isinstance(url, str):
await record_request("fetch")
return {"error": "Missing 'url'. Please provide a valid URL string."}
if not url.lower().startswith(("http://", "https://")):
await record_request("fetch")
return {"error": "URL must start with http:// or https://."}
try:
# Rate limit
if not await limiter.hit(rate_limit, "global"):
await record_request("fetch")
return {"error": "Rate limit exceeded. Limit: 360 requests/hour."}
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
resp = await client.get(url)
text = resp.text or ""
content = (
trafilatura.extract(
text,
include_formatting=False,
include_comments=False,
)
or ""
)
title = _extract_title_from_html(text) or ""
final_url_str = str(resp.url) if hasattr(resp, "url") else url
domain = _domain_from_url(final_url_str)
word_count = len(content.split()) if content else 0
result = {
"url": url,
"final_url": final_url_str,
"domain": domain,
"status_code": resp.status_code,
"title": title,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"word_count": word_count,
"content": content.strip(),
"duration_s": round(time.time() - start_time, 2),
}
await record_request("fetch")
return result
except httpx.HTTPError as e:
await record_request("fetch")
return {"error": f"Network error while fetching: {str(e)}"}
except Exception as e:
await record_request("fetch")
return {"error": f"Unexpected error while fetching: {str(e)}"}
# ──────────────────────────────────────────────────────────────────────────────
# Gradio UI
# ──────────────────────────────────────────────────────────────────────────────
with gr.Blocks(title="Web Search MCP Server") as demo:
gr.HTML(
"""
<div style="background-color: rgba(59, 130, 246, 0.1); border: 1px solid rgba(59, 130, 246, 0.3); border-radius: 8px; padding: 12px; margin-bottom: 16px; text-align: center;">
<p style="color: rgb(59, 130, 246); margin: 0; font-size: 14px; font-weight: 500;">
🀝 Community resource β€” please use responsibly to keep this service available for everyone
</p>
</div>
"""
)
gr.Markdown("# πŸ” Web Search MCP Server")
gr.Markdown(
"This server provides two composable MCP tools: **search** (metadata only) and **fetch** (single-URL extraction)."
)
with gr.Tabs():
with gr.Tab("App"):
with gr.Row():
# ── Search panel ───────────────────────────────────────────────
with gr.Column(scale=3):
gr.Markdown("## Search (metadata only)")
query_input = gr.Textbox(
label="Search Query",
placeholder='e.g. "OpenAI news", "climate change 2024", "React hooks useState"',
info="Required",
)
search_type_input = gr.Radio(
choices=["search", "news"],
value="search",
label="Search Type",
info="Choose general web search or news",
)
num_results_input = gr.Slider(
minimum=1,
maximum=20,
value=4,
step=1,
label="Number of Results",
info="Optional (default 4)",
)
search_button = gr.Button("Run Search", variant="primary")
search_output = gr.JSON(
label="Search Results (metadata only)",
)
gr.Examples(
examples=[
["OpenAI GPT-5 latest developments", "news", 5],
["React hooks useState", "search", 4],
["Apple Vision Pro reviews", "search", 4],
["Tesla stock price today", "news", 6],
],
inputs=[query_input, search_type_input, num_results_input],
outputs=search_output,
fn=search,
cache_examples=False,
)
# ── Fetch panel ────────────────────────────────────────────────
with gr.Column(scale=2):
gr.Markdown("## Fetch (single URL β†’ extracted content)")
url_input = gr.Textbox(
label="URL",
placeholder="https://example.com/article",
info="Required: the URL to fetch and extract",
)
timeout_input = gr.Slider(
minimum=5,
maximum=60,
value=20,
step=1,
label="Timeout (seconds)",
info="Optional (default 20)",
)
fetch_button = gr.Button("Fetch & Extract", variant="primary")
fetch_output = gr.JSON(label="Fetched Content (structured)")
gr.Examples(
examples=[
["https://news.ycombinator.com/"],
["https://www.python.org/dev/peps/pep-0008/"],
["https://en.wikipedia.org/wiki/Model_Context_Protocol"],
],
inputs=[url_input],
outputs=fetch_output,
fn=fetch,
cache_examples=False,
)
# Wire up buttons
search_button.click(
fn=search,
inputs=[query_input, search_type_input, num_results_input],
outputs=search_output,
api_name=False,
)
fetch_button.click(
fn=fetch,
inputs=[url_input, timeout_input],
outputs=fetch_output,
api_name=False,
)
with gr.Tab("Analytics"):
gr.Markdown("## Community Usage Analytics")
gr.Markdown("Daily request counts (UTC), split by tool.")
with gr.Row():
with gr.Column():
search_plot = gr.BarPlot(
value=last_n_days_count_df("search", 14),
x="date",
y="count",
title="Daily Search Count",
tooltip=["date", "count", "full_date"],
height=350,
x_label_angle=-45,
container=False,
)
with gr.Column():
fetch_plot = gr.BarPlot(
value=last_n_days_count_df("fetch", 14),
x="date",
y="count",
title="Daily Fetch Count",
tooltip=["date", "count", "full_date"],
height=350,
x_label_angle=-45,
container=False,
)
# Refresh analytics on load
demo.load(
fn=lambda: (
last_n_days_count_df("search", 14),
last_n_days_count_df("fetch", 14),
),
outputs=[search_plot, fetch_plot],
api_name=False,
)
# Expose MCP tools
gr.api(search, api_name="search")
gr.api(fetch, api_name="fetch")
if __name__ == "__main__":
# Launch with MCP server enabled
demo.launch(mcp_server=True, show_api=True)