import os import time import re import html from typing import Optional, Dict, Any, List from urllib.parse import urlsplit from datetime import datetime, timezone import httpx import trafilatura import gradio as gr from dateutil import parser as dateparser from limits import parse from limits.aio.storage import MemoryStorage from limits.aio.strategies import MovingWindowRateLimiter from analytics import record_request, last_n_days_count_df # ────────────────────────────────────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────────────────────────────────────── SERPER_API_KEY = os.getenv("SERPER_API_KEY") SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" HEADERS = {"X-API-KEY": SERPER_API_KEY or "", "Content-Type": "application/json"} # Rate limiting (shared by both tools) storage = MemoryStorage() limiter = MovingWindowRateLimiter(storage) rate_limit = parse("360/hour") # shared global limit across search + fetch # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _domain_from_url(url: str) -> str: try: netloc = urlsplit(url).netloc return netloc.replace("www.", "") except Exception: return "" def _iso_date_or_unknown(date_str: Optional[str]) -> Optional[str]: if not date_str: return None try: return dateparser.parse(date_str, fuzzy=True).strftime("%Y-%m-%d") except Exception: return None def _extract_title_from_html(html_text: str) -> Optional[str]: m = re.search(r"]*>(.*?)", html_text, re.IGNORECASE | re.DOTALL) if not m: return None title = re.sub(r"\s+", " ", m.group(1)).strip() return html.unescape(title) if title else None # ────────────────────────────────────────────────────────────────────────────── # Tool: search (metadata only) # ────────────────────────────────────────────────────────────────────────────── async def search( query: str, search_type: str = "search", num_results: Optional[int] = 4 ) -> Dict[str, Any]: """ Perform a web or news search via Serper and return metadata ONLY. Does NOT fetch or extract content from result URLs. """ start_time = time.time() # Validate inputs if not query or not query.strip(): await record_request("search") return {"error": "Missing 'query'. Please provide a search query string."} if num_results is None: num_results = 4 num_results = max(1, min(20, int(num_results))) if search_type not in ["search", "news"]: search_type = "search" # Check API key if not SERPER_API_KEY: await record_request("search") return { "error": "SERPER_API_KEY is not set. Export SERPER_API_KEY and try again." } try: # Rate limit if not await limiter.hit(rate_limit, "global"): await record_request("search") return {"error": "Rate limit exceeded. Limit: 360 requests/hour."} endpoint = ( SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT ) payload: Dict[str, Any] = {"q": query, "num": num_results} if search_type == "news": payload["type"] = "news" payload["page"] = 1 async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(endpoint, headers=HEADERS, json=payload) if resp.status_code != 200: await record_request("search") return { "error": f"Search API returned status {resp.status_code}. Check your API key and query." } data = resp.json() raw_results: List[Dict[str, Any]] = ( data.get("news", []) if search_type == "news" else data.get("organic", []) ) if not raw_results: await record_request("search") return { "query": query, "search_type": search_type, "count": 0, "results": [], "message": f"No {search_type} results found.", } formatted: List[Dict[str, Any]] = [] for idx, r in enumerate(raw_results[:num_results], start=1): item = { "position": idx, "title": r.get("title"), "link": r.get("link"), "domain": _domain_from_url(r.get("link", "")), "snippet": r.get("snippet") or r.get("description"), } if search_type == "news": item["source"] = r.get("source") item["date"] = _iso_date_or_unknown(r.get("date")) formatted.append(item) await record_request("search") return { "query": query, "search_type": search_type, "count": len(formatted), "results": formatted, "duration_s": round(time.time() - start_time, 2), } except Exception as e: await record_request("search") return {"error": f"Search failed: {str(e)}"} # ────────────────────────────────────────────────────────────────────────────── # Tool: fetch (single URL fetch + extraction) # ────────────────────────────────────────────────────────────────────────────── async def fetch(url: str, timeout: int = 20) -> Dict[str, Any]: """ Fetch a single URL and extract the main readable content. """ start_time = time.time() if not url or not isinstance(url, str): await record_request("fetch") return {"error": "Missing 'url'. Please provide a valid URL string."} if not url.lower().startswith(("http://", "https://")): await record_request("fetch") return {"error": "URL must start with http:// or https://."} try: # Rate limit if not await limiter.hit(rate_limit, "global"): await record_request("fetch") return {"error": "Rate limit exceeded. Limit: 360 requests/hour."} async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: resp = await client.get(url) text = resp.text or "" content = ( trafilatura.extract( text, include_formatting=False, include_comments=False, ) or "" ) title = _extract_title_from_html(text) or "" final_url_str = str(resp.url) if hasattr(resp, "url") else url domain = _domain_from_url(final_url_str) word_count = len(content.split()) if content else 0 result = { "url": url, "final_url": final_url_str, "domain": domain, "status_code": resp.status_code, "title": title, "fetched_at": datetime.now(timezone.utc).isoformat(), "word_count": word_count, "content": content.strip(), "duration_s": round(time.time() - start_time, 2), } await record_request("fetch") return result except httpx.HTTPError as e: await record_request("fetch") return {"error": f"Network error while fetching: {str(e)}"} except Exception as e: await record_request("fetch") return {"error": f"Unexpected error while fetching: {str(e)}"} # ────────────────────────────────────────────────────────────────────────────── # Gradio UI # ────────────────────────────────────────────────────────────────────────────── with gr.Blocks(title="Web Search MCP Server") as demo: gr.HTML( """

