|
|
|
|
|
from __future__ import annotations |
|
|
import functools, json, logging, re |
|
|
from difflib import SequenceMatcher |
|
|
from io import StringIO |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
import pandas as pd |
|
|
import regex |
|
|
import tiktoken |
|
|
from bs4 import BeautifulSoup |
|
|
from config import CFG |
|
|
from web_helpers import retry |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
enc = tiktoken.get_encoding("cl100k_base") |
|
|
_tok = lambda s: len(enc.encode(s)) |
|
|
|
|
|
@functools.lru_cache(maxsize=1) |
|
|
def _nlp(): |
|
|
import spacy |
|
|
return spacy.load("en_core_web_sm") |
|
|
|
|
|
def _openai_client(): |
|
|
"""Import OpenAI lazily to avoid overhead when not needed.""" |
|
|
import importlib |
|
|
mod = importlib.import_module("openai") |
|
|
return getattr(mod, "OpenAI", None)() if hasattr(mod, "OpenAI") else mod |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATE_PATS = [re.compile(p, re.I) for p in [ |
|
|
r"\d{4}-\d{2}-\d{2}", |
|
|
r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},\s+\d{4}", |
|
|
r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}", |
|
|
r"\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{4}", |
|
|
r"\b\d{4}/\d{2}\b", |
|
|
r"\b\d{4}\b(?!\s*(?:%|million|billion|thousand))", |
|
|
]] |
|
|
EMAIL_PAT = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") |
|
|
URL_PAT = re.compile(r"https?://[^\s\)]+") |
|
|
PHONE_PAT = re.compile(r"\+?\d[\d\s\-().]{7,}\d") |
|
|
CURR_PAT = re.compile(r"(\$\s?\d+(?:,\d{3})*(?:\.\d+)?|\d+(?:,\d{3})*(?:\.\d+)?\s*(USD|EUR|GBP|INR|Β₯|β©|βΉ|β¬))", re.I) |
|
|
DEF_PAT = re.compile(r"([A-Z][A-Za-z0-9\s]+?)\s+(is|are|refers to|means)\s+(.*?)(?:[\.\n])") |
|
|
|
|
|
MD_TABLE_PAT = re.compile( |
|
|
r"(?:^\|.*?\|\n?)+(?:^\|[-:\s|]+\|\n?)?(?:^\|.*?\|\n?)+", re.M) |
|
|
CSV_PAT = re.compile(r"((?:^.*?,.*?\n){2,})", re.M) |
|
|
TSV_PAT = re.compile(r"((?:^.*?\t.*?\n){2,})", re.M) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def deduplicate_items(items: List[str], *, similarity=0.5, |
|
|
other: List[str] | None = None) -> List[str]: |
|
|
"""Drop nearβduplicates; prefer the longest variant.""" |
|
|
if not items: return [] |
|
|
other = other or [] |
|
|
|
|
|
def _clean(x: str) -> str: |
|
|
x = re.sub(r'\[edit\]|\[\d+\]', '', x) |
|
|
return re.sub(r'\s+', ' ', x).strip() |
|
|
|
|
|
out, out_clean = [], [] |
|
|
for orig in items: |
|
|
clean = _clean(orig) |
|
|
dup = False |
|
|
for ref in out_clean + list(map(_clean, other)): |
|
|
sim = SequenceMatcher(None, clean, ref).ratio() |
|
|
if sim >= similarity or clean in ref or ref in clean: |
|
|
dup = True |
|
|
|
|
|
if clean not in out_clean and len(clean) > len(ref): |
|
|
idx = out_clean.index(ref) |
|
|
out[idx], out_clean[idx] = orig, clean |
|
|
break |
|
|
if not dup: |
|
|
out.append(orig) |
|
|
out_clean.append(clean) |
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_facts_and_tables(text: str) -> Tuple[str, List[str], List[str]]: |
|
|
facts, spans = [], [] |
|
|
|
|
|
def _add(match): |
|
|
facts.append(match.group()) |
|
|
spans.append(match.span()) |
|
|
|
|
|
for pat in DATE_PATS: [_add(m) for m in pat.finditer(text)] |
|
|
for m in EMAIL_PAT.finditer(text): _add(m) |
|
|
for m in URL_PAT.finditer(text): _add(m) |
|
|
for m in PHONE_PAT.finditer(text): _add(m) |
|
|
for m in CURR_PAT.finditer(text): _add(m) |
|
|
for m in DEF_PAT.finditer(text): _add(m) |
|
|
|
|
|
|
|
|
doc = _nlp()(text) |
|
|
ctx = [s.text.strip() for s in doc.sents |
|
|
if any(s.start_char <= s_ <= s.end_char for s_, _ in spans)] |
|
|
facts.extend(ctx) |
|
|
facts = sorted(set(facts)) |
|
|
|
|
|
|
|
|
tables = [] |
|
|
|
|
|
for tbl in MD_TABLE_PAT.findall(text): |
|
|
cleaned = "\n".join(l for l in tbl.splitlines() |
|
|
if l.strip() and not re.match(r"^\|[-:\s|]+\|$", l)) |
|
|
if len(cleaned.splitlines()) < 2: continue |
|
|
try: |
|
|
df = pd.read_csv(StringIO(cleaned), sep="|").dropna(how="all", axis=1) |
|
|
tables.append(df.to_markdown(index=False)) |
|
|
except Exception: |
|
|
tables.append(cleaned) |
|
|
|
|
|
soup = BeautifulSoup(text, "lxml") |
|
|
for html_tbl in soup.find_all("table"): |
|
|
try: |
|
|
df = pd.read_html(str(html_tbl))[0] |
|
|
tables.append(df.to_markdown(index=False)) |
|
|
except Exception: |
|
|
tables.append(str(html_tbl)) |
|
|
|
|
|
for m in CSV_PAT.finditer(text): |
|
|
try: |
|
|
df = pd.read_csv(StringIO(m.group(1))) |
|
|
if not df.empty: |
|
|
tables.append(df.to_markdown(index=False)) |
|
|
except Exception: |
|
|
pass |
|
|
for m in TSV_PAT.finditer(text): |
|
|
try: |
|
|
df = pd.read_csv(StringIO(m.group(1)), sep="\t") |
|
|
if not df.empty: |
|
|
tables.append(df.to_markdown(index=False)) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
narrative = text |
|
|
for tbl in tables: narrative = narrative.replace(tbl, " ") |
|
|
for s, e in sorted(spans, reverse=True): narrative = narrative[:s] + narrative[e:] |
|
|
narrative = re.sub(r"\s{2,}", " ", narrative).strip() |
|
|
|
|
|
return narrative, facts, tables |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _summarise(text: str, pct: float, model: str) -> str: |
|
|
target_tokens = int(_tok(text) * pct) |
|
|
sys_prompt = ( |
|
|
"You are an expert abstractor. Summarize the text below to " |
|
|
f"approximately {pct*100:.0f}% of its original length (β{target_tokens} tokens), " |
|
|
"while **retaining all key facts, figures, names, dates, places, and events**. " |
|
|
"Ensure the summary remains accurate, informative, and faithful to the original content." |
|
|
) |
|
|
client = _openai_client() |
|
|
rsp = client.chat.completions.create( |
|
|
model=model, temperature=0.2, |
|
|
messages=[{"role":"system","content":sys_prompt}, |
|
|
{"role":"user","content":text}], |
|
|
max_tokens=CFG.output_limit_per_link |
|
|
) |
|
|
return rsp.choices[0].message.content |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compress_text(text: str, *, pct: float = 0.3, |
|
|
model: str = "gpt-4o-mini") -> Dict[str, str]: |
|
|
|
|
|
FACTS_TABLES_LIMIT = CFG.output_limit_per_link - CFG.disable_narrative_compress_thresh |
|
|
narrative, facts, tables = extract_facts_and_tables(text) |
|
|
|
|
|
|
|
|
if _tok(narrative) > CFG.disable_narrative_compress_thresh: |
|
|
narrative_txt = _summarise(narrative, pct, model) |
|
|
else: |
|
|
narrative_txt = narrative |
|
|
return narrative_txt |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EXTRACTOR_SYS_PROMPT = ( |
|
|
"You are a highly skilled information extraction agent. Your job is to analyze long, complex webpages " |
|
|
"in the context of a specific user goal. You excel at identifying relevant sections, capturing supporting evidence " |
|
|
"in full original context, and providing logically structured summaries. Always ensure precision, completeness, " |
|
|
"and alignment with the userβs intent." |
|
|
) |
|
|
EXTRACTOR_PROMPT_TEMPLATE = """You are a highly skilled information extraction agent. Your task is to analyze the following webpage content in light of a specific user goal, and extract accurate, well-structured information using plain text format. |
|
|
|
|
|
## Webpage Content |
|
|
{webpage_content} |
|
|
|
|
|
## User Goal |
|
|
{goal} |
|
|
|
|
|
## Task Guidelines |
|
|
1. **Rational**: Briefly explain why this content is relevant to the userβs goal. |
|
|
2. **Evidence**: Quote the most relevant parts of the webpage that directly support or address the goal. Use bullet points or numbered lines separated by newlines. |
|
|
3. **Summary**: Provide a clear, logically structured summary of the extracted evidence that addresses the user's goal. |
|
|
|
|
|
## Output Format |
|
|
Your response must follow **exactly this format** with the three sections: |
|
|
Rational: <one paragraph> |
|
|
Evidence: <first point>\n<second point>... |
|
|
Summary:<concise paragraph summarizing the evidence> |
|
|
""" |
|
|
|
|
|
def extract_regex(text: str) -> Dict[str, str]: |
|
|
def extract_section(header: str) -> str: |
|
|
|
|
|
pattern = rf"{header}:\s*(.*?)(?=\n[A-Z][a-z]+:|\Z)" |
|
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) |
|
|
return match.group(1).strip() if match else "(not found)" |
|
|
|
|
|
return { |
|
|
"rational": extract_section("Rational"), |
|
|
"evidence": extract_section("Evidence"), |
|
|
"summary": extract_section("Summary") |
|
|
} |
|
|
|
|
|
def query_text( |
|
|
url: str, |
|
|
text: str, |
|
|
goal: str, |
|
|
*, |
|
|
model: str = "gpt-4.1-mini", |
|
|
max_attempts: int = 3, |
|
|
) -> Dict[str, str]: |
|
|
"""Goalβoriented extractor with retries β compress fallback β token trim fallback.""" |
|
|
prompt = EXTRACTOR_PROMPT_TEMPLATE.format( |
|
|
webpage_content=text[:15_000], |
|
|
goal=goal, |
|
|
) |
|
|
client = _openai_client() |
|
|
|
|
|
for attempt in range(1, max_attempts + 1): |
|
|
try: |
|
|
rsp = client.chat.completions.create( |
|
|
model=model, |
|
|
temperature=0.0, |
|
|
messages=[ |
|
|
{"role": "system", "content": EXTRACTOR_SYS_PROMPT}, |
|
|
{"role": "user", "content": prompt}, |
|
|
], |
|
|
max_tokens = 1024 |
|
|
).choices[0].message.content |
|
|
|
|
|
extracted = extract_regex(rsp) |
|
|
|
|
|
|
|
|
if len(extracted.get("evidence", "")) + len(extracted.get("summary", "")) > 20: |
|
|
return { |
|
|
"extracted_info": ( |
|
|
f"The useful information in {url} for goal β{goal}β:\n\n" |
|
|
f"Rationale:\n{extracted.get('rational')}\n\n" |
|
|
f"Evidence:\n{extracted.get('evidence')}\n\n" |
|
|
f"Summary:\n{extracted.get('summary')}" |
|
|
) |
|
|
} |
|
|
|
|
|
raise ValueError("LLM returned empty or malformed extraction") |
|
|
|
|
|
except Exception as e: |
|
|
logging.warning("Attempt %d/%d failed for query-based extraction: %s", |
|
|
attempt, max_attempts, e) |
|
|
|
|
|
|
|
|
try: |
|
|
compressed = compress_text(text, model=model) |
|
|
return { |
|
|
"extracted_info": ( |
|
|
f"Goal-based extraction failed after {max_attempts} attempts; " |
|
|
f"returning compressed webpage:\n\n{compressed}" |
|
|
) |
|
|
} |
|
|
except Exception as ce: |
|
|
logging.error("compress_text also failed: %s", ce) |
|
|
|
|
|
|
|
|
return { |
|
|
"extracted_info": ( |
|
|
"Goal-based extraction and compression both failed; " |
|
|
"returning truncated webpage content:\n\n" + |
|
|
trim_to_budget(text, CFG.output_limit_per_link, model=model) |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def trim_to_budget(items: List[str], budget: int, *, |
|
|
is_table: bool) -> Tuple[str, int]: |
|
|
build, used = [], 0 |
|
|
for it in items: |
|
|
toks = _tok(it) |
|
|
if used + toks > budget: |
|
|
break |
|
|
build.append(it) |
|
|
used += toks |
|
|
if len(build) < len(items): |
|
|
build.append(f"[{len(items)-len(build)} {'tables' if is_table else 'facts'} omitted]") |
|
|
joined = "\n\n".join(build) if is_table else "\n".join(build) |
|
|
return joined, _tok(joined) |
|
|
|