#!/usr/bin/env python3 """ NZ Legislation Loophole Analysis Dataset Creation Tool This script processes New Zealand legislation text to create a finetuning dataset for AI models that can identify potential loopholes, ambiguities, and unintended consequences in legal text. The script: 1. Loads and cleans NZ legislation text, preserving legal structure and terminology 2. Chunks the text into manageable sections with overlap for context 3. Uses an LLM to analyze each chunk for legal issues 4. Generates a structured dataset for training AI models on legal loophole detection Usage: python trl.py Requirements: - llama-cpp-python with GGUF model support - psutil for memory monitoring - Input file: nz-legislation.txt containing NZ legislation in JSON lines format Output: - JSON dataset saved to nz_legislation_dataset/nz_legislation_loophole_dataset.json """ import os import json import time import psutil from typing import List, Dict, Any import numpy as np from llama_cpp import Llama import re # Placeholder classes and functions for missing dependencies class ProgressManager: """Simple placeholder for progress tracking""" def __init__(self): pass def show_memory_usage(label: str): """Simple memory usage display""" process = psutil.Process(os.getpid()) memory_mb = process.memory_info().rss / 1024 / 1024 print(f"{label}: {memory_mb:.2f} MB") # Configuration for NZ Legislation Loophole Analysis Dataset Creation INPUT_FILE = "nz-legislation.txt" # Path to New Zealand legislation JSON dataset OUTPUT_DIR = "nz_legislation_dataset" # Directory to save the dataset CHUNK_SIZE = 4096 # Size of text chunks for processing legislation sections CHUNK_OVERLAP = 256 # Overlap between chunks to maintain context BATCH_SIZE = 16 # Number of chunks to process at once MODEL_PATH = "qwen3.gguf" # Path to your Qwen3 GGUF model MAX_TOKENS = 4096 # Maximum tokens for model response # Ensure output directory exists os.makedirs(OUTPUT_DIR, exist_ok=True) def load_model(progress_manager: ProgressManager = None): """Load the LLM model for text generation with progress tracking""" if progress_manager is None: progress_manager = ProgressManager() print("Loading LLM model...") show_memory_usage("Initial memory usage") start_time = time.time() try: llm = Llama.from_pretrained( repo_id="DavidAU/Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-GGUF", filename="Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-D_AU-IQ4_XS-imat.gguf", n_ctx=40960, # Context length n_threads=8, # Adjust based on your CPU verbose=False, n_gpu_layers=-1, # Use all available GPU layers n_batch=4096, # Batch size for processing logits_all=False, # Optimize for text generation use_mlock=True, # Lock model in memory if possible use_mmap=True, # Use memory mapping for better performance ) except Exception as e: print(f"Error loading model: {e}") print("Trying with basic configuration...") # Fallback to basic configuration model = Llama( model_path=MODEL_PATH, n_ctx=40960, n_threads=8, verbose=False, n_gpu_layers=-1, n_batch=4096 ) load_time = time.time() - start_time print(f"LLM model loaded in {load_time:.2f}s") show_memory_usage("Memory after model load") return model def clean_text(text: str) -> str: """Clean and normalize text for better embedding quality, optimized for legal/legislative content""" import re # Preserve section numbers and legal structure while cleaning # Keep section numbers like "1:", "2:", etc. text = re.sub(r'^(\d+:)', r'\1', text, flags=re.MULTILINE) # Remove excessive whitespace but preserve paragraph structure text = re.sub(r'[ \t]+', ' ', text) # Replace multiple spaces/tabs with single space text = re.sub(r'\n\s*\n', '\n\n', text) # Preserve paragraph breaks but clean up text = re.sub(r'\n{3,}', '\n\n', text) # Reduce excessive newlines to double # Remove control characters but preserve legal formatting text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) # Remove control chars except tab and newline # Handle legal-specific characters and formatting # Keep legal punctuation and symbols allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§' text = re.sub(r'[^' + allowed_chars + ']', '', text) # Normalize quotes and apostrophes for legal text text = re.sub(r'[""]', '"', text) # Normalize double quotes text = re.sub(r"['']", "'", text) # Normalize single quotes text = re.sub(r'`', "'", text) # Replace backticks with apostrophes # Clean up legal numbering and references # Normalize section references text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE) text = re.sub(r'(\d+)\s*[Jj]anuary', r'\1 January', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Jj]uly', r'\1 July', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Aa]pril', r'\1 April', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Ff]ebruary', r'\1 February', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Dd]ecember', r'\1 December', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Aa]ugust', r'\1 August', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Mm]arch', r'\1 March', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Mm]ay', r'\1 May', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Jj]une', r'\1 June', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Ss]eptember', r'\1 September', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Oo]ctober', r'\1 October', text) # Clean date formatting text = re.sub(r'(\d+)\s*[Nn]ovember', r'\1 November', text) # Clean date formatting # Clean up punctuation spacing in legal text text = re.sub(r'\s+([\.!\?\,\;\:])', r'\1', text) # Remove space before punctuation text = re.sub(r'([\.!\?\,\;\:])\s*', r'\1 ', text) # Ensure space after punctuation # Handle legal citations and references (generic patterns) # Normalize act names with years - generic pattern for "Act ####" format text = re.sub(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', r'\1 Act', text) # Normalize act names # Clean up amendment references (generic patterns) text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text) text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text) # Normalize section references # Generic pattern for legal document sections text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)', lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE) # NZ-specific legal enhancements # Handle New Zealand specific terms and references text = re.sub(r'\b[Nn]ew\s+[Zz]ealand\b', 'New Zealand', text) # Normalize "New Zealand" text = re.sub(r'\b[Pp]arliament\b', 'Parliament', text) # Normalize "Parliament" text = re.sub(r'\b[Cc]rown\b', 'Crown', text) # Normalize "Crown" text = re.sub(r'\b[Gg]overnment\b', 'Government', text) # Normalize "Government" # Handle NZ-specific legal citations (e.g., "NZB" references, Treaty of Waitangi) text = re.sub(r'\b[Nn][Zz][Bb]\s+(\d+)', r'NZB \1', text) # Normalize NZB references text = re.sub(r'[Tt]reaty\s+[Oo]f\s+[Ww]aitangi', 'Treaty of Waitangi', text, flags=re.IGNORECASE) # Handle Maori-specific characters if present (basic support) # Keep common Maori characters: ā, ē, ī, ō, ū, wh maori_chars = 'āēīōūwhĀĒĪŌŪWH' allowed_chars += maori_chars text = re.sub(r'[^' + allowed_chars + ']', '', text) # Remove empty lines and trim while preserving legal structure lines = [] for line in text.split('\n'): stripped = line.strip() if stripped: # Keep non-empty lines # Preserve section headers if re.match(r'^\d+:', stripped): lines.append(stripped) else: lines.append(stripped) text = '\n'.join(lines) # Final cleanup text = text.strip() return text # Constants for prompt formatting REASONING_START = "" REASONING_END = "" SOLUTION_START = "" SOLUTION_END = "" def create_system_prompt(text: str) -> str: """Create a system prompt for analyzing legislative text for loopholes and ambiguities""" return f"""You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. LEGISLATION TEXT: {text} TASK: Analyze this legislative text and identify potential loopholes, ambiguities, or unintended consequences. REASONING: Provide a structured analysis in the following format: 1. **Text Meaning**: Explain what the text means and its intended purpose 2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited 3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose 4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation 5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent Write your complete analysis between {REASONING_START} and {REASONING_END}. Then provide your overall conclusion between {SOLUTION_START} and {SOLUTION_END}. """ def generate_chat_template(system_prompt: str) -> str: """ Generate a chat template using the GGUF model's native chat format. This uses the proper message structure with BOS/EOS tokens for better model compatibility. """ # Build the chat using the GGUF template structure chat_messages = [] # System message if system_prompt: chat_messages.append("<|im_start|>system") chat_messages.append(system_prompt) chat_messages.append("<|im_end|>") # User message with the analysis request chat_messages.append("<|im_start|>user") chat_messages.append("Analyze the given legislative text for loopholes, ambiguities, and unintended consequences. Provide a structured legal analysis following the specified format.") chat_messages.append("<|im_end|>") # Assistant message with generation prompt chat_messages.append("<|im_start|>assistant") chat_messages.append("") # Empty for generation return "\n".join(chat_messages) def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]: """Split text into overlapping chunks for processing""" if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] # Try to end chunk at a sentence boundary if possible if end < len(text): # Look for sentence endings in the last 100 characters sentence_end = max( chunk.rfind('. ', max(0, len(chunk) - 100)), chunk.rfind('! ', max(0, len(chunk) - 100)), chunk.rfind('? ', max(0, len(chunk) - 100)) ) if sentence_end != -1: chunk = chunk[:sentence_end + 2] # Include the sentence ending chunks.append(chunk) start = end - overlap if end < len(text) else len(text) return chunks def generate_response(model, prompt: str, max_tokens: int = MAX_TOKENS) -> str: """ Generate a response from the model for a given prompt with optimized parameters for legal analysis. Parameter Explanations: - temperature=0.3: Balanced creativity for legal analysis (not too random, not too deterministic) - top_p=0.85: Nucleus sampling - considers top 85% probability mass for coherent legal text - top_k=50: Top-k sampling - considers top 50 tokens for better legal terminology selection - min_p=0.05: Minimum probability threshold to avoid low-quality tokens Anti-Repetition Parameters: - repetition_penalty=1.15: Penalizes repetition of phrases (15% penalty) - presence_penalty=0.1: Encourages topic diversity across the response - frequency_penalty=0.1: Reduces overuse of frequent tokens Advanced Sampling: - typical_p=0.95: Focuses on typical token probabilities for legal text patterns - tfs_z=0.95: Tail-free sampling for more natural legal reasoning - mirostat_mode=2: Mirostat v2 for perplexity-controlled generation - mirostat_tau=4.0: Target entropy level for legal analysis - mirostat_eta=0.15: Learning rate for perplexity adaptation """ try: response = model( prompt, max_tokens=max_tokens, # Core generation parameters temperature=0.3, # Balanced temperature for legal analysis top_p=0.85, # Nucleus sampling for coherent legal text top_k=50, # Top-k sampling for better token selection min_p=0.05, # Minimum probability threshold to avoid low-quality tokens # Anti-repetition parameters repeat_penalty=1.15, # Reduce repetition of phrases presence_penalty=0.1, # Encourage topic diversity frequency_penalty=0.1, # Reduce frequent token usage # Advanced sampling parameters typical_p=0.95, # Typical token probability for legal text patterns tfs_z=0.95, # Tail-free sampling for better reasoning mirostat_mode=2, # Mirostat v2 for perplexity control mirostat_tau=4.0, # Mirostat target entropy mirostat_eta=0.15, # Mirostat learning rate # Stopping conditions stop=[SOLUTION_END, "", ""] # Multiple stop tokens ) return response['choices'][0]['text'].strip() except Exception as e: print(f"Error generating response: {e}") # Try with fallback parameters if advanced ones fail try: response = model( prompt, max_tokens=max_tokens, temperature=0.3, top_p=0.85, top_k=50, repeat_penalty=1.15, stop=[SOLUTION_END, ""] ) return response['choices'][0]['text'].strip() except Exception as e2: print(f"Fallback also failed: {e2}") return "" def parse_legislation_json(file_path: str) -> List[Dict[str, Any]]: """Parse the JSON lines format of NZ legislation dataset""" legislation_entries = [] try: with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if line: try: entry = json.loads(line) if 'id' in entry and 'text' in entry: legislation_entries.append(entry) else: print(f"Warning: Line {line_num} missing required fields, skipping") except json.JSONDecodeError as e: print(f"Warning: Could not parse line {line_num}: {e}") continue except Exception as e: print(f"Error reading legislation file: {e}") return [] print(f"Successfully parsed {len(legislation_entries)} legislation entries") return legislation_entries def create_finetuning_dataset(input_file: str, model, output_file: str = None) -> List[Dict[str, Any]]: """Create a finetuning dataset by processing NZ legislation JSON dataset with incremental saving""" if output_file is None: output_file = os.path.join(OUTPUT_DIR, "nz_legislation_loophole_dataset.json") # Create temporary file paths temp_file = output_file.replace('.json', '_temp.jsonl') backup_file = output_file.replace('.json', '_backup.json') print(f"Parsing legislation dataset from {input_file}") legislation_entries = parse_legislation_json(input_file) if not legislation_entries: print("No legislation entries found to process") return [] dataset = [] total_entries = len(legislation_entries) saved_count = 0 print(f"Processing {total_entries} legislation entries...") print(f"Dataset will be saved incrementally to: {temp_file}") try: # Open temporary file for incremental saving with open(temp_file, 'w', encoding='utf-8') as temp_f: for entry_num, entry in enumerate(legislation_entries, 1): legislation_id = entry.get('id', f'entry_{entry_num}') title = entry.get('title', 'Unknown Title') year = entry.get('year', 'Unknown Year') raw_text = entry.get('text', '') print(f"\nProcessing entry {entry_num}/{total_entries}: {title} ({year}) - ID: {legislation_id}") # Clean the legislation text cleaned_text = clean_text(raw_text) # Chunk the text if it's too long chunks = chunk_text(cleaned_text) print(f" - Text length: {len(raw_text)} characters") print(f" - Number of chunks: {len(chunks)}") # Process each chunk for chunk_id, chunk in enumerate(chunks): # Create prompt for this chunk system_prompt = create_system_prompt(chunk) full_prompt = generate_chat_template(system_prompt) # Generate response response = generate_response(model, full_prompt) # Print response for monitoring print(f"\n📝 **Generated Analysis for {title} (Chunk {chunk_id + 1}/{len(chunks)})**:") print(f" Response length: {len(response)} characters") # Show preview of the analysis preview = response.replace('\n', ' ').strip() print(f" Preview: {preview}") # Check for key analysis elements has_reasoning = '' in response or 'reasoning' in response.lower() has_loopholes = 'loophole' in response.lower() or 'ambiguity' in response.lower() or 'issue' in response.lower() has_recommendations = 'recommend' in response.lower() or 'suggest' in response.lower() print(f" Analysis quality: {'✅' if has_reasoning else '❌'} Reasoning | {'✅' if has_loopholes else '❌'} Loopholes | {'✅' if has_recommendations else '❌'} Recommendations") # Add to dataset with metadata dataset_entry = { "prompt": full_prompt, "response": response, "legislation_id": legislation_id, "title": title, "year": year, "chunk_id": chunk_id, "total_chunks": len(chunks), "text_length": len(chunk), "original_text_length": len(raw_text) } # Save entry immediately to temporary file (JSON Lines format) json.dump(dataset_entry, temp_f, ensure_ascii=False) temp_f.write('\n') temp_f.flush() # Force write to disk dataset.append(dataset_entry) saved_count += 1 # Progress update every 10 entries if saved_count % 10 == 0: print(f" ✓ Saved {saved_count} entries so far...") print(f"\n✓ All entries processed and saved to temporary file") print(f"✓ Total entries saved: {saved_count}") # Create backup of existing file if it exists if os.path.exists(output_file): print(f"Creating backup of existing dataset...") os.rename(output_file, backup_file) # Convert JSON Lines to final JSON format print(f"Converting to final JSON format...") with open(temp_file, 'r', encoding='utf-8') as temp_f: lines = temp_f.readlines() final_dataset = [] for line in lines: if line.strip(): final_dataset.append(json.loads(line)) # Save final consolidated JSON file with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_dataset, f, indent=2, ensure_ascii=False) print(f"✓ Final dataset saved to: {output_file}") # Clean up temporary file if os.path.exists(temp_file): os.remove(temp_file) print(f"✓ Temporary file cleaned up") # Clean up backup file if everything succeeded if os.path.exists(backup_file): os.remove(backup_file) print(f"✓ Backup file cleaned up") print(f"\n🎉 Dataset creation complete!") print(f" • Processed {total_entries} legislation documents") print(f" • Generated {len(final_dataset)} analysis entries") print(f" • Total chunks processed: {sum(entry.get('total_chunks', 1) for entry in final_dataset[:total_entries])}") return final_dataset except KeyboardInterrupt: print(f"\n⚠️ Process interrupted by user") print(f" • Partial dataset saved to: {temp_file}") print(f" • {saved_count} entries saved so far") print(f" • You can resume processing or use the temporary file") raise except Exception as e: print(f"\n❌ Error during processing: {e}") print(f" • Partial dataset saved to: {temp_file}") print(f" • {saved_count} entries saved so far") if os.path.exists(backup_file): print(f" • Original dataset restored from backup") os.rename(backup_file, output_file) raise def main(): """Main execution function""" print("Starting NZ Legislation Loophole Analysis Dataset Creation") print("=" * 60) # Load the model model = load_model() # Create the dataset dataset = create_finetuning_dataset(INPUT_FILE, model) # Cleanup if hasattr(model, 'close'): model.close() print("\nDataset creation completed successfully!") print(f"Output saved to: {os.path.join(OUTPUT_DIR, 'nz_legislation_loophole_dataset.json')}") if __name__ == "__main__": main()