SummarizerApp / test_v4_ndjson_mock.py
ming
feat: Add V4 NDJSON patch-based structured summarization
93c9664
raw
history blame
8.62 kB
"""
Mock test for NDJSON patch protocol logic.
Tests the state management and patch application without requiring the actual model.
"""
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, AsyncGenerator, Dict
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
class MockNDJSONTester:
"""Mock tester that simulates the NDJSON protocol."""
def _empty_state(self) -> Dict[str, Any]:
"""Initial empty structured state that patches will build up."""
return {
"title": None,
"main_summary": None,
"key_points": [],
"category": None,
"sentiment": None,
"read_time_min": None,
}
def _apply_patch(self, state: Dict[str, Any], patch: Dict[str, Any]) -> bool:
"""
Apply a single patch to the state.
Returns True if this is a 'done' patch (signals logical completion).
"""
op = patch.get("op")
if op == "done":
return True
field = patch.get("field")
if not field:
return False
if op == "set":
state[field] = patch.get("value")
elif op == "append":
# Ensure list exists for list-like fields (e.g. key_points)
if not isinstance(state.get(field), list):
state[field] = []
state[field].append(patch.get("value"))
return False
async def simulate_ndjson_stream(self) -> AsyncGenerator[Dict[str, Any], None]:
"""Simulate NDJSON patch streaming with realistic test data."""
# Simulate NDJSON patches that a model would generate
mock_patches = [
{"op": "set", "field": "title", "value": "Qwen2.5-0.5B: Efficient AI for Edge Computing"},
{"op": "set", "field": "category", "value": "Tech"},
{"op": "set", "field": "sentiment", "value": "positive"},
{"op": "set", "field": "read_time_min", "value": 3},
{"op": "set", "field": "main_summary", "value": "Qwen2.5-0.5B is a compact language model optimized for resource-constrained environments. Despite its small size of 0.5 billion parameters, it demonstrates strong performance on instruction following and basic reasoning tasks while requiring significantly less memory and computational resources than larger models."},
{"op": "append", "field": "key_points", "value": "Compact 0.5B parameter model designed for edge devices and mobile platforms"},
{"op": "append", "field": "key_points", "value": "Strong performance on instruction following despite small size"},
{"op": "append", "field": "key_points", "value": "Supports multiple languages and diverse task types"},
{"op": "append", "field": "key_points", "value": "Significantly lower memory and computational requirements than larger models"},
{"op": "append", "field": "key_points", "value": "Ideal for applications requiring efficiency and low latency"},
{"op": "done"}
]
# Initialize state
state = self._empty_state()
token_count = 0
# Process each patch
for i, patch in enumerate(mock_patches):
token_count += 5 # Simulate token usage
# Apply patch to state
is_done = self._apply_patch(state, patch)
# Yield structured event
yield {
"delta": patch,
"state": dict(state), # Copy state
"done": is_done,
"tokens_used": token_count,
}
# Simulate streaming delay
await asyncio.sleep(0.05)
if is_done:
break
# Final event with latency
yield {
"delta": None,
"state": dict(state),
"done": True,
"tokens_used": token_count,
"latency_ms": 523.45,
}
async def test_mock_ndjson():
"""Test the NDJSON protocol with mock data."""
print("=" * 80)
print("MOCK TEST: NDJSON Patch-Based Streaming Protocol")
print("=" * 80)
print("\nThis test simulates the NDJSON protocol without requiring the actual model.")
print("It validates the patch application logic and event structure.\n")
tester = MockNDJSONTester()
event_count = 0
final_state = None
total_tokens = 0
print("=" * 80)
print("STREAMING EVENTS")
print("=" * 80)
async for event in tester.simulate_ndjson_stream():
event_count += 1
# Extract event data
delta = event.get("delta")
state = event.get("state")
done = event.get("done", False)
tokens_used = event.get("tokens_used", 0)
latency_ms = event.get("latency_ms")
total_tokens = tokens_used
# Print event details
print(f"\n--- Event #{event_count} ---")
if delta:
print(f"Delta: {json.dumps(delta, ensure_ascii=False)}")
else:
print(f"Delta: None (final event)")
if done and latency_ms:
print(f"Done: {done} | Tokens: {tokens_used} | Latency: {latency_ms}ms")
else:
print(f"Done: {done} | Tokens: {tokens_used}")
# Store final state
if state:
final_state = state
# Show what field was updated
if delta and "op" in delta:
op = delta.get("op")
if op == "set":
field = delta.get("field")
value = delta.get("value")
value_str = str(value)[:80] + "..." if len(str(value)) > 80 else str(value)
print(f" β†’ Set {field}: {value_str}")
elif op == "append":
field = delta.get("field")
value = delta.get("value")
value_str = str(value)[:80] + "..." if len(str(value)) > 80 else str(value)
print(f" β†’ Append to {field}: {value_str}")
elif op == "done":
print(f" β†’ Model signaled completion")
# Print current state summary
if state and not done:
fields_set = [k for k, v in state.items() if v is not None and (not isinstance(v, list) or len(v) > 0)]
print(f" State has: {', '.join(fields_set)}")
print("\n" + "=" * 80)
print("FINAL RESULTS")
print("=" * 80)
print(f"\nTotal events: {event_count}")
print(f"Total tokens: {total_tokens}")
if final_state:
print("\n--- Final Structured State ---")
print(json.dumps(final_state, indent=2, ensure_ascii=False))
# Validate structure
print("\n--- Validation ---")
required_fields = ["title", "main_summary", "key_points", "category", "sentiment", "read_time_min"]
all_valid = True
for field in required_fields:
value = final_state.get(field)
if field == "key_points":
if isinstance(value, list) and len(value) > 0:
print(f"βœ… {field}: {len(value)} items")
else:
print(f"❌ {field}: empty or not a list")
all_valid = False
else:
if value is not None:
value_str = str(value)[:50] + "..." if len(str(value)) > 50 else str(value)
print(f"βœ… {field}: {value_str}")
else:
print(f"❌ {field}: None")
all_valid = False
# Check sentiment is valid
sentiment = final_state.get("sentiment")
valid_sentiments = ["positive", "negative", "neutral"]
if sentiment in valid_sentiments:
print(f"βœ… sentiment value is valid: {sentiment}")
else:
print(f"❌ sentiment value is invalid: {sentiment} (expected one of {valid_sentiments})")
all_valid = False
print("\n" + "=" * 80)
if all_valid:
print("βœ… ALL VALIDATIONS PASSED - Protocol is working correctly!")
else:
print("⚠️ Some validations failed - check the output above")
print("=" * 80)
else:
print("\n❌ No final state received!")
if __name__ == "__main__":
print("\nπŸ§ͺ Mock Test: NDJSON Patch-Based Protocol\n")
asyncio.run(test_mock_ndjson())