Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json, os, re, time, math, logging | |
| from pathlib import Path | |
| from typing import Dict, List, Iterable, Tuple, Optional | |
| import yaml | |
| import requests | |
| log = logging.getLogger(__name__) | |
| # ------------------------- | |
| # Text cleaning & chunking | |
| # ------------------------- | |
| _MD_FRONTMATTER = re.compile(r"^---\s*\n.*?\n---\s*\n", re.DOTALL) | |
| def normalize_text(text: str) -> str: | |
| lines = [ln.strip() for ln in text.splitlines()] | |
| cleaned = [] | |
| for ln in lines: | |
| if not ln: | |
| continue | |
| if sum(ch.isalnum() for ch in ln) < 3: | |
| continue | |
| cleaned.append(ln) | |
| s = "\n".join(cleaned) | |
| s = re.sub(r"\n{3,}", "\n\n", s) | |
| return s.strip() | |
| def md_to_text(md: str) -> str: | |
| md = re.sub(_MD_FRONTMATTER, "", md) | |
| md = re.sub(r"```.*?```", "", md, flags=re.DOTALL) # drop fenced code | |
| md = re.sub(r"!\[[^\]]*\]\([^)]+\)", "", md) # drop images | |
| md = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", md) # links -> label | |
| md = re.sub(r"^\s{0,3}#{1,6}\s*", "", md, flags=re.MULTILINE) | |
| md = md.replace("`", "") | |
| md = re.sub(r"^\s*[-*+]\s+", "• ", md, flags=re.MULTILINE) | |
| md = re.sub(r"^\s*>\s?", "", md, flags=re.MULTILINE) | |
| return normalize_text(md) | |
| def chunk_text(text: str, max_chars: int = 800, overlap: int = 120) -> List[str]: | |
| paras = [p.strip() for p in text.split("\n\n") if p.strip()] | |
| out: List[str] = [] | |
| buf = "" | |
| for p in paras: | |
| if len(p) > max_chars: | |
| i = 0 | |
| while i < len(p): | |
| j = min(i + max_chars, len(p)) | |
| out.append(p[i:j]) | |
| i = j - overlap if j - overlap > i else j | |
| continue | |
| if len(buf) + 2 + len(p) <= max_chars: | |
| buf = (buf + "\n\n" + p) if buf else p | |
| else: | |
| if buf: | |
| out.append(buf) | |
| buf = p | |
| if buf: | |
| out.append(buf) | |
| return out | |
| def write_jsonl(records: Iterable[Dict], out_path: Path) -> None: | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| with out_path.open("w", encoding="utf-8") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") | |
| # ------------------------- | |
| # GitHub API helpers | |
| # ------------------------- | |
| def gh_session() -> requests.Session: | |
| s = requests.Session() | |
| s.headers.update({ | |
| "Accept": "application/vnd.github+json", | |
| "User-Agent": "matrix-ai-rag-builder/1.0", | |
| }) | |
| tok = os.getenv("GITHUB_TOKEN") | |
| if tok: | |
| s.headers["Authorization"] = f"Bearer {tok}" | |
| return s | |
| def gh_get_json(url: str, sess: requests.Session, max_retries: int = 3) -> Dict | List: | |
| backoff = 1.0 | |
| for attempt in range(max_retries): | |
| r = sess.get(url, timeout=25) | |
| if r.status_code == 403 and "rate limit" in r.text.lower(): | |
| log.warning("GitHub rate-limited; sleeping %.1fs", backoff) | |
| time.sleep(backoff) | |
| backoff = min(backoff * 2, 30) | |
| continue | |
| r.raise_for_status() | |
| return r.json() | |
| r.raise_for_status() | |
| return {} | |
| def gh_list_org_repos(org: str, sess: requests.Session) -> List[Dict]: | |
| repos: List[Dict] = [] | |
| page = 1 | |
| while True: | |
| url = f"https://api.github.com/orgs/{org}/repos?per_page=100&page={page}" | |
| js = gh_get_json(url, sess) | |
| if not js: | |
| break | |
| repos.extend(js) | |
| if len(js) < 100: | |
| break | |
| page += 1 | |
| return repos | |
| def gh_list_tree(owner: str, repo: str, branch: str, sess: requests.Session) -> List[Dict]: | |
| url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1" | |
| js = gh_get_json(url, sess) | |
| return js.get("tree", []) if isinstance(js, dict) else [] | |
| def gh_fetch_raw(owner: str, repo: str, branch: str, path: str, sess: requests.Session) -> Optional[str]: | |
| raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}" | |
| r = sess.get(raw_url, timeout=25) | |
| if r.status_code == 404 and branch == "main": # try master fallback | |
| raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/master/{path}" | |
| r = sess.get(raw_url, timeout=25) | |
| if r.status_code == 200: | |
| return r.text | |
| return None | |
| # ------------------------- | |
| # Builders | |
| # ------------------------- | |
| def ingest_github_repo(owner: str, name: str, branch: str, docs_paths: List[str], | |
| include_readme: bool, exts: Tuple[str,...] = (".md",".mdx",".txt")) -> List[Tuple[str,str]]: | |
| sess = gh_session() | |
| out: List[Tuple[str,str]] = [] | |
| # README | |
| if include_readme: | |
| for candidate in ("README.md", "readme.md", "README.MD"): | |
| t = gh_fetch_raw(owner, name, branch, candidate, sess) | |
| if t: | |
| out.append((f"github:{owner}/{name}/{candidate}", md_to_text(t))) | |
| break | |
| # Tree -> docs paths | |
| tree = gh_list_tree(owner, name, branch, sess) | |
| if not tree: | |
| return out | |
| wanted_dirs = [p.strip("/").lower() for p in docs_paths] | |
| for entry in tree: | |
| if entry.get("type") != "blob": | |
| continue | |
| path = entry.get("path", "") | |
| lower = path.lower() | |
| if not lower.endswith(exts): | |
| continue | |
| if any(lower.startswith(d + "/") for d in wanted_dirs): | |
| t = gh_fetch_raw(owner, name, branch, path, sess) | |
| if not t: | |
| continue | |
| txt = md_to_text(t) if lower.endswith((".md",".mdx")) else normalize_text(t) | |
| if txt: | |
| out.append((f"github:{owner}/{name}/{path}", txt)) | |
| return out | |
| def ingest_github_sources(cfg: Dict) -> List[Tuple[str,str]]: | |
| out: List[Tuple[str,str]] = [] | |
| gh = cfg.get("github") or {} | |
| sess = gh_session() | |
| # explicit repos | |
| for repo in (gh.get("repos") or []): | |
| owner = repo["owner"] | |
| name = repo["name"] | |
| branch = repo.get("branch", "main") | |
| docs_paths = repo.get("docs_paths", ["docs"]) | |
| include_readme = bool(repo.get("include_readme", True)) | |
| out.extend(ingest_github_repo(owner, name, branch, docs_paths, include_readme)) | |
| # whole org scan (README + docs/) | |
| for org in (gh.get("orgs") or []): | |
| try: | |
| repos = gh_list_org_repos(org, sess) | |
| except Exception as e: | |
| log.warning("Failed to list org %s: %s", org, e) | |
| continue | |
| for r in repos: | |
| owner = r["owner"]["login"] | |
| name = r["name"] | |
| default_branch = r.get("default_branch", "main") | |
| # README + docs/ | |
| out.extend(ingest_github_repo(owner, name, default_branch, ["docs"], include_readme=True)) | |
| return out | |
| def ingest_local_sources(cfg: Dict) -> List[Tuple[str,str]]: | |
| out: List[Tuple[str,str]] = [] | |
| local = cfg.get("local") or {} | |
| paths = local.get("paths") or [] | |
| glob_pat = local.get("glob", "**/*.md") | |
| for p in paths: | |
| fp = Path(p) | |
| if fp.is_file(): | |
| try: | |
| raw = fp.read_text(encoding="utf-8", errors="ignore") | |
| txt = md_to_text(raw) if fp.suffix.lower() in {".md",".mdx"} else normalize_text(raw) | |
| if txt: | |
| out.append((str(fp), txt)) | |
| except Exception as e: | |
| log.warning("Failed reading %s: %s", fp, e) | |
| elif fp.is_dir(): | |
| for f in fp.rglob(glob_pat): | |
| try: | |
| raw = f.read_text(encoding="utf-8", errors="ignore") | |
| txt = md_to_text(raw) if f.suffix.lower() in {".md",".mdx"} else normalize_text(raw) | |
| if txt: | |
| out.append((str(f), txt)) | |
| except Exception as e: | |
| log.warning("Failed reading %s: %s", f, e) | |
| return out | |
| def build_kb_from_config(config_path: str = "configs/rag_sources.yaml", | |
| out_jsonl: str = "data/kb.jsonl", | |
| max_chars: int = 800, | |
| overlap: int = 120, | |
| minlen: int = 200, | |
| dedupe: bool = True) -> int: | |
| cfg: Dict = {} | |
| p = Path(config_path) | |
| if p.exists(): | |
| cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} | |
| else: | |
| log.warning("rag_sources.yaml not found at %s (using defaults)", p) | |
| records: List[Dict] = [] | |
| # GitHub | |
| try: | |
| gh_docs = ingest_github_sources(cfg) | |
| for src, text in gh_docs: | |
| for chunk in chunk_text(text, max_chars, overlap): | |
| if len(chunk) >= minlen: | |
| records.append({"text": chunk, "source": src}) | |
| except Exception as e: | |
| log.warning("GitHub ingest failed: %s", e) | |
| # Local | |
| try: | |
| loc_docs = ingest_local_sources(cfg) | |
| for src, text in loc_docs: | |
| for chunk in chunk_text(text, max_chars, overlap): | |
| if len(chunk) >= minlen: | |
| records.append({"text": chunk, "source": src}) | |
| except Exception as e: | |
| log.warning("Local ingest failed: %s", e) | |
| # URLs (optional) | |
| for url in (cfg.get("urls") or []): | |
| try: | |
| r = requests.get(url, timeout=25) | |
| r.raise_for_status() | |
| txt = normalize_text(r.text) | |
| for chunk in chunk_text(txt, max_chars, overlap): | |
| if len(chunk) >= minlen: | |
| records.append({"text": chunk, "source": url}) | |
| except Exception as e: | |
| log.warning("URL ingest failed for %s: %s", url, e) | |
| if dedupe: | |
| seen = set() | |
| deduped: List[Dict] = [] | |
| for rec in records: | |
| h = hash(rec["text"]) | |
| if h in seen: | |
| continue | |
| seen.add(h) | |
| deduped.append(rec) | |
| records = deduped | |
| if not records: | |
| log.warning("No KB records produced.") | |
| return 0 | |
| out_path = Path(out_jsonl) | |
| write_jsonl(records, out_path) | |
| log.info("Wrote %d chunks to %s", len(records), out_path) | |
| return len(records) | |
| def ensure_kb(out_jsonl: str = "data/kb.jsonl", | |
| config_path: str = "configs/rag_sources.yaml", | |
| skip_if_exists: bool = True) -> bool: | |
| """ | |
| If kb.jsonl exists -> return True. | |
| Else -> build from config and return True on success. | |
| """ | |
| out = Path(out_jsonl) | |
| if skip_if_exists and out.exists() and out.stat().st_size > 0: | |
| log.info("KB already present at %s (skipping build)", out) | |
| return True | |
| n = build_kb_from_config(config_path=config_path, out_jsonl=out_jsonl) | |
| return n > 0 | |