Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, Header, HTTPException, Query | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.openapi.docs import get_swagger_ui_html | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from datetime import datetime | |
| import uuid | |
| import os | |
| import openai | |
| from transformers import pipeline | |
| import logging, traceback | |
| from typing import Optional, List, Union | |
| from model import ( | |
| summarize_review, smart_summarize, detect_industry, | |
| detect_product_category, detect_emotion, answer_followup, answer_only, | |
| assess_churn_risk, extract_pain_points # β Added extract_pain_points | |
| ) | |
| app = FastAPI( | |
| title="π§ ChurnSight AI", | |
| description="Multilingual GenAI for smarter feedback β summarization, sentiment, emotion, aspects, Q&A and tags.", | |
| version="2025.1.0", | |
| openapi_url="/openapi.json", | |
| docs_url=None, | |
| redoc_url="/redoc" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| VALID_API_KEY = "my-secret-key" | |
| log_store = [] # β Shared in-memory churn log | |
| def root(): | |
| return "<h1>ChurnSight AI Backend is Running</h1>" | |
| def custom_swagger_ui(): | |
| return get_swagger_ui_html( | |
| openapi_url=app.openapi_url, | |
| title="π§ Swagger UI - ChurnSight AI", | |
| swagger_favicon_url="https://cdn-icons-png.flaticon.com/512/3794/3794616.png", | |
| swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js", | |
| swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css", | |
| ) | |
| async def exception_handler(request: Request, exc: Exception): | |
| logging.error(f"Unhandled Exception: {traceback.format_exc()}") | |
| return JSONResponse(status_code=500, content={"detail": "Internal Server Error. Please contact support."}) | |
| # ==== SCHEMAS ==== | |
| class ReviewInput(BaseModel): | |
| text: str | |
| model: str = "distilbert-base-uncased-finetuned-sst-2-english" | |
| industry: Optional[str] = None | |
| aspects: bool = False | |
| follow_up: Optional[Union[str, List[str]]] = None | |
| product_category: Optional[str] = None | |
| device: Optional[str] = None | |
| intelligence: Optional[bool] = False | |
| verbosity: Optional[str] = "detailed" | |
| class BulkReviewInput(BaseModel): | |
| reviews: List[str] | |
| model: str = "distilbert-base-uncased-finetuned-sst-2-english" | |
| industry: Optional[List[str]] = None | |
| aspects: bool = False | |
| product_category: Optional[List[str]] = None | |
| device: Optional[List[str]] = None | |
| follow_up: Optional[List[Union[str, List[str]]]] = None | |
| intelligence: Optional[bool] = False | |
| explain_bulk: Optional[bool] = False | |
| class FollowUpRequest(BaseModel): | |
| text: str | |
| question: str | |
| verbosity: Optional[str] = "brief" | |
| # ==== HELPERS ==== | |
| def auto_fill(value: Optional[str], fallback: str) -> str: | |
| if not value or value.lower() == "auto-detect": | |
| return fallback | |
| return value | |
| # ==== ENDPOINTS ==== | |
| async def analyze(data: ReviewInput, x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="β Invalid API key") | |
| if len(data.text.split()) < 20: | |
| raise HTTPException(status_code=400, detail="β οΈ Review too short for analysis (min. 20 words).") | |
| global log_store | |
| try: | |
| # === Generate Summary === | |
| summary = ( | |
| summarize_review(data.text, max_len=40, min_len=8) | |
| if data.verbosity.lower() == "brief" | |
| else smart_summarize(data.text, n_clusters=2 if data.intelligence else 1) | |
| ) | |
| # === Sentiment + Emotion === | |
| sentiment_pipeline = pipeline("sentiment-analysis", model=data.model) | |
| sentiment = sentiment_pipeline(data.text)[0] | |
| emotion = detect_emotion(data.text) | |
| churn_risk = assess_churn_risk(sentiment["label"], emotion) | |
| # === Auto-detect metadata === | |
| industry = detect_industry(data.text) if not data.industry or "auto" in data.industry.lower() else data.industry | |
| product_category = detect_product_category(data.text) if not data.product_category or "auto" in data.product_category.lower() else data.product_category | |
| # === Optional: Pain Points === | |
| pain_points = extract_pain_points(data.text) if data.aspects else [] | |
| # === Log entry === | |
| log_store.append({ | |
| "timestamp": datetime.now(), | |
| "product": product_category, | |
| "churn_risk": churn_risk, | |
| "user_id": str(uuid.uuid4()) | |
| }) | |
| if len(log_store) > 1000: | |
| log_store = log_store[-1000:] | |
| # === Final API Response === | |
| response = { | |
| "summary": summary, | |
| "sentiment": sentiment, | |
| "emotion": emotion, | |
| "product_category": product_category, | |
| "device": "Web", | |
| "industry": industry, | |
| "churn_risk": churn_risk, | |
| "pain_points": pain_points | |
| } | |
| if data.follow_up: | |
| response["follow_up"] = answer_followup(data.text, data.follow_up, verbosity=data.verbosity) | |
| return response | |
| except Exception as e: | |
| logging.error(f"π₯ Unexpected analysis failure: {traceback.format_exc()}") | |
| raise HTTPException(status_code=500, detail="Internal Server Error during analysis.") | |
| async def followup(request: FollowUpRequest, x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| try: | |
| if not request.question or len(request.text.split()) < 10: | |
| raise HTTPException(status_code=400, detail="Question or text is too short.") | |
| return {"answer": answer_only(request.text, request.question)} | |
| except Exception as e: | |
| logging.error(f"β Follow-up failed: {traceback.format_exc()}") | |
| raise HTTPException(status_code=500, detail="Follow-up generation failed.") | |
| async def get_churn_log(x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| return {"log": log_store} | |
| async def bulk_analyze(data: BulkReviewInput, token: str = Query(None)): | |
| if token != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="β Unauthorized: Invalid API token") | |
| global log_store | |
| try: | |
| results = [] | |
| sentiment_pipeline = pipeline("sentiment-analysis", model=data.model) | |
| for i, review_text in enumerate(data.reviews): | |
| if not review_text.strip(): | |
| continue # Skip empty reviews | |
| if len(review_text.split()) < 20: | |
| results.append({ | |
| "review": review_text, | |
| "error": "Too short to analyze" | |
| }) | |
| continue | |
| summary = smart_summarize(review_text, n_clusters=2 if data.intelligence else 1) | |
| sentiment = sentiment_pipeline(review_text)[0] | |
| emotion = detect_emotion(review_text) | |
| churn = assess_churn_risk(sentiment["label"], emotion) | |
| pain = extract_pain_points(review_text) if data.aspects else [] | |
| ind = auto_fill(data.industry[i] if data.industry else None, detect_industry(review_text)) | |
| prod = auto_fill(data.product_category[i] if data.product_category else None, detect_product_category(review_text)) | |
| dev = auto_fill(data.device[i] if data.device else None, "Web") | |
| result = { | |
| "review": review_text, | |
| "summary": summary, | |
| "sentiment": sentiment["label"], | |
| "score": sentiment["score"], | |
| "emotion": emotion, | |
| "industry": ind, | |
| "product_category": prod, | |
| "device": dev, | |
| "churn_risk": churn, | |
| "pain_points": pain | |
| } | |
| # β Optional follow-up | |
| if data.follow_up and i < len(data.follow_up): | |
| follow_q = data.follow_up[i] | |
| result["follow_up"] = answer_followup(review_text, follow_q) | |
| # β Log churn entry | |
| log_store.append({ | |
| "timestamp": datetime.now(), | |
| "product": prod, | |
| "churn_risk": churn, | |
| "user_id": str(uuid.uuid4()) | |
| }) | |
| results.append(result) | |
| # β Cap log size | |
| if len(log_store) > 1000: | |
| log_store = log_store[-1000:] | |
| return {"results": results} | |
| except Exception as e: | |
| logging.error(f"π₯ Bulk processing failed: {traceback.format_exc()}") | |
| raise HTTPException(status_code=500, detail="Failed to analyze bulk reviews") | |
| # Already set with os.environ β nothing else needed | |
| async def root_cause_analysis(payload: dict, x_api_key: str = Header(None)): | |
| if x_api_key and x_api_key != VALID_API_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| try: | |
| text = payload.get("text", "").strip() | |
| if not text or len(text.split()) < 5: | |
| raise HTTPException(status_code=400, detail="Insufficient input for root cause analysis.") | |
| prompt = f""" | |
| Analyze the following customer feedback and extract: | |
| 1. The main problem | |
| 2. The possible root cause | |
| 3. A suggested fix or which team might need to handle it | |
| Feedback: '''{text}''' | |
| Format your answer as: | |
| Problem: ... | |
| Cause: ... | |
| Suggestion: ... | |
| """ | |
| # Models to try in order | |
| models_to_try = ["gpt-4", "gpt-4o-mini", "gpt-3.5-turbo"] | |
| last_error = None | |
| for model_name in models_to_try: | |
| try: | |
| response = openai.chat.completions.create( | |
| model=model_name, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| output = response.choices[0].message.content | |
| lines = output.splitlines() | |
| def extract_line(tag): | |
| for line in lines: | |
| if line.lower().startswith(tag.lower()): | |
| return line.split(":", 1)[-1].strip() | |
| return "β" | |
| return { | |
| "problem": extract_line("Problem"), | |
| "cause": extract_line("Cause"), | |
| "suggestion": extract_line("Suggestion"), | |
| "model_used": model_name | |
| } | |
| except Exception as e: | |
| last_error = str(e) | |
| logging.warning(f"Model {model_name} failed: {last_error}") | |
| continue | |
| # If all models fail | |
| raise HTTPException(status_code=500, detail=f"All model attempts failed. Last error: {last_error}") | |
| except Exception as e: | |
| logging.error(f"Root cause analysis failed: {traceback.format_exc()}") | |
| return JSONResponse(status_code=500, content={"detail": f"Root cause generation failed: {str(e)}"}) | |