🤝 Community resource — please use responsibly to keep this service available for everyone

""" ) gr.Markdown("# 🔍 Web Search MCP Server") gr.Markdown( "This server provides two composable MCP tools: **search** (metadata only) and **fetch** (single-URL extraction)." ) with gr.Tabs(): with gr.Tab("App"): with gr.Row(): # ── Search panel ─────────────────────────────────────────────── with gr.Column(scale=3): gr.Markdown("## Search (metadata only)") query_input = gr.Textbox( label="Search Query", placeholder='e.g. "OpenAI news", "climate change 2024", "React hooks useState"', info="Required", ) search_type_input = gr.Radio( choices=["search", "news"], value="search", label="Search Type", info="Choose general web search or news", ) num_results_input = gr.Slider( minimum=1, maximum=20, value=4, step=1, label="Number of Results", info="Optional (default 4)", ) search_button = gr.Button("Run Search", variant="primary") search_output = gr.JSON( label="Search Results (metadata only)", ) gr.Examples( examples=[ ["OpenAI GPT-5 latest developments", "news", 5], ["React hooks useState", "search", 4], ["Apple Vision Pro reviews", "search", 4], ["Tesla stock price today", "news", 6], ], inputs=[query_input, search_type_input, num_results_input], outputs=search_output, fn=search, cache_examples=False, ) # ── Fetch panel ──────────────────────────────────────────────── with gr.Column(scale=2): gr.Markdown("## Fetch (single URL → extracted content)") url_input = gr.Textbox( label="URL", placeholder="https://example.com/article", info="Required: the URL to fetch and extract", ) timeout_input = gr.Slider( minimum=5, maximum=60, value=20, step=1, label="Timeout (seconds)", info="Optional (default 20)", ) fetch_button = gr.Button("Fetch & Extract", variant="primary") fetch_output = gr.JSON(label="Fetched Content (structured)") gr.Examples( examples=[ ["https://news.ycombinator.com/"], ["https://www.python.org/dev/peps/pep-0008/"], ["https://en.wikipedia.org/wiki/Model_Context_Protocol"], ], inputs=[url_input], outputs=fetch_output, fn=fetch, cache_examples=False, ) # Wire up buttons search_button.click( fn=search, inputs=[query_input, search_type_input, num_results_input], outputs=search_output, api_name=False, ) fetch_button.click( fn=fetch, inputs=[url_input, timeout_input], outputs=fetch_output, api_name=False, ) with gr.Tab("Analytics"): gr.Markdown("## Community Usage Analytics") gr.Markdown("Daily request counts (UTC), split by tool.") with gr.Row(): with gr.Column(): search_plot = gr.BarPlot( value=last_n_days_count_df("search", 14), x="date", y="count", title="Daily Search Count", tooltip=["date", "count", "full_date"], height=350, x_label_angle=-45, container=False, ) with gr.Column(): fetch_plot = gr.BarPlot( value=last_n_days_count_df("fetch", 14), x="date", y="count", title="Daily Fetch Count", tooltip=["date", "count", "full_date"], height=350, x_label_angle=-45, container=False, ) # Refresh analytics on load demo.load( fn=lambda: ( last_n_days_count_df("search", 14), last_n_days_count_df("fetch", 14), ), outputs=[search_plot, fetch_plot], api_name=False, ) # Expose MCP tools gr.api(search, api_name="search") gr.api(fetch, api_name="fetch") if __name__ == "__main__": # Launch with MCP server enabled demo.launch(mcp_server=True, show_api=True)