File size: 4,311 Bytes
695fbf0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
from typing import Dict, Tuple

_CACHE: Dict[str, Tuple[float, Dict[str, Dict[str, object]]]] = {}

_MSG_PREFIX = "[MagicNodes][Presets]"

def _root_dir() -> str:
    # .../MagicNodes/mod/easy -> .../MagicNodes
    return os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

def _pressets_dir() -> str:
    return os.path.join(_root_dir(), "pressets")

def _cfg_path(kind: str) -> str:
    # kind examples: "mg_cade25", "mg_controlfusion"
    return os.path.join(_pressets_dir(), f"{kind}.cfg")

def _parse_value(raw: str):
    s = raw.strip()
    if not s:
        return ""
    low = s.lower()
    if low in ("true", "false"):
        return low == "true"
    try:
        if "." in s or "e" in low:
            return float(s)
        return int(s)
    except Exception:
        pass
    # variable substitution
    s = s.replace("$(ROOT)", _root_dir())
    if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")):
        s = s[1:-1]
    return s

def _load_kind(kind: str) -> Dict[str, Dict[str, object]]:
    path = _cfg_path(kind)
    if not os.path.isfile(path):
        print(f"{_MSG_PREFIX} No configuration file for '{kind}' found; loaded defaults — results may be unpredictable!")
        return {}
    try:
        mtime = os.path.getmtime(path)
        cached = _CACHE.get(path)
        if cached and cached[0] == mtime:
            return cached[1]

        data: Dict[str, Dict[str, object]] = {}
        cur_section = None
        with open(path, "r", encoding="utf-8") as f:
            for ln, line in enumerate(f, start=1):
                line = line.strip()
                if not line or line.startswith("#") or line.startswith(";"):
                    continue
                if line.startswith("[") and line.endswith("]"):
                    cur_section = line[1:-1].strip().lower()
                    data.setdefault(cur_section, {})
                    continue
                if ":" in line:
                    if cur_section is None:
                        print(f"{_MSG_PREFIX} Parse warning at line {ln}: key outside of any [section]; ignored")
                        continue
                    k, v = line.split(":", 1)
                    key = k.strip()
                    try:
                        val = _parse_value(v)
                    except Exception:
                        print(f"{_MSG_PREFIX} Missing or invalid parameter '{key}'; this may affect results!")
                        continue
                    data[cur_section][key] = val
                else:
                    print(f"{_MSG_PREFIX} Unknown line at {ln}: '{line}'; ignored")

        _CACHE[path] = (mtime, data)
        return data
    except Exception as e:
        print(f"{_MSG_PREFIX} Failed to read '{path}': {e}. Loaded defaults — results may be unpredictable!")
        return {}

def get(kind: str, step: str) -> Dict[str, object]:
    """Return dict of parameters for a given kind and step.
    step accepts 'Step 1', '1', 'step1', case-insensitive.
    """
    data = _load_kind(kind)
    if not data:
        return {}
    label = step.strip().lower().replace(" ", "")
    if label.startswith("step"):
        key = label
    elif label.isdigit():
        key = f"step{label}"
    else:
        key = f"step{label}"

    if key not in data:
        # Special case: CF is intentionally not applied on Step 1 in this pipeline.
        # Suppress noisy log for missing 'Step 1' in mg_controlfusion.
        if kind == "mg_controlfusion" and key in ("step1", "1"):
            return {}
        print(f"{_MSG_PREFIX} Preset step '{step}' not found for '{kind}'; using defaults")
        return {}
    res = dict(data[key])
    # Side-effect: when CADE presets are loaded, optionally enable KV pruning in attention
    try:
        if kind == "mg_cade25":
            from .. import mg_sagpu_attention as sa_patch  # local import to avoid cycles
            kv_enable = bool(res.get("kv_prune_enable", False))
            kv_keep = float(res.get("kv_keep", 0.85))
            kv_min = int(res.get("kv_min_tokens", 128)) if "kv_min_tokens" in res else 128
            if hasattr(sa_patch, "set_kv_prune"):
                sa_patch.set_kv_prune(kv_enable, kv_keep, kv_min)
    except Exception:
        pass
    return res