Gemini
feat: add detailed logging
01d5a5d
from datetime import datetime
from typing import List, Optional
import numpy as np
from lpm_kernel.L1.bio import Note, Chunk, Bio, ShadeInfo, ShadeMergeInfo
from lpm_kernel.L1.l1_generator import L1Generator
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.file_data.document_service import document_service
from lpm_kernel.models.l1 import L1Bio
from lpm_kernel.models.l1 import (
L1GenerationResult,
L1Version,
GlobalBioDTO,
StatusBioDTO,
)
from lpm_kernel.models.status_biography import StatusBiography
from lpm_kernel.configs.logging import get_train_process_logger
logger = get_train_process_logger()
def extract_notes_from_documents(documents) -> tuple[List[Note], list]:
"""Extract Note objects and memory list from documents
Args:
documents: Document list containing L0 data
Returns:
tuple: (notes_list, memory_list)
- notes_list: List of Note objects
- memory_list: List of memory dictionaries for clustering
"""
notes_list = []
memory_list = []
for doc in documents:
doc_id = doc.get("id")
doc_embedding = document_service.get_document_embedding(doc_id)
chunks = document_service.get_document_chunks(doc_id)
all_chunk_embeddings = document_service.get_chunk_embeddings_by_document_id(
doc_id
)
if not doc_embedding:
logger.warning(f"Document {doc_id} missing document embedding")
continue
if not chunks:
logger.warning(f"Document {doc_id} missing chunks")
continue
if not all_chunk_embeddings:
logger.warning(f"Document {doc_id} missing chunk embeddings")
continue
# Ensure create_time is in string format
create_time = doc.get("create_time")
if isinstance(create_time, datetime):
create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
# Get document insight and summary
insight_data = doc.get("insight", {})
summary_data = doc.get("summary", {})
if insight_data is None:
insight_data = {}
if summary_data is None:
summary_data = {}
# Build Note object
note = Note(
noteId=doc_id,
content=doc.get("raw_content", ""),
createTime=create_time,
memoryType="TEXT",
embedding=np.array(doc_embedding),
chunks=[
Chunk(
id=f"{chunk.id}",
document_id=doc_id,
content=chunk.content,
embedding=np.array(all_chunk_embeddings.get(chunk.id))
if all_chunk_embeddings.get(chunk.id)
else None,
tags=chunk.tags if hasattr(chunk, "tags") else None,
topic=chunk.topic if hasattr(chunk, "topic") else None,
)
for chunk in chunks
if all_chunk_embeddings.get(chunk.id)
],
title=insight_data.get("title", ""),
summary=summary_data.get("summary", ""),
insight=insight_data.get("insight", ""),
tags=summary_data.get("keywords", []),
)
notes_list.append(note)
memory_list.append({"memoryId": str(doc_id), "embedding": doc_embedding})
return notes_list, memory_list
def generate_l1_from_l0() -> L1GenerationResult:
"""Generate L1 level knowledge representation from L0 data"""
l1_generator = L1Generator()
# 1. Prepare data
documents = document_service.list_documents_with_l0()
logger.info(f"Found {len(documents)} documents with L0 data")
# 2. Extract notes and memories
notes_list, memory_list = extract_notes_from_documents(documents)
if not notes_list or not memory_list:
logger.error("No valid documents found for processing")
return None
try:
# 3. Generate L1 data
# 3.1 Generate topics
clusters = l1_generator.gen_topics_for_shades(
old_cluster_list=[], old_outlier_memory_list=[], new_memory_list=memory_list
)
logger.info(f"Generated clusters: {bool(clusters)}")
# 3.2 Generate chunk topics
chunk_topics = l1_generator.generate_topics(notes_list)
logger.info(f"Generated chunk topics: {bool(chunk_topics)}")
# Add log in l1_manager.py
logger.info(f"chunk_topics content: {chunk_topics}")
# 3.3 Generate features for each cluster and merge them
shades = generate_shades(clusters, l1_generator, notes_list)
shades_merge_infos = convert_from_shades_to_merge_info(shades)
logger.info(f"Generated {len(shades)} shades")
merged_shades = l1_generator.merge_shades(shades_merge_infos)
logger.info(f"Merged shades success: {merged_shades.success}")
logger.info(
f"Number of merged shades: {len(merged_shades.merge_shade_list) if merged_shades.success else 0}"
)
# 3.4 Generate global biography
bio = l1_generator.gen_global_biography(
old_profile=Bio(
shadesList=merged_shades.merge_shade_list
if merged_shades.success
else []
),
cluster_list=clusters.get("clusterList", []),
)
logger.info(f"Generated global biography: {bio}")
# 4. Build result object
result = L1GenerationResult(
bio=bio, clusters=clusters, chunk_topics=chunk_topics
)
logger.info("L1 generation completed successfully")
return result
except Exception as e:
logger.error(f"Error in L1 generation: {str(e)}", exc_info=True)
raise
def generate_shades(clusters, l1_generator, notes_list):
shades = []
if clusters and "clusterList" in clusters:
for cluster in clusters.get("clusterList", []):
cluster_memory_ids = [
str(m.get("memoryId")) for m in cluster.get("memoryList", [])
]
logger.info(
f"Processing cluster with {len(cluster_memory_ids)} memories"
)
cluster_notes = [
note for note in notes_list if str(note.id) in cluster_memory_ids
]
if cluster_notes:
shade = l1_generator.gen_shade_for_cluster([], cluster_notes, [])
if shade:
shades.append(shade)
logger.info(
f"Generated shade for cluster: {shade.name if hasattr(shade, 'name') else 'Unknown'}"
)
return shades
def convert_from_shades_to_merge_info(shades: List[ShadeInfo]) -> List[ShadeMergeInfo]:
return [ShadeMergeInfo(
id=shade.id,
name=shade.name,
aspect=shade.aspect,
icon=shade.icon,
desc_third_view=shade.desc_third_view,
content_third_view=shade.content_third_view,
desc_second_view=shade.desc_second_view,
content_second_view=shade.content_second_view,
cluster_info=None
) for shade in shades]
def store_status_bio(status_bio: Bio) -> None:
"""Store status biography to database
Args:
status_bio (Bio): Generated status biography object
"""
try:
with DatabaseSession.session() as session:
# Delete old status biography (if exists)
session.query(StatusBiography).delete()
# Insert new status biography
new_bio = StatusBiography(
content=status_bio.content_second_view,
content_third_view=status_bio.content_third_view,
summary=status_bio.summary_second_view,
summary_third_view=status_bio.summary_third_view,
)
session.add(new_bio)
session.commit()
except Exception as e:
logger.error(f"Error storing status biography: {str(e)}", exc_info=True)
raise
def get_latest_status_bio() -> Optional[StatusBioDTO]:
"""Get the latest status biography
Returns:
Optional[StatusBioDTO]: Data transfer object for status biography, returns None if not found
"""
try:
with DatabaseSession.session() as session:
# Get the latest status biography
latest_bio = (
session.query(StatusBiography)
.order_by(StatusBiography.create_time.desc())
.first()
)
if not latest_bio:
return None
# Convert to DTO and return
return StatusBioDTO.from_model(latest_bio)
except Exception as e:
logger.error(f"Error getting status biography: {str(e)}", exc_info=True)
return None
def get_latest_global_bio() -> Optional[GlobalBioDTO]:
"""Get the latest global biography
Returns:
Optional[GlobalBioDTO]: Data transfer object for global biography, returns None if not found
"""
try:
with DatabaseSession.session() as session:
# Get the latest version of L1 data
latest_version = (
session.query(L1Version).order_by(L1Version.version.desc()).first()
)
if not latest_version:
return None
# Get bio data for this version
bio = (
session.query(L1Bio)
.filter(L1Bio.version == latest_version.version)
.first()
)
if not bio:
return None
# Convert to DTO and return
return GlobalBioDTO.from_model(bio)
except Exception as e:
logger.error(f"Error getting global biography: {str(e)}", exc_info=True)
return None
def generate_and_store_status_bio() -> Bio:
"""Generate and store status biography
Returns:
Bio: Generated status biography object
"""
# Generate status biography
status_bio = generate_status_bio()
if status_bio:
# Store to database
store_status_bio(status_bio)
return status_bio
def generate_status_bio() -> Bio:
"""Generate status biography
Returns:
Bio: Generated status biography
"""
l1_generator = L1Generator()
try:
# 1. Get all documents and extract notes
documents = document_service.list_documents_with_l0()
notes_list, _ = extract_notes_from_documents(documents)
if not notes_list:
logger.error("No valid notes found for status bio generation")
return None
# 2. Generate status biography
# Currently we only use notes, todos and chats are empty lists for now
current_time = datetime.now().strftime("%Y-%m-%d")
status_bio = l1_generator.gen_status_biography(
cur_time=current_time,
notes=notes_list,
todos=[], # Empty for now
chats=[], # Empty for now
)
logger.info("Status biography generated successfully")
return status_bio
except Exception as e:
logger.error(f"Error generating status bio: {str(e)}", exc_info=True)
raise