|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import random |
|
|
import copy |
|
|
import optuna |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
from skopt import gp_minimize |
|
|
from skopt.space import Real, Categorical |
|
|
from skopt.utils import use_named_args |
|
|
from statsmodels.tsa.arima.model import ARIMA |
|
|
import numpy_financial as npf |
|
|
|
|
|
optuna.logging.set_verbosity(optuna.logging.WARNING) |
|
|
|
|
|
PROJECT_YEARS = 15 |
|
|
BASE_CAPACITY_KTA = 300 |
|
|
INFLATION_RATE = 0.015 |
|
|
TAX_RATE = 0.10 |
|
|
DEPRECIATION_YEARS = 15 |
|
|
|
|
|
TECHNOLOGY_DATA = { |
|
|
"JNC": {"capex_base_M": 180.0, "opex_base_cents_kg": 150.0}, |
|
|
"Hoechst_AG": {"capex_base_M": 230.0, "opex_base_cents_kg": 155.0}, |
|
|
"BF_Goodrich": {"capex_base_M": 280.0, "opex_base_cents_kg": 160.0}, |
|
|
"Shin_Etsu_1991": {"capex_base_M": 260.0, "opex_base_cents_kg": 155.0}, |
|
|
"Shin_Etsu_2004": {"capex_base_M": 1500.0, "opex_base_cents_kg": 150.0}, |
|
|
"Vinnolit": {"capex_base_M": 240.0, "opex_base_cents_kg": 155.0}, |
|
|
"QVC_Qatar": {"capex_base_M": 200.0, "opex_base_cents_kg": 145.0}, |
|
|
"SP_Chemicals": {"capex_base_M": 250.0, "opex_base_cents_kg": 145.0}, |
|
|
"Engro_Pakistan": {"capex_base_M": 1400.0, "opex_base_cents_kg": 155.0}, |
|
|
"Formosa_BR_USA": {"capex_base_M": 380.0, "opex_base_cents_kg": 150.0}, |
|
|
"Shintech_USA_Exp": {"capex_base_M": 1700.0, "opex_base_cents_kg": 160.0}, |
|
|
"Zhongtai_China": {"capex_base_M": 1100.0, "opex_base_cents_kg": 155.0}, |
|
|
"Shintech_USA_2021": {"capex_base_M": 1700.0, "opex_base_cents_kg": 160.0}, |
|
|
"Reliance_India_2024": {"capex_base_M": 2200.0, "opex_base_cents_kg": 155.0}, |
|
|
"Orbia_Germany_2023": {"capex_base_M": 180.0, "opex_base_cents_kg": 155.0}, |
|
|
"Westlake_USA_2022": {"capex_base_M": 850.0, "opex_base_cents_kg": 160.0} |
|
|
} |
|
|
|
|
|
STRATEGY_DATA = { |
|
|
'Integrated_Production': {'sourcing_cost_per_ton_pvc': 450.0, 'byproducts': {'caustic_soda_ton': 1.1, 'surplus_edc_ton': 0.523}}, |
|
|
'Purchase_VCM': {'sourcing_cost_per_ton_pvc': 650.0, 'byproducts': {'caustic_soda_ton': 0, 'surplus_edc_ton': 0}} |
|
|
} |
|
|
|
|
|
PRODUCT_PRICES_USD_PER_TON = { |
|
|
'pvc_s65_export': 1100, 'pvc_s70_domestic': 950, |
|
|
'byproduct_caustic_soda': 450, 'byproduct_surplus_edc': 170 |
|
|
} |
|
|
|
|
|
OPTIMIZATION_SPACE = { |
|
|
'capacity_kta': (500, 600), |
|
|
'technology': ["Engro_Pakistan", "Shin_Etsu_2004"], |
|
|
'sourcing_strategy': ['Integrated_Production'], |
|
|
'export_market_mix': (0.6, 0.8), |
|
|
'sell_byproducts': [True] |
|
|
} |
|
|
|
|
|
def forecast_prices(base_price, years=PROJECT_YEARS, cagr=0.04): |
|
|
historical = [base_price * (1 + cagr + random.uniform(-0.03, 0.03)) ** i for i in range(-5, 0)] |
|
|
model = ARIMA(historical, order=(1, 1, 0)) |
|
|
fit = model.fit() |
|
|
forecast = fit.forecast(steps=years) |
|
|
return [p * (1 + INFLATION_RATE) ** i for i, p in enumerate(forecast)] |
|
|
|
|
|
def calculate_project_kpis(**params): |
|
|
tech_data = TECHNOLOGY_DATA[params['technology']] |
|
|
scaling_factor = (params['capacity_kta'] / BASE_CAPACITY_KTA) ** 0.65 |
|
|
total_capex = tech_data['capex_base_M'] * scaling_factor * 1_000_000 |
|
|
capacity_tons = params['capacity_kta'] * 1000 |
|
|
|
|
|
price_s65_forecast = forecast_prices(PRODUCT_PRICES_USD_PER_TON['pvc_s65_export']) |
|
|
price_s70_forecast = forecast_prices(PRODUCT_PRICES_USD_PER_TON['pvc_s70_domestic']) |
|
|
byproduct_caustic_forecast = forecast_prices(PRODUCT_PRICES_USD_PER_TON['byproduct_caustic_soda']) |
|
|
byproduct_edc_forecast = forecast_prices(PRODUCT_PRICES_USD_PER_TON['byproduct_surplus_edc']) |
|
|
|
|
|
cash_flows = [-total_capex] |
|
|
depreciation_annual = total_capex / DEPRECIATION_YEARS |
|
|
|
|
|
for year in range(1, PROJECT_YEARS + 1): |
|
|
infl_factor = (1 + INFLATION_RATE) ** (year - 1) |
|
|
price_s65 = price_s65_forecast[year - 1] |
|
|
price_s70 = price_s70_forecast[year - 1] * 0.95 if params['capacity_kta'] >= 550 else price_s70_forecast[year - 1] |
|
|
revenue_export = (capacity_tons * params['export_market_mix']) * price_s65 |
|
|
revenue_domestic = (capacity_tons * (1 - params['export_market_mix'])) * price_s70 |
|
|
pvc_revenue = revenue_export + revenue_domestic |
|
|
|
|
|
byproduct_revenue = 0 |
|
|
if params['sourcing_strategy'] == 'Integrated_Production' and params['sell_byproducts']: |
|
|
byproducts = STRATEGY_DATA['Integrated_Production']['byproducts'] |
|
|
byproduct_revenue += (byproducts['caustic_soda_ton'] * capacity_tons * byproduct_caustic_forecast[year - 1]) |
|
|
byproduct_revenue += (byproducts['surplus_edc_ton'] * capacity_tons * byproduct_edc_forecast[year - 1]) |
|
|
|
|
|
total_revenue = pvc_revenue + byproduct_revenue |
|
|
|
|
|
opex_sourcing = STRATEGY_DATA['Integrated_Production']['sourcing_cost_per_ton_pvc'] * capacity_tons * infl_factor |
|
|
opex_base = (tech_data['opex_base_cents_kg'] / 100) * capacity_tons * infl_factor |
|
|
total_opex = opex_sourcing + opex_base |
|
|
|
|
|
ebitda = total_revenue - total_opex |
|
|
taxable_income = ebitda - depreciation_annual |
|
|
taxes = max(taxable_income * TAX_RATE, 0) |
|
|
net_income = taxable_income - taxes |
|
|
free_cash_flow = net_income + depreciation_annual |
|
|
cash_flows.append(free_cash_flow) |
|
|
|
|
|
irr = npf.irr(cash_flows) * 100 if npf.irr(cash_flows) > 0 else -1.0 |
|
|
cumulative_cash_flow = np.cumsum(cash_flows) |
|
|
payback_period_years = np.where(cumulative_cash_flow > 0)[0] |
|
|
payback_period = payback_period_years[0] + 1 if len(payback_period_years) > 0 else float('inf') |
|
|
annual_profit = np.mean([cf for cf in cash_flows[1:] if cf > 0]) |
|
|
|
|
|
return {"irr": irr, "annual_profit": annual_profit, "total_capex": total_capex, "payback_period": payback_period} |
|
|
|
|
|
def run_optimizations_without_ml(): |
|
|
print("\n--- Running Optimization Algorithms ---") |
|
|
results = [] |
|
|
results.append(run_bayesian_optimization()) |
|
|
results.append(run_genetic_algorithm()) |
|
|
results.append(run_optuna_direct()) |
|
|
return results |
|
|
|
|
|
def run_genetic_algorithm(): |
|
|
print("Running Genetic Algorithm (GA)...") |
|
|
population = [{k: random.choice(v) if isinstance(v, list) else random.uniform(*v) for k,v in OPTIMIZATION_SPACE.items()} for _ in range(40)] |
|
|
best_overall_individual = None |
|
|
best_overall_fitness = -float('inf') |
|
|
for _ in range(80): |
|
|
fitnesses = [calculate_project_kpis(**ind)['irr'] for ind in population] |
|
|
if max(fitnesses) > best_overall_fitness: |
|
|
best_overall_fitness = max(fitnesses) |
|
|
best_overall_individual = population[np.argmax(fitnesses)] |
|
|
selected = [max(random.sample(list(zip(population, fitnesses)), 5), key=lambda i: i[1])[0] for _ in range(50)] |
|
|
next_gen = [] |
|
|
for i in range(0, 50, 2): |
|
|
p1, p2 = selected[i], selected[i+1] |
|
|
c1, c2 = copy.deepcopy(p1), copy.deepcopy(p2) |
|
|
if random.random() < 0.9: c1['technology'], c2['technology'] = p2['technology'], c1['technology'] |
|
|
if random.random() < 0.2: c1['export_market_mix'] = random.uniform(*OPTIMIZATION_SPACE['export_market_mix']) |
|
|
next_gen.extend([c1, c2]) |
|
|
population = next_gen |
|
|
kpis = calculate_project_kpis(**best_overall_individual) |
|
|
return {"Method": "Genetic Algorithm", **kpis, "Params": best_overall_individual} |
|
|
|
|
|
def run_bayesian_optimization(): |
|
|
print("Running Bayesian Optimization...") |
|
|
skopt_space = [ |
|
|
Real(OPTIMIZATION_SPACE['capacity_kta'][0], OPTIMIZATION_SPACE['capacity_kta'][1], name='capacity_kta'), |
|
|
Categorical(OPTIMIZATION_SPACE['technology'], name='technology'), |
|
|
Categorical(OPTIMIZATION_SPACE['sourcing_strategy'], name='sourcing_strategy'), |
|
|
Real(OPTIMIZATION_SPACE['export_market_mix'][0], OPTIMIZATION_SPACE['export_market_mix'][1], name='export_market_mix'), |
|
|
Categorical(OPTIMIZATION_SPACE['sell_byproducts'], name='sell_byproducts') |
|
|
] |
|
|
@use_named_args(skopt_space) |
|
|
def objective(**params): |
|
|
return -calculate_project_kpis(**params)['irr'] |
|
|
res = gp_minimize(objective, skopt_space, n_calls=90, random_state=42, n_initial_points=20) |
|
|
best_params = {space.name: val for space, val in zip(skopt_space, res.x)} |
|
|
kpis = calculate_project_kpis(**best_params) |
|
|
return {"Method": "Bayesian Opt", **kpis, "Params": best_params} |
|
|
|
|
|
def run_optuna_direct(): |
|
|
print("Running Optuna (TPE)...") |
|
|
def objective(trial): |
|
|
params = { |
|
|
"capacity_kta": trial.suggest_float("capacity_kta", *OPTIMIZATION_SPACE['capacity_kta']), |
|
|
"technology": trial.suggest_categorical("technology", OPTIMIZATION_SPACE['technology']), |
|
|
"sourcing_strategy": trial.suggest_categorical("sourcing_strategy", OPTIMIZATION_SPACE['sourcing_strategy']), |
|
|
"export_market_mix": trial.suggest_float("export_market_mix", *OPTIMIZATION_SPACE['export_market_mix']), |
|
|
"sell_byproducts": trial.suggest_categorical("sell_byproducts", OPTIMIZATION_SPACE['sell_byproducts']) |
|
|
} |
|
|
return calculate_project_kpis(**params)['irr'] |
|
|
study = optuna.create_study(direction="maximize") |
|
|
study.optimize(objective, n_trials=150, n_jobs=-1) |
|
|
kpis = calculate_project_kpis(**study.best_params) |
|
|
return {"Method": "Optuna (TPE - Direct)", **kpis, "Params": study.best_params} |
|
|
|
|
|
def display_and_save_results(df_results): |
|
|
print("\n--- Final Results and Comparison ---") |
|
|
df_display = pd.DataFrame() |
|
|
df_display['Method'] = df_results['Method'] |
|
|
df_display['Optimal IRR (%)'] = df_results['irr'].map('{:,.2f}%'.format) |
|
|
df_display['Annual Profit ($M)'] = (df_results['annual_profit'] / 1_000_000).map('{:,.1f}'.format) |
|
|
df_display['CAPEX ($M)'] = (df_results['total_capex'] / 1_000_000).map('{:,.1f}'.format) |
|
|
df_display['Payback (Yrs)'] = df_results['payback_period'].map('{:,.1f}'.format) |
|
|
param_df = pd.DataFrame(df_results['Params'].tolist()) |
|
|
param_df['capacity_kta'] = param_df['capacity_kta'].round(1) |
|
|
param_df['export_market_mix'] = (param_df['export_market_mix'] * 100).round(1).astype(str) + '%' |
|
|
df_display = pd.concat([df_display, param_df.rename(columns={ |
|
|
'capacity_kta': 'Capacity (KTA)', 'technology': 'Technology', 'sourcing_strategy': 'Sourcing', |
|
|
'export_market_mix': 'Export Mix', 'sell_byproducts': 'Sell Byproducts' |
|
|
})], axis=1) |
|
|
|
|
|
print("\n✅ **Final Comparison of Optimal Scenarios (Sorted by Best IRR)**") |
|
|
print("="*120) |
|
|
print(df_display.to_string(index=False)) |
|
|
print("="*120) |
|
|
df_display.to_csv("results.csv", index=False, encoding='utf-8-sig') |
|
|
|
|
|
def create_kpi_comparison_dashboard(df_results): |
|
|
print("\n--- Generating KPI Comparison Dashboard ---") |
|
|
df_plot = df_results.sort_values(by='irr', ascending=False) |
|
|
df_plot['annual_profit_M'] = df_plot['annual_profit'] / 1_000_000 |
|
|
df_plot['total_capex_M'] = df_plot['total_capex'] / 1_000_000 |
|
|
|
|
|
fig, axes = plt.subplots(2, 2, figsize=(20, 14)) |
|
|
fig.suptitle('Dashboard: Comprehensive Comparison of Optimization Methods', fontsize=22, weight='bold') |
|
|
palettes = ['viridis', 'plasma', 'magma', 'cividis'] |
|
|
metrics = [ |
|
|
('irr', 'Optimal IRR (%)', axes[0, 0]), |
|
|
('annual_profit_M', 'Annual Profit ($M)', axes[0, 1]), |
|
|
('total_capex_M', 'Total CAPEX ($M)', axes[1, 0]), |
|
|
('payback_period', 'Payback Period (Years)', axes[1, 1]) |
|
|
] |
|
|
|
|
|
for i, (metric, title, ax) in enumerate(metrics): |
|
|
sns.barplot(x=metric, y='Method', data=df_plot, ax=ax, palette=palettes[i]) |
|
|
ax.set_title(title, fontsize=16, weight='bold') |
|
|
ax.set_xlabel('') |
|
|
ax.set_ylabel('') |
|
|
for p in ax.patches: |
|
|
width = p.get_width() |
|
|
ax.text(width * 1.01, p.get_y() + p.get_height() / 2, |
|
|
f'{width:,.2f}', |
|
|
va='center', fontsize=11) |
|
|
|
|
|
plt.tight_layout(rect=[0, 0.03, 1, 0.95]) |
|
|
plt.savefig("static/images/kpi_dashboard.png", bbox_inches='tight') |
|
|
print("\n✅ A KPI dashboard graph has been saved as 'kpi_dashboard.png'") |
|
|
|
|
|
def run_sensitivity_analysis(best_params, base_irr): |
|
|
global PRODUCT_PRICES_USD_PER_TON, STRATEGY_DATA |
|
|
print("\n--- Sensitivity Analysis on Best Scenario ---") |
|
|
print(f"Analyzing sensitivity around the base IRR of {base_irr:,.2f}%") |
|
|
|
|
|
sensitivity_vars = { |
|
|
'Byproduct Caustic Soda Price': ('price', 'byproduct_caustic_soda'), |
|
|
'Sourcing Cost Integrated': ('strategy', 'Integrated_Production', 'sourcing_cost_per_ton_pvc'), |
|
|
'Domestic PVC S70 Price': ('price', 'pvc_s70_domestic') |
|
|
} |
|
|
variations = [-0.20, -0.10, 0.10, 0.20] |
|
|
results = [] |
|
|
original_prices = copy.deepcopy(PRODUCT_PRICES_USD_PER_TON) |
|
|
original_strategies = copy.deepcopy(STRATEGY_DATA) |
|
|
|
|
|
for key, path in sensitivity_vars.items(): |
|
|
for var in variations: |
|
|
PRODUCT_PRICES_USD_PER_TON = copy.deepcopy(original_prices) |
|
|
STRATEGY_DATA = copy.deepcopy(original_strategies) |
|
|
if path[0] == 'price': |
|
|
base_value = PRODUCT_PRICES_USD_PER_TON[path[1]] |
|
|
PRODUCT_PRICES_USD_PER_TON[path[1]] = base_value * (1 + var) |
|
|
else: |
|
|
base_value = STRATEGY_DATA[path[1]][path[2]] |
|
|
STRATEGY_DATA[path[1]][path[2]] = base_value * (1 + var) |
|
|
|
|
|
kpis = calculate_project_kpis(**best_params) |
|
|
results.append({ |
|
|
'Variable': key, 'Change': f'{var:+.0%}', |
|
|
'New IRR (%)': kpis['irr'], 'IRR Delta (%)': kpis['irr'] - base_irr |
|
|
}) |
|
|
|
|
|
PRODUCT_PRICES_USD_PER_TON = original_prices |
|
|
STRATEGY_DATA = original_strategies |
|
|
|
|
|
df_sensitivity = pd.DataFrame(results) |
|
|
print("\n✅ **Sensitivity Analysis Results**") |
|
|
print("="*60) |
|
|
print(df_sensitivity.to_string(index=False)) |
|
|
print("="*60) |
|
|
|
|
|
tornado_data = [] |
|
|
for var_name in df_sensitivity['Variable'].unique(): |
|
|
subset = df_sensitivity[df_sensitivity['Variable'] == var_name] |
|
|
min_delta = subset['IRR Delta (%)'].min() |
|
|
max_delta = subset['IRR Delta (%)'].max() |
|
|
tornado_data.append({ |
|
|
'Variable': var_name, |
|
|
'Min_Delta': min_delta, |
|
|
'Max_Delta': max_delta, |
|
|
'Range': max_delta - min_delta |
|
|
}) |
|
|
df_tornado = pd.DataFrame(tornado_data).sort_values('Range', ascending=True) |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(12, 8)) |
|
|
y = np.arange(len(df_tornado)) |
|
|
ax.barh(y, df_tornado['Max_Delta'], color='mediumseagreen', label='Positive Impact') |
|
|
ax.barh(y, df_tornado['Min_Delta'], color='lightcoral', label='Negative Impact') |
|
|
ax.set_yticks(y) |
|
|
ax.set_yticklabels(df_tornado['Variable'], fontsize=12) |
|
|
ax.axvline(0, color='black', linewidth=0.8, linestyle='--') |
|
|
ax.set_title('Tornado Chart: IRR Sensitivity to Key Variables', fontsize=18, pad=20, weight='bold') |
|
|
ax.set_xlabel(f'Change in IRR (%) from Base IRR ({base_irr:.2f}%)', fontsize=14) |
|
|
ax.set_ylabel('Variable', fontsize=14) |
|
|
ax.legend() |
|
|
ax.grid(axis='x', linestyle='--', alpha=0.7) |
|
|
for i, (p, n) in enumerate(zip(df_tornado['Max_Delta'], df_tornado['Min_Delta'])): |
|
|
ax.text(p, i, f' +{p:.2f}%', va='center', ha='left', color='darkgreen') |
|
|
ax.text(n, i, f' {n:.2f}%', va='center', ha='right', color='darkred') |
|
|
plt.tight_layout() |
|
|
plt.savefig("static/images/sensitivity_analysis_tornado.png", bbox_inches='tight') |
|
|
print("\n✅ A sensitivity analysis Tornado chart has been saved as 'sensitivity_analysis_tornado.png'") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
optimization_results = run_optimizations_without_ml() |
|
|
df_results = pd.DataFrame(optimization_results).sort_values(by="irr", ascending=False).reset_index(drop=True) |
|
|
df_results.round(2) |
|
|
display_and_save_results(df_results) |
|
|
create_kpi_comparison_dashboard(df_results) |
|
|
|
|
|
if not df_results.empty: |
|
|
best_scenario = df_results.iloc[0] |
|
|
run_sensitivity_analysis(best_scenario['Params'], best_scenario['irr']) |