Spaces:
Sleeping
Sleeping
File size: 13,558 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
"""
Database Migration Manager
This module provides functionality to manage database migrations in a systematic way.
It ensures that migrations are applied in order and only once.
"""
import os
import importlib.util
import sqlite3
from datetime import datetime
from lpm_kernel.common.logging import logger
class MigrationManager:
"""Manages database migrations for SQLite database"""
def __init__(self, db_path):
"""
Initialize the migration manager
Args:
db_path: Path to the SQLite database file
"""
self.db_path = db_path
self._ensure_migration_table()
def _ensure_migration_table(self):
"""Create migration tracking table if it doesn't exist"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
description TEXT,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
conn.close()
logger.debug("Migration tracking table checked/created")
def get_applied_migrations(self):
"""
Get list of already applied migrations
Returns:
List of applied migration versions
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT version FROM schema_migrations ORDER BY version")
versions = [row[0] for row in cursor.fetchall()]
conn.close()
return versions
def apply_migrations(self, migrations_dir=None):
"""
Apply all pending migrations from the migrations directory
Args:
migrations_dir: Directory containing migration scripts.
If None, use 'migrations' subdirectory
Returns:
List of applied migration versions
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# logger.info(f"Using default migrations directory: {migrations_dir}")
# Ensure migrations directory exists
os.makedirs(migrations_dir, exist_ok=True)
# Get already applied migrations
applied = self.get_applied_migrations()
# logger.info(f"Found {len(applied)} previously applied migrations")
# Get all migration files and sort them
migration_files = []
for f in os.listdir(migrations_dir):
if f.endswith('.py') and not f.startswith('__'):
try:
# Extract version from filename (format: V20250420221300__description.py)
version = f.split('__')[0].replace('V', '')
migration_files.append((version, f))
# logger.info(f"Found migration file: {f}")
except Exception as e:
logger.warning(f"Skipping invalid migration filename: {f}, error: {e}")
# Sort by version
migration_files.sort(key=lambda x: x[0])
# logger.info(f"Found {len(migration_files)} migration files: {', '.join([f[1] for f in migration_files])}")
applied_in_session = []
# Apply each migration that hasn't been applied yet
for version, migration_file in migration_files:
if version in applied:
logger.debug(f"Skipping already applied migration: {migration_file}")
continue
# logger.info(f"Applying migration: {migration_file}")
# Load the migration module
module_path = os.path.join(migrations_dir, migration_file)
module_name = f"migration_{version}"
try:
# Import the migration module dynamically
spec = importlib.util.spec_from_file_location(module_name, module_path)
migration_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migration_module)
# Get migration description
description = getattr(migration_module, 'description', migration_file)
# Connect to database and start transaction
conn = sqlite3.connect(self.db_path)
conn.execute("BEGIN TRANSACTION")
try:
# Execute the migration
migration_module.upgrade(conn)
# Record the migration
conn.execute(
"INSERT INTO schema_migrations (version, description) VALUES (?, ?)",
(version, description)
)
# Commit the transaction
conn.commit()
# logger.info(f"Successfully applied migration: {migration_file}")
applied_in_session.append(version)
except Exception as e:
# Rollback on error
conn.rollback()
logger.error(f"Error applying migration {migration_file}: {str(e)}")
raise
finally:
conn.close()
except Exception as e:
logger.error(f"Failed to load migration {migration_file}: {str(e)}")
raise
# if not applied_in_session:
# # logger.info("No new migrations to apply")
# else:
# logger.info(f"Applied {len(applied_in_session)} new migrations")
return applied_in_session
def downgrade_migration(self, version, migrations_dir=None):
"""
Downgrade a specific migration by version
Args:
version: Version of the migration to downgrade
migrations_dir: Directory containing migration scripts
Returns:
True if downgrade was successful, False otherwise
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# logger.info(f"Using default migrations directory: {migrations_dir}")
# Check if migration is applied
applied = self.get_applied_migrations()
if version not in applied:
logger.warning(f"Migration version {version} is not applied, cannot downgrade")
return False
# Find migration file
migration_file = None
for f in os.listdir(migrations_dir):
if f.endswith('.py') and not f.startswith('__') and f.startswith(f'V{version}'):
migration_file = f
break
if not migration_file:
logger.error(f"Migration file for version {version} not found")
return False
# logger.info(f"Downgrading migration: {migration_file}")
# Load the migration module
module_path = os.path.join(migrations_dir, migration_file)
module_name = f"migration_{version}"
try:
# Import the migration module dynamically
spec = importlib.util.spec_from_file_location(module_name, module_path)
migration_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migration_module)
# Check if downgrade method exists
if not hasattr(migration_module, 'downgrade'):
logger.error(f"Migration {migration_file} does not have a downgrade method")
return False
# Connect to database and start transaction
conn = sqlite3.connect(self.db_path)
conn.execute("BEGIN TRANSACTION")
try:
# Execute the downgrade
migration_module.downgrade(conn)
# Remove the migration record
conn.execute(
"DELETE FROM schema_migrations WHERE version = ?",
(version,)
)
# Commit the transaction
conn.commit()
# logger.info(f"Successfully downgraded migration: {migration_file}")
return True
except Exception as e:
# Rollback on error
conn.rollback()
logger.error(f"Error downgrading migration {migration_file}: {str(e)}")
raise
finally:
conn.close()
except Exception as e:
logger.error(f"Failed to load migration {migration_file}: {str(e)}")
raise
def downgrade_to_version(self, target_version=None, migrations_dir=None):
"""
Downgrade migrations to a specific version
Args:
target_version: Version to downgrade to (inclusive). If None, downgrade all migrations.
migrations_dir: Directory containing migration scripts
Returns:
List of downgraded migration versions
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# logger.info(f"Using default migrations directory: {migrations_dir}")
# Get applied migrations
applied = self.get_applied_migrations()
# logger.info(f"Found {len(applied)} applied migrations")
if not applied:
# logger.info("No migrations to downgrade")
return []
# Determine which migrations to downgrade
to_downgrade = []
if target_version is None:
# Downgrade all migrations
to_downgrade = applied
else:
# Find target version index
if target_version not in applied:
logger.error(f"Target version {target_version} is not applied")
return []
target_index = applied.index(target_version)
# Get all versions after target_version
to_downgrade = applied[target_index + 1:]
# Sort in reverse order to downgrade newest first
to_downgrade.sort(reverse=True)
downgraded = []
for version in to_downgrade:
try:
if self.downgrade_migration(version, migrations_dir):
downgraded.append(version)
else:
logger.error(f"Failed to downgrade migration {version}, stopping")
break
except Exception as e:
logger.error(f"Error during downgrade of {version}: {str(e)}")
break
# if not downgraded:
# logger.info("No migrations were downgraded")
# else:
# logger.info(f"Downgraded {len(downgraded)} migrations: {', '.join(downgraded)}")
return downgraded
def create_migration(self, description, migrations_dir=None):
"""
Create a new migration file with template code
Args:
description: Short description of what the migration does
migrations_dir: Directory to create migration in
Returns:
Path to the created migration file
"""
if migrations_dir is None:
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
# Ensure migrations directory exists
os.makedirs(migrations_dir, exist_ok=True)
# Get current timestamp for version
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# Format description for filename (lowercase, underscores)
safe_description = description.lower().replace(' ', '_').replace('-', '_')
safe_description = ''.join(c for c in safe_description if c.isalnum() or c == '_')
# Create filename
filename = f"V{timestamp}__{safe_description}.py"
filepath = os.path.join(migrations_dir, filename)
# Create migration file with template
with open(filepath, 'w') as f:
f.write(f'''"""
Migration: {description}
Version: {timestamp}
"""
description = "{description}"
def upgrade(conn):
"""
Apply the migration
Args:
conn: SQLite connection object
"""
cursor = conn.cursor()
# TODO: Implement your migration logic here
# Example:
# cursor.execute("""
# CREATE TABLE IF NOT EXISTS new_table (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# name TEXT NOT NULL
# )
# """)
# No need to commit, the migration manager handles transactions
def downgrade(conn):
"""
Revert the migration
Args:
conn: SQLite connection object
"""
cursor = conn.cursor()
# TODO: Implement your downgrade logic here
# Example:
# cursor.execute("DROP TABLE IF EXISTS new_table")
# No need to commit, the migration manager handles transactions
''')
# logger.info(f"Created new migration: {filename}")
return filepath
|