Spaces:
Sleeping
Sleeping
| """ | |
| Mock test for NDJSON patch protocol logic. | |
| Tests the state management and patch application without requiring the actual model. | |
| """ | |
| import asyncio | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, AsyncGenerator, Dict | |
| # Add project root to path | |
| project_root = Path(__file__).parent | |
| sys.path.insert(0, str(project_root)) | |
| class MockNDJSONTester: | |
| """Mock tester that simulates the NDJSON protocol.""" | |
| def _empty_state(self) -> Dict[str, Any]: | |
| """Initial empty structured state that patches will build up.""" | |
| return { | |
| "title": None, | |
| "main_summary": None, | |
| "key_points": [], | |
| "category": None, | |
| "sentiment": None, | |
| "read_time_min": None, | |
| } | |
| def _apply_patch(self, state: Dict[str, Any], patch: Dict[str, Any]) -> bool: | |
| """ | |
| Apply a single patch to the state. | |
| Returns True if this is a 'done' patch (signals logical completion). | |
| """ | |
| op = patch.get("op") | |
| if op == "done": | |
| return True | |
| field = patch.get("field") | |
| if not field: | |
| return False | |
| if op == "set": | |
| state[field] = patch.get("value") | |
| elif op == "append": | |
| # Ensure list exists for list-like fields (e.g. key_points) | |
| if not isinstance(state.get(field), list): | |
| state[field] = [] | |
| state[field].append(patch.get("value")) | |
| return False | |
| async def simulate_ndjson_stream(self) -> AsyncGenerator[Dict[str, Any], None]: | |
| """Simulate NDJSON patch streaming with realistic test data.""" | |
| # Simulate NDJSON patches that a model would generate | |
| mock_patches = [ | |
| {"op": "set", "field": "title", "value": "Qwen2.5-0.5B: Efficient AI for Edge Computing"}, | |
| {"op": "set", "field": "category", "value": "Tech"}, | |
| {"op": "set", "field": "sentiment", "value": "positive"}, | |
| {"op": "set", "field": "read_time_min", "value": 3}, | |
| {"op": "set", "field": "main_summary", "value": "Qwen2.5-0.5B is a compact language model optimized for resource-constrained environments. Despite its small size of 0.5 billion parameters, it demonstrates strong performance on instruction following and basic reasoning tasks while requiring significantly less memory and computational resources than larger models."}, | |
| {"op": "append", "field": "key_points", "value": "Compact 0.5B parameter model designed for edge devices and mobile platforms"}, | |
| {"op": "append", "field": "key_points", "value": "Strong performance on instruction following despite small size"}, | |
| {"op": "append", "field": "key_points", "value": "Supports multiple languages and diverse task types"}, | |
| {"op": "append", "field": "key_points", "value": "Significantly lower memory and computational requirements than larger models"}, | |
| {"op": "append", "field": "key_points", "value": "Ideal for applications requiring efficiency and low latency"}, | |
| {"op": "done"} | |
| ] | |
| # Initialize state | |
| state = self._empty_state() | |
| token_count = 0 | |
| # Process each patch | |
| for i, patch in enumerate(mock_patches): | |
| token_count += 5 # Simulate token usage | |
| # Apply patch to state | |
| is_done = self._apply_patch(state, patch) | |
| # Yield structured event | |
| yield { | |
| "delta": patch, | |
| "state": dict(state), # Copy state | |
| "done": is_done, | |
| "tokens_used": token_count, | |
| } | |
| # Simulate streaming delay | |
| await asyncio.sleep(0.05) | |
| if is_done: | |
| break | |
| # Final event with latency | |
| yield { | |
| "delta": None, | |
| "state": dict(state), | |
| "done": True, | |
| "tokens_used": token_count, | |
| "latency_ms": 523.45, | |
| } | |
| async def test_mock_ndjson(): | |
| """Test the NDJSON protocol with mock data.""" | |
| print("=" * 80) | |
| print("MOCK TEST: NDJSON Patch-Based Streaming Protocol") | |
| print("=" * 80) | |
| print("\nThis test simulates the NDJSON protocol without requiring the actual model.") | |
| print("It validates the patch application logic and event structure.\n") | |
| tester = MockNDJSONTester() | |
| event_count = 0 | |
| final_state = None | |
| total_tokens = 0 | |
| print("=" * 80) | |
| print("STREAMING EVENTS") | |
| print("=" * 80) | |
| async for event in tester.simulate_ndjson_stream(): | |
| event_count += 1 | |
| # Extract event data | |
| delta = event.get("delta") | |
| state = event.get("state") | |
| done = event.get("done", False) | |
| tokens_used = event.get("tokens_used", 0) | |
| latency_ms = event.get("latency_ms") | |
| total_tokens = tokens_used | |
| # Print event details | |
| print(f"\n--- Event #{event_count} ---") | |
| if delta: | |
| print(f"Delta: {json.dumps(delta, ensure_ascii=False)}") | |
| else: | |
| print(f"Delta: None (final event)") | |
| if done and latency_ms: | |
| print(f"Done: {done} | Tokens: {tokens_used} | Latency: {latency_ms}ms") | |
| else: | |
| print(f"Done: {done} | Tokens: {tokens_used}") | |
| # Store final state | |
| if state: | |
| final_state = state | |
| # Show what field was updated | |
| if delta and "op" in delta: | |
| op = delta.get("op") | |
| if op == "set": | |
| field = delta.get("field") | |
| value = delta.get("value") | |
| value_str = str(value)[:80] + "..." if len(str(value)) > 80 else str(value) | |
| print(f" β Set {field}: {value_str}") | |
| elif op == "append": | |
| field = delta.get("field") | |
| value = delta.get("value") | |
| value_str = str(value)[:80] + "..." if len(str(value)) > 80 else str(value) | |
| print(f" β Append to {field}: {value_str}") | |
| elif op == "done": | |
| print(f" β Model signaled completion") | |
| # Print current state summary | |
| if state and not done: | |
| fields_set = [k for k, v in state.items() if v is not None and (not isinstance(v, list) or len(v) > 0)] | |
| print(f" State has: {', '.join(fields_set)}") | |
| print("\n" + "=" * 80) | |
| print("FINAL RESULTS") | |
| print("=" * 80) | |
| print(f"\nTotal events: {event_count}") | |
| print(f"Total tokens: {total_tokens}") | |
| if final_state: | |
| print("\n--- Final Structured State ---") | |
| print(json.dumps(final_state, indent=2, ensure_ascii=False)) | |
| # Validate structure | |
| print("\n--- Validation ---") | |
| required_fields = ["title", "main_summary", "key_points", "category", "sentiment", "read_time_min"] | |
| all_valid = True | |
| for field in required_fields: | |
| value = final_state.get(field) | |
| if field == "key_points": | |
| if isinstance(value, list) and len(value) > 0: | |
| print(f"β {field}: {len(value)} items") | |
| else: | |
| print(f"β {field}: empty or not a list") | |
| all_valid = False | |
| else: | |
| if value is not None: | |
| value_str = str(value)[:50] + "..." if len(str(value)) > 50 else str(value) | |
| print(f"β {field}: {value_str}") | |
| else: | |
| print(f"β {field}: None") | |
| all_valid = False | |
| # Check sentiment is valid | |
| sentiment = final_state.get("sentiment") | |
| valid_sentiments = ["positive", "negative", "neutral"] | |
| if sentiment in valid_sentiments: | |
| print(f"β sentiment value is valid: {sentiment}") | |
| else: | |
| print(f"β sentiment value is invalid: {sentiment} (expected one of {valid_sentiments})") | |
| all_valid = False | |
| print("\n" + "=" * 80) | |
| if all_valid: | |
| print("β ALL VALIDATIONS PASSED - Protocol is working correctly!") | |
| else: | |
| print("β οΈ Some validations failed - check the output above") | |
| print("=" * 80) | |
| else: | |
| print("\nβ No final state received!") | |
| if __name__ == "__main__": | |
| print("\nπ§ͺ Mock Test: NDJSON Patch-Based Protocol\n") | |
| asyncio.run(test_mock_ndjson()) | |