Spaces:
Runtime error
Runtime error
| from typing import List, Dict | |
| import httpx | |
| import gradio as gr | |
| import pandas as pd | |
| from huggingface_hub import HfApi, ModelCard | |
| import base64 | |
| import io | |
| import zipfile | |
| import asyncio | |
| import aiohttp | |
| from pathlib import Path | |
| import emoji | |
| def search_hub(query: str, search_type: str) -> pd.DataFrame: | |
| api = HfApi() | |
| if search_type == "Models": | |
| results = api.list_models(search=query) | |
| data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results] | |
| elif search_type == "Datasets": | |
| results = api.list_datasets(search=query) | |
| data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results] | |
| elif search_type == "Spaces": | |
| results = api.list_spaces(search=query) | |
| data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results] | |
| else: | |
| data = [] | |
| for i, item in enumerate(data, 1): | |
| item['number'] = i | |
| item['formatted_link'] = format_link(item, i, search_type) | |
| return pd.DataFrame(data) | |
| def format_link(item: Dict, number: int, search_type: str) -> str: | |
| link = item['link'] | |
| readme_link = f"{link}/blob/main/README.md" | |
| title = f"{number}. {item['id']}" | |
| metadata = f"Author: {item['author']}" | |
| if 'downloads' in item: | |
| metadata += f", Downloads: {item['downloads']}" | |
| html = f""" | |
| <div style="margin-bottom: 10px;"> | |
| <strong>{title}</strong><br> | |
| <a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> | | |
| <a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br> | |
| <small>{metadata}</small> | |
| </div> | |
| """ | |
| return html | |
| async def download_readme(session: aiohttp.ClientSession, item: Dict) -> tuple[str, str]: | |
| """Download README.md file for a given item.""" | |
| item_id = item['id'] | |
| raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md" | |
| try: | |
| async with session.get(raw_url) as response: | |
| if response.status == 200: | |
| content = await response.text() | |
| return item_id.replace('/', '_'), content | |
| return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}" | |
| except Exception as e: | |
| return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}" | |
| async def download_all_readmes(data: List[Dict]) -> tuple[str, str]: | |
| """Download all README files and create a zip archive.""" | |
| if not data: | |
| return "", "No results to download" | |
| zip_buffer = io.BytesIO() | |
| status_message = "Downloading READMEs..." | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [download_readme(session, item) for item in data] | |
| results = await asyncio.gather(*tasks) | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for filename, content in results: | |
| zip_file.writestr(f"{filename}.md", content) | |
| zip_buffer.seek(0) | |
| base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() | |
| download_link = f""" | |
| <div style="margin-top: 10px;"> | |
| <a href="data:application/zip;base64,{base64_zip}" | |
| download="readmes.zip" | |
| style="display: inline-block; padding: 10px 20px; | |
| background-color: #4CAF50; color: white; | |
| text-decoration: none; border-radius: 5px;"> | |
| 📥 Download READMEs Archive | |
| </a> | |
| </div> | |
| """ | |
| return download_link, "READMEs ready for download!" | |
| def display_results(df: pd.DataFrame): | |
| if df is not None and not df.empty: | |
| html = "<div style='max-height: 400px; overflow-y: auto;'>" | |
| for _, row in df.iterrows(): | |
| html += row['formatted_link'] | |
| html += "</div>" | |
| return html | |
| else: | |
| return "<p>No results found.</p>" | |
| def SwarmyTime(data: List[Dict]) -> Dict: | |
| """Aggregates all content from the given data.""" | |
| aggregated = { | |
| "total_items": len(data), | |
| "unique_authors": set(), | |
| "total_downloads": 0, | |
| "item_types": {"Models": 0, "Datasets": 0, "Spaces": 0} | |
| } | |
| for item in data: | |
| aggregated["unique_authors"].add(item.get("author", "Unknown")) | |
| aggregated["total_downloads"] += item.get("downloads", 0) | |
| if "modelId" in item: | |
| aggregated["item_types"]["Models"] += 1 | |
| elif "dataset" in item.get("id", ""): | |
| aggregated["item_types"]["Datasets"] += 1 | |
| else: | |
| aggregated["item_types"]["Spaces"] += 1 | |
| aggregated["unique_authors"] = len(aggregated["unique_authors"]) | |
| return aggregated | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Search the Hugging Face Hub") | |
| with gr.Row(): | |
| search_query = gr.Textbox(label="Search Query", value="awacke1") | |
| search_type = gr.Radio(["Models", "Datasets", "Spaces"], label="Search Type", value="Models") | |
| search_button = gr.Button("Search") | |
| results_html = gr.HTML(label="Search Results") | |
| download_button = gr.Button("📚 Download All READMEs", visible=False) | |
| download_status = gr.Markdown("", label="Download Status") | |
| download_area = gr.HTML("", label="Download Link") | |
| metadata_output = gr.Textbox(label="Metadata", lines=10) | |
| aggregated_output = gr.JSON(label="Aggregated Content") | |
| current_results = gr.State([]) | |
| def search_and_aggregate(query, search_type): | |
| df = search_hub(query, search_type) | |
| data = df.to_dict('records') | |
| aggregated = SwarmyTime(data) | |
| html_results = display_results(df) | |
| show_download = len(data) > 0 | |
| return [ | |
| html_results, # results_html | |
| show_download, # download_button visible | |
| "", # download_status | |
| "", # download_area | |
| aggregated, # aggregated_output | |
| data # current_results | |
| ] | |
| async def handle_download(data): | |
| if not data: | |
| return ["No results to download", ""] | |
| download_link, status = await download_all_readmes(data) | |
| return [status, download_link] | |
| search_button.click( | |
| search_and_aggregate, | |
| inputs=[search_query, search_type], | |
| outputs=[ | |
| results_html, | |
| download_button, | |
| download_status, | |
| download_area, | |
| aggregated_output, | |
| current_results | |
| ] | |
| ) | |
| download_button.click( | |
| handle_download, | |
| inputs=[current_results], | |
| outputs=[download_status, download_area] | |
| ) | |
| demo.launch(debug=True) |