|
|
|
|
|
""" |
|
|
Performance Monitor |
|
|
|
|
|
Monitors system performance metrics for the NZ Legislation Loophole Analysis application. |
|
|
Tracks memory usage, CPU utilization, processing times, and other performance indicators. |
|
|
""" |
|
|
|
|
|
import time |
|
|
import threading |
|
|
import psutil |
|
|
from typing import Dict, Any, Optional, List |
|
|
from collections import deque |
|
|
import streamlit as st |
|
|
|
|
|
class PerformanceMonitor: |
|
|
"""Performance monitoring system""" |
|
|
|
|
|
def __init__(self, max_history: int = 1000): |
|
|
""" |
|
|
Initialize performance monitor |
|
|
|
|
|
Args: |
|
|
max_history: Maximum number of historical data points to keep |
|
|
""" |
|
|
self.max_history = max_history |
|
|
self.lock = threading.RLock() |
|
|
|
|
|
|
|
|
self.memory_history = deque(maxlen=max_history) |
|
|
self.cpu_history = deque(maxlen=max_history) |
|
|
self.processing_times = deque(maxlen=max_history) |
|
|
|
|
|
|
|
|
self.current_metrics = { |
|
|
'memory_usage_mb': 0, |
|
|
'memory_percent': 0, |
|
|
'cpu_percent': 0, |
|
|
'active_threads': 0, |
|
|
'processing_time_avg': 0, |
|
|
'processing_time_max': 0, |
|
|
'processing_time_min': 0, |
|
|
'total_processed_chunks': 0, |
|
|
'chunks_per_second': 0 |
|
|
} |
|
|
|
|
|
|
|
|
self.processing_start_time = None |
|
|
self.last_chunk_time = time.time() |
|
|
|
|
|
|
|
|
self.monitoring = True |
|
|
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) |
|
|
self.monitor_thread.start() |
|
|
|
|
|
def _monitor_loop(self): |
|
|
"""Background monitoring loop""" |
|
|
while self.monitoring: |
|
|
try: |
|
|
self._update_metrics() |
|
|
time.sleep(1) |
|
|
except Exception as e: |
|
|
print(f"Performance monitoring error: {e}") |
|
|
time.sleep(5) |
|
|
|
|
|
def _update_metrics(self): |
|
|
"""Update current performance metrics""" |
|
|
process = psutil.Process() |
|
|
|
|
|
with self.lock: |
|
|
|
|
|
memory_info = process.memory_info() |
|
|
memory_usage_mb = memory_info.rss / 1024 / 1024 |
|
|
memory_percent = process.memory_percent() |
|
|
|
|
|
|
|
|
cpu_percent = process.cpu_percent(interval=0.1) |
|
|
|
|
|
|
|
|
active_threads = len(process.threads()) |
|
|
|
|
|
|
|
|
self.current_metrics.update({ |
|
|
'memory_usage_mb': memory_usage_mb, |
|
|
'memory_percent': memory_percent, |
|
|
'cpu_percent': cpu_percent, |
|
|
'active_threads': active_threads |
|
|
}) |
|
|
|
|
|
|
|
|
current_time = time.time() |
|
|
self.memory_history.append((current_time, memory_usage_mb)) |
|
|
self.cpu_history.append((current_time, cpu_percent)) |
|
|
|
|
|
def start_processing_timer(self): |
|
|
"""Start timing a processing operation""" |
|
|
self.processing_start_time = time.time() |
|
|
|
|
|
def end_processing_timer(self) -> float: |
|
|
"""End timing and return elapsed time""" |
|
|
if self.processing_start_time is None: |
|
|
return 0 |
|
|
|
|
|
elapsed = time.time() - self.processing_start_time |
|
|
self.processing_start_time = None |
|
|
|
|
|
with self.lock: |
|
|
self.processing_times.append(elapsed) |
|
|
|
|
|
|
|
|
if self.processing_times: |
|
|
self.current_metrics['processing_time_avg'] = sum(self.processing_times) / len(self.processing_times) |
|
|
self.current_metrics['processing_time_max'] = max(self.processing_times) |
|
|
self.current_metrics['processing_time_min'] = min(self.processing_times) |
|
|
|
|
|
return elapsed |
|
|
|
|
|
def record_chunk_processing(self): |
|
|
"""Record that a chunk has been processed""" |
|
|
current_time = time.time() |
|
|
|
|
|
with self.lock: |
|
|
self.current_metrics['total_processed_chunks'] += 1 |
|
|
|
|
|
|
|
|
time_diff = current_time - self.last_chunk_time |
|
|
if time_diff > 0: |
|
|
current_cps = 1.0 / time_diff |
|
|
|
|
|
self.current_metrics['chunks_per_second'] = ( |
|
|
0.9 * self.current_metrics['chunks_per_second'] + 0.1 * current_cps |
|
|
) |
|
|
|
|
|
self.last_chunk_time = current_time |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get current performance statistics""" |
|
|
with self.lock: |
|
|
return self.current_metrics.copy() |
|
|
|
|
|
def get_memory_history(self, time_window_seconds: int = 300) -> List[tuple]: |
|
|
"""Get memory usage history within time window""" |
|
|
current_time = time.time() |
|
|
cutoff_time = current_time - time_window_seconds |
|
|
|
|
|
with self.lock: |
|
|
return [(t, v) for t, v in self.memory_history if t >= cutoff_time] |
|
|
|
|
|
def get_cpu_history(self, time_window_seconds: int = 300) -> List[tuple]: |
|
|
"""Get CPU usage history within time window""" |
|
|
current_time = time.time() |
|
|
cutoff_time = current_time - time_window_seconds |
|
|
|
|
|
with self.lock: |
|
|
return [(t, v) for t, v in self.cpu_history if t >= cutoff_time] |
|
|
|
|
|
def get_processing_time_stats(self) -> Dict[str, Any]: |
|
|
"""Get processing time statistics""" |
|
|
with self.lock: |
|
|
if not self.processing_times: |
|
|
return { |
|
|
'count': 0, |
|
|
'average': 0, |
|
|
'maximum': 0, |
|
|
'minimum': 0, |
|
|
'median': 0 |
|
|
} |
|
|
|
|
|
sorted_times = sorted(self.processing_times) |
|
|
|
|
|
return { |
|
|
'count': len(self.processing_times), |
|
|
'average': sum(self.processing_times) / len(self.processing_times), |
|
|
'maximum': max(self.processing_times), |
|
|
'minimum': min(self.processing_times), |
|
|
'median': sorted_times[len(sorted_times) // 2] |
|
|
} |
|
|
|
|
|
def get_system_info(self) -> Dict[str, Any]: |
|
|
"""Get system information""" |
|
|
return { |
|
|
'cpu_count': psutil.cpu_count(), |
|
|
'cpu_count_logical': psutil.cpu_count(logical=True), |
|
|
'total_memory_gb': psutil.virtual_memory().total / (1024**3), |
|
|
'available_memory_gb': psutil.virtual_memory().available / (1024**3), |
|
|
'python_version': f"{psutil.python_implementation()} {psutil.python_version()}", |
|
|
'platform': psutil.platform |
|
|
} |
|
|
|
|
|
def reset_stats(self): |
|
|
"""Reset performance statistics""" |
|
|
with self.lock: |
|
|
self.processing_times.clear() |
|
|
self.current_metrics['total_processed_chunks'] = 0 |
|
|
self.current_metrics['chunks_per_second'] = 0 |
|
|
self.current_metrics['processing_time_avg'] = 0 |
|
|
self.current_metrics['processing_time_max'] = 0 |
|
|
self.current_metrics['processing_time_min'] = 0 |
|
|
|
|
|
def cleanup(self): |
|
|
"""Cleanup resources""" |
|
|
self.monitoring = False |
|
|
if self.monitor_thread.is_alive(): |
|
|
self.monitor_thread.join(timeout=2) |
|
|
|
|
|
def get_performance_report(self) -> Dict[str, Any]: |
|
|
"""Generate a comprehensive performance report""" |
|
|
return { |
|
|
'current_metrics': self.get_stats(), |
|
|
'processing_stats': self.get_processing_time_stats(), |
|
|
'system_info': self.get_system_info(), |
|
|
'memory_history_count': len(self.memory_history), |
|
|
'cpu_history_count': len(self.cpu_history), |
|
|
'processing_times_count': len(self.processing_times) |
|
|
} |
|
|
|
|
|
def check_memory_threshold(self, threshold_mb: int) -> bool: |
|
|
"""Check if memory usage is above threshold""" |
|
|
return self.current_metrics['memory_usage_mb'] > threshold_mb |
|
|
|
|
|
def check_cpu_threshold(self, threshold_percent: float) -> bool: |
|
|
"""Check if CPU usage is above threshold""" |
|
|
return self.current_metrics['cpu_percent'] > threshold_percent |
|
|
|
|
|
def get_recommendations(self) -> List[str]: |
|
|
"""Get performance recommendations based on current metrics""" |
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
if self.current_metrics['memory_usage_mb'] > 7000: |
|
|
recommendations.append("High memory usage detected. Consider reducing batch size or chunk size.") |
|
|
elif self.current_metrics['memory_usage_mb'] > 5000: |
|
|
recommendations.append("Moderate memory usage. Monitor closely during processing.") |
|
|
|
|
|
|
|
|
if self.current_metrics['cpu_percent'] > 90: |
|
|
recommendations.append("High CPU usage. Consider reducing processing intensity.") |
|
|
elif self.current_metrics['cpu_percent'] > 70: |
|
|
recommendations.append("Moderate CPU usage. Processing is running optimally.") |
|
|
|
|
|
|
|
|
avg_time = self.current_metrics.get('processing_time_avg', 0) |
|
|
if avg_time > 10: |
|
|
recommendations.append("Slow processing detected. Consider using a more powerful model or optimizing settings.") |
|
|
elif avg_time > 5: |
|
|
recommendations.append("Moderate processing speed. Consider increasing batch size if memory allows.") |
|
|
|
|
|
|
|
|
|
|
|
chunks_per_second = self.current_metrics.get('chunks_per_second', 0) |
|
|
if chunks_per_second < 1: |
|
|
recommendations.append("Low processing throughput. Consider optimizing chunk size or model parameters.") |
|
|
|
|
|
if not recommendations: |
|
|
recommendations.append("Performance is optimal. All metrics are within normal ranges.") |
|
|
|
|
|
return recommendations |
|
|
|
|
|
|
|
|
_performance_instance = None |
|
|
_performance_lock = threading.Lock() |
|
|
|
|
|
def get_performance_monitor(max_history: int = 1000) -> PerformanceMonitor: |
|
|
"""Get or create global performance monitor instance""" |
|
|
global _performance_instance |
|
|
|
|
|
with _performance_lock: |
|
|
if _performance_instance is None: |
|
|
_performance_instance = PerformanceMonitor(max_history) |
|
|
|
|
|
return _performance_instance |
|
|
|