secondme-api / lpm_kernel /database /migration_manager.py
Gemini
feat: add detailed logging
01d5a5d
"""
Database Migration Manager
This module provides functionality to manage database migrations in a systematic way.
It ensures that migrations are applied in order and only once.
"""
import os
import importlib.util
import sqlite3
from datetime import datetime
from lpm_kernel.common.logging import logger
class MigrationManager:
"""Manages database migrations for SQLite database"""
def __init__(self, db_path):
"""
Initialize the migration manager
Args:
db_path: Path to the SQLite database file
"""
self.db_path = db_path
self._ensure_migration_table()
def _ensure_migration_table(self):
"""Create migration tracking table if it doesn't exist"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
description TEXT,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
conn.close()
logger.debug("Migration tracking table checked/created")
def get_applied_migrations(self):
"""
Get list of already applied migrations
Returns:
List of applied migration versions
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT version FROM schema_migrations ORDER BY version")
versions = [row[0] for row in cursor.fetchall()]
conn.close()
return versions
def apply_migrations(self, migrations_dir=None):
"""
Apply all pending migrations from the migrations directory
Args:
migrations_dir: Directory containing migration scripts.
If None, use 'migrations' subdirectory
Returns:
List of applied migration versions
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# logger.info(f"Using default migrations directory: {migrations_dir}")
# Ensure migrations directory exists
os.makedirs(migrations_dir, exist_ok=True)
# Get already applied migrations
applied = self.get_applied_migrations()
# logger.info(f"Found {len(applied)} previously applied migrations")
# Get all migration files and sort them
migration_files = []
for f in os.listdir(migrations_dir):
if f.endswith('.py') and not f.startswith('__'):
try:
# Extract version from filename (format: V20250420221300__description.py)
version = f.split('__')[0].replace('V', '')
migration_files.append((version, f))
# logger.info(f"Found migration file: {f}")
except Exception as e:
logger.warning(f"Skipping invalid migration filename: {f}, error: {e}")
# Sort by version
migration_files.sort(key=lambda x: x[0])
# logger.info(f"Found {len(migration_files)} migration files: {', '.join([f[1] for f in migration_files])}")
applied_in_session = []
# Apply each migration that hasn't been applied yet
for version, migration_file in migration_files:
if version in applied:
logger.debug(f"Skipping already applied migration: {migration_file}")
continue
# logger.info(f"Applying migration: {migration_file}")
# Load the migration module
module_path = os.path.join(migrations_dir, migration_file)
module_name = f"migration_{version}"
try:
# Import the migration module dynamically
spec = importlib.util.spec_from_file_location(module_name, module_path)
migration_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migration_module)
# Get migration description
description = getattr(migration_module, 'description', migration_file)
# Connect to database and start transaction
conn = sqlite3.connect(self.db_path)
conn.execute("BEGIN TRANSACTION")
try:
# Execute the migration
migration_module.upgrade(conn)
# Record the migration
conn.execute(
"INSERT INTO schema_migrations (version, description) VALUES (?, ?)",
(version, description)
)
# Commit the transaction
conn.commit()
# logger.info(f"Successfully applied migration: {migration_file}")
applied_in_session.append(version)
except Exception as e:
# Rollback on error
conn.rollback()
logger.error(f"Error applying migration {migration_file}: {str(e)}")
raise
finally:
conn.close()
except Exception as e:
logger.error(f"Failed to load migration {migration_file}: {str(e)}")
raise
# if not applied_in_session:
# # logger.info("No new migrations to apply")
# else:
# logger.info(f"Applied {len(applied_in_session)} new migrations")
return applied_in_session
def downgrade_migration(self, version, migrations_dir=None):
"""
Downgrade a specific migration by version
Args:
version: Version of the migration to downgrade
migrations_dir: Directory containing migration scripts
Returns:
True if downgrade was successful, False otherwise
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# logger.info(f"Using default migrations directory: {migrations_dir}")
# Check if migration is applied
applied = self.get_applied_migrations()
if version not in applied:
logger.warning(f"Migration version {version} is not applied, cannot downgrade")
return False
# Find migration file
migration_file = None
for f in os.listdir(migrations_dir):
if f.endswith('.py') and not f.startswith('__') and f.startswith(f'V{version}'):
migration_file = f
break
if not migration_file:
logger.error(f"Migration file for version {version} not found")
return False
# logger.info(f"Downgrading migration: {migration_file}")
# Load the migration module
module_path = os.path.join(migrations_dir, migration_file)
module_name = f"migration_{version}"
try:
# Import the migration module dynamically
spec = importlib.util.spec_from_file_location(module_name, module_path)
migration_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migration_module)
# Check if downgrade method exists
if not hasattr(migration_module, 'downgrade'):
logger.error(f"Migration {migration_file} does not have a downgrade method")
return False
# Connect to database and start transaction
conn = sqlite3.connect(self.db_path)
conn.execute("BEGIN TRANSACTION")
try:
# Execute the downgrade
migration_module.downgrade(conn)
# Remove the migration record
conn.execute(
"DELETE FROM schema_migrations WHERE version = ?",
(version,)
)
# Commit the transaction
conn.commit()
# logger.info(f"Successfully downgraded migration: {migration_file}")
return True
except Exception as e:
# Rollback on error
conn.rollback()
logger.error(f"Error downgrading migration {migration_file}: {str(e)}")
raise
finally:
conn.close()
except Exception as e:
logger.error(f"Failed to load migration {migration_file}: {str(e)}")
raise
def downgrade_to_version(self, target_version=None, migrations_dir=None):
"""
Downgrade migrations to a specific version
Args:
target_version: Version to downgrade to (inclusive). If None, downgrade all migrations.
migrations_dir: Directory containing migration scripts
Returns:
List of downgraded migration versions
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# logger.info(f"Using default migrations directory: {migrations_dir}")
# Get applied migrations
applied = self.get_applied_migrations()
# logger.info(f"Found {len(applied)} applied migrations")
if not applied:
# logger.info("No migrations to downgrade")
return []
# Determine which migrations to downgrade
to_downgrade = []
if target_version is None:
# Downgrade all migrations
to_downgrade = applied
else:
# Find target version index
if target_version not in applied:
logger.error(f"Target version {target_version} is not applied")
return []
target_index = applied.index(target_version)
# Get all versions after target_version
to_downgrade = applied[target_index + 1:]
# Sort in reverse order to downgrade newest first
to_downgrade.sort(reverse=True)
downgraded = []
for version in to_downgrade:
try:
if self.downgrade_migration(version, migrations_dir):
downgraded.append(version)
else:
logger.error(f"Failed to downgrade migration {version}, stopping")
break
except Exception as e:
logger.error(f"Error during downgrade of {version}: {str(e)}")
break
# if not downgraded:
# logger.info("No migrations were downgraded")
# else:
# logger.info(f"Downgraded {len(downgraded)} migrations: {', '.join(downgraded)}")
return downgraded
def create_migration(self, description, migrations_dir=None):
"""
Create a new migration file with template code
Args:
description: Short description of what the migration does
migrations_dir: Directory to create migration in
Returns:
Path to the created migration file
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# Ensure migrations directory exists
os.makedirs(migrations_dir, exist_ok=True)
# Get current timestamp for version
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# Format description for filename (lowercase, underscores)
safe_description = description.lower().replace(' ', '_').replace('-', '_')
safe_description = ''.join(c for c in safe_description if c.isalnum() or c == '_')
# Create filename
filename = f"V{timestamp}__{safe_description}.py"
filepath = os.path.join(migrations_dir, filename)
# Create migration file with template
with open(filepath, 'w') as f:
f.write(f'''"""
Migration: {description}
Version: {timestamp}
"""
description = "{description}"
def upgrade(conn):
"""
Apply the migration
Args:
conn: SQLite connection object
"""
cursor = conn.cursor()
# TODO: Implement your migration logic here
# Example:
# cursor.execute("""
# CREATE TABLE IF NOT EXISTS new_table (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# name TEXT NOT NULL
# )
# """)
# No need to commit, the migration manager handles transactions
def downgrade(conn):
"""
Revert the migration
Args:
conn: SQLite connection object
"""
cursor = conn.cursor()
# TODO: Implement your downgrade logic here
# Example:
# cursor.execute("DROP TABLE IF EXISTS new_table")
# No need to commit, the migration manager handles transactions
''')
# logger.info(f"Created new migration: {filename}")
return filepath