File size: 13,558 Bytes
01d5a5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""
Database Migration Manager

This module provides functionality to manage database migrations in a systematic way.
It ensures that migrations are applied in order and only once.
"""
import os
import importlib.util
import sqlite3
from datetime import datetime
from lpm_kernel.common.logging import logger
class MigrationManager:
    """Manages database migrations for SQLite database"""
    
    def __init__(self, db_path):
        """
        Initialize the migration manager
        
        Args:
            db_path: Path to the SQLite database file
        """
        self.db_path = db_path
        self._ensure_migration_table()
    
    def _ensure_migration_table(self):
        """Create migration tracking table if it doesn't exist"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS schema_migrations (
            version VARCHAR(50) PRIMARY KEY,
            description TEXT,
            applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """)
        
        conn.commit()
        conn.close()
        logger.debug("Migration tracking table checked/created")
    
    def get_applied_migrations(self):
        """
        Get list of already applied migrations
        
        Returns:
            List of applied migration versions
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("SELECT version FROM schema_migrations ORDER BY version")
        versions = [row[0] for row in cursor.fetchall()]
        
        conn.close()
        return versions
    
    def apply_migrations(self, migrations_dir=None):
        """
        Apply all pending migrations from the migrations directory
        
        Args:
            migrations_dir: Directory containing migration scripts.
                            If None, use 'migrations' subdirectory
        
        Returns:
            List of applied migration versions
        """
        if migrations_dir is None:
            migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
            # logger.info(f"Using default migrations directory: {migrations_dir}")
        
        # Ensure migrations directory exists
        os.makedirs(migrations_dir, exist_ok=True)
        
        # Get already applied migrations
        applied = self.get_applied_migrations()
        # logger.info(f"Found {len(applied)} previously applied migrations")
        
        # Get all migration files and sort them
        migration_files = []
        for f in os.listdir(migrations_dir):
            if f.endswith('.py') and not f.startswith('__'):
                try:
                    # Extract version from filename (format: V20250420221300__description.py)
                    version = f.split('__')[0].replace('V', '')
                    migration_files.append((version, f))
                    # logger.info(f"Found migration file: {f}")
                except Exception as e:
                    logger.warning(f"Skipping invalid migration filename: {f}, error: {e}")
        
        # Sort by version
        migration_files.sort(key=lambda x: x[0])
        # logger.info(f"Found {len(migration_files)} migration files: {', '.join([f[1] for f in migration_files])}")
        
        applied_in_session = []
        
        # Apply each migration that hasn't been applied yet
        for version, migration_file in migration_files:
            if version in applied:
                logger.debug(f"Skipping already applied migration: {migration_file}")
                continue
            
            # logger.info(f"Applying migration: {migration_file}")
            
            # Load the migration module
            module_path = os.path.join(migrations_dir, migration_file)
            module_name = f"migration_{version}"
            
            try:
                # Import the migration module dynamically
                spec = importlib.util.spec_from_file_location(module_name, module_path)
                migration_module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(migration_module)
                
                # Get migration description
                description = getattr(migration_module, 'description', migration_file)
                
                # Connect to database and start transaction
                conn = sqlite3.connect(self.db_path)
                conn.execute("BEGIN TRANSACTION")
                
                try:
                    # Execute the migration
                    migration_module.upgrade(conn)
                    
                    # Record the migration
                    conn.execute(
                        "INSERT INTO schema_migrations (version, description) VALUES (?, ?)",
                        (version, description)
                    )
                    
                    # Commit the transaction
                    conn.commit()
                    # logger.info(f"Successfully applied migration: {migration_file}")
                    applied_in_session.append(version)
                    
                except Exception as e:
                    # Rollback on error
                    conn.rollback()
                    logger.error(f"Error applying migration {migration_file}: {str(e)}")
                    raise
                
                finally:
                    conn.close()
                
            except Exception as e:
                logger.error(f"Failed to load migration {migration_file}: {str(e)}")
                raise
        
        # if not applied_in_session:
        #     # logger.info("No new migrations to apply")
        # else:
        #     logger.info(f"Applied {len(applied_in_session)} new migrations")
        
        return applied_in_session
    
    def downgrade_migration(self, version, migrations_dir=None):
        """
        Downgrade a specific migration by version
        
        Args:
            version: Version of the migration to downgrade
            migrations_dir: Directory containing migration scripts
        
        Returns:
            True if downgrade was successful, False otherwise
        """
        if migrations_dir is None:
            migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
            # logger.info(f"Using default migrations directory: {migrations_dir}")
        
        # Check if migration is applied
        applied = self.get_applied_migrations()
        if version not in applied:
            logger.warning(f"Migration version {version} is not applied, cannot downgrade")
            return False
        
        # Find migration file
        migration_file = None
        for f in os.listdir(migrations_dir):
            if f.endswith('.py') and not f.startswith('__') and f.startswith(f'V{version}'):
                migration_file = f
                break
        
        if not migration_file:
            logger.error(f"Migration file for version {version} not found")
            return False
        
        # logger.info(f"Downgrading migration: {migration_file}")
        
        # Load the migration module
        module_path = os.path.join(migrations_dir, migration_file)
        module_name = f"migration_{version}"
        
        try:
            # Import the migration module dynamically
            spec = importlib.util.spec_from_file_location(module_name, module_path)
            migration_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(migration_module)
            
            # Check if downgrade method exists
            if not hasattr(migration_module, 'downgrade'):
                logger.error(f"Migration {migration_file} does not have a downgrade method")
                return False
            
            # Connect to database and start transaction
            conn = sqlite3.connect(self.db_path)
            conn.execute("BEGIN TRANSACTION")
            
            try:
                # Execute the downgrade
                migration_module.downgrade(conn)
                
                # Remove the migration record
                conn.execute(
                    "DELETE FROM schema_migrations WHERE version = ?",
                    (version,)
                )
                
                # Commit the transaction
                conn.commit()
                # logger.info(f"Successfully downgraded migration: {migration_file}")
                return True
                
            except Exception as e:
                # Rollback on error
                conn.rollback()
                logger.error(f"Error downgrading migration {migration_file}: {str(e)}")
                raise
            
            finally:
                conn.close()
            
        except Exception as e:
            logger.error(f"Failed to load migration {migration_file}: {str(e)}")
            raise
    
    def downgrade_to_version(self, target_version=None, migrations_dir=None):
        """
        Downgrade migrations to a specific version
        
        Args:
            target_version: Version to downgrade to (inclusive). If None, downgrade all migrations.
            migrations_dir: Directory containing migration scripts
        
        Returns:
            List of downgraded migration versions
        """
        if migrations_dir is None:
            migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
            # logger.info(f"Using default migrations directory: {migrations_dir}")
        
        # Get applied migrations
        applied = self.get_applied_migrations()
        # logger.info(f"Found {len(applied)} applied migrations")
        
        if not applied:
            # logger.info("No migrations to downgrade")
            return []
        
        # Determine which migrations to downgrade
        to_downgrade = []
        if target_version is None:
            # Downgrade all migrations
            to_downgrade = applied
        else:
            # Find target version index
            if target_version not in applied:
                logger.error(f"Target version {target_version} is not applied")
                return []
            
            target_index = applied.index(target_version)
            
            # Get all versions after target_version
            to_downgrade = applied[target_index + 1:]
        
        # Sort in reverse order to downgrade newest first
        to_downgrade.sort(reverse=True)
        
        downgraded = []
        for version in to_downgrade:
            try:
                if self.downgrade_migration(version, migrations_dir):
                    downgraded.append(version)
                else:
                    logger.error(f"Failed to downgrade migration {version}, stopping")
                    break
            except Exception as e:
                logger.error(f"Error during downgrade of {version}: {str(e)}")
                break
        
        # if not downgraded:
        #     logger.info("No migrations were downgraded")
        # else:
        #     logger.info(f"Downgraded {len(downgraded)} migrations: {', '.join(downgraded)}")
        
        return downgraded
        
    def create_migration(self, description, migrations_dir=None):
        """
        Create a new migration file with template code
        
        Args:
            description: Short description of what the migration does
            migrations_dir: Directory to create migration in
        
        Returns:
            Path to the created migration file
        """
        if migrations_dir is None:
            migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
        
        # Ensure migrations directory exists
        os.makedirs(migrations_dir, exist_ok=True)
        
        # Get current timestamp for version
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        
        # Format description for filename (lowercase, underscores)
        safe_description = description.lower().replace(' ', '_').replace('-', '_')
        safe_description = ''.join(c for c in safe_description if c.isalnum() or c == '_')
        
        # Create filename
        filename = f"V{timestamp}__{safe_description}.py"
        filepath = os.path.join(migrations_dir, filename)
        
        # Create migration file with template
        with open(filepath, 'w') as f:
            f.write(f'''"""
Migration: {description}
Version: {timestamp}
"""

description = "{description}"

def upgrade(conn):
    """
    Apply the migration
    
    Args:
        conn: SQLite connection object
    """
    cursor = conn.cursor()
    
    # TODO: Implement your migration logic here
    # Example:
    # cursor.execute("""
    #     CREATE TABLE IF NOT EXISTS new_table (
    #         id INTEGER PRIMARY KEY AUTOINCREMENT,
    #         name TEXT NOT NULL
    #     )
    # """)
    
    # No need to commit, the migration manager handles transactions

def downgrade(conn):
    """
    Revert the migration
    
    Args:
        conn: SQLite connection object
    """
    cursor = conn.cursor()
    
    # TODO: Implement your downgrade logic here
    # Example:
    # cursor.execute("DROP TABLE IF EXISTS new_table")
    
    # No need to commit, the migration manager handles transactions
''')
        
        # logger.info(f"Created new migration: {filename}")
        return filepath