|
|
import os |
|
|
import time |
|
|
import re |
|
|
import html |
|
|
from typing import Optional, Dict, Any, List |
|
|
from urllib.parse import urlsplit |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
import httpx |
|
|
import trafilatura |
|
|
import gradio as gr |
|
|
from dateutil import parser as dateparser |
|
|
from limits import parse |
|
|
from limits.aio.storage import MemoryStorage |
|
|
from limits.aio.strategies import MovingWindowRateLimiter |
|
|
|
|
|
from analytics import record_request, last_n_days_count_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SERPER_API_KEY = os.getenv("SERPER_API_KEY") |
|
|
SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" |
|
|
SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" |
|
|
HEADERS = {"X-API-KEY": SERPER_API_KEY or "", "Content-Type": "application/json"} |
|
|
|
|
|
|
|
|
storage = MemoryStorage() |
|
|
limiter = MovingWindowRateLimiter(storage) |
|
|
rate_limit = parse("360/hour") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _domain_from_url(url: str) -> str: |
|
|
try: |
|
|
netloc = urlsplit(url).netloc |
|
|
return netloc.replace("www.", "") |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
|
|
|
def _iso_date_or_unknown(date_str: Optional[str]) -> Optional[str]: |
|
|
if not date_str: |
|
|
return None |
|
|
try: |
|
|
return dateparser.parse(date_str, fuzzy=True).strftime("%Y-%m-%d") |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
def _extract_title_from_html(html_text: str) -> Optional[str]: |
|
|
m = re.search(r"<title[^>]*>(.*?)</title>", html_text, re.IGNORECASE | re.DOTALL) |
|
|
if not m: |
|
|
return None |
|
|
title = re.sub(r"\s+", " ", m.group(1)).strip() |
|
|
return html.unescape(title) if title else None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def search( |
|
|
query: str, search_type: str = "search", num_results: Optional[int] = 4 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform a web or news search via Serper and return metadata ONLY. |
|
|
Does NOT fetch or extract content from result URLs. |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
if not query or not query.strip(): |
|
|
await record_request("search") |
|
|
return {"error": "Missing 'query'. Please provide a search query string."} |
|
|
|
|
|
if num_results is None: |
|
|
num_results = 4 |
|
|
num_results = max(1, min(20, int(num_results))) |
|
|
if search_type not in ["search", "news"]: |
|
|
search_type = "search" |
|
|
|
|
|
|
|
|
if not SERPER_API_KEY: |
|
|
await record_request("search") |
|
|
return { |
|
|
"error": "SERPER_API_KEY is not set. Export SERPER_API_KEY and try again." |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
if not await limiter.hit(rate_limit, "global"): |
|
|
await record_request("search") |
|
|
return {"error": "Rate limit exceeded. Limit: 360 requests/hour."} |
|
|
|
|
|
endpoint = ( |
|
|
SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT |
|
|
) |
|
|
payload: Dict[str, Any] = {"q": query, "num": num_results} |
|
|
if search_type == "news": |
|
|
payload["type"] = "news" |
|
|
payload["page"] = 1 |
|
|
|
|
|
async with httpx.AsyncClient(timeout=15) as client: |
|
|
resp = await client.post(endpoint, headers=HEADERS, json=payload) |
|
|
|
|
|
if resp.status_code != 200: |
|
|
await record_request("search") |
|
|
return { |
|
|
"error": f"Search API returned status {resp.status_code}. Check your API key and query." |
|
|
} |
|
|
|
|
|
data = resp.json() |
|
|
raw_results: List[Dict[str, Any]] = ( |
|
|
data.get("news", []) if search_type == "news" else data.get("organic", []) |
|
|
) |
|
|
if not raw_results: |
|
|
await record_request("search") |
|
|
return { |
|
|
"query": query, |
|
|
"search_type": search_type, |
|
|
"count": 0, |
|
|
"results": [], |
|
|
"message": f"No {search_type} results found.", |
|
|
} |
|
|
|
|
|
formatted: List[Dict[str, Any]] = [] |
|
|
for idx, r in enumerate(raw_results[:num_results], start=1): |
|
|
item = { |
|
|
"position": idx, |
|
|
"title": r.get("title"), |
|
|
"link": r.get("link"), |
|
|
"domain": _domain_from_url(r.get("link", "")), |
|
|
"snippet": r.get("snippet") or r.get("description"), |
|
|
} |
|
|
if search_type == "news": |
|
|
item["source"] = r.get("source") |
|
|
item["date"] = _iso_date_or_unknown(r.get("date")) |
|
|
formatted.append(item) |
|
|
|
|
|
await record_request("search") |
|
|
return { |
|
|
"query": query, |
|
|
"search_type": search_type, |
|
|
"count": len(formatted), |
|
|
"results": formatted, |
|
|
"duration_s": round(time.time() - start_time, 2), |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
await record_request("search") |
|
|
return {"error": f"Search failed: {str(e)}"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch(url: str, timeout: int = 20) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch a single URL and extract the main readable content. |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
if not url or not isinstance(url, str): |
|
|
await record_request("fetch") |
|
|
return {"error": "Missing 'url'. Please provide a valid URL string."} |
|
|
if not url.lower().startswith(("http://", "https://")): |
|
|
await record_request("fetch") |
|
|
return {"error": "URL must start with http:// or https://."} |
|
|
|
|
|
try: |
|
|
|
|
|
if not await limiter.hit(rate_limit, "global"): |
|
|
await record_request("fetch") |
|
|
return {"error": "Rate limit exceeded. Limit: 360 requests/hour."} |
|
|
|
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: |
|
|
resp = await client.get(url) |
|
|
|
|
|
text = resp.text or "" |
|
|
content = ( |
|
|
trafilatura.extract( |
|
|
text, |
|
|
include_formatting=False, |
|
|
include_comments=False, |
|
|
) |
|
|
or "" |
|
|
) |
|
|
|
|
|
title = _extract_title_from_html(text) or "" |
|
|
final_url_str = str(resp.url) if hasattr(resp, "url") else url |
|
|
domain = _domain_from_url(final_url_str) |
|
|
word_count = len(content.split()) if content else 0 |
|
|
|
|
|
result = { |
|
|
"url": url, |
|
|
"final_url": final_url_str, |
|
|
"domain": domain, |
|
|
"status_code": resp.status_code, |
|
|
"title": title, |
|
|
"fetched_at": datetime.now(timezone.utc).isoformat(), |
|
|
"word_count": word_count, |
|
|
"content": content.strip(), |
|
|
"duration_s": round(time.time() - start_time, 2), |
|
|
} |
|
|
|
|
|
await record_request("fetch") |
|
|
return result |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
await record_request("fetch") |
|
|
return {"error": f"Network error while fetching: {str(e)}"} |
|
|
except Exception as e: |
|
|
await record_request("fetch") |
|
|
return {"error": f"Unexpected error while fetching: {str(e)}"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Web Search MCP Server") as demo: |
|
|
gr.HTML( |
|
|
""" |
|
|
<div style="background-color: rgba(59, 130, 246, 0.1); border: 1px solid rgba(59, 130, 246, 0.3); border-radius: 8px; padding: 12px; margin-bottom: 16px; text-align: center;"> |
|
|
<p style="color: rgb(59, 130, 246); margin: 0; font-size: 14px; font-weight: 500;"> |
|
|
π€ Community resource β please use responsibly to keep this service available for everyone |
|
|
</p> |
|
|
</div> |
|
|
""" |
|
|
) |
|
|
|
|
|
gr.Markdown("# π Web Search MCP Server") |
|
|
gr.Markdown( |
|
|
"This server provides two composable MCP tools: **search** (metadata only) and **fetch** (single-URL extraction)." |
|
|
) |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.Tab("App"): |
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
gr.Markdown("## Search (metadata only)") |
|
|
query_input = gr.Textbox( |
|
|
label="Search Query", |
|
|
placeholder='e.g. "OpenAI news", "climate change 2024", "React hooks useState"', |
|
|
info="Required", |
|
|
) |
|
|
search_type_input = gr.Radio( |
|
|
choices=["search", "news"], |
|
|
value="search", |
|
|
label="Search Type", |
|
|
info="Choose general web search or news", |
|
|
) |
|
|
num_results_input = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=20, |
|
|
value=4, |
|
|
step=1, |
|
|
label="Number of Results", |
|
|
info="Optional (default 4)", |
|
|
) |
|
|
search_button = gr.Button("Run Search", variant="primary") |
|
|
search_output = gr.JSON( |
|
|
label="Search Results (metadata only)", |
|
|
) |
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["OpenAI GPT-5 latest developments", "news", 5], |
|
|
["React hooks useState", "search", 4], |
|
|
["Apple Vision Pro reviews", "search", 4], |
|
|
["Tesla stock price today", "news", 6], |
|
|
], |
|
|
inputs=[query_input, search_type_input, num_results_input], |
|
|
outputs=search_output, |
|
|
fn=search, |
|
|
cache_examples=False, |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## Fetch (single URL β extracted content)") |
|
|
url_input = gr.Textbox( |
|
|
label="URL", |
|
|
placeholder="https://example.com/article", |
|
|
info="Required: the URL to fetch and extract", |
|
|
) |
|
|
timeout_input = gr.Slider( |
|
|
minimum=5, |
|
|
maximum=60, |
|
|
value=20, |
|
|
step=1, |
|
|
label="Timeout (seconds)", |
|
|
info="Optional (default 20)", |
|
|
) |
|
|
fetch_button = gr.Button("Fetch & Extract", variant="primary") |
|
|
fetch_output = gr.JSON(label="Fetched Content (structured)") |
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["https://news.ycombinator.com/"], |
|
|
["https://www.python.org/dev/peps/pep-0008/"], |
|
|
["https://en.wikipedia.org/wiki/Model_Context_Protocol"], |
|
|
], |
|
|
inputs=[url_input], |
|
|
outputs=fetch_output, |
|
|
fn=fetch, |
|
|
cache_examples=False, |
|
|
) |
|
|
|
|
|
|
|
|
search_button.click( |
|
|
fn=search, |
|
|
inputs=[query_input, search_type_input, num_results_input], |
|
|
outputs=search_output, |
|
|
api_name=False, |
|
|
) |
|
|
fetch_button.click( |
|
|
fn=fetch, |
|
|
inputs=[url_input, timeout_input], |
|
|
outputs=fetch_output, |
|
|
api_name=False, |
|
|
) |
|
|
|
|
|
with gr.Tab("Analytics"): |
|
|
gr.Markdown("## Community Usage Analytics") |
|
|
gr.Markdown("Daily request counts (UTC), split by tool.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
search_plot = gr.BarPlot( |
|
|
value=last_n_days_count_df("search", 14), |
|
|
x="date", |
|
|
y="count", |
|
|
title="Daily Search Count", |
|
|
tooltip=["date", "count", "full_date"], |
|
|
height=350, |
|
|
x_label_angle=-45, |
|
|
container=False, |
|
|
) |
|
|
with gr.Column(): |
|
|
fetch_plot = gr.BarPlot( |
|
|
value=last_n_days_count_df("fetch", 14), |
|
|
x="date", |
|
|
y="count", |
|
|
title="Daily Fetch Count", |
|
|
tooltip=["date", "count", "full_date"], |
|
|
height=350, |
|
|
x_label_angle=-45, |
|
|
container=False, |
|
|
) |
|
|
|
|
|
|
|
|
demo.load( |
|
|
fn=lambda: ( |
|
|
last_n_days_count_df("search", 14), |
|
|
last_n_days_count_df("fetch", 14), |
|
|
), |
|
|
outputs=[search_plot, fetch_plot], |
|
|
api_name=False, |
|
|
) |
|
|
|
|
|
|
|
|
gr.api(search, api_name="search") |
|
|
gr.api(fetch, api_name="fetch") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.launch(mcp_server=True, show_api=True) |
|
|
|