Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from azure.cosmos import CosmosClient, exceptions | |
| import os | |
| import pandas as pd | |
| import json | |
| from datetime import datetime | |
| import shutil | |
| from github import Github | |
| from git import Repo | |
| import base64 | |
| # Cosmos DB configuration | |
| ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
| SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816" | |
| DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
| Key = os.environ.get("Key") | |
| # GitHub configuration | |
| def download_github_repo(url, local_path): | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| Repo.clone_from(url, local_path) | |
| def create_zip_file(source_dir, output_filename): | |
| shutil.make_archive(output_filename, 'zip', source_dir) | |
| def create_repo(g, repo_name): | |
| user = g.get_user() | |
| return user.create_repo(repo_name) | |
| def push_to_github(local_path, repo, github_token): | |
| repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
| local_repo = Repo(local_path) | |
| if 'origin' in [remote.name for remote in local_repo.remotes]: | |
| origin = local_repo.remote('origin') | |
| origin.set_url(repo_url) | |
| else: | |
| origin = local_repo.create_remote('origin', repo_url) | |
| if not local_repo.heads: | |
| local_repo.git.checkout('-b', 'main') | |
| current_branch = 'main' | |
| else: | |
| current_branch = local_repo.active_branch.name | |
| local_repo.git.add(A=True) | |
| if local_repo.is_dirty(): | |
| local_repo.git.commit('-m', 'Initial commit') | |
| origin.push(refspec=f'{current_branch}:{current_branch}') | |
| def get_base64_download_link(file_path, file_name): | |
| with open(file_path, "rb") as file: | |
| contents = file.read() | |
| base64_encoded = base64.b64encode(contents).decode() | |
| return f'<a href="data:application/zip;base64,{base64_encoded}" download="{file_name}">Download {file_name}</a>' | |
| # Function to fetch all databases and containers | |
| def fetch_db_structure(client): | |
| structure = {} | |
| for database in client.list_databases(): | |
| db_name = database['id'] | |
| structure[db_name] = [] | |
| db_client = client.get_database_client(db_name) | |
| for container in db_client.list_containers(): | |
| structure[db_name].append(container['id']) | |
| return structure | |
| # Function to fetch items from a container | |
| def fetch_items(client, db_name, container_name): | |
| container = client.get_database_client(db_name).get_container_client(container_name) | |
| items = list(container.read_all_items()) | |
| return items | |
| # Function to render sidebar | |
| def render_sidebar(client, structure): | |
| st.sidebar.title("π Cosmos DB Manager") | |
| # Database and Container selection | |
| selected_db = st.sidebar.selectbox("ποΈ Select Database", list(structure.keys())) | |
| selected_container = st.sidebar.selectbox("π¦ Select Container", structure[selected_db]) | |
| # Fetch items for the selected container | |
| items = fetch_items(client, selected_db, selected_container) | |
| # Item selection | |
| item_ids = [item['id'] for item in items] | |
| selected_item = st.sidebar.selectbox("π Select Item", ["Create New Item"] + item_ids) | |
| # CRUD buttons | |
| if selected_item != "Create New Item": | |
| if st.sidebar.button("ποΈ Edit Item"): | |
| st.session_state.action = "edit" | |
| st.session_state.selected_item = selected_item | |
| if st.sidebar.button("ποΈ Delete Item"): | |
| st.session_state.action = "delete" | |
| st.session_state.selected_item = selected_item | |
| else: | |
| if st.sidebar.button("β Create New Item"): | |
| st.session_state.action = "create" | |
| # GitHub section | |
| st.sidebar.subheader("π GitHub Operations") | |
| github_token = os.environ.get("GITHUB") | |
| source_repo = st.sidebar.text_input("π Source Repo URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") | |
| new_repo_name = st.sidebar.text_input("π New Repo Name", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") | |
| if st.sidebar.button("π₯ Clone Repository"): | |
| clone_github_repo(github_token, source_repo, new_repo_name) | |
| if st.sidebar.button("π€ Push to New Repository"): | |
| push_to_new_repo(github_token, source_repo, new_repo_name) | |
| # Archive data button | |
| if st.sidebar.button("π¦ Archive All Data"): | |
| download_link = archive_all_data(client) | |
| st.sidebar.markdown(download_link, unsafe_allow_html=True) | |
| # Logout button | |
| if st.sidebar.button("πͺ Logout"): | |
| st.session_state.logged_in = False | |
| st.session_state.action = None | |
| st.session_state.selected_item = None | |
| st.rerun() | |
| return selected_db, selected_container, selected_item | |
| # GitHub operations | |
| def clone_github_repo(github_token, source_repo, new_repo_name): | |
| if github_token and source_repo: | |
| try: | |
| local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| download_github_repo(source_repo, local_path) | |
| zip_filename = f"{new_repo_name}.zip" | |
| create_zip_file(local_path, zip_filename[:-4]) | |
| st.sidebar.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True) | |
| st.sidebar.success("Repository cloned successfully!") | |
| except Exception as e: | |
| st.sidebar.error(f"An error occurred: {str(e)}") | |
| finally: | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| if os.path.exists(zip_filename): | |
| os.remove(zip_filename) | |
| else: | |
| st.sidebar.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.") | |
| def push_to_new_repo(github_token, source_repo, new_repo_name): | |
| if github_token and source_repo: | |
| try: | |
| g = Github(github_token) | |
| new_repo = create_repo(g, new_repo_name) | |
| local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| download_github_repo(source_repo, local_path) | |
| push_to_github(local_path, new_repo, github_token) | |
| st.sidebar.success(f"Repository pushed successfully to {new_repo.html_url}") | |
| except Exception as e: | |
| st.sidebar.error(f"An error occurred: {str(e)}") | |
| finally: | |
| if os.path.exists(local_path): | |
| shutil.rmtree(local_path) | |
| else: | |
| st.sidebar.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.") | |
| # Function to archive all data | |
| def archive_all_data(client): | |
| try: | |
| base_dir = "./cosmos_archive" | |
| if os.path.exists(base_dir): | |
| shutil.rmtree(base_dir) | |
| os.makedirs(base_dir) | |
| for database in client.list_databases(): | |
| db_name = database['id'] | |
| db_dir = os.path.join(base_dir, db_name) | |
| os.makedirs(db_dir) | |
| db_client = client.get_database_client(db_name) | |
| for container in db_client.list_containers(): | |
| container_name = container['id'] | |
| container_dir = os.path.join(db_dir, container_name) | |
| os.makedirs(container_dir) | |
| container_client = db_client.get_container_client(container_name) | |
| items = list(container_client.read_all_items()) | |
| with open(os.path.join(container_dir, f"{container_name}.json"), 'w') as f: | |
| json.dump(items, f, indent=2) | |
| archive_name = f"cosmos_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| shutil.make_archive(archive_name, 'zip', base_dir) | |
| return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip") | |
| except Exception as e: | |
| return f"An error occurred while archiving data: {str(e)}" | |
| # New function to convert text to JSON | |
| def text_to_json(text): | |
| lines = text.split('\n') | |
| json_data = {} | |
| for line in lines: | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| json_data[key.strip()] = value.strip() | |
| else: | |
| # If there's no colon, use the whole line as a key with an empty string as value | |
| json_data[line.strip()] = "" | |
| return json_data | |
| # Cosmos DB operations | |
| def create_item(client, db_name, container_name): | |
| st.subheader("Create New Item") | |
| item_id = st.text_input("Item ID") | |
| item_data = st.text_area("Item Data (Text format)", | |
| value="π Cosmos DB and GitHub Integration\nCreate New Item\nItem ID\nItem Data (Text format)\nπΎ Save New Item") | |
| if st.button("πΎ Save New Item"): | |
| try: | |
| container = client.get_database_client(db_name).get_container_client(container_name) | |
| json_data = text_to_json(item_data) | |
| new_item = {"id": item_id, "content": json_data} | |
| container.create_item(body=new_item) | |
| st.success("New item created successfully!") | |
| st.json(new_item) # Display the created item in JSON format | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error creating item: {str(e)}") | |
| def edit_item(client, db_name, container_name, item_id): | |
| container = client.get_database_client(db_name).get_container_client(container_name) | |
| item = container.read_item(item=item_id, partition_key=item_id) | |
| st.subheader(f"Editing Item: {item_id}") | |
| edited_item = {} | |
| for key, value in item.items(): | |
| if key not in ['_rid', '_self', '_etag', '_attachments', '_ts']: | |
| if key == 'content': | |
| # Convert the JSON content back to text for editing | |
| text_content = "\n".join([f"{k}: {v}" for k, v in value.items()]) | |
| edited_text = st.text_area(key, text_content) | |
| edited_item[key] = text_to_json(edited_text) | |
| else: | |
| edited_item[key] = st.text_input(key, value) | |
| if st.button("πΎ Save Changes"): | |
| try: | |
| container.upsert_item(body=edited_item) | |
| st.success("Item updated successfully!") | |
| st.json(edited_item) # Display the updated item in JSON format | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error updating item: {str(e)}") | |
| def delete_item(client, db_name, container_name, item_id): | |
| st.subheader(f"Delete Item: {item_id}") | |
| st.write("Are you sure you want to delete this item?") | |
| if st.button("ποΈ Confirm Delete"): | |
| try: | |
| container = client.get_database_client(db_name).get_container_client(container_name) | |
| container.delete_item(item=item_id, partition_key=item_id) | |
| st.success(f"Item {item_id} deleted successfully!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error deleting item: {str(e)}") | |
| # Main Streamlit app | |
| def main(): | |
| st.set_page_config(layout="wide") | |
| st.title("π Cosmos DB and GitHub Integration") | |
| # Initialize session state | |
| if 'logged_in' not in st.session_state: | |
| st.session_state.logged_in = False | |
| if 'action' not in st.session_state: | |
| st.session_state.action = None | |
| if 'selected_item' not in st.session_state: | |
| st.session_state.selected_item = None | |
| # Login section | |
| if not st.session_state.logged_in: | |
| st.subheader("π Login") | |
| input_key = Key # Get the key from the environment variable | |
| if st.button("π Login"): | |
| if input_key: | |
| st.session_state.primary_key = input_key | |
| st.session_state.logged_in = True | |
| st.rerun() | |
| else: | |
| st.error("Invalid key. Please check your environment variables.") | |
| else: | |
| # Initialize Cosmos DB client | |
| try: | |
| client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
| # Fetch database structure | |
| structure = fetch_db_structure(client) | |
| # Render sidebar and get selections | |
| selected_db, selected_container, selected_item = render_sidebar(client, structure) | |
| # Main area | |
| if st.session_state.action == "create": | |
| create_item(client, selected_db, selected_container) | |
| elif st.session_state.action == "edit": | |
| edit_item(client, selected_db, selected_container, st.session_state.selected_item) | |
| elif st.session_state.action == "delete": | |
| delete_item(client, selected_db, selected_container, st.session_state.selected_item) | |
| else: | |
| st.write("Select an action from the sidebar to get started.") | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code}") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |