MCDA / src /streamlit_app.py
NavyDevilDoc's picture
Update src/streamlit_app.py
360b70c verified
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from mcda_v4 import UtilityCalculator
# Page configuration
st.set_page_config(
page_title="MCDA Utility Calculator",
page_icon="βš–οΈ",
layout="wide"
)
def main():
st.title("βš–οΈ Multi-Criteria Decision Analysis (MCDA) Calculator")
st.markdown("Compare products and alternatives using weighted criteria analysis")
# Add info about the tool (removed Excel reference)
with st.expander("πŸ“– About This Tool"):
st.write("""
**What is MCDA?**
Multi-Criteria Decision Analysis helps you make objective decisions when comparing
products, services, or alternatives across multiple criteria.
**How to use:**
1. Define your evaluation categories (Performance, Cost, Quality, etc.)
2. Add the products/options you want to compare
3. Adjust category weights based on importance
4. View rankings and detailed analysis
**Features:**
β€’ Interactive data entry with real-time editing
β€’ Multiple aggregation methods (Weighted Sum, Geometric Mean, Penalty System)
β€’ Export results as CSV or JSON
β€’ Professional visualizations and analysis
""")
# Go directly to manual interface
manual_interface()
def manual_interface():
"""Handle manual data entry interface."""
st.header("✏️ Manual Data Entry")
# Step 1: Define categories
st.subheader("1. Define Categories")
col1, col2 = st.columns(2)
with col1:
categories_input = st.text_area(
"Enter categories (one per line):",
value="Performance\nCost\nReliability",
height=100,
help="Enter each category on a new line"
)
categories = [cat.strip() for cat in categories_input.split('\n') if cat.strip()]
with col2:
st.write("**Optimization Direction**")
maximize = {}
for cat in categories:
maximize[cat] = st.checkbox(f"Maximize {cat}", value=(cat.lower() != 'cost'))
if len(categories) < 2:
st.warning("⚠️ Please enter at least 2 categories to continue.")
return
# Auto-update products when categories change
if 'previous_categories' not in st.session_state:
st.session_state.previous_categories = categories
if st.session_state.previous_categories != categories:
# Auto-update existing products
if 'products' in st.session_state and st.session_state.products:
st.info("πŸ”„ Categories changed. Auto-updating existing products...")
updated_products = []
for product in st.session_state.products:
updated_product = {'name': product['name']}
# Keep existing category values, add new ones with 0
for cat in categories:
if cat in product:
updated_product[cat] = product[cat]
else:
updated_product[cat] = 0.0 # Default for new categories
updated_products.append(updated_product)
st.session_state.products = updated_products
st.success(f"βœ… Updated {len(updated_products)} products for new categories. New categories set to 0.")
st.session_state.previous_categories = categories
# Step 2: Enter product data
st.subheader("2. Enter Product Data")
# Create calculator
try:
calc = UtilityCalculator(categories, maximize)
except Exception as e:
st.error(f"❌ Error creating calculator: {str(e)}")
return
# Data entry interface
products_data = data_entry_interface(categories)
if products_data:
# Add products to calculator
try:
calc.add_products_batch(products_data)
# Weight adjustment
adjust_weights(calc)
# Results
display_results(calc)
except Exception as e:
st.error(f"❌ Error adding products: {str(e)}")
st.info("Try editing the products to ensure all categories have values.")
def data_entry_interface(categories):
"""Create a data entry interface for products."""
# Initialize session state for products
if 'products' not in st.session_state:
st.session_state.products = []
# Add new product form
with st.expander("βž• Add New Product", expanded=len(st.session_state.products) == 0):
with st.form("add_product"):
col1, col2 = st.columns([1, 2])
with col1:
product_name = st.text_input("Product Name", placeholder="e.g., Product A")
with col2:
scores = {}
cols = st.columns(len(categories))
for i, cat in enumerate(categories):
with cols[i]:
scores[cat] = st.number_input(f"{cat}", value=0.0, step=1.0)
submitted = st.form_submit_button("Add Product")
if submitted and product_name:
# Check if product name already exists
existing_names = [p['name'] for p in st.session_state.products]
if product_name in existing_names:
st.error(f"❌ Product '{product_name}' already exists. Please use a different name.")
else:
new_product = {'name': product_name, **scores}
st.session_state.products.append(new_product)
st.success(f"βœ… Added {product_name}")
st.rerun()
# Display current products with edit/delete options
if st.session_state.products:
st.write("**Current Products:**")
# Convert to DataFrame for display
df = pd.DataFrame(st.session_state.products)
# Display products in an editable table
st.write("*You can edit values directly in the table below:*")
edited_df = st.data_editor(
df,
use_container_width=True,
num_rows="dynamic", # This should allow adding/deleting rows
key="products_editor"
)
# Update session state with edited data
st.session_state.products = edited_df.to_dict('records')
# Individual product management
st.write("**Manage Individual Products:**")
col1, col2, col3 = st.columns([2, 1, 1])
with col1:
# Select product to edit/delete
if st.session_state.products:
product_names = [p['name'] for p in st.session_state.products]
selected_product = st.selectbox(
"Select product to manage:",
options=product_names,
key="product_selector"
)
with col2:
# Edit button
if st.button("✏️ Edit Selected", key="edit_button"):
if 'edit_mode' not in st.session_state:
st.session_state.edit_mode = {}
st.session_state.edit_mode[selected_product] = True
st.rerun()
with col3:
# Delete button
if st.button("πŸ—‘οΈ Delete Selected", key="delete_button", type="secondary"):
st.session_state.products = [p for p in st.session_state.products if p['name'] != selected_product]
st.success(f"βœ… Deleted {selected_product}")
st.rerun()
# Edit mode for selected product
if 'edit_mode' in st.session_state and selected_product in st.session_state.edit_mode:
if st.session_state.edit_mode[selected_product]:
st.write(f"**Editing: {selected_product}**")
# Find the product to edit
product_to_edit = next(p for p in st.session_state.products if p['name'] == selected_product)
with st.form(f"edit_product_{selected_product}"):
col1, col2 = st.columns([1, 2])
with col1:
new_name = st.text_input("Product Name", value=selected_product)
with col2:
new_scores = {}
cols = st.columns(len(categories))
for i, cat in enumerate(categories):
with cols[i]:
current_value = product_to_edit.get(cat, 0.0)
new_scores[cat] = st.number_input(
f"{cat}",
value=float(current_value),
step=1.0,
key=f"edit_{cat}_{selected_product}"
)
col_save, col_cancel = st.columns(2)
with col_save:
save_changes = st.form_submit_button("πŸ’Ύ Save Changes", type="primary")
with col_cancel:
cancel_edit = st.form_submit_button("❌ Cancel")
if save_changes:
# Update the product
for i, product in enumerate(st.session_state.products):
if product['name'] == selected_product:
st.session_state.products[i] = {'name': new_name, **new_scores}
break
# Clear edit mode
st.session_state.edit_mode[selected_product] = False
st.success(f"βœ… Updated product: {new_name}")
st.rerun()
if cancel_edit:
# Clear edit mode
st.session_state.edit_mode[selected_product] = False
st.rerun()
# Bulk operations
if len(st.session_state.products) > 0:
st.write("**Bulk Operations:**")
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ—‘οΈ Clear All Products", type="secondary"):
st.session_state.products = []
if 'edit_mode' in st.session_state:
st.session_state.edit_mode = {}
st.success("βœ… Cleared all products")
st.rerun()
with col2:
# Export current products to JSON for backup
import json
products_json = json.dumps(st.session_state.products, indent=2)
st.download_button(
label="πŸ“₯ Export Products (JSON)",
data=products_json,
file_name="products_backup.json",
mime="application/json"
)
return st.session_state.products
return []
def adjust_weights(calc):
"""Create weight adjustment interface."""
st.subheader("3. Adjust Category Weights")
col1, col2 = st.columns([2, 1])
with col1:
st.write("**Adjust the importance of each category:**")
# Weight sliders
new_weights = {}
for cat in calc.categories:
new_weights[cat] = st.slider(
f"{cat.title()}",
min_value=0.0,
max_value=1.0,
value=calc.weights[cat],
step=0.05,
help=f"Current weight: {calc.weights[cat]:.2f}"
)
# Normalize weights to sum to 1
total_weight = sum(new_weights.values())
if total_weight > 0:
normalized_weights = {cat: weight/total_weight for cat, weight in new_weights.items()}
calc.set_weights(normalized_weights)
with col2:
# Display current weights
st.write("**Current Weights:**")
weight_df = pd.DataFrame({
'Category': calc.categories,
'Weight': [f"{calc.weights[cat]:.2f}" for cat in calc.categories]
})
st.dataframe(weight_df, use_container_width=True)
# Add aggregation method selection
st.write("**Aggregation Method:**")
aggregation_method = st.radio(
"Choose how to combine category scores:",
options=['weighted_sum', 'geometric_mean', 'threshold_penalty'],
format_func=lambda x: {
'weighted_sum': 'Weighted Sum (No Penalty)',
'geometric_mean': 'Geometric Mean (Penalty for Poor Performance)',
'threshold_penalty': 'Threshold/Objective Penalty System'
}[x],
help="Weighted Sum: Full compensation between criteria. Geometric Mean: Penalizes poor performance. Threshold/Objective: Three-zone penalty system with elimination below thresholds."
)
# Update calculator's aggregation method
calc.set_aggregation_method(aggregation_method)
# Add threshold/objective configuration for penalty system
if aggregation_method == 'threshold_penalty':
st.write("**Configure Thresholds and Objectives:**")
st.info("πŸ“ Set minimum acceptable values (thresholds) and target values (objectives) for each category.")
col1, col2 = st.columns(2)
with col1:
st.write("**Thresholds (Minimum Acceptable):**")
thresholds = {}
for cat in calc.categories:
direction = "maximize" if calc.maximize[cat] else "minimize"
thresholds[cat] = st.number_input(
f"{cat.title()} threshold",
value=50.0,
step=1.0,
help=f"Minimum acceptable value for {cat} ({direction}). Below this = elimination."
)
with col2:
st.write("**Objectives (Target Values):**")
objectives = {}
for cat in calc.categories:
direction = "maximize" if calc.maximize[cat] else "minimize"
objectives[cat] = st.number_input(
f"{cat.title()} objective",
value=80.0,
step=1.0,
help=f"Target value for {cat} ({direction}). At/above this = full score."
)
# Validate and apply threshold/objective configuration
try:
calc.set_thresholds(thresholds)
calc.set_objectives(objectives)
# Validate configuration
validation_errors = calc.validate_penalty_configuration()
if validation_errors:
st.error("❌ Configuration Issues:")
for error in validation_errors:
st.error(f"β€’ {error}")
else:
st.success("βœ… Threshold/Objective configuration is valid")
# Show penalty zones explanation
with st.expander("πŸ“– How the Penalty System Works"):
st.write("""
**Three-Zone System for each category:**
πŸ”΄ **Zone 1 - Elimination**: Below threshold β†’ Score = 0
- Products failing to meet minimum requirements are heavily penalized
🟑 **Zone 2 - Penalty**: Between threshold and objective β†’ Linear scale (0-50)
- Graduated penalty that decreases as you approach the objective
🟒 **Zone 3 - Full Reward**: At/above objective β†’ Full normalized score (50-100)
- Products meeting targets compete on standard normalization
**Example**: Reliability (maximize, threshold=80, objective=95)
- Score 70: Gets 0 (below threshold)
- Score 87: Gets ~23 (between threshold/objective)
- Score 98: Gets ~90 (above objective, normalized against other qualified products)
""")
except Exception as e:
st.error(f"❌ Error configuring penalty system: {str(e)}")
# Update calculator's aggregation method
calc.set_aggregation_method(aggregation_method)
def display_results(calc):
"""Display analysis results."""
st.subheader("πŸ“Š Results")
# Display current aggregation method
method_names = {
'weighted_sum': 'Weighted Sum (No Penalty)',
'geometric_mean': 'Geometric Mean (Penalty for Poor Performance)',
'threshold_penalty': 'Threshold/Objective Penalty System'
}
method_name = method_names.get(calc.aggregation_method, calc.aggregation_method)
st.info(f"πŸ”§ Using: **{method_name}**")
# Show penalty configuration if threshold penalty is active
if calc.aggregation_method == 'threshold_penalty' and calc.use_penalties:
with st.expander("🎯 Current Threshold/Objective Settings"):
penalty_config = pd.DataFrame({
'Category': calc.categories,
'Direction': ['Maximize' if calc.maximize[cat] else 'Minimize' for cat in calc.categories],
'Threshold': [calc.thresholds[cat] for cat in calc.categories],
'Objective': [calc.objectives[cat] for cat in calc.categories]
})
st.dataframe(penalty_config, use_container_width=True)
# Get results
rankings = calc.rank_products()
results_df = calc.get_results_df()
# Create tabs for different views
tab1, tab2, tab3 = st.tabs(["πŸ† Rankings", "πŸ“‹ Detailed Results", "πŸ“ˆ Visualizations"])
with tab1:
st.write("**Product Rankings:**")
# Fix: Create medals list with correct length
num_products = len(rankings)
medals = ["πŸ₯‡", "πŸ₯ˆ", "πŸ₯‰"] + [""] * max(0, num_products - 3)
medals = medals[:num_products] # Trim to exact length needed
ranking_df = pd.DataFrame({
'Rank': range(1, num_products + 1),
'Medal': medals,
'Product': [name for name, _ in rankings],
'Utility Score': [f"{score:.1f}" for _, score in rankings]
})
st.dataframe(ranking_df, use_container_width=True, hide_index=True)
with tab2:
st.write("**Detailed Analysis:**")
st.dataframe(results_df, use_container_width=True)
# Download button
csv = results_df.to_csv(index=False)
st.download_button(
label="πŸ“₯ Download Results as CSV",
data=csv,
file_name="mcda_results.csv",
mime="text/csv"
)
with tab3:
if len(rankings) > 1:
# Bar chart of utility scores
fig_bar = px.bar(
x=[name for name, _ in rankings],
y=[score for _, score in rankings],
title="Utility Scores by Product",
labels={'x': 'Product', 'y': 'Utility Score'}
)
fig_bar.update_layout(showlegend=False)
st.plotly_chart(fig_bar, use_container_width=True)
# Radar chart for top 3 products
if len(rankings) >= 2:
st.write("**Category Comparison (Top Products):**")
top_products = [name for name, _ in rankings[:3]]
fig_radar = go.Figure()
normalized = calc.normalize_scores()
for product in top_products[:3]: # Limit to top 3 for clarity
values = [normalized[product][cat] for cat in calc.categories]
fig_radar.add_trace(go.Scatterpolar(
r=values,
theta=calc.categories,
fill='toself',
name=product
))
fig_radar.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, 100]
)),
showlegend=True,
title="Normalized Scores by Category"
)
# Penalty zone visualization for threshold_penalty method
if calc.aggregation_method == 'threshold_penalty' and calc.use_penalties:
st.write("**Penalty Zone Analysis:**")
# Create penalty zone visualization
penalty_fig = go.Figure()
for i, cat in enumerate(calc.categories):
threshold = calc.thresholds[cat]
objective = calc.objectives[cat]
# Get all product scores for this category
product_scores = [(name, calc.products[name][cat]) for name in calc.products]
product_scores.sort(key=lambda x: x[1])
# Create traces for penalty zones
y_pos = [i] * len(product_scores)
scores = [score for _, score in product_scores]
names = [name for name, _ in product_scores]
# Zone colors based on scores
colors = []
for _, score in product_scores:
if calc.maximize[cat]:
if score < threshold:
colors.append('red') # Below threshold
elif score < objective:
colors.append('orange') # Between threshold and objective
else:
colors.append('green') # Above objective
else:
if score > threshold:
colors.append('red') # Above threshold (bad for minimize)
elif score > objective:
colors.append('orange') # Between objective and threshold
else:
colors.append('green') # Below objective (good for minimize)
# Add scatter points for products
penalty_fig.add_trace(go.Scatter(
x=scores,
y=y_pos,
mode='markers',
marker=dict(size=12, color=colors),
text=names,
name=f'{cat} scores',
showlegend=False
))
# Add threshold and objective lines
penalty_fig.add_vline(x=threshold, line=dict(color='red', dash='dash'),
annotation_text=f'{cat} threshold')
penalty_fig.add_vline(x=objective, line=dict(color='green', dash='dash'),
annotation_text=f'{cat} objective')
penalty_fig.update_layout(
title="Product Scores vs Thresholds/Objectives",
xaxis_title="Score Value",
yaxis=dict(
tickmode='array',
tickvals=list(range(len(calc.categories))),
ticktext=calc.categories
),
height=max(300, len(calc.categories) * 60)
)
# Legend explanation
st.write("πŸ”΄ Red: Below threshold (eliminated) | 🟠 Orange: Between threshold/objective (penalized) | 🟒 Green: Above objective (full score)")
# MOVE THIS LINE INSIDE THE CONDITIONAL BLOCK
st.plotly_chart(penalty_fig, use_container_width=True)
# ADD THIS LINE FOR THE RADAR CHART (outside the penalty block)
st.plotly_chart(fig_radar, use_container_width=True)
if __name__ == "__main__":
main()