aaronjosephd's picture
feat: disable similarity worker for NER-only benchmark
79c8e6d
import pandas as pd
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import os
import fitz # PyMuPDF
import torch
import spacy
import re
from bs4 import BeautifulSoup
import emoji
import subprocess
import json
import sys
import pathlib
import uuid
# --- Text Cleaning Functions ---
def old_refined_text_cleaning(text: str) -> str:
"""The OLD cleaning function used for the annotation phase. Removes #, +, / etc."""
if not isinstance(text, str):
return ""
text = BeautifulSoup(text, "html.parser").get_text()
url_pattern = r'(?:(?:https?|ftp)://)?(?:\S+(?::\S*)?@)?(?:(?!(?:10|127)(?:\.\d{1,3}){3})(?!(?:169\.254|192\.168)(?:\.\d{1,3}){2})(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1,3}){2})(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){2}(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))|(?:(?:[a-z\u00a1-\uffff0-9]-*)*[a-z\u00a1-\uffff0-9]+)(?:\.(?:[a-z\u00a1-\uffff0-9]-*)*[a-z\u00a1-\uffff0-9]+)*(?:\.(?:[a-z\u00a1-\uffff]{2,})))(?::\d{2,5})?(?:/\S*)?'
text = re.sub(url_pattern, '', text)
text = re.sub(r'\S+@\S+\s?', '', text)
text = emoji.demojize(text)
text = re.sub(r':[a-zA-Z_]+:', '', text)
text = text.replace('\\', ' ')
text = re.sub(r'[#*•]', ' ', text)
text = re.sub(r'\{.*?\}', ' ', text)
text = re.sub(r'[^a-zA-Z0-9\s.,!?-]', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'\s([,.!?-])', r'\1', text)
text = text.strip()
text = text.lower()
return text
def new_refined_text_cleaning(text: str) -> str:
"""The NEW, improved cleaning function. Keeps technical symbols."""
if not isinstance(text, str):
return ""
text = BeautifulSoup(text, "html.parser").get_text()
url_pattern = r'(?:(?:https?|ftp)://)?(?:\S+(?::\S*)?@)?(?:(?!(?:10|127)(?:\.\d{1,3}){3})(?!(?:169\.254|192\.168)(?:\.\d{1,3}){2})(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1,3}){2})(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){2}(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))|(?:(?:[a-z\u00a1-\uffff0-9]-*)*[a-z\u00a1-\uffff0-9]+)(?:\.(?:[a-z\u00a1-\uffff0-9]-*)*[a-z\u00a1-\uffff0-9]+)*(?:\.(?:[a-z\u00a1-\uffff]{2,})))(?::\d{2,5})?(?:/\S*)?'
text = re.sub(url_pattern, '', text)
text = re.sub(r'\S+@\S+\s?', '', text)
text = emoji.demojize(text)
text = re.sub(r':[a-zA-Z_]+:', '', text)
text = text.replace('\\', ' ')
text = re.sub(r'[*•]', ' ', text) # Keep '#' from old regex r'[#*•]' to preserve C#
text = re.sub(r'\{.*?\}', ' ', text)
# Keep '#', '+', '/', '()', and '_' to preserve technical terms.
text = re.sub(r'[^a-zA-Z0-9_#+()/\s.,!?-]', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'\s([,.!?-])', r'\1', text)
text = text.strip()
text = text.lower()
return text
# --- Pydantic Models for API Response Structure ---
class SkillCount(BaseModel):
skill: str
count: int
class ToolCount(BaseModel):
tool: str
count: int
class RoleSkill(BaseModel):
cmo_role_match: str
skill: str
count: int
class RoleTool(BaseModel):
cmo_role_match: str
tool: str
count: int
class ExperienceDistribution(BaseModel):
year: int
count: int
class SkillCooccurrence(BaseModel):
skill_A: str
skill_B: str
count: int
class ToolCooccurrence(BaseModel):
tool_A: str
tool_B: str
count: int
class JobRoleDistribution(BaseModel):
cmo_role_match: str
count: int
class RoleInsightsResponse(BaseModel):
top_skills: List[RoleSkill]
total_skills: int
top_tools: List[RoleTool]
total_tools: int
average_experience: Optional[float] = None
experience_distribution: List[ExperienceDistribution]
total_experience_distribution: int
skill_co_occurrence: List[SkillCooccurrence]
total_skill_co_occurrence: int
tool_co_occurrence: List[ToolCooccurrence]
total_tool_co_occurrence: int
class MarketInsightsResponse(BaseModel):
top_overall_skills: List[SkillCount]
total_overall_skills: int
top_overall_tools: List[ToolCount]
total_overall_tools: int
experience_distribution: List[ExperienceDistribution]
total_experience_distribution: int
skill_co_occurrence: List[SkillCooccurrence]
total_skill_co_occurrence: int
tool_co_occurrence: List[ToolCooccurrence]
total_tool_co_occurrence: int
average_experience: Optional[float] = None
class SimilarJob(BaseModel):
job_title: str
similarity_score: float
cmo_role_match: str
url: Optional[str] = None
class SkillDetail(BaseModel):
name: str
count: int
class GapAnalysis(BaseModel):
user_skills: List[SkillDetail]
user_tools: List[SkillDetail]
missing_skills: List[SkillDetail]
matching_skills: List[SkillDetail]
missing_tools: List[SkillDetail]
matching_tools: List[SkillDetail]
total_user_skills: int
total_user_tools: int
total_missing_skills: int
total_matching_skills: int
total_missing_tools: int
total_matching_tools: int
class AnalysisResult(BaseModel):
similar_jobs: List[SimilarJob]
total_similar_jobs: int
gap_analysis: GapAnalysis
recommendations: Dict[str, Any]
session_id: str
# --- App instantiation ---
app = FastAPI(
title="Skill Gap Analyzer API",
description="API for market insights and resume analysis.",
version="1.3.0", # Version bump
)
# --- CORS Middleware ---
origins = [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:5174",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- In-memory storage for models and data ---
DB = {}
@app.on_event("startup")
async def startup_event():
DB['similarity_cache'] = {}
"""Load models and data into memory on application startup."""
print("INFO: Loading models and data...")
backend_dir = os.path.dirname(os.path.abspath(__file__))
model_path = os.path.join(backend_dir, "ner_model")
# --- Load Pre-computed Insights ---
insights_path = os.path.join(backend_dir, 'market_insights.json')
with open(insights_path, 'r') as f:
DB['insights'] = json.load(f)
print("INFO: Market insights loaded successfully.")
# --- Load other necessary data ---
# This is still needed for the similarity worker and gap analysis source
DB['market_data'] = pd.read_csv(os.path.join(backend_dir, 'final_prototype_postings.csv'))
# --- Load Models ---
print(f"INFO: Loading NER model from {model_path}...")
DB['ner_model'] = spacy.load(model_path)
print("INFO: NER model loaded successfully.")
print("INFO: Models and data loaded successfully.")
@app.get("/", tags=["General"])
async def read_root():
return {"message": "Welcome to the Skill Gap Analyzer API v1.3"}
@app.get("/roles", response_model=List[str], tags=["Market Insights"])
async def get_roles():
roles = sorted(DB['insights']['by_role'].keys())
return ["Overall Market"] + roles
@app.get("/job_roles_distribution", response_model=List[JobRoleDistribution], tags=["Market Insights"])
async def get_job_roles_distribution():
return DB['insights']['job_role_distribution']
@app.get("/market_insights", response_model=MarketInsightsResponse, tags=["Market Insights"])
async def get_market_insights(page: int = 1, limit: int = 20):
start = (page - 1) * limit
end = page * limit
overall_data = DB['insights']['overall_market']
top_skills = overall_data.get('top_skills', [])
top_tools = overall_data.get('top_tools', [])
exp_dist = overall_data.get('experience_distribution', [])
skill_co = overall_data.get('skill_co_occurrence', [])
tool_co = overall_data.get('tool_co_occurrence', [])
avg_exp = overall_data.get('average_experience')
return {
"top_overall_skills": top_skills[start:end],
"total_overall_skills": len(top_skills),
"top_overall_tools": top_tools[start:end],
"total_overall_tools": len(top_tools),
"experience_distribution": exp_dist[start:end],
"total_experience_distribution": len(exp_dist),
"skill_co_occurrence": skill_co[start:end],
"total_skill_co_occurrence": len(skill_co),
"tool_co_occurrence": tool_co[start:end],
"total_tool_co_occurrence": len(tool_co),
"average_experience": avg_exp,
}
@app.get("/market_insights/{role:path}", response_model=RoleInsightsResponse, tags=["Market Insights"])
async def get_role_insights(role: str, page: int = 1, limit: int = 10):
start = (page - 1) * limit
end = page * limit
role_data = DB['insights']['by_role'].get(role)
if not role_data:
raise HTTPException(status_code=404, detail="Role not found")
top_skills = role_data.get('top_skills', [])
top_tools = role_data.get('top_tools', [])
exp_dist = role_data.get('experience_distribution', [])
skill_co = role_data.get('skill_co_occurrence', [])
tool_co = role_data.get('tool_co_occurrence', [])
avg_exp = role_data.get('average_experience')
return {
"top_skills": top_skills[start:end],
"total_skills": len(top_skills),
"top_tools": top_tools[start:end],
"total_tools": len(top_tools),
"average_experience": avg_exp,
"experience_distribution": exp_dist[start:end],
"total_experience_distribution": len(exp_dist),
"skill_co_occurrence": skill_co[start:end],
"total_skill_co_occurrence": len(skill_co),
"tool_co_occurrence": tool_co[start:end],
"total_tool_co_occurrence": len(tool_co),
}
@app.post("/analyze_resume", response_model=AnalysisResult, tags=["Resume Analysis"])
async def analyze_resume(
resume_file: UploadFile = File(...),
target_role: Optional[str] = Form(None),
limit: Optional[int] = Form(10) # This limit is now for the initial page load
):
# --- PDF Processing ---
if not resume_file or not resume_file.filename.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="Invalid file type. Please upload a PDF.")
pdf_bytes = await resume_file.read()
MAX_FILE_SIZE = 1 * 1024 * 1024 # 1MB
if len(pdf_bytes) > MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail="File is too large. Please upload a PDF under 1MB."
)
resume_text = ""
try:
with fitz.open(stream=pdf_bytes, filetype="pdf") as doc:
for page in doc:
resume_text += page.get_text()
except Exception as e:
raise HTTPException(status_code=422, detail=f"Failed to process PDF file: {e}")
if not resume_text or resume_text.isspace():
raise HTTPException(
status_code=422,
detail="Could not extract any text from the provided PDF. The document may be empty, image-based, or corrupted."
)
# --- Text Cleaning ---
ner_cleaned_text = old_refined_text_cleaning(resume_text)
similarity_cleaned_text = new_refined_text_cleaning(resume_text)
# --- NER Processing ---
doc = DB['ner_model'](ner_cleaned_text)
user_skills = [ent.text for ent in doc.ents if ent.label_ == "SKILL"]
user_tools = [ent.text for ent in doc.ents if ent.label_ == "TOOL"]
# --- Similarity Search (DISABLED for NER-only benchmarking) ---
all_similar_jobs = []
total_similar_jobs = 0
# The similarity worker subprocess call is bypassed for this benchmark.
# The original code for similarity search was here.
# --- Cache the full results ---
session_id = str(uuid.uuid4())
# Simple cache eviction: Keep cache size under a limit (e.g., 50)
if len(DB['similarity_cache']) > 50:
try:
oldest_key = next(iter(DB['similarity_cache']))
del DB['similarity_cache'][oldest_key]
except (StopIteration, KeyError):
# Handle edge cases where cache might be empty or key is gone
pass
DB['similarity_cache'][session_id] = all_similar_jobs
# --- Gap Analysis (remains the same) ---
if target_role and target_role != "Overall Market":
role_data = DB['insights']['by_role'].get(target_role, {})
market_skills_list = role_data.get('top_skills', [])
market_tools_list = role_data.get('top_tools', [])
else:
overall_data = DB['insights']['overall_market']
market_skills_list = overall_data.get('top_skills', [])
market_tools_list = overall_data.get('top_tools', [])
market_skill_freq = {s['skill'].lower(): s['count'] for s in market_skills_list}
market_tool_freq = {t['tool'].lower(): t['count'] for t in market_tools_list}
user_skills_lower = {s.lower() for s in user_skills}
user_tools_lower = {t.lower() for t in user_tools}
missing_skills = [{"name": s['skill'], "count": s['count']} for s in market_skills_list if s['skill'].lower() not in user_skills_lower]
matching_skills = [{"name": s['skill'], "count": s['count']} for s in market_skills_list if s['skill'].lower() in user_skills_lower]
missing_tools = [{"name": t['tool'], "count": t['count']} for t in market_tools_list if t['tool'].lower() not in user_tools_lower]
matching_tools = [{"name": t['tool'], "count": t['count']} for t in market_tools_list if t['tool'].lower() in user_tools_lower]
user_skills_with_freq = [{"name": s, "count": market_skill_freq.get(s.lower(), 0)} for s in user_skills]
user_tools_with_freq = [{"name": t, "count": market_tool_freq.get(t.lower(), 0)} for t in user_tools]
gap_analysis = {
"user_skills": user_skills_with_freq,
"user_tools": user_tools_with_freq,
"missing_skills": missing_skills,
"matching_skills": matching_skills,
"missing_tools": missing_tools,
"matching_tools": matching_tools,
"total_user_skills": len(user_skills),
"total_user_tools": len(user_tools),
"total_missing_skills": len(missing_skills),
"total_matching_skills": len(matching_skills),
"total_missing_tools": len(missing_tools),
"total_matching_tools": len(matching_tools),
}
# --- Recommendation Generation (remains the same) ---
all_user_entities = user_skills_lower.union(user_tools_lower)
recommendations = {
"message": "Based on your resume, focusing on these skills and tools could improve your market alignment. We also recommend looking at co-occurring skills for your existing strengths.",
"skills_to_learn": missing_skills[:5],
"tools_to_learn": missing_tools[:5],
"based_on_your_strengths": {}
}
skill_co_data = []
tool_co_data = []
if target_role and target_role != "Overall Market":
role_data = DB['insights']['by_role'].get(target_role, {})
skill_co_data = role_data.get('skill_co_occurrence', [])
tool_co_data = role_data.get('tool_co_occurrence', [])
else:
overall_data = DB['insights']['overall_market']
skill_co_data = overall_data.get('skill_co_occurrence', [])
tool_co_data = overall_data.get('tool_co_occurrence', [])
df_list = []
if skill_co_data:
skills_df = pd.DataFrame(skill_co_data)
if 'skill_A' in skills_df.columns and 'skill_B' in skills_df.columns:
skills_df = skills_df.rename(columns={'skill_A': 'entity_A', 'skill_B': 'entity_B'})
df_list.append(skills_df)
if tool_co_data:
tools_df = pd.DataFrame(tool_co_data)
if 'tool_A' in tools_df.columns and 'tool_B' in tools_df.columns:
tools_df = tools_df.rename(columns={'tool_A': 'entity_A', 'tool_B': 'entity_B'})
df_list.append(tools_df)
if df_list:
co_occurrence_df = pd.concat(df_list, ignore_index=True)
if 'entity_A' in co_occurrence_df.columns and 'entity_B' in co_occurrence_df.columns:
for entity in all_user_entities:
related_A = co_occurrence_df[co_occurrence_df['entity_B'].str.lower() == entity]['entity_A'].tolist()
related_B = co_occurrence_df[co_occurrence_df['entity_A'].str.lower() == entity]['entity_B'].tolist()
related_entities = related_A + related_B
recommended = [s for s in related_entities if s.lower() not in all_user_entities]
if recommended:
unique_recommended = list(dict.fromkeys(recommended))
recommendations["based_on_your_strengths"][entity] = unique_recommended[:3]
# --- Final Response ---
return {
"similar_jobs": all_similar_jobs[:limit], # Return only the first page
"total_similar_jobs": total_similar_jobs,
"gap_analysis": gap_analysis,
"recommendations": recommendations,
"session_id": session_id,
}
@app.get("/similar_jobs/{session_id}", response_model=List[SimilarJob], tags=["Resume Analysis"])
async def get_more_similar_jobs(session_id: str, page: int = 1, limit: int = 10):
"""
Gets a paginated list of similar jobs from the cache.
"""
if session_id not in DB['similarity_cache']:
raise HTTPException(status_code=404, detail="Session not found or expired.")
full_job_list = DB['similarity_cache'][session_id]
start_index = (page - 1) * limit
end_index = page * limit
return full_job_list[start_index:end_index]