Spaces:
Sleeping
Sleeping
| from core.base_agent import BaseAgent | |
| from core.database import db | |
| from core.workflow import DevelopmentWorkflow | |
| from typing import Dict, Any | |
| class ProjectManagerAgent(BaseAgent): | |
| def __init__(self): | |
| super().__init__("Project Manager") | |
| self.workflow = DevelopmentWorkflow() | |
| self.create_chain(""" | |
| You are a Project Manager. Your task is to oversee the development process and provide status updates. | |
| Current Status: | |
| {input} | |
| Please provide a comprehensive status report including: | |
| 1. Project progress | |
| 2. Any blockers or issues | |
| 3. Next steps | |
| 4. Quality metrics | |
| 5. Recommendations | |
| Please provide clear, actionable insights. | |
| """) | |
| async def start_project(self, requirements: str) -> Dict[str, Any]: | |
| """Start a new project with the given requirements""" | |
| # Run the complete workflow | |
| result = await self.workflow.run(requirements) | |
| # Store all artifacts in the database | |
| self._store_artifacts(result) | |
| return { | |
| "status": "success", | |
| "user_stories": result["user_stories"], | |
| "design": result["design"], | |
| "code": result["code"], | |
| "test_results": result["test_results"], | |
| "messages": result["messages"], | |
| "message": "Project completed successfully" | |
| } | |
| def _store_artifacts(self, result: Dict[str, Any]): | |
| """Store all project artifacts in the database""" | |
| # Store user stories | |
| db.store_artifact( | |
| "user_stories", | |
| result["user_stories"], | |
| { | |
| "type": "user_story", | |
| "source": "business_analyst", | |
| "status": "created" | |
| } | |
| ) | |
| # Store design | |
| db.store_artifact( | |
| "designs", | |
| result["design"], | |
| { | |
| "type": "design", | |
| "source": "designer", | |
| "status": "created" | |
| } | |
| ) | |
| # Store code | |
| db.store_artifact( | |
| "code", | |
| result["code"], | |
| { | |
| "type": "code", | |
| "source": "developer", | |
| "status": "created", | |
| "version": "1.0.0" | |
| } | |
| ) | |
| # Store test results | |
| db.store_artifact( | |
| "test_results", | |
| result["test_results"], | |
| { | |
| "type": "test_result", | |
| "source": "tester", | |
| "status": "executed" | |
| } | |
| ) | |
| async def get_status(self) -> Dict[str, Any]: | |
| """Get the current project status""" | |
| # Query all artifacts from the database | |
| user_stories = db.query_artifacts("user_stories", "status:created") | |
| designs = db.query_artifacts("designs", "status:created") | |
| code = db.query_artifacts("code", "status:created") | |
| test_results = db.query_artifacts("test_results", "status:executed") | |
| status = { | |
| "user_stories": len(user_stories["ids"]), | |
| "designs": len(designs["ids"]), | |
| "code": len(code["ids"]), | |
| "test_results": len(test_results["ids"]) | |
| } | |
| return { | |
| "status": "success", | |
| "data": status, | |
| "message": "Status retrieved successfully" | |
| } | |
| async def check_quality(self) -> Dict[str, Any]: | |
| """Check the quality of project artifacts""" | |
| result = await self.process({ | |
| "input": "Checking quality of all project artifacts" | |
| }) | |
| return { | |
| "status": "success", | |
| "quality_report": result, | |
| "message": "Quality check completed successfully" | |
| } |