l / trl copy.py
Princess3's picture
Upload 25 files
c089ca4 verified
#!/usr/bin/env python3
"""
NZ Legislation Loophole Analysis Dataset Creation Tool
This script processes New Zealand legislation text to create a finetuning dataset for AI models
that can identify potential loopholes, ambiguities, and unintended consequences in legal text.
The script:
1. Loads and cleans NZ legislation text, preserving legal structure and terminology
2. Chunks the text into manageable sections with overlap for context
3. Uses an LLM to analyze each chunk for legal issues
4. Generates a structured dataset for training AI models on legal loophole detection
Usage:
python trl.py
Requirements:
- llama-cpp-python with GGUF model support
- psutil for memory monitoring
- Input file: nz-legislation.txt containing NZ legislation in JSON lines format
Output:
- JSON dataset saved to nz_legislation_dataset/nz_legislation_loophole_dataset.json
"""
import os
import json
import time
import psutil
from typing import List, Dict, Any
import numpy as np
from llama_cpp import Llama
import re
# Placeholder classes and functions for missing dependencies
class ProgressManager:
"""Simple placeholder for progress tracking"""
def __init__(self):
pass
def show_memory_usage(label: str):
"""Simple memory usage display"""
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"{label}: {memory_mb:.2f} MB")
# Configuration for NZ Legislation Loophole Analysis Dataset Creation
INPUT_FILE = "nz-legislation.txt" # Path to New Zealand legislation JSON dataset
OUTPUT_DIR = "nz_legislation_dataset" # Directory to save the dataset
CHUNK_SIZE = 4096 # Size of text chunks for processing legislation sections
CHUNK_OVERLAP = 256 # Overlap between chunks to maintain context
BATCH_SIZE = 16 # Number of chunks to process at once
MODEL_PATH = "qwen3.gguf" # Path to your Qwen3 GGUF model
MAX_TOKENS = 4096 # Maximum tokens for model response
# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)
def load_model(progress_manager: ProgressManager = None):
"""Load the LLM model for text generation with progress tracking"""
if progress_manager is None:
progress_manager = ProgressManager()
print("Loading LLM model...")
show_memory_usage("Initial memory usage")
start_time = time.time()
try:
llm = Llama.from_pretrained(
repo_id="DavidAU/Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-GGUF",
filename="Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-D_AU-IQ4_XS-imat.gguf",
n_ctx=40960, # Context length
n_threads=8, # Adjust based on your CPU
verbose=False,
n_gpu_layers=-1, # Use all available GPU layers
n_batch=4096, # Batch size for processing
logits_all=False, # Optimize for text generation
use_mlock=True, # Lock model in memory if possible
use_mmap=True, # Use memory mapping for better performance
)
except Exception as e:
print(f"Error loading model: {e}")
print("Trying with basic configuration...")
# Fallback to basic configuration
model = Llama(
model_path=MODEL_PATH,
n_ctx=40960,
n_threads=8,
verbose=False,
n_gpu_layers=-1,
n_batch=4096
)
load_time = time.time() - start_time
print(f"LLM model loaded in {load_time:.2f}s")
show_memory_usage("Memory after model load")
return model
def clean_text(text: str) -> str:
"""Clean and normalize text for better embedding quality, optimized for legal/legislative content"""
import re
# Preserve section numbers and legal structure while cleaning
# Keep section numbers like "1:", "2:", etc.
text = re.sub(r'^(\d+:)', r'\1', text, flags=re.MULTILINE)
# Remove excessive whitespace but preserve paragraph structure
text = re.sub(r'[ \t]+', ' ', text) # Replace multiple spaces/tabs with single space
text = re.sub(r'\n\s*\n', '\n\n', text) # Preserve paragraph breaks but clean up
text = re.sub(r'\n{3,}', '\n\n', text) # Reduce excessive newlines to double
# Remove control characters but preserve legal formatting
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) # Remove control chars except tab and newline
# Handle legal-specific characters and formatting
# Keep legal punctuation and symbols
allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§'
text = re.sub(r'[^' + allowed_chars + ']', '', text)
# Normalize quotes and apostrophes for legal text
text = re.sub(r'[""]', '"', text) # Normalize double quotes
text = re.sub(r"['']", "'", text) # Normalize single quotes
text = re.sub(r'`', "'", text) # Replace backticks with apostrophes
# Clean up legal numbering and references
# Normalize section references
text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE)
text = re.sub(r'(\d+)\s*[Jj]anuary', r'\1 January', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Jj]uly', r'\1 July', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Aa]pril', r'\1 April', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Ff]ebruary', r'\1 February', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Dd]ecember', r'\1 December', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Aa]ugust', r'\1 August', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Mm]arch', r'\1 March', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Mm]ay', r'\1 May', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Jj]une', r'\1 June', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Ss]eptember', r'\1 September', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Oo]ctober', r'\1 October', text) # Clean date formatting
text = re.sub(r'(\d+)\s*[Nn]ovember', r'\1 November', text) # Clean date formatting
# Clean up punctuation spacing in legal text
text = re.sub(r'\s+([\.!\?\,\;\:])', r'\1', text) # Remove space before punctuation
text = re.sub(r'([\.!\?\,\;\:])\s*', r'\1 ', text) # Ensure space after punctuation
# Handle legal citations and references (generic patterns)
# Normalize act names with years - generic pattern for "Act ####" format
text = re.sub(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', r'\1 Act', text) # Normalize act names
# Clean up amendment references (generic patterns)
text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text)
text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text) # Normalize section references
# Generic pattern for legal document sections
text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)',
lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE)
# NZ-specific legal enhancements
# Handle New Zealand specific terms and references
text = re.sub(r'\b[Nn]ew\s+[Zz]ealand\b', 'New Zealand', text) # Normalize "New Zealand"
text = re.sub(r'\b[Pp]arliament\b', 'Parliament', text) # Normalize "Parliament"
text = re.sub(r'\b[Cc]rown\b', 'Crown', text) # Normalize "Crown"
text = re.sub(r'\b[Gg]overnment\b', 'Government', text) # Normalize "Government"
# Handle NZ-specific legal citations (e.g., "NZB" references, Treaty of Waitangi)
text = re.sub(r'\b[Nn][Zz][Bb]\s+(\d+)', r'NZB \1', text) # Normalize NZB references
text = re.sub(r'[Tt]reaty\s+[Oo]f\s+[Ww]aitangi', 'Treaty of Waitangi', text, flags=re.IGNORECASE)
# Handle Maori-specific characters if present (basic support)
# Keep common Maori characters: ā, Δ“, Δ«, ō, Ε«, wh
maori_chars = 'āēīōūwhΔ€Δ’ΔͺŌΕͺWH'
allowed_chars += maori_chars
text = re.sub(r'[^' + allowed_chars + ']', '', text)
# Remove empty lines and trim while preserving legal structure
lines = []
for line in text.split('\n'):
stripped = line.strip()
if stripped: # Keep non-empty lines
# Preserve section headers
if re.match(r'^\d+:', stripped):
lines.append(stripped)
else:
lines.append(stripped)
text = '\n'.join(lines)
# Final cleanup
text = text.strip()
return text
# Constants for prompt formatting
REASONING_START = "<start_working_out>"
REASONING_END = "<end_working_out>"
SOLUTION_START = "<SOLUTION>"
SOLUTION_END = "</SOLUTION>"
def create_system_prompt(text: str) -> str:
"""Create a system prompt for analyzing legislative text for loopholes and ambiguities"""
return f"""You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities.
LEGISLATION TEXT:
{text}
TASK: Analyze this legislative text and identify potential loopholes, ambiguities, or unintended consequences.
REASONING: Provide a structured analysis in the following format:
1. **Text Meaning**: Explain what the text means and its intended purpose
2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited
3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose
4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation
5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent
Write your complete analysis between {REASONING_START} and {REASONING_END}.
Then provide your overall conclusion between {SOLUTION_START} and {SOLUTION_END}.
"""
def generate_chat_template(system_prompt: str) -> str:
"""
Generate a chat template using the GGUF model's native chat format.
This uses the proper message structure with BOS/EOS tokens for better model compatibility.
"""
# Build the chat using the GGUF template structure
chat_messages = []
# System message
if system_prompt:
chat_messages.append("<|im_start|>system")
chat_messages.append(system_prompt)
chat_messages.append("<|im_end|>")
# User message with the analysis request
chat_messages.append("<|im_start|>user")
chat_messages.append("Analyze the given legislative text for loopholes, ambiguities, and unintended consequences. Provide a structured legal analysis following the specified format.")
chat_messages.append("<|im_end|>")
# Assistant message with generation prompt
chat_messages.append("<|im_start|>assistant")
chat_messages.append("") # Empty for generation
return "\n".join(chat_messages)
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
"""Split text into overlapping chunks for processing"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Try to end chunk at a sentence boundary if possible
if end < len(text):
# Look for sentence endings in the last 100 characters
sentence_end = max(
chunk.rfind('. ', max(0, len(chunk) - 100)),
chunk.rfind('! ', max(0, len(chunk) - 100)),
chunk.rfind('? ', max(0, len(chunk) - 100))
)
if sentence_end != -1:
chunk = chunk[:sentence_end + 2] # Include the sentence ending
chunks.append(chunk)
start = end - overlap if end < len(text) else len(text)
return chunks
def generate_response(model, prompt: str, max_tokens: int = MAX_TOKENS) -> str:
"""
Generate a response from the model for a given prompt with optimized parameters for legal analysis.
Parameter Explanations:
- temperature=0.3: Balanced creativity for legal analysis (not too random, not too deterministic)
- top_p=0.85: Nucleus sampling - considers top 85% probability mass for coherent legal text
- top_k=50: Top-k sampling - considers top 50 tokens for better legal terminology selection
- min_p=0.05: Minimum probability threshold to avoid low-quality tokens
Anti-Repetition Parameters:
- repetition_penalty=1.15: Penalizes repetition of phrases (15% penalty)
- presence_penalty=0.1: Encourages topic diversity across the response
- frequency_penalty=0.1: Reduces overuse of frequent tokens
Advanced Sampling:
- typical_p=0.95: Focuses on typical token probabilities for legal text patterns
- tfs_z=0.95: Tail-free sampling for more natural legal reasoning
- mirostat_mode=2: Mirostat v2 for perplexity-controlled generation
- mirostat_tau=4.0: Target entropy level for legal analysis
- mirostat_eta=0.15: Learning rate for perplexity adaptation
"""
try:
response = model(
prompt,
max_tokens=max_tokens,
# Core generation parameters
temperature=0.3, # Balanced temperature for legal analysis
top_p=0.85, # Nucleus sampling for coherent legal text
top_k=50, # Top-k sampling for better token selection
min_p=0.05, # Minimum probability threshold to avoid low-quality tokens
# Anti-repetition parameters
repeat_penalty=1.15, # Reduce repetition of phrases
presence_penalty=0.1, # Encourage topic diversity
frequency_penalty=0.1, # Reduce frequent token usage
# Advanced sampling parameters
typical_p=0.95, # Typical token probability for legal text patterns
tfs_z=0.95, # Tail-free sampling for better reasoning
mirostat_mode=2, # Mirostat v2 for perplexity control
mirostat_tau=4.0, # Mirostat target entropy
mirostat_eta=0.15, # Mirostat learning rate
# Stopping conditions
stop=[SOLUTION_END, "</SOLUTION>", "<end_working_out>"] # Multiple stop tokens
)
return response['choices'][0]['text'].strip()
except Exception as e:
print(f"Error generating response: {e}")
# Try with fallback parameters if advanced ones fail
try:
response = model(
prompt,
max_tokens=max_tokens,
temperature=0.3,
top_p=0.85,
top_k=50,
repeat_penalty=1.15,
stop=[SOLUTION_END, "</SOLUTION>"]
)
return response['choices'][0]['text'].strip()
except Exception as e2:
print(f"Fallback also failed: {e2}")
return ""
def parse_legislation_json(file_path: str) -> List[Dict[str, Any]]:
"""Parse the JSON lines format of NZ legislation dataset"""
legislation_entries = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line:
try:
entry = json.loads(line)
if 'id' in entry and 'text' in entry:
legislation_entries.append(entry)
else:
print(f"Warning: Line {line_num} missing required fields, skipping")
except json.JSONDecodeError as e:
print(f"Warning: Could not parse line {line_num}: {e}")
continue
except Exception as e:
print(f"Error reading legislation file: {e}")
return []
print(f"Successfully parsed {len(legislation_entries)} legislation entries")
return legislation_entries
def create_finetuning_dataset(input_file: str, model, output_file: str = None) -> List[Dict[str, Any]]:
"""Create a finetuning dataset by processing NZ legislation JSON dataset with incremental saving"""
if output_file is None:
output_file = os.path.join(OUTPUT_DIR, "nz_legislation_loophole_dataset.json")
# Create temporary file paths
temp_file = output_file.replace('.json', '_temp.jsonl')
backup_file = output_file.replace('.json', '_backup.json')
print(f"Parsing legislation dataset from {input_file}")
legislation_entries = parse_legislation_json(input_file)
if not legislation_entries:
print("No legislation entries found to process")
return []
dataset = []
total_entries = len(legislation_entries)
saved_count = 0
print(f"Processing {total_entries} legislation entries...")
print(f"Dataset will be saved incrementally to: {temp_file}")
try:
# Open temporary file for incremental saving
with open(temp_file, 'w', encoding='utf-8') as temp_f:
for entry_num, entry in enumerate(legislation_entries, 1):
legislation_id = entry.get('id', f'entry_{entry_num}')
title = entry.get('title', 'Unknown Title')
year = entry.get('year', 'Unknown Year')
raw_text = entry.get('text', '')
print(f"\nProcessing entry {entry_num}/{total_entries}: {title} ({year}) - ID: {legislation_id}")
# Clean the legislation text
cleaned_text = clean_text(raw_text)
# Chunk the text if it's too long
chunks = chunk_text(cleaned_text)
print(f" - Text length: {len(raw_text)} characters")
print(f" - Number of chunks: {len(chunks)}")
# Process each chunk
for chunk_id, chunk in enumerate(chunks):
# Create prompt for this chunk
system_prompt = create_system_prompt(chunk)
full_prompt = generate_chat_template(system_prompt)
# Generate response
response = generate_response(model, full_prompt)
# Print response for monitoring
print(f"\nπŸ“ **Generated Analysis for {title} (Chunk {chunk_id + 1}/{len(chunks)})**:")
print(f" Response length: {len(response)} characters")
# Show preview of the analysis
preview = response.replace('\n', ' ').strip()
print(f" Preview: {preview}")
# Check for key analysis elements
has_reasoning = '<start_working_out>' in response or 'reasoning' in response.lower()
has_loopholes = 'loophole' in response.lower() or 'ambiguity' in response.lower() or 'issue' in response.lower()
has_recommendations = 'recommend' in response.lower() or 'suggest' in response.lower()
print(f" Analysis quality: {'βœ…' if has_reasoning else '❌'} Reasoning | {'βœ…' if has_loopholes else '❌'} Loopholes | {'βœ…' if has_recommendations else '❌'} Recommendations")
# Add to dataset with metadata
dataset_entry = {
"prompt": full_prompt,
"response": response,
"legislation_id": legislation_id,
"title": title,
"year": year,
"chunk_id": chunk_id,
"total_chunks": len(chunks),
"text_length": len(chunk),
"original_text_length": len(raw_text)
}
# Save entry immediately to temporary file (JSON Lines format)
json.dump(dataset_entry, temp_f, ensure_ascii=False)
temp_f.write('\n')
temp_f.flush() # Force write to disk
dataset.append(dataset_entry)
saved_count += 1
# Progress update every 10 entries
if saved_count % 10 == 0:
print(f" βœ“ Saved {saved_count} entries so far...")
print(f"\nβœ“ All entries processed and saved to temporary file")
print(f"βœ“ Total entries saved: {saved_count}")
# Create backup of existing file if it exists
if os.path.exists(output_file):
print(f"Creating backup of existing dataset...")
os.rename(output_file, backup_file)
# Convert JSON Lines to final JSON format
print(f"Converting to final JSON format...")
with open(temp_file, 'r', encoding='utf-8') as temp_f:
lines = temp_f.readlines()
final_dataset = []
for line in lines:
if line.strip():
final_dataset.append(json.loads(line))
# Save final consolidated JSON file
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(final_dataset, f, indent=2, ensure_ascii=False)
print(f"βœ“ Final dataset saved to: {output_file}")
# Clean up temporary file
if os.path.exists(temp_file):
os.remove(temp_file)
print(f"βœ“ Temporary file cleaned up")
# Clean up backup file if everything succeeded
if os.path.exists(backup_file):
os.remove(backup_file)
print(f"βœ“ Backup file cleaned up")
print(f"\nπŸŽ‰ Dataset creation complete!")
print(f" β€’ Processed {total_entries} legislation documents")
print(f" β€’ Generated {len(final_dataset)} analysis entries")
print(f" β€’ Total chunks processed: {sum(entry.get('total_chunks', 1) for entry in final_dataset[:total_entries])}")
return final_dataset
except KeyboardInterrupt:
print(f"\n⚠️ Process interrupted by user")
print(f" β€’ Partial dataset saved to: {temp_file}")
print(f" β€’ {saved_count} entries saved so far")
print(f" β€’ You can resume processing or use the temporary file")
raise
except Exception as e:
print(f"\n❌ Error during processing: {e}")
print(f" β€’ Partial dataset saved to: {temp_file}")
print(f" β€’ {saved_count} entries saved so far")
if os.path.exists(backup_file):
print(f" β€’ Original dataset restored from backup")
os.rename(backup_file, output_file)
raise
def main():
"""Main execution function"""
print("Starting NZ Legislation Loophole Analysis Dataset Creation")
print("=" * 60)
# Load the model
model = load_model()
# Create the dataset
dataset = create_finetuning_dataset(INPUT_FILE, model)
# Cleanup
if hasattr(model, 'close'):
model.close()
print("\nDataset creation completed successfully!")
print(f"Output saved to: {os.path.join(OUTPUT_DIR, 'nz_legislation_loophole_dataset.json')}")
if __name__ == "__main__":
main()