Spaces:
Sleeping
Sleeping
| # Import libraries and references: | |
| import anthropic | |
| import base64 | |
| import glob | |
| import hashlib | |
| import json | |
| import os | |
| import pandas as pd | |
| import pytz | |
| import random | |
| import re | |
| import shutil | |
| import streamlit as st | |
| import time | |
| import traceback | |
| import uuid | |
| import zipfile | |
| from PIL import Image | |
| from azure.cosmos import CosmosClient, exceptions | |
| from datetime import datetime | |
| from git import Repo | |
| from github import Github | |
| from gradio_client import Client | |
| from urllib.parse import quote | |
| # π App Configuration - Because every app needs a good costume! | |
| Site_Name = 'πGitCosmosπ - AI Azure Cosmos DB and Github Agent' | |
| title = "πGitCosmosπ - AI Azure Cosmos DB and Github Agent" | |
| helpURL = 'https://huggingface.co/awacke1' | |
| bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' | |
| icons = 'πππ«' | |
| st.set_page_config( | |
| page_title=title, | |
| page_icon=icons, | |
| layout="wide", | |
| initial_sidebar_state="auto", | |
| menu_items={ | |
| 'Get Help': helpURL, | |
| 'Report a bug': bugURL, | |
| 'About': title | |
| } | |
| ) | |
| # π Cosmos DB configuration - Where data goes to party! | |
| ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
| DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
| CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
| Key = os.environ.get("Key") | |
| # π Your local app URL - Home sweet home | |
| LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" | |
| CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' | |
| # π€ Anthropic configuration - Teaching machines to be more human (and funnier) | |
| anthropicclient = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) | |
| # π§ Initialize session state - Because even apps need a good memory | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| # π οΈ Helper Functions - The unsung heroes of our code | |
| # π Get a file download link - Making file sharing as easy as stealing candy from a baby | |
| def get_download_link(file_path): | |
| with open(file_path, "rb") as file: | |
| contents = file.read() | |
| b64 = base64.b64encode(contents).decode() | |
| file_name = os.path.basename(file_path) | |
| return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name}π</a>' | |
| # π² Generate a unique ID - Because being unique is important (just ask your mother) | |
| def generate_unique_id(): | |
| timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
| unique_uuid = str(uuid.uuid4()) | |
| returnValue = f"{timestamp}-{unique_uuid}" | |
| st.write('New Unique ID:' + returnValue) | |
| return | |
| # π Generate a filename - Naming files like a pro (or a very confused librarian) | |
| def generate_filename(prompt, file_type): | |
| central = pytz.timezone('US/Central') | |
| safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
| safe_prompt = re.sub(r'\W+', '', prompt)[:90] | |
| return f"{safe_date_time}{safe_prompt}.{file_type}" | |
| # πΎ Create and save a file - Because data hoarding is a legitimate hobby | |
| def create_file(filename, prompt, response, should_save=True): | |
| if not should_save: | |
| return | |
| with open(filename, 'w', encoding='utf-8') as file: | |
| file.write(prompt + "\n\n" + response) | |
| # π Load file content - Bringing words back from the digital grave | |
| def load_file(file_name): | |
| with open(file_name, "r", encoding='utf-8') as file: | |
| content = file.read() | |
| return content | |
| # π Display glossary entity - Making search fun again (as if it ever was) | |
| def display_glossary_entity(k): | |
| search_urls = { | |
| "ππArXiv": lambda k: f"/?q={quote(k)}", | |
| "π": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", | |
| "π": lambda k: f"https://www.google.com/search?q={quote(k)}", | |
| "π₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", | |
| } | |
| links_md = ' '.join([f"<a href='{url(k)}' target='_blank'>{emoji}</a>" for emoji, url in search_urls.items()]) | |
| st.markdown(f"{k} {links_md}", unsafe_allow_html=True) | |
| # ποΈ Create zip of files - Squeezing files together like sardines in a can | |
| def create_zip_of_files(files): | |
| zip_name = "all_files.zip" | |
| with zipfile.ZipFile(zip_name, 'w') as zipf: | |
| for file in files: | |
| zipf.write(file) | |
| return zip_name | |
| # π¬ Get video HTML - Making videos play nice (or at least trying to) | |
| def get_video_html(video_path, width="100%"): | |
| video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
| return f''' | |
| <video width="{width}" controls autoplay loop> | |
| <source src="{video_url}" type="video/mp4"> | |
| Your browser does not support the video tag. | |
| </video> | |
| ''' | |
| # π΅ Get audio HTML - Let the music play (and hope it's not Baby Shark) | |
| def get_audio_html(audio_path, width="100%"): | |
| audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
| return f''' | |
| <audio controls style="width:{width}"> | |
| <source src="{audio_url}" type="audio/mpeg"> | |
| Your browser does not support the audio element. | |
| </audio> | |
| ''' | |
| # π Cosmos DB functions - Where data goes to live its best life | |
| # π Get databases - Collecting databases like Pokemon cards | |
| def get_databases(client): | |
| return [db['id'] for db in client.list_databases()] | |
| # π¦ Get containers - Finding where all the good stuff is hidden | |
| def get_containers(database): | |
| return [container['id'] for container in database.list_containers()] | |
| # π Get documents - Retrieving the sacred texts (or just some JSON) | |
| def get_documents(container, limit=None): | |
| query = "SELECT * FROM c ORDER BY c._ts DESC" | |
| items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
| return items | |
| # π₯ Insert record - Adding new data (and crossing fingers it doesn't break anything) | |
| def insert_record(container, record): | |
| try: | |
| container.create_item(body=record) | |
| return True, "Record inserted successfully! π" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error occurred: {str(e)} π¨" | |
| except Exception as e: | |
| return False, f"An unexpected error occurred: {str(e)} π±" | |
| # π Update record - Giving data a makeover | |
| def update_record(container, updated_record): | |
| try: | |
| container.upsert_item(body=updated_record) | |
| return True, f"Record with id {updated_record['id']} successfully updated. π οΈ" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error occurred: {str(e)} π¨" | |
| except Exception as e: | |
| return False, f"An unexpected error occurred: {traceback.format_exc()} π±" | |
| # ποΈ Delete record - Saying goodbye to data (it's not you, it's me) | |
| def delete_record(container, record): | |
| try: | |
| container.delete_item(item=record['id'], partition_key=record['id']) | |
| return True, f"Record with id {record['id']} successfully deleted. ποΈ" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error occurred: {str(e)} π¨" | |
| except Exception as e: | |
| return False, f"An unexpected error occurred: {traceback.format_exc()} π±" | |
| # πΎ Save to Cosmos DB - Preserving data for future generations (or just until the next update) | |
| def save_to_cosmos_db(container, query, response1, response2): | |
| try: | |
| if container: | |
| record = { | |
| "id": generate_unique_id(), | |
| "query": query, | |
| "response1": response1, | |
| "response2": response2, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| try: | |
| container.create_item(body=record) | |
| st.success(f"Record saved successfully with ID: {record['id']}") | |
| # Refresh the documents display | |
| st.session_state.documents = get_documents(container) | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"Error saving record to Cosmos DB: {e}") | |
| else: | |
| st.error("Cosmos DB container is not initialized.") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)}") | |
| # π GitHub functions - Where code goes to socialize | |
| # π₯ Download GitHub repo - Cloning repos like it's going out of style | |
| def download_github_repo(url, local_path): | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| Repo.clone_from(url, local_path) | |
| # ποΈ Create zip file - Squeezing files tighter than your budget | |
| def create_zip_file(source_dir, output_filename): | |
| shutil.make_archive(output_filename, 'zip', source_dir) | |
| # ποΈ Create repo - Building digital homes for lonely code | |
| def create_repo(g, repo_name): | |
| user = g.get_user() | |
| return user.create_repo(repo_name) | |
| # π Push to GitHub - Sending code to the cloud (hopefully not the rainy kind) | |
| def push_to_github(local_path, repo, github_token): | |
| repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
| local_repo = Repo(local_path) | |
| if 'origin' in [remote.name for remote in local_repo.remotes]: | |
| origin = local_repo.remote('origin') | |
| origin.set_url(repo_url) | |
| else: | |
| origin = local_repo.create_remote('origin', repo_url) | |
| if not local_repo.heads: | |
| local_repo.git.checkout('-b', 'main') | |
| current_branch = 'main' | |
| else: | |
| current_branch = local_repo.active_branch.name | |
| local_repo.git.add(A=True) | |
| if local_repo.is_dirty(): | |
| local_repo.git.commit('-m', 'Initial commit') | |
| origin.push(refspec=f'{current_branch}:{current_branch}') | |
| def save_or_clone_to_cosmos_db(container, document=None, clone_id=None): | |
| def generate_complex_unique_id(): | |
| timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
| random_component = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=8)) | |
| return f"{timestamp}-{random_component}-{str(uuid.uuid4())}" | |
| max_retries = 10 | |
| base_delay = 0.1 | |
| for attempt in range(max_retries): | |
| try: | |
| new_id = generate_complex_unique_id() | |
| if clone_id: | |
| try: | |
| existing_doc = container.read_item(item=clone_id, partition_key=clone_id) | |
| new_doc = { | |
| 'id': new_id, | |
| 'originalText': existing_doc.get('originalText', ''), | |
| 'qtPrompts': existing_doc.get('qtPrompts', []), | |
| 'cloned_from': clone_id, | |
| 'cloned_at': datetime.utcnow().isoformat() | |
| } | |
| except exceptions.CosmosResourceNotFoundError: | |
| return False, f"Document with ID {clone_id} not found for cloning." | |
| else: | |
| if document is None: | |
| return False, "No document provided for saving" | |
| document['id'] = new_id | |
| document['created_at'] = datetime.utcnow().isoformat() | |
| new_doc = document | |
| response = container.create_item(body=new_doc) | |
| return True, f"{'Cloned' if clone_id else 'New'} document saved successfully with ID: {response['id']}" | |
| except exceptions.CosmosHttpResponseError as e: | |
| if e.status_code == 409: | |
| delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1) | |
| time.sleep(delay) | |
| continue | |
| return False, f"Error saving to Cosmos DB: {str(e)}" | |
| except Exception as e: | |
| return False, f"An unexpected error occurred: {str(e)}" | |
| return False, "Failed to save document after maximum retries." | |
| # π¦ Archive current container - Packing up data like you're moving to a new digital house | |
| def archive_current_container(database_name, container_name, client): | |
| try: | |
| base_dir = "./cosmos_archive_current_container" | |
| if os.path.exists(base_dir): | |
| shutil.rmtree(base_dir) | |
| os.makedirs(base_dir) | |
| db_client = client.get_database_client(database_name) | |
| container_client = db_client.get_container_client(container_name) | |
| items = list(container_client.read_all_items()) | |
| container_dir = os.path.join(base_dir, container_name) | |
| os.makedirs(container_dir) | |
| for item in items: | |
| item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
| with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
| json.dump(item, f, indent=2) | |
| archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| shutil.make_archive(archive_name, 'zip', base_dir) | |
| return get_download_link(f"{archive_name}.zip") | |
| except Exception as e: | |
| return f"An error occurred while archiving data: {str(e)} π’" | |
| def gen_AI_IO_filename(display_query, output): | |
| # Get current time in Central Time Zone with milliseconds | |
| now_central = datetime.now(pytz.timezone("America/Chicago")) | |
| timestamp = now_central.strftime("%Y-%m-%d-%I-%M-%S-%f-%p") | |
| # Limit components to prevent excessive filename length | |
| display_query = display_query[:50] # Truncate display_query to 50 chars | |
| output_snippet = re.sub(r'[^A-Za-z0-9]+', '_', output[:100]) # Truncate output_snippet to 100 chars | |
| filename = f"{timestamp} - {display_query} - {output_snippet}.md" | |
| return filename | |
| # π Search glossary - Finding needles in digital haystacks | |
| def search_glossary(query): | |
| st.markdown(f"### π SearchGlossary for: {query}") | |
| model_options = ['mistralai/Mixtral-8x7B-Instruct-v0.1', 'mistralai/Mistral-7B-Instruct-v0.2'] | |
| model_choice = st.selectbox('π§ Select LLM Model', options=model_options, index=1, key=f"model_choice_{id(query)}") | |
| database_options = ['Semantic Search', 'Arxiv Search - Latest - (EXPERIMENTAL)'] | |
| database_choice = st.selectbox('π Select Database', options=database_options, index=0, key=f"database_choice_{id(query)}") | |
| # π΅οΈββοΈ Searching the glossary for: query | |
| all_results = "" | |
| # Limit the query display to 80 characters | |
| display_query = query[:80] + "..." if len(query) > 80 else query | |
| st.markdown(f"π΅οΈββοΈ Running ArXiV AI Analysis with Query: {display_query} - ML model: {model_choice} and Option: {database_options}") | |
| # π ArXiV RAG researcher expert ~-<>-~ Paper Summary & Ask LLM | |
| client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
| # π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm | |
| result = client.predict( | |
| prompt=query, | |
| llm_model_picked="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| stream_outputs=True, | |
| api_name="/ask_llm" | |
| ) | |
| st.markdown("# Mixtral-8x7B-Instruct-v0.1") | |
| st.markdown(result) | |
| #st.code(result, language="python", line_numbers=True) | |
| # π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm | |
| result2 = client.predict( | |
| prompt=query, | |
| llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", | |
| stream_outputs=True, | |
| api_name="/ask_llm" | |
| ) | |
| st.markdown("# Mistral-7B-Instruct-v0.2") | |
| st.markdown(result2) | |
| #st.code(result2, language="python", line_numbers=True) | |
| # π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /update_with_rag_md | |
| response2 = client.predict( | |
| message=query, # str in 'parameter_13' Textbox component | |
| llm_results_use=10, | |
| database_choice="Semantic Search", | |
| llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", | |
| api_name="/update_with_rag_md" | |
| ) | |
| st.markdown("# Mistral-7B-Instruct-v0.2 update_with_rag_md 0") | |
| st.markdown(response2[0]) | |
| #st.code(response2[0], language="python", line_numbers=True, wrap_lines=True) | |
| st.markdown("# Mistral-7B-Instruct-v0.2 update_with_rag_md 1") | |
| st.markdown(response2[1]) | |
| #st.code(response2[1], language="python", line_numbers=True, wrap_lines=True) | |
| # β Persist AI Results to Markdown Files | |
| filename = gen_AI_IO_filename(display_query, result) | |
| create_file(filename, query, result) | |
| st.markdown(f"β File saved as: `{filename}`") | |
| filename = gen_AI_IO_filename(display_query, result2) | |
| create_file(filename, query, result2) | |
| st.markdown(f"β File saved as: `{filename}`") | |
| filename = gen_AI_IO_filename(display_query, response2[0]) | |
| create_file(filename, query, response2[0]) | |
| st.markdown(f"β File saved as: `{filename}`") | |
| filename = gen_AI_IO_filename(display_query, response2[1]) | |
| create_file(filename, query, response2[1]) | |
| st.markdown(f"β File saved as: `{filename}`") | |
| return result, result2, response2 | |
| # π Generate a safe filename from the first few lines of content | |
| def generate_filename_from_content(content, file_type="md"): | |
| # Extract the first few lines or sentences | |
| first_sentence = content.split('\n', 1)[0][:90] # Limit the length to 90 characters | |
| # Remove special characters to make it a valid filename | |
| safe_name = re.sub(r'[^\w\s-]', '', first_sentence) | |
| # Limit length to be compatible with Windows and Linux | |
| safe_name = safe_name[:50].strip() # Adjust length limit | |
| return f"{safe_name}.{file_type}" | |
| # πΎ Create and save a file | |
| def create_file_from_content(content, should_save=True): | |
| if not should_save: | |
| return | |
| filename = generate_filename_from_content(content) | |
| with open(filename, 'w', encoding='utf-8') as file: | |
| file.write(content) | |
| return filename | |
| # π Display list of saved .md files in the sidebar | |
| def display_saved_files_in_sidebar(): | |
| all_files = glob.glob("*.md") | |
| all_files.sort(reverse=True) | |
| all_files = [file for file in all_files if not file.lower().startswith('readme')] # Exclude README.md | |
| st.sidebar.markdown("## π Saved Markdown Files") | |
| for file in all_files: | |
| col1, col2, col3 = st.sidebar.columns([6, 2, 1]) | |
| with col1: | |
| st.markdown(f"π {file}") | |
| with col2: | |
| st.sidebar.download_button( | |
| label="β¬οΈ Download", | |
| data=open(file, 'rb').read(), | |
| file_name=file | |
| ) | |
| with col3: | |
| if st.sidebar.button("π", key=f"delete_{file}"): | |
| os.remove(file) | |
| st.rerun() | |
| def clone_record(container, clone_id): | |
| try: | |
| existing_doc = container.read_item(item=clone_id, partition_key=clone_id) | |
| new_doc = existing_doc.copy() | |
| new_doc['id'] = generate_unique_id() # Generate new unique ID with timestamp | |
| new_doc['name'] = new_doc['id'] # Generate new unique ID with timestamp | |
| new_doc['createdAt'] = datetime.utcnow().isoformat() # Update the creation time | |
| new_doc['_rid'] = None # Reset _rid or any system-managed fields | |
| new_doc['_self'] = None | |
| new_doc['_etag'] = None | |
| new_doc['_attachments'] = None | |
| new_doc['_ts'] = None # Reset timestamp to be updated by Cosmos DB automatically | |
| # Insert the cloned document | |
| response = container.create_item(body=new_doc) | |
| st.success(f"Cloned document saved successfully with ID: {new_doc['id']} π") | |
| # Refresh the documents in session state | |
| st.session_state.documents = list(container.query_items( | |
| query="SELECT * FROM c ORDER BY c._ts DESC", | |
| enable_cross_partition_query=True | |
| )) | |
| except exceptions.CosmosResourceNotFoundError: | |
| st.error(f"Document with ID {clone_id} not found for cloning.") | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"HTTP error occurred: {str(e)} π¨") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)} π±") | |
| def create_new_blank_record(container): | |
| try: | |
| # Get the structure of the latest document (to preserve schema) | |
| latest_doc = container.query_items(query="SELECT * FROM c ORDER BY c._ts DESC", enable_cross_partition_query=True, max_item_count=1) | |
| if latest_doc: | |
| new_doc_structure = latest_doc[0].copy() | |
| else: | |
| new_doc_structure = {} | |
| new_doc = {key: "" for key in new_doc_structure.keys()} # Set all fields to blank | |
| new_doc['id'] = generate_unique_id() # Generate new unique ID | |
| new_doc['createdAt'] = datetime.utcnow().isoformat() # Set creation time | |
| # Insert the new blank document | |
| response = container.create_item(body=new_doc) | |
| st.success(f"New blank document saved successfully with ID: {new_doc['id']} π") | |
| # Refresh the documents in session state | |
| st.session_state.documents = list(container.query_items( | |
| query="SELECT * FROM c ORDER BY c._ts DESC", | |
| enable_cross_partition_query=True | |
| )) | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"HTTP error occurred: {str(e)} π¨") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)} π±") | |
| # Function to preprocess the pasted content | |
| def preprocess_text(text): | |
| # Replace CRLF and other newline variations with the JSON newline escape sequence | |
| text = text.replace('\r\n', '\\n') | |
| text = text.replace('\r', '\\n') | |
| text = text.replace('\n', '\\n') | |
| # Escape double quotes inside the text | |
| text = text.replace('"', '\\"') | |
| # Optionally remove or handle other special characters that might not be JSON-safe | |
| # Here, we remove characters like tabs or non-ASCII characters (as an example) | |
| text = re.sub(r'[\t]', ' ', text) # Replace tabs with spaces | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) # Remove non-ASCII characters | |
| # Normalize spaces (strip leading/trailing whitespace) | |
| text = text.strip() | |
| return text | |
| # π Main function - "All the world's a stage, and all the code merely players" -Shakespeare, probably | |
| def main(): | |
| st.markdown("### πGitπCosmosπ« - Azure Cosmos DB and Github Agent") | |
| # π² Session state vars - "Life is like a session state, you never know what you're gonna get" | |
| if 'logged_in' not in st.session_state: | |
| st.session_state.logged_in = False | |
| if 'selected_records' not in st.session_state: | |
| st.session_state.selected_records = [] | |
| if 'client' not in st.session_state: | |
| st.session_state.client = None | |
| if 'selected_database' not in st.session_state: | |
| st.session_state.selected_database = None | |
| if 'selected_container' not in st.session_state: | |
| st.session_state.selected_container = None | |
| if 'selected_document_id' not in st.session_state: | |
| st.session_state.selected_document_id = None | |
| if 'current_index' not in st.session_state: | |
| st.session_state.current_index = 0 | |
| if 'cloned_doc' not in st.session_state: | |
| st.session_state.cloned_doc = None | |
| # π Query processing - "To search or not to search, that is the query" | |
| try: | |
| query_params = st.query_params | |
| query = query_params.get('q') or query_params.get('query') or '' | |
| if query: | |
| result, result2, result3, response2 = search_glossary(query) | |
| # πΎ Save results - "Every file you save is a future you pave" | |
| try: | |
| if st.button("Save AI Output"): | |
| filename = create_file_from_content(result) | |
| st.success(f"File saved: {filename}") | |
| filename = create_file_from_content(result2) | |
| st.success(f"File saved: {filename}") | |
| filename = create_file_from_content(result3) | |
| st.success(f"File saved: {filename}") | |
| filename = create_file_from_content(response2) | |
| st.success(f"File saved: {filename}") | |
| display_saved_files_in_sidebar() | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)} π±") | |
| # π Cosmos DB operations - "In Cosmos DB we trust, but we still handle errors we must" | |
| try: | |
| save_to_cosmos_db(st.session_state.cosmos_container, query, result, result) | |
| save_to_cosmos_db(st.session_state.cosmos_container, query, result2, result2) | |
| save_to_cosmos_db(st.session_state.cosmos_container, query, result3, result3) | |
| save_to_cosmos_db(st.session_state.cosmos_container, query, response2[0], response2[0]) | |
| save_to_cosmos_db(st.session_state.cosmos_container, query, response2[1], response2[1]) | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"HTTP error occurred: {str(e)} π¨") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)} π±") | |
| st.stop() | |
| except Exception as e: | |
| st.markdown(' ') | |
| # π Auth check - "With great keys come great connectivity" | |
| if Key: | |
| st.session_state.primary_key = Key | |
| st.session_state.logged_in = True | |
| else: | |
| st.error("Cosmos DB Key is not set in environment variables. πβ") | |
| return | |
| if st.session_state.logged_in: | |
| # π DB initialization - "In the beginning, there was connection string..." | |
| try: | |
| if st.session_state.client is None: | |
| st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
| # π Navigation setup - "Navigation is not about where you are, but where you're going" | |
| st.sidebar.title("πGitπCosmosπ«ποΈNavigator") | |
| databases = get_databases(st.session_state.client) | |
| selected_db = st.sidebar.selectbox("ποΈ Select Database", databases) | |
| st.markdown(CosmosDBUrl) | |
| # π State management - "Change is the only constant in state management" | |
| if selected_db != st.session_state.selected_database: | |
| st.session_state.selected_database = selected_db | |
| st.session_state.selected_container = None | |
| st.session_state.selected_document_id = None | |
| st.session_state.current_index = 0 | |
| st.rerun() | |
| if st.session_state.selected_database: | |
| database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
| containers = get_containers(database) | |
| selected_container = st.sidebar.selectbox("π Select Container", containers) | |
| # π Container state handling - "Container changes, state arranges" | |
| if selected_container != st.session_state.selected_container: | |
| st.session_state.selected_container = selected_container | |
| st.session_state.selected_document_id = None | |
| st.session_state.current_index = 0 | |
| st.rerun() | |
| if st.session_state.selected_container: | |
| container = database.get_container_client(st.session_state.selected_container) | |
| # π¦ Export functionality - "Pack it, zip it, ship it" | |
| if st.sidebar.button("π¦ Export Container Data"): | |
| download_link = archive_current_container(st.session_state.selected_database, | |
| st.session_state.selected_container, | |
| st.session_state.client) | |
| if download_link.startswith('<a'): | |
| st.markdown(download_link, unsafe_allow_html=True) | |
| else: | |
| st.error(download_link) | |
| # π Document handling - "Document, document, on the wall, who's the most recent of them all?" | |
| documents = get_documents(container) | |
| total_docs = len(documents) | |
| # Add a slider to let the user choose how many documents to display | |
| num_docs_to_display = st.slider( | |
| "Select number of documents to display", 1, 20, 1 | |
| ) | |
| # Adjust the document display logic based on the slider value | |
| if total_docs > num_docs_to_display: | |
| documents_to_display = documents[:num_docs_to_display] | |
| st.sidebar.info(f"Showing top {num_docs_to_display} most recent documents.") | |
| else: | |
| documents_to_display = documents | |
| st.sidebar.info(f"Showing all {len(documents_to_display)} documents.") | |
| if documents_to_display: | |
| # π¨ View options - "Different strokes for different folks" | |
| view_options = ['Show as Markdown', 'Show as Code Editor', 'Show as Run AI', 'Clone Document', 'New Record'] | |
| selected_view = st.sidebar.selectbox("Select Viewer/Editor", view_options, index=2) | |
| if selected_view == 'Show as Markdown': | |
| Label = '#### π Markdown view - Mark it down, mark it up' | |
| st.markdown(Label) | |
| total_docs = len(documents) | |
| doc = documents[st.session_state.current_index] | |
| # st.markdown(f"#### Document ID: {doc.get('id', '')}") | |
| # π΅οΈ Value extraction - "Finding spaces in all the right places" | |
| values_with_space = [] | |
| def extract_values(obj): | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| extract_values(v) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| extract_values(item) | |
| elif isinstance(obj, str): | |
| if ' ' in obj: | |
| values_with_space.append(obj) | |
| extract_values(doc) | |
| st.markdown("#### π Links for Extracted Texts") | |
| for term in values_with_space: | |
| display_glossary_entity(term) | |
| content = json.dumps(doc, indent=2) | |
| st.markdown(f"```json\n{content}\n```") | |
| # β¬ οΈβ‘οΈ Navigation - "Left and right, day and night" | |
| col_prev, col_next = st.columns([1, 1]) | |
| with col_prev: | |
| if st.button("β¬ οΈ Previous", key='prev_markdown'): | |
| if st.session_state.current_index > 0: | |
| st.session_state.current_index -= 1 | |
| st.rerun() | |
| with col_next: | |
| if st.button("β‘οΈ Next", key='next_markdown'): | |
| if st.session_state.current_index < total_docs - 1: | |
| st.session_state.current_index += 1 | |
| st.rerun() | |
| elif selected_view == 'Show as Code Editor': | |
| Label = '#### π» Code editor view' | |
| st.markdown(Label) | |
| total_docs = len(documents) | |
| doc = documents[st.session_state.current_index] | |
| # st.markdown(f"#### Document ID: {doc.get('id', '')}") | |
| doc_str = st.text_area("Edit Document", | |
| value=json.dumps(doc, indent=2), | |
| height=300, | |
| key=f'code_editor_{st.session_state.current_index}') | |
| col_prev, col_next = st.columns([1, 1]) | |
| with col_prev: | |
| if st.button("β¬ οΈ Previous", key='prev_code'): | |
| if st.session_state.current_index > 0: | |
| st.session_state.current_index -= 1 | |
| st.rerun() | |
| with col_next: | |
| if st.button("β‘οΈ Next", key='next_code'): | |
| if st.session_state.current_index < total_docs - 1: | |
| st.session_state.current_index += 1 | |
| st.rerun() | |
| col_save, col_delete = st.columns([1, 1]) | |
| with col_save: | |
| if st.button("πΎ Save Changes", key=f'save_button_{st.session_state.current_index}'): | |
| try: | |
| updated_doc = json.loads(doc_str) | |
| response = container.upsert_item(body=updated_doc) | |
| if response: | |
| st.success(f"Document {updated_doc['id']} saved successfully.") | |
| st.session_state.selected_document_id = updated_doc['id'] | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error saving document: {str(e)}") | |
| with col_delete: | |
| if st.button("ποΈ Delete", key=f'delete_button_{st.session_state.current_index}'): | |
| try: | |
| current_doc = json.loads(doc_str) | |
| # Direct deletion using container method with id and partition key | |
| delete = container.delete_item(current_doc["id"], current_doc["id"]) | |
| if delete: | |
| st.success(f"Document {current_doc['id']} deleted successfully.") | |
| if st.session_state.current_index > 0: | |
| st.session_state.current_index -= 1 | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error deleting document: {str(e)}") | |
| elif selected_view == 'Show as Run AI': | |
| Label = '#### βοΈ Run AI with wisdom, save with precision' | |
| st.markdown(Label) | |
| num_cols = len(documents_to_display) | |
| cols = st.columns(num_cols) | |
| for idx, (col, doc) in enumerate(zip(cols, documents_to_display)): | |
| with col: | |
| # st.markdown(f"##### Document ID: {doc.get('id', '')}") | |
| editable_id = st.text_input("ID", value=doc.get('id', ''), key=f'edit_id_{idx}') | |
| editable_doc = doc.copy() | |
| editable_doc.pop('id', None) | |
| # st.markdown(f"##### Document Name: {doc.get('name', '')}") | |
| editable_id = st.text_input("Name", value=doc.get('name', ''), key=f'edit_name_{idx}') | |
| editable_doc = doc.copy() | |
| editable_doc.pop('name', None) | |
| doc_str = st.text_area("Document Content (in JSON format)", | |
| value=json.dumps(editable_doc, indent=2), | |
| height=300, | |
| key=f'doc_str_{idx}') | |
| # πΎπ€ Save and AI operations | |
| col_ai, col_delete = st.columns(2) | |
| with col_ai: | |
| if st.button("π€ Run AI", key=f'run_with_ai_button_{idx}'): | |
| # Experiment to get content fields π€π€π€π€π€π€π€π€π€π€π€π€π€ | |
| total_docs = len(documents) | |
| doc = documents[st.session_state.current_index] | |
| values_with_space = [] | |
| def extract_values2(obj): | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| extract_values2(v) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| extract_values2(item) | |
| elif isinstance(obj, str): | |
| if ' ' in obj: | |
| values_with_space.append(obj) | |
| extract_values2(doc) | |
| #st.markdown("#### π Links for Extracted Texts") | |
| for term in values_with_space: | |
| display_glossary_entity(term) | |
| search_glossary(term) | |
| #content = json.dumps(doc, indent=2) | |
| #st.markdown(f"```json\n{content}\n```") | |
| # Experiment to get content fields π€π€π€π€π€π€π€π€π€π€π€π€π€ | |
| #search_glossary(json.dumps(editable_doc, indent=2)) | |
| elif selected_view == 'Clone Document': | |
| st.markdown("#### Clone a document:") | |
| for idx, doc in enumerate(documents_to_display): | |
| st.markdown(f"##### Original Document ID: {doc.get('id', '')}") | |
| if st.button("π Clone Document", key=f'clone_button_{idx}'): | |
| # Create new document with unique ID and name | |
| new_doc = { | |
| 'id': str(uuid.uuid4()), | |
| 'name': f"Clone_{str(uuid.uuid4())[:8]}", | |
| **{k: v for k, v in doc.items() if k not in ['id', 'name', '_rid', '_self', '_etag', '_attachments', '_ts']} | |
| } | |
| # Show editable preview | |
| edited_doc = st.text_area( | |
| "Edit cloned document:", | |
| value=json.dumps(new_doc, indent=2), | |
| height=300, | |
| key=f'edit_clone_{idx}' | |
| ) | |
| if st.button("πΎ Save Clone", key=f'save_clone_{idx}'): | |
| try: | |
| final_doc = json.loads(edited_doc) | |
| # Use container.create_item() instead of update_record() | |
| response = container.create_item(body=final_doc) | |
| if response: | |
| st.success(f"New cloned document created with ID: {final_doc['id']}") | |
| st.rerun() | |
| else: | |
| st.error("Failed to create new document") | |
| except Exception as e: | |
| st.error(f"Error creating document: {str(e)}") | |
| elif selected_view == 'New Record': | |
| st.markdown("#### Create a new document:") | |
| if st.button("π€ Insert Auto-Generated Record"): | |
| auto_doc = { | |
| "id": generate_unique_id(), | |
| "name": f"Auto-generated Record {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| "content": "This is an auto-generated record.", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| success, message = save_or_clone_to_cosmos_db(container, document=auto_doc) | |
| if success: | |
| st.success(message) | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| else: | |
| new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') | |
| default_doc = { | |
| "id": new_id, | |
| "name": "New Document", | |
| "content": "", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| new_doc_str = st.text_area("Document Content (in JSON format)", | |
| value=json.dumps(default_doc, indent=2), | |
| height=300) | |
| if st.button("β Create New Document"): | |
| try: | |
| # Preprocess the text before loading it into JSON | |
| cleaned_doc_str = preprocess_text(new_doc_str) | |
| new_doc = json.loads(cleaned_doc_str) | |
| new_doc['id'] = new_id # Ensure ID matches input field | |
| success, message = insert_record(container, new_doc) | |
| if success: | |
| st.success(f"New document created with id: {new_doc['id']} π") | |
| st.session_state.selected_document_id = new_doc['id'] | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| except json.JSONDecodeError as e: | |
| st.error(f"Invalid JSON: {str(e)} π«") | |
| st.subheader(f"π Container: {st.session_state.selected_container}") | |
| if st.session_state.selected_container: | |
| if documents_to_display: | |
| Label = '#### π Data display - Data tells tales that words cannot' | |
| st.markdown(Label) | |
| df = pd.DataFrame(documents_to_display) | |
| st.dataframe(df) | |
| else: | |
| st.info("No documents to display. π§") | |
| Label = '#### π GitHub integration - Git happens' | |
| st.subheader("π GitHub Operations") | |
| github_token = os.environ.get("GITHUB") | |
| source_repo = st.text_input("Source GitHub Repository URL", | |
| value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") | |
| new_repo_name = st.text_input("New Repository Name (for cloning)", | |
| value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("π₯ Clone Repository"): | |
| if github_token and source_repo: | |
| st.markdown(Label) | |
| try: | |
| local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| download_github_repo(source_repo, local_path) | |
| zip_filename = f"{new_repo_name}.zip" | |
| create_zip_file(local_path, zip_filename[:-4]) | |
| st.markdown(get_download_link(zip_filename), unsafe_allow_html=True) | |
| st.success("Repository cloned successfully! π") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)} π’") | |
| finally: | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| if os.path.exists(zip_filename): | |
| os.remove(zip_filename) | |
| else: | |
| st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πβ") | |
| with col2: | |
| if st.button("π€ Push to New Repository"): | |
| if github_token and source_repo: | |
| st.markdown(Label) | |
| try: | |
| g = Github(github_token) | |
| new_repo = create_repo(g, new_repo_name) | |
| local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| download_github_repo(source_repo, local_path) | |
| push_to_github(local_path, new_repo, github_token) | |
| st.success(f"Repository pushed successfully to {new_repo.html_url} π") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)} π’") | |
| finally: | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| else: | |
| st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πβ") | |
| st.subheader("π¬ Chat with Claude") | |
| user_input = st.text_area("Message π¨:", height=100) | |
| if st.button("Send π¨"): | |
| Label = '#### π¬ Chat functionality - Every chat is a chance to learn' | |
| st.markdown(Label) | |
| if user_input: | |
| response = client.messages.create( | |
| model="claude-3-sonnet-20240229", | |
| max_tokens=1000, | |
| messages=[ | |
| {"role": "user", "content": user_input} | |
| ] | |
| ) | |
| st.write("Claude's reply π§ :") | |
| st.write(response.content[0].text) | |
| filename = generate_filename(user_input, "md") | |
| create_file(filename, user_input, response.content[0].text) | |
| st.session_state.chat_history.append({"user": user_input, "claude": response.content[0].text}) | |
| # Save to Cosmos DB | |
| save_to_cosmos_db(container, user_input, response.content[0].text, "") | |
| # π Chat history display - "History repeats itself, first as chat, then as wisdom" | |
| st.subheader("Past Conversations π") | |
| for chat in st.session_state.chat_history: | |
| st.text_area("You said π¬:", chat["user"], height=100, disabled=True) | |
| st.text_area("Claude replied π€:", chat["claude"], height=200, disabled=True) | |
| st.markdown("---") | |
| # π File editor - "Edit with care, save with flair" | |
| if hasattr(st.session_state, 'current_file'): | |
| st.subheader(f"Editing: {st.session_state.current_file} π ") | |
| new_content = st.text_area("File Content βοΈ:", st.session_state.file_content, height=300) | |
| # Preprocess the text before loading it into JSON - Added to protect copy paste into JSON to keep format. | |
| cleaned_doc_str = preprocess_text(new_content) | |
| new_doc = json.loads(cleaned_doc_str) | |
| new_content = cleaned_doc_str | |
| if st.button("Save Changes πΎ"): | |
| with open(st.session_state.current_file, 'w', encoding='utf-8') as file: | |
| file.write(new_content) | |
| st.success("File updated successfully! π") | |
| # π File management - "Manage many, maintain order" | |
| st.sidebar.title("π File Management") | |
| all_files = glob.glob("*.md") | |
| all_files.sort(reverse=True) | |
| if st.sidebar.button("π Delete All Files"): | |
| for file in all_files: | |
| os.remove(file) | |
| st.rerun() | |
| if st.sidebar.button("β¬οΈ Download All Files"): | |
| zip_file = create_zip_of_files(all_files) | |
| st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) | |
| for file in all_files: | |
| col1, col2, col3, col4 = st.sidebar.columns([1,3,1,1]) | |
| with col1: | |
| if st.button("π", key="view_"+file): | |
| st.session_state.current_file = file | |
| st.session_state.file_content = load_file(file) | |
| with col2: | |
| st.markdown(get_download_link(file), unsafe_allow_html=True) | |
| with col3: | |
| if st.button("π", key="edit_"+file): | |
| st.session_state.current_file = file | |
| st.session_state.file_content = load_file(file) | |
| with col4: | |
| if st.button("π", key="delete_"+file): | |
| os.remove(file) | |
| st.rerun() | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)} π¨") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)} π±") | |
| if st.session_state.logged_in and st.sidebar.button("πͺ Logout"): | |
| Label = '#### πͺ Logout - All good things must come to an end' | |
| st.markdown(Label) | |
| st.session_state.logged_in = False | |
| st.session_state.selected_records.clear() | |
| st.session_state.client = None | |
| st.session_state.selected_database = None | |
| st.session_state.selected_container = None | |
| st.session_state.selected_document_id = None | |
| st.session_state.current_index = 0 | |
| st.rerun() | |
| if __name__ == "__main__": | |
| main() |