Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from mcda_v4 import UtilityCalculator | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="MCDA Utility Calculator", | |
| page_icon="βοΈ", | |
| layout="wide" | |
| ) | |
| def main(): | |
| st.title("βοΈ Multi-Criteria Decision Analysis (MCDA) Calculator") | |
| st.markdown("Compare products and alternatives using weighted criteria analysis") | |
| # Add info about the tool (removed Excel reference) | |
| with st.expander("π About This Tool"): | |
| st.write(""" | |
| **What is MCDA?** | |
| Multi-Criteria Decision Analysis helps you make objective decisions when comparing | |
| products, services, or alternatives across multiple criteria. | |
| **How to use:** | |
| 1. Define your evaluation categories (Performance, Cost, Quality, etc.) | |
| 2. Add the products/options you want to compare | |
| 3. Adjust category weights based on importance | |
| 4. View rankings and detailed analysis | |
| **Features:** | |
| β’ Interactive data entry with real-time editing | |
| β’ Multiple aggregation methods (Weighted Sum, Geometric Mean, Penalty System) | |
| β’ Export results as CSV or JSON | |
| β’ Professional visualizations and analysis | |
| """) | |
| # Go directly to manual interface | |
| manual_interface() | |
| def manual_interface(): | |
| """Handle manual data entry interface.""" | |
| st.header("βοΈ Manual Data Entry") | |
| # Step 1: Define categories | |
| st.subheader("1. Define Categories") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| categories_input = st.text_area( | |
| "Enter categories (one per line):", | |
| value="Performance\nCost\nReliability", | |
| height=100, | |
| help="Enter each category on a new line" | |
| ) | |
| categories = [cat.strip() for cat in categories_input.split('\n') if cat.strip()] | |
| with col2: | |
| st.write("**Optimization Direction**") | |
| maximize = {} | |
| for cat in categories: | |
| maximize[cat] = st.checkbox(f"Maximize {cat}", value=(cat.lower() != 'cost')) | |
| if len(categories) < 2: | |
| st.warning("β οΈ Please enter at least 2 categories to continue.") | |
| return | |
| # Auto-update products when categories change | |
| if 'previous_categories' not in st.session_state: | |
| st.session_state.previous_categories = categories | |
| if st.session_state.previous_categories != categories: | |
| # Auto-update existing products | |
| if 'products' in st.session_state and st.session_state.products: | |
| st.info("π Categories changed. Auto-updating existing products...") | |
| updated_products = [] | |
| for product in st.session_state.products: | |
| updated_product = {'name': product['name']} | |
| # Keep existing category values, add new ones with 0 | |
| for cat in categories: | |
| if cat in product: | |
| updated_product[cat] = product[cat] | |
| else: | |
| updated_product[cat] = 0.0 # Default for new categories | |
| updated_products.append(updated_product) | |
| st.session_state.products = updated_products | |
| st.success(f"β Updated {len(updated_products)} products for new categories. New categories set to 0.") | |
| st.session_state.previous_categories = categories | |
| # Step 2: Enter product data | |
| st.subheader("2. Enter Product Data") | |
| # Create calculator | |
| try: | |
| calc = UtilityCalculator(categories, maximize) | |
| except Exception as e: | |
| st.error(f"β Error creating calculator: {str(e)}") | |
| return | |
| # Data entry interface | |
| products_data = data_entry_interface(categories) | |
| if products_data: | |
| # Add products to calculator | |
| try: | |
| calc.add_products_batch(products_data) | |
| # Weight adjustment | |
| adjust_weights(calc) | |
| # Results | |
| display_results(calc) | |
| except Exception as e: | |
| st.error(f"β Error adding products: {str(e)}") | |
| st.info("Try editing the products to ensure all categories have values.") | |
| def data_entry_interface(categories): | |
| """Create a data entry interface for products.""" | |
| # Initialize session state for products | |
| if 'products' not in st.session_state: | |
| st.session_state.products = [] | |
| # Add new product form | |
| with st.expander("β Add New Product", expanded=len(st.session_state.products) == 0): | |
| with st.form("add_product"): | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| product_name = st.text_input("Product Name", placeholder="e.g., Product A") | |
| with col2: | |
| scores = {} | |
| cols = st.columns(len(categories)) | |
| for i, cat in enumerate(categories): | |
| with cols[i]: | |
| scores[cat] = st.number_input(f"{cat}", value=0.0, step=1.0) | |
| submitted = st.form_submit_button("Add Product") | |
| if submitted and product_name: | |
| # Check if product name already exists | |
| existing_names = [p['name'] for p in st.session_state.products] | |
| if product_name in existing_names: | |
| st.error(f"β Product '{product_name}' already exists. Please use a different name.") | |
| else: | |
| new_product = {'name': product_name, **scores} | |
| st.session_state.products.append(new_product) | |
| st.success(f"β Added {product_name}") | |
| st.rerun() | |
| # Display current products with edit/delete options | |
| if st.session_state.products: | |
| st.write("**Current Products:**") | |
| # Convert to DataFrame for display | |
| df = pd.DataFrame(st.session_state.products) | |
| # Display products in an editable table | |
| st.write("*You can edit values directly in the table below:*") | |
| edited_df = st.data_editor( | |
| df, | |
| use_container_width=True, | |
| num_rows="dynamic", # This should allow adding/deleting rows | |
| key="products_editor" | |
| ) | |
| # Update session state with edited data | |
| st.session_state.products = edited_df.to_dict('records') | |
| # Individual product management | |
| st.write("**Manage Individual Products:**") | |
| col1, col2, col3 = st.columns([2, 1, 1]) | |
| with col1: | |
| # Select product to edit/delete | |
| if st.session_state.products: | |
| product_names = [p['name'] for p in st.session_state.products] | |
| selected_product = st.selectbox( | |
| "Select product to manage:", | |
| options=product_names, | |
| key="product_selector" | |
| ) | |
| with col2: | |
| # Edit button | |
| if st.button("βοΈ Edit Selected", key="edit_button"): | |
| if 'edit_mode' not in st.session_state: | |
| st.session_state.edit_mode = {} | |
| st.session_state.edit_mode[selected_product] = True | |
| st.rerun() | |
| with col3: | |
| # Delete button | |
| if st.button("ποΈ Delete Selected", key="delete_button", type="secondary"): | |
| st.session_state.products = [p for p in st.session_state.products if p['name'] != selected_product] | |
| st.success(f"β Deleted {selected_product}") | |
| st.rerun() | |
| # Edit mode for selected product | |
| if 'edit_mode' in st.session_state and selected_product in st.session_state.edit_mode: | |
| if st.session_state.edit_mode[selected_product]: | |
| st.write(f"**Editing: {selected_product}**") | |
| # Find the product to edit | |
| product_to_edit = next(p for p in st.session_state.products if p['name'] == selected_product) | |
| with st.form(f"edit_product_{selected_product}"): | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| new_name = st.text_input("Product Name", value=selected_product) | |
| with col2: | |
| new_scores = {} | |
| cols = st.columns(len(categories)) | |
| for i, cat in enumerate(categories): | |
| with cols[i]: | |
| current_value = product_to_edit.get(cat, 0.0) | |
| new_scores[cat] = st.number_input( | |
| f"{cat}", | |
| value=float(current_value), | |
| step=1.0, | |
| key=f"edit_{cat}_{selected_product}" | |
| ) | |
| col_save, col_cancel = st.columns(2) | |
| with col_save: | |
| save_changes = st.form_submit_button("πΎ Save Changes", type="primary") | |
| with col_cancel: | |
| cancel_edit = st.form_submit_button("β Cancel") | |
| if save_changes: | |
| # Update the product | |
| for i, product in enumerate(st.session_state.products): | |
| if product['name'] == selected_product: | |
| st.session_state.products[i] = {'name': new_name, **new_scores} | |
| break | |
| # Clear edit mode | |
| st.session_state.edit_mode[selected_product] = False | |
| st.success(f"β Updated product: {new_name}") | |
| st.rerun() | |
| if cancel_edit: | |
| # Clear edit mode | |
| st.session_state.edit_mode[selected_product] = False | |
| st.rerun() | |
| # Bulk operations | |
| if len(st.session_state.products) > 0: | |
| st.write("**Bulk Operations:**") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("ποΈ Clear All Products", type="secondary"): | |
| st.session_state.products = [] | |
| if 'edit_mode' in st.session_state: | |
| st.session_state.edit_mode = {} | |
| st.success("β Cleared all products") | |
| st.rerun() | |
| with col2: | |
| # Export current products to JSON for backup | |
| import json | |
| products_json = json.dumps(st.session_state.products, indent=2) | |
| st.download_button( | |
| label="π₯ Export Products (JSON)", | |
| data=products_json, | |
| file_name="products_backup.json", | |
| mime="application/json" | |
| ) | |
| return st.session_state.products | |
| return [] | |
| def adjust_weights(calc): | |
| """Create weight adjustment interface.""" | |
| st.subheader("3. Adjust Category Weights") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.write("**Adjust the importance of each category:**") | |
| # Weight sliders | |
| new_weights = {} | |
| for cat in calc.categories: | |
| new_weights[cat] = st.slider( | |
| f"{cat.title()}", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=calc.weights[cat], | |
| step=0.05, | |
| help=f"Current weight: {calc.weights[cat]:.2f}" | |
| ) | |
| # Normalize weights to sum to 1 | |
| total_weight = sum(new_weights.values()) | |
| if total_weight > 0: | |
| normalized_weights = {cat: weight/total_weight for cat, weight in new_weights.items()} | |
| calc.set_weights(normalized_weights) | |
| with col2: | |
| # Display current weights | |
| st.write("**Current Weights:**") | |
| weight_df = pd.DataFrame({ | |
| 'Category': calc.categories, | |
| 'Weight': [f"{calc.weights[cat]:.2f}" for cat in calc.categories] | |
| }) | |
| st.dataframe(weight_df, use_container_width=True) | |
| # Add aggregation method selection | |
| st.write("**Aggregation Method:**") | |
| aggregation_method = st.radio( | |
| "Choose how to combine category scores:", | |
| options=['weighted_sum', 'geometric_mean', 'threshold_penalty'], | |
| format_func=lambda x: { | |
| 'weighted_sum': 'Weighted Sum (No Penalty)', | |
| 'geometric_mean': 'Geometric Mean (Penalty for Poor Performance)', | |
| 'threshold_penalty': 'Threshold/Objective Penalty System' | |
| }[x], | |
| help="Weighted Sum: Full compensation between criteria. Geometric Mean: Penalizes poor performance. Threshold/Objective: Three-zone penalty system with elimination below thresholds." | |
| ) | |
| # Update calculator's aggregation method | |
| calc.set_aggregation_method(aggregation_method) | |
| # Add threshold/objective configuration for penalty system | |
| if aggregation_method == 'threshold_penalty': | |
| st.write("**Configure Thresholds and Objectives:**") | |
| st.info("π Set minimum acceptable values (thresholds) and target values (objectives) for each category.") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Thresholds (Minimum Acceptable):**") | |
| thresholds = {} | |
| for cat in calc.categories: | |
| direction = "maximize" if calc.maximize[cat] else "minimize" | |
| thresholds[cat] = st.number_input( | |
| f"{cat.title()} threshold", | |
| value=50.0, | |
| step=1.0, | |
| help=f"Minimum acceptable value for {cat} ({direction}). Below this = elimination." | |
| ) | |
| with col2: | |
| st.write("**Objectives (Target Values):**") | |
| objectives = {} | |
| for cat in calc.categories: | |
| direction = "maximize" if calc.maximize[cat] else "minimize" | |
| objectives[cat] = st.number_input( | |
| f"{cat.title()} objective", | |
| value=80.0, | |
| step=1.0, | |
| help=f"Target value for {cat} ({direction}). At/above this = full score." | |
| ) | |
| # Validate and apply threshold/objective configuration | |
| try: | |
| calc.set_thresholds(thresholds) | |
| calc.set_objectives(objectives) | |
| # Validate configuration | |
| validation_errors = calc.validate_penalty_configuration() | |
| if validation_errors: | |
| st.error("β Configuration Issues:") | |
| for error in validation_errors: | |
| st.error(f"β’ {error}") | |
| else: | |
| st.success("β Threshold/Objective configuration is valid") | |
| # Show penalty zones explanation | |
| with st.expander("π How the Penalty System Works"): | |
| st.write(""" | |
| **Three-Zone System for each category:** | |
| π΄ **Zone 1 - Elimination**: Below threshold β Score = 0 | |
| - Products failing to meet minimum requirements are heavily penalized | |
| π‘ **Zone 2 - Penalty**: Between threshold and objective β Linear scale (0-50) | |
| - Graduated penalty that decreases as you approach the objective | |
| π’ **Zone 3 - Full Reward**: At/above objective β Full normalized score (50-100) | |
| - Products meeting targets compete on standard normalization | |
| **Example**: Reliability (maximize, threshold=80, objective=95) | |
| - Score 70: Gets 0 (below threshold) | |
| - Score 87: Gets ~23 (between threshold/objective) | |
| - Score 98: Gets ~90 (above objective, normalized against other qualified products) | |
| """) | |
| except Exception as e: | |
| st.error(f"β Error configuring penalty system: {str(e)}") | |
| # Update calculator's aggregation method | |
| calc.set_aggregation_method(aggregation_method) | |
| def display_results(calc): | |
| """Display analysis results.""" | |
| st.subheader("π Results") | |
| # Display current aggregation method | |
| method_names = { | |
| 'weighted_sum': 'Weighted Sum (No Penalty)', | |
| 'geometric_mean': 'Geometric Mean (Penalty for Poor Performance)', | |
| 'threshold_penalty': 'Threshold/Objective Penalty System' | |
| } | |
| method_name = method_names.get(calc.aggregation_method, calc.aggregation_method) | |
| st.info(f"π§ Using: **{method_name}**") | |
| # Show penalty configuration if threshold penalty is active | |
| if calc.aggregation_method == 'threshold_penalty' and calc.use_penalties: | |
| with st.expander("π― Current Threshold/Objective Settings"): | |
| penalty_config = pd.DataFrame({ | |
| 'Category': calc.categories, | |
| 'Direction': ['Maximize' if calc.maximize[cat] else 'Minimize' for cat in calc.categories], | |
| 'Threshold': [calc.thresholds[cat] for cat in calc.categories], | |
| 'Objective': [calc.objectives[cat] for cat in calc.categories] | |
| }) | |
| st.dataframe(penalty_config, use_container_width=True) | |
| # Get results | |
| rankings = calc.rank_products() | |
| results_df = calc.get_results_df() | |
| # Create tabs for different views | |
| tab1, tab2, tab3 = st.tabs(["π Rankings", "π Detailed Results", "π Visualizations"]) | |
| with tab1: | |
| st.write("**Product Rankings:**") | |
| # Fix: Create medals list with correct length | |
| num_products = len(rankings) | |
| medals = ["π₯", "π₯", "π₯"] + [""] * max(0, num_products - 3) | |
| medals = medals[:num_products] # Trim to exact length needed | |
| ranking_df = pd.DataFrame({ | |
| 'Rank': range(1, num_products + 1), | |
| 'Medal': medals, | |
| 'Product': [name for name, _ in rankings], | |
| 'Utility Score': [f"{score:.1f}" for _, score in rankings] | |
| }) | |
| st.dataframe(ranking_df, use_container_width=True, hide_index=True) | |
| with tab2: | |
| st.write("**Detailed Analysis:**") | |
| st.dataframe(results_df, use_container_width=True) | |
| # Download button | |
| csv = results_df.to_csv(index=False) | |
| st.download_button( | |
| label="π₯ Download Results as CSV", | |
| data=csv, | |
| file_name="mcda_results.csv", | |
| mime="text/csv" | |
| ) | |
| with tab3: | |
| if len(rankings) > 1: | |
| # Bar chart of utility scores | |
| fig_bar = px.bar( | |
| x=[name for name, _ in rankings], | |
| y=[score for _, score in rankings], | |
| title="Utility Scores by Product", | |
| labels={'x': 'Product', 'y': 'Utility Score'} | |
| ) | |
| fig_bar.update_layout(showlegend=False) | |
| st.plotly_chart(fig_bar, use_container_width=True) | |
| # Radar chart for top 3 products | |
| if len(rankings) >= 2: | |
| st.write("**Category Comparison (Top Products):**") | |
| top_products = [name for name, _ in rankings[:3]] | |
| fig_radar = go.Figure() | |
| normalized = calc.normalize_scores() | |
| for product in top_products[:3]: # Limit to top 3 for clarity | |
| values = [normalized[product][cat] for cat in calc.categories] | |
| fig_radar.add_trace(go.Scatterpolar( | |
| r=values, | |
| theta=calc.categories, | |
| fill='toself', | |
| name=product | |
| )) | |
| fig_radar.update_layout( | |
| polar=dict( | |
| radialaxis=dict( | |
| visible=True, | |
| range=[0, 100] | |
| )), | |
| showlegend=True, | |
| title="Normalized Scores by Category" | |
| ) | |
| # Penalty zone visualization for threshold_penalty method | |
| if calc.aggregation_method == 'threshold_penalty' and calc.use_penalties: | |
| st.write("**Penalty Zone Analysis:**") | |
| # Create penalty zone visualization | |
| penalty_fig = go.Figure() | |
| for i, cat in enumerate(calc.categories): | |
| threshold = calc.thresholds[cat] | |
| objective = calc.objectives[cat] | |
| # Get all product scores for this category | |
| product_scores = [(name, calc.products[name][cat]) for name in calc.products] | |
| product_scores.sort(key=lambda x: x[1]) | |
| # Create traces for penalty zones | |
| y_pos = [i] * len(product_scores) | |
| scores = [score for _, score in product_scores] | |
| names = [name for name, _ in product_scores] | |
| # Zone colors based on scores | |
| colors = [] | |
| for _, score in product_scores: | |
| if calc.maximize[cat]: | |
| if score < threshold: | |
| colors.append('red') # Below threshold | |
| elif score < objective: | |
| colors.append('orange') # Between threshold and objective | |
| else: | |
| colors.append('green') # Above objective | |
| else: | |
| if score > threshold: | |
| colors.append('red') # Above threshold (bad for minimize) | |
| elif score > objective: | |
| colors.append('orange') # Between objective and threshold | |
| else: | |
| colors.append('green') # Below objective (good for minimize) | |
| # Add scatter points for products | |
| penalty_fig.add_trace(go.Scatter( | |
| x=scores, | |
| y=y_pos, | |
| mode='markers', | |
| marker=dict(size=12, color=colors), | |
| text=names, | |
| name=f'{cat} scores', | |
| showlegend=False | |
| )) | |
| # Add threshold and objective lines | |
| penalty_fig.add_vline(x=threshold, line=dict(color='red', dash='dash'), | |
| annotation_text=f'{cat} threshold') | |
| penalty_fig.add_vline(x=objective, line=dict(color='green', dash='dash'), | |
| annotation_text=f'{cat} objective') | |
| penalty_fig.update_layout( | |
| title="Product Scores vs Thresholds/Objectives", | |
| xaxis_title="Score Value", | |
| yaxis=dict( | |
| tickmode='array', | |
| tickvals=list(range(len(calc.categories))), | |
| ticktext=calc.categories | |
| ), | |
| height=max(300, len(calc.categories) * 60) | |
| ) | |
| # Legend explanation | |
| st.write("π΄ Red: Below threshold (eliminated) | π Orange: Between threshold/objective (penalized) | π’ Green: Above objective (full score)") | |
| # MOVE THIS LINE INSIDE THE CONDITIONAL BLOCK | |
| st.plotly_chart(penalty_fig, use_container_width=True) | |
| # ADD THIS LINE FOR THE RADAR CHART (outside the penalty block) | |
| st.plotly_chart(fig_radar, use_container_width=True) | |
| if __name__ == "__main__": | |
| main() |