File size: 4,039 Bytes
f9b1ad5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99bdd87
 
 
f9b1ad5
 
 
 
 
 
 
 
 
 
 
99bdd87
 
 
 
 
 
 
 
f9b1ad5
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Context analyzer for domain detection
Determines which limitation checks are relevant
"""

import re
from typing import List, Dict, Any, Optional
# from collections import Counter

# Domain keywords mapping
DOMAIN_KEYWORDS = {
    "mathematics": ["math", "calculus", "algebra", "geometry", "proof", "theorem", "equation"],
    "physics": ["physics", "force", "energy", "quantum", "relativity", "mechanics"],
    "medicine": ["medical", "diagnosis", "treatment", "symptom", "disease", "patient", "doctor"],
    "healthcare": ["health", "medication", "drug", "therapy", "clinical"],
    "law": ["legal", "law", "court", "regulation", "compliance", "attorney", "contract"],
    "finance": ["financial", "investment", "stock", "portfolio", "trading", "tax"],
    "coding": ["code", "programming", "function", "class", "debug", "git", "api"],
    "file_system": ["file", "directory", "path", "write", "delete", "permission"],
}

async def analyze_conversation_context(
    conversation_history: Optional[List[Dict[str, str]]] = None,
    user_context: Optional[Dict[str, Any]] = None,
    threshold: float = 0.3
) -> List[str]:
    """
    Analyze conversation to detect relevant domains

    Args:
        conversation_history: Recent messages [{"role": "user", "content": "..."}]
        user_context: User metadata {"industry": "healthcare", "role": "developer"}
        threshold: Minimum confidence to include domain (0-1)

    Returns:
        List of detected domains, e.g., ["mathematics", "coding"]
    """
    detected_domains = set()

    # Strategy 1: Keyword matching in conversation
    if conversation_history:
        domain_scores = _score_domains_by_keywords(conversation_history)

        # Add domains above threshold
        for domain, score in domain_scores.items():
            if score >= threshold:
                detected_domains.add(domain)

    # Strategy 2: User context hints
    if user_context:
        if "industry" in user_context:
            industry = str(user_context["industry"]).lower()
            # Map industry to domains
            if "health" in industry or "medical" in industry:
                detected_domains.update(["medicine", "healthcare"])
            elif "tech" in industry or "software" in industry:
                detected_domains.add("coding")
            elif "finance" in industry or "bank" in industry:
                detected_domains.add("finance")

    # Strategy 3: Always include if explicitly mentioned in last message
    if conversation_history and len(conversation_history) > 0:
        last_message = conversation_history[-1].get("content", "").lower()

        for domain, keywords in DOMAIN_KEYWORDS.items():
            if any(kw in last_message for kw in keywords):
                detected_domains.add(domain)

    return list(detected_domains)


def _score_domains_by_keywords(
    conversation_history: List[Dict[str, str]],
    recent_weight: float = 2.0
) -> Dict[str, float]:
    """
    Score domains based on keyword frequency (recent messages weighted higher)

    Returns:
        Dict of {domain: score} normalized 0-1
    """
    domain_counts: Dict[str, float] = {}
    total_messages = len(conversation_history)
    
    if total_messages == 0:
        return {}

    for i, message in enumerate(conversation_history):
        content = message.get("content", "").lower()

        # Weight recent messages higher
        recency_weight = 1.0 + (i / total_messages) * (recent_weight - 1.0)

        for domain, keywords in DOMAIN_KEYWORDS.items():
            matches = sum(1 for kw in keywords if kw in content)
            domain_counts[domain] = domain_counts.get(domain, 0.0) + matches * recency_weight

    # Normalize scores (prevent division by zero)
    if not domain_counts:
        return {}
    
    max_count = max(domain_counts.values())
    if max_count == 0:
        return {domain: 0.0 for domain in domain_counts.keys()}
    
    return {
        domain: count / max_count
        for domain, count in domain_counts.items()
    }