Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from typing import List, Optional | |
| import numpy as np | |
| from lpm_kernel.L1.bio import Note, Chunk, Bio, ShadeInfo, ShadeMergeInfo | |
| from lpm_kernel.L1.l1_generator import L1Generator | |
| from lpm_kernel.common.repository.database_session import DatabaseSession | |
| from lpm_kernel.file_data.document_service import document_service | |
| from lpm_kernel.models.l1 import L1Bio | |
| from lpm_kernel.models.l1 import ( | |
| L1GenerationResult, | |
| L1Version, | |
| GlobalBioDTO, | |
| StatusBioDTO, | |
| ) | |
| from lpm_kernel.models.status_biography import StatusBiography | |
| from lpm_kernel.configs.logging import get_train_process_logger | |
| logger = get_train_process_logger() | |
| def extract_notes_from_documents(documents) -> tuple[List[Note], list]: | |
| """Extract Note objects and memory list from documents | |
| Args: | |
| documents: Document list containing L0 data | |
| Returns: | |
| tuple: (notes_list, memory_list) | |
| - notes_list: List of Note objects | |
| - memory_list: List of memory dictionaries for clustering | |
| """ | |
| notes_list = [] | |
| memory_list = [] | |
| for doc in documents: | |
| doc_id = doc.get("id") | |
| doc_embedding = document_service.get_document_embedding(doc_id) | |
| chunks = document_service.get_document_chunks(doc_id) | |
| all_chunk_embeddings = document_service.get_chunk_embeddings_by_document_id( | |
| doc_id | |
| ) | |
| if not doc_embedding: | |
| logger.warning(f"Document {doc_id} missing document embedding") | |
| continue | |
| if not chunks: | |
| logger.warning(f"Document {doc_id} missing chunks") | |
| continue | |
| if not all_chunk_embeddings: | |
| logger.warning(f"Document {doc_id} missing chunk embeddings") | |
| continue | |
| # Ensure create_time is in string format | |
| create_time = doc.get("create_time") | |
| if isinstance(create_time, datetime): | |
| create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") | |
| # Get document insight and summary | |
| insight_data = doc.get("insight", {}) | |
| summary_data = doc.get("summary", {}) | |
| if insight_data is None: | |
| insight_data = {} | |
| if summary_data is None: | |
| summary_data = {} | |
| # Build Note object | |
| note = Note( | |
| noteId=doc_id, | |
| content=doc.get("raw_content", ""), | |
| createTime=create_time, | |
| memoryType="TEXT", | |
| embedding=np.array(doc_embedding), | |
| chunks=[ | |
| Chunk( | |
| id=f"{chunk.id}", | |
| document_id=doc_id, | |
| content=chunk.content, | |
| embedding=np.array(all_chunk_embeddings.get(chunk.id)) | |
| if all_chunk_embeddings.get(chunk.id) | |
| else None, | |
| tags=chunk.tags if hasattr(chunk, "tags") else None, | |
| topic=chunk.topic if hasattr(chunk, "topic") else None, | |
| ) | |
| for chunk in chunks | |
| if all_chunk_embeddings.get(chunk.id) | |
| ], | |
| title=insight_data.get("title", ""), | |
| summary=summary_data.get("summary", ""), | |
| insight=insight_data.get("insight", ""), | |
| tags=summary_data.get("keywords", []), | |
| ) | |
| notes_list.append(note) | |
| memory_list.append({"memoryId": str(doc_id), "embedding": doc_embedding}) | |
| return notes_list, memory_list | |
| def generate_l1_from_l0() -> L1GenerationResult: | |
| """Generate L1 level knowledge representation from L0 data""" | |
| l1_generator = L1Generator() | |
| # 1. Prepare data | |
| documents = document_service.list_documents_with_l0() | |
| logger.info(f"Found {len(documents)} documents with L0 data") | |
| # 2. Extract notes and memories | |
| notes_list, memory_list = extract_notes_from_documents(documents) | |
| if not notes_list or not memory_list: | |
| logger.error("No valid documents found for processing") | |
| return None | |
| try: | |
| # 3. Generate L1 data | |
| # 3.1 Generate topics | |
| clusters = l1_generator.gen_topics_for_shades( | |
| old_cluster_list=[], old_outlier_memory_list=[], new_memory_list=memory_list | |
| ) | |
| logger.info(f"Generated clusters: {bool(clusters)}") | |
| # 3.2 Generate chunk topics | |
| chunk_topics = l1_generator.generate_topics(notes_list) | |
| logger.info(f"Generated chunk topics: {bool(chunk_topics)}") | |
| # Add log in l1_manager.py | |
| logger.info(f"chunk_topics content: {chunk_topics}") | |
| # 3.3 Generate features for each cluster and merge them | |
| shades = generate_shades(clusters, l1_generator, notes_list) | |
| shades_merge_infos = convert_from_shades_to_merge_info(shades) | |
| logger.info(f"Generated {len(shades)} shades") | |
| merged_shades = l1_generator.merge_shades(shades_merge_infos) | |
| logger.info(f"Merged shades success: {merged_shades.success}") | |
| logger.info( | |
| f"Number of merged shades: {len(merged_shades.merge_shade_list) if merged_shades.success else 0}" | |
| ) | |
| # 3.4 Generate global biography | |
| bio = l1_generator.gen_global_biography( | |
| old_profile=Bio( | |
| shadesList=merged_shades.merge_shade_list | |
| if merged_shades.success | |
| else [] | |
| ), | |
| cluster_list=clusters.get("clusterList", []), | |
| ) | |
| logger.info(f"Generated global biography: {bio}") | |
| # 4. Build result object | |
| result = L1GenerationResult( | |
| bio=bio, clusters=clusters, chunk_topics=chunk_topics | |
| ) | |
| logger.info("L1 generation completed successfully") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in L1 generation: {str(e)}", exc_info=True) | |
| raise | |
| def generate_shades(clusters, l1_generator, notes_list): | |
| shades = [] | |
| if clusters and "clusterList" in clusters: | |
| for cluster in clusters.get("clusterList", []): | |
| cluster_memory_ids = [ | |
| str(m.get("memoryId")) for m in cluster.get("memoryList", []) | |
| ] | |
| logger.info( | |
| f"Processing cluster with {len(cluster_memory_ids)} memories" | |
| ) | |
| cluster_notes = [ | |
| note for note in notes_list if str(note.id) in cluster_memory_ids | |
| ] | |
| if cluster_notes: | |
| shade = l1_generator.gen_shade_for_cluster([], cluster_notes, []) | |
| if shade: | |
| shades.append(shade) | |
| logger.info( | |
| f"Generated shade for cluster: {shade.name if hasattr(shade, 'name') else 'Unknown'}" | |
| ) | |
| return shades | |
| def convert_from_shades_to_merge_info(shades: List[ShadeInfo]) -> List[ShadeMergeInfo]: | |
| return [ShadeMergeInfo( | |
| id=shade.id, | |
| name=shade.name, | |
| aspect=shade.aspect, | |
| icon=shade.icon, | |
| desc_third_view=shade.desc_third_view, | |
| content_third_view=shade.content_third_view, | |
| desc_second_view=shade.desc_second_view, | |
| content_second_view=shade.content_second_view, | |
| cluster_info=None | |
| ) for shade in shades] | |
| def store_status_bio(status_bio: Bio) -> None: | |
| """Store status biography to database | |
| Args: | |
| status_bio (Bio): Generated status biography object | |
| """ | |
| try: | |
| with DatabaseSession.session() as session: | |
| # Delete old status biography (if exists) | |
| session.query(StatusBiography).delete() | |
| # Insert new status biography | |
| new_bio = StatusBiography( | |
| content=status_bio.content_second_view, | |
| content_third_view=status_bio.content_third_view, | |
| summary=status_bio.summary_second_view, | |
| summary_third_view=status_bio.summary_third_view, | |
| ) | |
| session.add(new_bio) | |
| session.commit() | |
| except Exception as e: | |
| logger.error(f"Error storing status biography: {str(e)}", exc_info=True) | |
| raise | |
| def get_latest_status_bio() -> Optional[StatusBioDTO]: | |
| """Get the latest status biography | |
| Returns: | |
| Optional[StatusBioDTO]: Data transfer object for status biography, returns None if not found | |
| """ | |
| try: | |
| with DatabaseSession.session() as session: | |
| # Get the latest status biography | |
| latest_bio = ( | |
| session.query(StatusBiography) | |
| .order_by(StatusBiography.create_time.desc()) | |
| .first() | |
| ) | |
| if not latest_bio: | |
| return None | |
| # Convert to DTO and return | |
| return StatusBioDTO.from_model(latest_bio) | |
| except Exception as e: | |
| logger.error(f"Error getting status biography: {str(e)}", exc_info=True) | |
| return None | |
| def get_latest_global_bio() -> Optional[GlobalBioDTO]: | |
| """Get the latest global biography | |
| Returns: | |
| Optional[GlobalBioDTO]: Data transfer object for global biography, returns None if not found | |
| """ | |
| try: | |
| with DatabaseSession.session() as session: | |
| # Get the latest version of L1 data | |
| latest_version = ( | |
| session.query(L1Version).order_by(L1Version.version.desc()).first() | |
| ) | |
| if not latest_version: | |
| return None | |
| # Get bio data for this version | |
| bio = ( | |
| session.query(L1Bio) | |
| .filter(L1Bio.version == latest_version.version) | |
| .first() | |
| ) | |
| if not bio: | |
| return None | |
| # Convert to DTO and return | |
| return GlobalBioDTO.from_model(bio) | |
| except Exception as e: | |
| logger.error(f"Error getting global biography: {str(e)}", exc_info=True) | |
| return None | |
| def generate_and_store_status_bio() -> Bio: | |
| """Generate and store status biography | |
| Returns: | |
| Bio: Generated status biography object | |
| """ | |
| # Generate status biography | |
| status_bio = generate_status_bio() | |
| if status_bio: | |
| # Store to database | |
| store_status_bio(status_bio) | |
| return status_bio | |
| def generate_status_bio() -> Bio: | |
| """Generate status biography | |
| Returns: | |
| Bio: Generated status biography | |
| """ | |
| l1_generator = L1Generator() | |
| try: | |
| # 1. Get all documents and extract notes | |
| documents = document_service.list_documents_with_l0() | |
| notes_list, _ = extract_notes_from_documents(documents) | |
| if not notes_list: | |
| logger.error("No valid notes found for status bio generation") | |
| return None | |
| # 2. Generate status biography | |
| # Currently we only use notes, todos and chats are empty lists for now | |
| current_time = datetime.now().strftime("%Y-%m-%d") | |
| status_bio = l1_generator.gen_status_biography( | |
| cur_time=current_time, | |
| notes=notes_list, | |
| todos=[], # Empty for now | |
| chats=[], # Empty for now | |
| ) | |
| logger.info("Status biography generated successfully") | |
| return status_bio | |
| except Exception as e: | |
| logger.error(f"Error generating status bio: {str(e)}", exc_info=True) | |
| raise | |