File size: 8,620 Bytes
93c9664
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
Mock test for NDJSON patch protocol logic.
Tests the state management and patch application without requiring the actual model.
"""

import asyncio
import json
import sys
from pathlib import Path
from typing import Any, AsyncGenerator, Dict

# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))


class MockNDJSONTester:
    """Mock tester that simulates the NDJSON protocol."""
    
    def _empty_state(self) -> Dict[str, Any]:
        """Initial empty structured state that patches will build up."""
        return {
            "title": None,
            "main_summary": None,
            "key_points": [],
            "category": None,
            "sentiment": None,
            "read_time_min": None,
        }
    
    def _apply_patch(self, state: Dict[str, Any], patch: Dict[str, Any]) -> bool:
        """
        Apply a single patch to the state.
        Returns True if this is a 'done' patch (signals logical completion).
        """
        op = patch.get("op")
        if op == "done":
            return True
        
        field = patch.get("field")
        if not field:
            return False
        
        if op == "set":
            state[field] = patch.get("value")
        elif op == "append":
            # Ensure list exists for list-like fields (e.g. key_points)
            if not isinstance(state.get(field), list):
                state[field] = []
            state[field].append(patch.get("value"))
        
        return False
    
    async def simulate_ndjson_stream(self) -> AsyncGenerator[Dict[str, Any], None]:
        """Simulate NDJSON patch streaming with realistic test data."""
        
        # Simulate NDJSON patches that a model would generate
        mock_patches = [
            {"op": "set", "field": "title", "value": "Qwen2.5-0.5B: Efficient AI for Edge Computing"},
            {"op": "set", "field": "category", "value": "Tech"},
            {"op": "set", "field": "sentiment", "value": "positive"},
            {"op": "set", "field": "read_time_min", "value": 3},
            {"op": "set", "field": "main_summary", "value": "Qwen2.5-0.5B is a compact language model optimized for resource-constrained environments. Despite its small size of 0.5 billion parameters, it demonstrates strong performance on instruction following and basic reasoning tasks while requiring significantly less memory and computational resources than larger models."},
            {"op": "append", "field": "key_points", "value": "Compact 0.5B parameter model designed for edge devices and mobile platforms"},
            {"op": "append", "field": "key_points", "value": "Strong performance on instruction following despite small size"},
            {"op": "append", "field": "key_points", "value": "Supports multiple languages and diverse task types"},
            {"op": "append", "field": "key_points", "value": "Significantly lower memory and computational requirements than larger models"},
            {"op": "append", "field": "key_points", "value": "Ideal for applications requiring efficiency and low latency"},
            {"op": "done"}
        ]
        
        # Initialize state
        state = self._empty_state()
        token_count = 0
        
        # Process each patch
        for i, patch in enumerate(mock_patches):
            token_count += 5  # Simulate token usage
            
            # Apply patch to state
            is_done = self._apply_patch(state, patch)
            
            # Yield structured event
            yield {
                "delta": patch,
                "state": dict(state),  # Copy state
                "done": is_done,
                "tokens_used": token_count,
            }
            
            # Simulate streaming delay
            await asyncio.sleep(0.05)
            
            if is_done:
                break
        
        # Final event with latency
        yield {
            "delta": None,
            "state": dict(state),
            "done": True,
            "tokens_used": token_count,
            "latency_ms": 523.45,
        }


async def test_mock_ndjson():
    """Test the NDJSON protocol with mock data."""
    
    print("=" * 80)
    print("MOCK TEST: NDJSON Patch-Based Streaming Protocol")
    print("=" * 80)
    print("\nThis test simulates the NDJSON protocol without requiring the actual model.")
    print("It validates the patch application logic and event structure.\n")
    
    tester = MockNDJSONTester()
    
    event_count = 0
    final_state = None
    total_tokens = 0
    
    print("=" * 80)
    print("STREAMING EVENTS")
    print("=" * 80)
    
    async for event in tester.simulate_ndjson_stream():
        event_count += 1
        
        # Extract event data
        delta = event.get("delta")
        state = event.get("state")
        done = event.get("done", False)
        tokens_used = event.get("tokens_used", 0)
        latency_ms = event.get("latency_ms")
        
        total_tokens = tokens_used
        
        # Print event details
        print(f"\n--- Event #{event_count} ---")
        
        if delta:
            print(f"Delta: {json.dumps(delta, ensure_ascii=False)}")
        else:
            print(f"Delta: None (final event)")
        
        if done and latency_ms:
            print(f"Done: {done} | Tokens: {tokens_used} | Latency: {latency_ms}ms")
        else:
            print(f"Done: {done} | Tokens: {tokens_used}")
        
        # Store final state
        if state:
            final_state = state
        
        # Show what field was updated
        if delta and "op" in delta:
            op = delta.get("op")
            if op == "set":
                field = delta.get("field")
                value = delta.get("value")
                value_str = str(value)[:80] + "..." if len(str(value)) > 80 else str(value)
                print(f"  β†’ Set {field}: {value_str}")
            elif op == "append":
                field = delta.get("field")
                value = delta.get("value")
                value_str = str(value)[:80] + "..." if len(str(value)) > 80 else str(value)
                print(f"  β†’ Append to {field}: {value_str}")
            elif op == "done":
                print(f"  β†’ Model signaled completion")
        
        # Print current state summary
        if state and not done:
            fields_set = [k for k, v in state.items() if v is not None and (not isinstance(v, list) or len(v) > 0)]
            print(f"  State has: {', '.join(fields_set)}")
    
    print("\n" + "=" * 80)
    print("FINAL RESULTS")
    print("=" * 80)
    
    print(f"\nTotal events: {event_count}")
    print(f"Total tokens: {total_tokens}")
    
    if final_state:
        print("\n--- Final Structured State ---")
        print(json.dumps(final_state, indent=2, ensure_ascii=False))
        
        # Validate structure
        print("\n--- Validation ---")
        required_fields = ["title", "main_summary", "key_points", "category", "sentiment", "read_time_min"]
        
        all_valid = True
        for field in required_fields:
            value = final_state.get(field)
            if field == "key_points":
                if isinstance(value, list) and len(value) > 0:
                    print(f"βœ… {field}: {len(value)} items")
                else:
                    print(f"❌ {field}: empty or not a list")
                    all_valid = False
            else:
                if value is not None:
                    value_str = str(value)[:50] + "..." if len(str(value)) > 50 else str(value)
                    print(f"βœ… {field}: {value_str}")
                else:
                    print(f"❌ {field}: None")
                    all_valid = False
        
        # Check sentiment is valid
        sentiment = final_state.get("sentiment")
        valid_sentiments = ["positive", "negative", "neutral"]
        if sentiment in valid_sentiments:
            print(f"βœ… sentiment value is valid: {sentiment}")
        else:
            print(f"❌ sentiment value is invalid: {sentiment} (expected one of {valid_sentiments})")
            all_valid = False
        
        print("\n" + "=" * 80)
        if all_valid:
            print("βœ… ALL VALIDATIONS PASSED - Protocol is working correctly!")
        else:
            print("⚠️  Some validations failed - check the output above")
        print("=" * 80)
    else:
        print("\n❌ No final state received!")


if __name__ == "__main__":
    print("\nπŸ§ͺ Mock Test: NDJSON Patch-Based Protocol\n")
    asyncio.run(test_mock_ndjson())