Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from azure.cosmos import CosmosClient, exceptions | |
| import os | |
| import pandas as pd | |
| import traceback | |
| import shutil | |
| from github import Github | |
| from git import Repo | |
| from datetime import datetime | |
| import base64 | |
| import json | |
| import uuid # π² For generating unique IDs | |
| from urllib.parse import quote # π For encoding URLs | |
| from gradio_client import Client # π For connecting to Gradio apps | |
| import anthropic | |
| import pytz | |
| import re | |
| from PIL import Image | |
| import glob | |
| from streamlit.components.v1 import html | |
| # π Welcome to our epic Cosmos DB, GitHub, and Claude App! The universe is at your command π | |
| st.set_page_config( | |
| page_title="π€GitπCosmosπ« & Claudeπ§ ", | |
| page_icon="π€ππ«π", | |
| layout="wide", | |
| initial_sidebar_state="auto", | |
| menu_items={ | |
| 'Get Help': 'https://huggingface.co/awacke1', | |
| 'Report a bug': 'https://huggingface.co/spaces/awacke1', | |
| 'About': 'πGitπCosmosπ« - Azure Cosmos DB and GitHub Agent, Now with Claude!' | |
| } | |
| ) | |
| # π Cosmos DB configuration | |
| ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
| DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
| CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
| Key = os.environ.get("Key") # π Don't forget your key! | |
| # Set up the Anthropic client (Claude time π€π§ ) | |
| client_anthropic = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) | |
| # Set up Gradio client for ArXiv queries (ArXiv, the scholar's playground π) | |
| client_gradio = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
| # π GitHub configuration | |
| def download_github_repo(url, local_path): | |
| # π Let's download that GitHub repo! | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| Repo.clone_from(url, local_path) | |
| def create_zip_file(source_dir, output_filename): | |
| # π¦ Zipping up files like a pro! | |
| shutil.make_archive(output_filename, 'zip', source_dir) | |
| def create_repo(g, repo_name): | |
| # π οΈ Creating a new GitHub repo. It's alive! | |
| user = g.get_user() | |
| return user.create_repo(repo_name) | |
| def push_to_github(local_path, repo, github_token): | |
| # π Pushing code to GitHub. Blast off! | |
| repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
| local_repo = Repo(local_path) | |
| if 'origin' in [remote.name for remote in local_repo.remotes]: | |
| #if 'origin' in [remote.name for remote.local_repo.remotes]: --error | |
| origin = local_repo.remote('origin') | |
| origin.set_url(repo_url) | |
| else: | |
| origin = local_repo.create_remote('origin', repo_url) | |
| if not local_repo.heads: | |
| local_repo.git.checkout('-b', 'main') | |
| current_branch = 'main' | |
| else: | |
| current_branch = local_repo.active_branch.name | |
| local_repo.git.add(A=True) | |
| if local_repo.is_dirty(): | |
| local_repo.git.commit('-m', 'Initial commit') | |
| origin.push(refspec=f'{current_branch}:{current_branch}') | |
| def get_base64_download_link(file_path, file_name): | |
| # π§ββοΈ Generating a magical download link! | |
| with open(file_path, "rb") as file: | |
| contents = file.read() | |
| base64_encoded = base64.b64encode(contents).decode() | |
| return f'<a href="data:application/zip;base64,{base64_encoded}" download="{file_name}">β¬οΈ Download {file_name}</a>' | |
| # π Cosmos DB functions | |
| def insert_record(container, record): | |
| try: | |
| container.create_item(body=record) | |
| return True, "Record inserted successfully! π" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error occurred: {str(e)} π¨" | |
| except Exception as e: | |
| return False, f"An unexpected error occurred: {str(e)} π±" | |
| # π² Function to generate a unique UUID | |
| def generate_unique_id(): | |
| return str(uuid.uuid4()) | |
| # Claude π§ Chat handling (Wise responses guaranteed! π€) | |
| def chat_with_claude(user_input): | |
| response = client_anthropic.messages.create( | |
| model="claude-3-sonnet-20240229", | |
| max_tokens=1000, | |
| messages=[ | |
| {"role": "user", "content": user_input} | |
| ] | |
| ) | |
| return response.content[0].text | |
| # File handling (Save that wisdom in files! πΎ) | |
| def generate_filename(prompt, file_type): | |
| central = pytz.timezone('US/Central') | |
| safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
| safe_prompt = re.sub(r'\W+', '_', prompt)[:90] | |
| return f"{safe_date_time}_{safe_prompt}.{file_type}" | |
| def create_file(filename, prompt, response, should_save=True): | |
| if not should_save: | |
| return | |
| with open(filename, 'w', encoding='utf-8') as file: | |
| file.write(prompt + "\n\n" + response) | |
| def load_file(file_name): | |
| with open(file_name, "r", encoding='utf-8') as file: | |
| content = file.read() | |
| return content | |
| # π¨ Image and media handling (Let's look at some visuals! πΈπ¬πΆ) | |
| def get_video_html(video_path, width="100%"): | |
| video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
| return f''' | |
| <video width="{width}" controls autoplay muted loop> | |
| <source src="{video_url}" type="video/mp4"> | |
| Your browser does not support the video tag. | |
| </video> | |
| ''' | |
| def get_audio_html(audio_path, width="100%"): | |
| audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
| return f''' | |
| <audio controls style="width: {width};"> | |
| <source src="{audio_url}" type="audio/mpeg"> | |
| Your browser does not support the audio element. | |
| </audio> | |
| ''' | |
| # Streamlit layout (because a clean UI is key! π ) | |
| def main(): | |
| st.title("πGitπCosmosπ« & Claudeπ§ - The Ultimate App") | |
| # Sidebar navigation (The command center π§) | |
| st.sidebar.title("π§ Claudeπ & Cosmos Explorer") | |
| # Cosmos DB Section | |
| st.sidebar.header("Cosmos DB Controls") | |
| if Key: | |
| st.session_state.primary_key = Key | |
| st.session_state.logged_in = True | |
| if st.session_state.logged_in: | |
| st.sidebar.write("π Connected to Cosmos DB!") | |
| # Fetch documents | |
| cosmos_client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
| databases = [db['id'] for db in cosmos_client.list_databases()] | |
| selected_db = st.sidebar.selectbox("Select Database", databases) | |
| if selected_db: | |
| db_client = cosmos_client.get_database_client(selected_db) | |
| containers = [container['id'] for container in db_client.list_containers()] | |
| selected_container = st.sidebar.selectbox("Select Container", containers) | |
| if selected_container: | |
| container = db_client.get_container_client(selected_container) | |
| documents = list(container.read_all_items()) | |
| if documents: | |
| st.write("π Document List:") | |
| df = pd.DataFrame(documents) | |
| st.dataframe(df) | |
| else: | |
| st.write("π No documents in this container.") | |
| # Claude Chat Section | |
| st.header("Chat with Claude π€π§ ") | |
| user_input = st.text_area("Ask Claude anything:") | |
| if st.button("Send"): | |
| if user_input: | |
| response = chat_with_claude(user_input) | |
| st.write(f"Claude says: {response}") | |
| filename = generate_filename(user_input, "md") | |
| create_file(filename, user_input, response) | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| st.session_state.chat_history.append({"user": user_input, "claude": response}) | |
| if "chat_history" in st.session_state: | |
| st.subheader("Past Conversations π") | |
| for chat in st.session_state.chat_history: | |
| st.text_area("You said π¬:", chat["user"], height=100, disabled=True) | |
| st.text_area("Claude replied π€:", chat["claude"], height=200, disabled=True) | |
| st.markdown("---") | |
| # Media Galleries (For those who prefer images and videos πΌπ₯πΆ) | |
| st.subheader("Image Gallery πΌ") | |
| image_files = glob.glob("*.png") + glob.glob("*.jpg") | |
| cols = st.columns(3) | |
| for idx, image_file in enumerate(image_files): | |
| with cols[idx % 3]: | |
| img = Image.open(image_file) | |
| st.image(img) | |
| st.subheader("Video Gallery π₯") | |
| video_files = glob.glob("*.mp4") | |
| for video_file in video_files: | |
| st.markdown(get_video_html(video_file), unsafe_allow_html=True) | |
| st.subheader("Audio Gallery πΆ") | |
| audio_files = glob.glob("*.mp3") + glob.glob("*.wav") | |
| for audio_file in audio_files: | |
| st.markdown(get_audio_html(audio_file), unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |