|
|
|
|
|
""" |
|
|
NZ Legislation Loophole Analysis Dataset Creation Tool |
|
|
|
|
|
This script processes New Zealand legislation text to create a finetuning dataset for AI models |
|
|
that can identify potential loopholes, ambiguities, and unintended consequences in legal text. |
|
|
|
|
|
The script: |
|
|
1. Loads and cleans NZ legislation text, preserving legal structure and terminology |
|
|
2. Chunks the text into manageable sections with overlap for context |
|
|
3. Uses an LLM to analyze each chunk for legal issues |
|
|
4. Generates a structured dataset for training AI models on legal loophole detection |
|
|
|
|
|
Usage: |
|
|
python trl.py |
|
|
|
|
|
Requirements: |
|
|
- llama-cpp-python with GGUF model support |
|
|
- psutil for memory monitoring |
|
|
- Input file: nz-legislation.txt containing NZ legislation in JSON lines format |
|
|
|
|
|
Output: |
|
|
- JSON dataset saved to nz_legislation_dataset/nz_legislation_loophole_dataset.json |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import time |
|
|
import psutil |
|
|
from typing import List, Dict, Any |
|
|
import numpy as np |
|
|
from llama_cpp import Llama |
|
|
import re |
|
|
|
|
|
|
|
|
class ProgressManager: |
|
|
"""Simple placeholder for progress tracking""" |
|
|
def __init__(self): |
|
|
pass |
|
|
|
|
|
def show_memory_usage(label: str): |
|
|
"""Simple memory usage display""" |
|
|
process = psutil.Process(os.getpid()) |
|
|
memory_mb = process.memory_info().rss / 1024 / 1024 |
|
|
print(f"{label}: {memory_mb:.2f} MB") |
|
|
|
|
|
|
|
|
INPUT_FILE = "nz-legislation.txt" |
|
|
OUTPUT_DIR = "nz_legislation_dataset" |
|
|
CHUNK_SIZE = 4096 |
|
|
CHUNK_OVERLAP = 256 |
|
|
BATCH_SIZE = 16 |
|
|
MODEL_PATH = "qwen3.gguf" |
|
|
MAX_TOKENS = 4096 |
|
|
|
|
|
|
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
def load_model(progress_manager: ProgressManager = None): |
|
|
"""Load the LLM model for text generation with progress tracking""" |
|
|
if progress_manager is None: |
|
|
progress_manager = ProgressManager() |
|
|
|
|
|
print("Loading LLM model...") |
|
|
show_memory_usage("Initial memory usage") |
|
|
|
|
|
start_time = time.time() |
|
|
try: |
|
|
llm = Llama.from_pretrained( |
|
|
repo_id="DavidAU/Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-GGUF", |
|
|
filename="Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-D_AU-IQ4_XS-imat.gguf", |
|
|
n_ctx=40960, |
|
|
n_threads=8, |
|
|
verbose=False, |
|
|
n_gpu_layers=-1, |
|
|
n_batch=4096, |
|
|
logits_all=False, |
|
|
use_mlock=True, |
|
|
use_mmap=True, |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error loading model: {e}") |
|
|
print("Trying with basic configuration...") |
|
|
|
|
|
model = Llama( |
|
|
model_path=MODEL_PATH, |
|
|
n_ctx=40960, |
|
|
n_threads=8, |
|
|
verbose=False, |
|
|
n_gpu_layers=-1, |
|
|
n_batch=4096 |
|
|
) |
|
|
|
|
|
load_time = time.time() - start_time |
|
|
print(f"LLM model loaded in {load_time:.2f}s") |
|
|
show_memory_usage("Memory after model load") |
|
|
|
|
|
return model |
|
|
|
|
|
def clean_text(text: str) -> str: |
|
|
"""Clean and normalize text for better embedding quality, optimized for legal/legislative content""" |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
text = re.sub(r'^(\d+:)', r'\1', text, flags=re.MULTILINE) |
|
|
|
|
|
|
|
|
text = re.sub(r'[ \t]+', ' ', text) |
|
|
text = re.sub(r'\n\s*\n', '\n\n', text) |
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) |
|
|
|
|
|
|
|
|
|
|
|
allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§' |
|
|
text = re.sub(r'[^' + allowed_chars + ']', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[""]', '"', text) |
|
|
text = re.sub(r"['']", "'", text) |
|
|
text = re.sub(r'`', "'", text) |
|
|
|
|
|
|
|
|
|
|
|
text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE) |
|
|
text = re.sub(r'(\d+)\s*[Jj]anuary', r'\1 January', text) |
|
|
text = re.sub(r'(\d+)\s*[Jj]uly', r'\1 July', text) |
|
|
text = re.sub(r'(\d+)\s*[Aa]pril', r'\1 April', text) |
|
|
text = re.sub(r'(\d+)\s*[Ff]ebruary', r'\1 February', text) |
|
|
text = re.sub(r'(\d+)\s*[Dd]ecember', r'\1 December', text) |
|
|
text = re.sub(r'(\d+)\s*[Aa]ugust', r'\1 August', text) |
|
|
text = re.sub(r'(\d+)\s*[Mm]arch', r'\1 March', text) |
|
|
text = re.sub(r'(\d+)\s*[Mm]ay', r'\1 May', text) |
|
|
text = re.sub(r'(\d+)\s*[Jj]une', r'\1 June', text) |
|
|
text = re.sub(r'(\d+)\s*[Ss]eptember', r'\1 September', text) |
|
|
text = re.sub(r'(\d+)\s*[Oo]ctober', r'\1 October', text) |
|
|
text = re.sub(r'(\d+)\s*[Nn]ovember', r'\1 November', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+([\.!\?\,\;\:])', r'\1', text) |
|
|
text = re.sub(r'([\.!\?\,\;\:])\s*', r'\1 ', text) |
|
|
|
|
|
|
|
|
|
|
|
text = re.sub(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', r'\1 Act', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text) |
|
|
text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text) |
|
|
|
|
|
|
|
|
text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)', |
|
|
lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE) |
|
|
|
|
|
|
|
|
|
|
|
text = re.sub(r'\b[Nn]ew\s+[Zz]ealand\b', 'New Zealand', text) |
|
|
text = re.sub(r'\b[Pp]arliament\b', 'Parliament', text) |
|
|
text = re.sub(r'\b[Cc]rown\b', 'Crown', text) |
|
|
text = re.sub(r'\b[Gg]overnment\b', 'Government', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\b[Nn][Zz][Bb]\s+(\d+)', r'NZB \1', text) |
|
|
text = re.sub(r'[Tt]reaty\s+[Oo]f\s+[Ww]aitangi', 'Treaty of Waitangi', text, flags=re.IGNORECASE) |
|
|
|
|
|
|
|
|
|
|
|
maori_chars = 'ΔΔΔ«ΕΕ«whΔΔΔͺΕΕͺWH' |
|
|
allowed_chars += maori_chars |
|
|
text = re.sub(r'[^' + allowed_chars + ']', '', text) |
|
|
|
|
|
|
|
|
lines = [] |
|
|
for line in text.split('\n'): |
|
|
stripped = line.strip() |
|
|
if stripped: |
|
|
|
|
|
if re.match(r'^\d+:', stripped): |
|
|
lines.append(stripped) |
|
|
else: |
|
|
lines.append(stripped) |
|
|
|
|
|
text = '\n'.join(lines) |
|
|
|
|
|
|
|
|
text = text.strip() |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
REASONING_START = "<start_working_out>" |
|
|
REASONING_END = "<end_working_out>" |
|
|
SOLUTION_START = "<SOLUTION>" |
|
|
SOLUTION_END = "</SOLUTION>" |
|
|
|
|
|
def create_system_prompt(text: str) -> str: |
|
|
"""Create a system prompt for analyzing legislative text for loopholes and ambiguities""" |
|
|
return f"""You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. |
|
|
|
|
|
LEGISLATION TEXT: |
|
|
{text} |
|
|
|
|
|
TASK: Analyze this legislative text and identify potential loopholes, ambiguities, or unintended consequences. |
|
|
|
|
|
REASONING: Provide a structured analysis in the following format: |
|
|
|
|
|
1. **Text Meaning**: Explain what the text means and its intended purpose |
|
|
2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited |
|
|
3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose |
|
|
4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation |
|
|
5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent |
|
|
|
|
|
Write your complete analysis between {REASONING_START} and {REASONING_END}. |
|
|
|
|
|
Then provide your overall conclusion between {SOLUTION_START} and {SOLUTION_END}. |
|
|
""" |
|
|
|
|
|
def generate_chat_template(system_prompt: str) -> str: |
|
|
""" |
|
|
Generate a chat template using the GGUF model's native chat format. |
|
|
This uses the proper message structure with BOS/EOS tokens for better model compatibility. |
|
|
""" |
|
|
|
|
|
chat_messages = [] |
|
|
|
|
|
|
|
|
if system_prompt: |
|
|
chat_messages.append("<|im_start|>system") |
|
|
chat_messages.append(system_prompt) |
|
|
chat_messages.append("<|im_end|>") |
|
|
|
|
|
|
|
|
chat_messages.append("<|im_start|>user") |
|
|
chat_messages.append("Analyze the given legislative text for loopholes, ambiguities, and unintended consequences. Provide a structured legal analysis following the specified format.") |
|
|
chat_messages.append("<|im_end|>") |
|
|
|
|
|
|
|
|
chat_messages.append("<|im_start|>assistant") |
|
|
chat_messages.append("") |
|
|
|
|
|
return "\n".join(chat_messages) |
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]: |
|
|
"""Split text into overlapping chunks for processing""" |
|
|
if len(text) <= chunk_size: |
|
|
return [text] |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
while start < len(text): |
|
|
end = start + chunk_size |
|
|
chunk = text[start:end] |
|
|
|
|
|
|
|
|
if end < len(text): |
|
|
|
|
|
sentence_end = max( |
|
|
chunk.rfind('. ', max(0, len(chunk) - 100)), |
|
|
chunk.rfind('! ', max(0, len(chunk) - 100)), |
|
|
chunk.rfind('? ', max(0, len(chunk) - 100)) |
|
|
) |
|
|
if sentence_end != -1: |
|
|
chunk = chunk[:sentence_end + 2] |
|
|
|
|
|
chunks.append(chunk) |
|
|
start = end - overlap if end < len(text) else len(text) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def generate_response(model, prompt: str, max_tokens: int = MAX_TOKENS) -> str: |
|
|
""" |
|
|
Generate a response from the model for a given prompt with optimized parameters for legal analysis. |
|
|
|
|
|
Parameter Explanations: |
|
|
- temperature=0.3: Balanced creativity for legal analysis (not too random, not too deterministic) |
|
|
- top_p=0.85: Nucleus sampling - considers top 85% probability mass for coherent legal text |
|
|
- top_k=50: Top-k sampling - considers top 50 tokens for better legal terminology selection |
|
|
- min_p=0.05: Minimum probability threshold to avoid low-quality tokens |
|
|
|
|
|
Anti-Repetition Parameters: |
|
|
- repetition_penalty=1.15: Penalizes repetition of phrases (15% penalty) |
|
|
- presence_penalty=0.1: Encourages topic diversity across the response |
|
|
- frequency_penalty=0.1: Reduces overuse of frequent tokens |
|
|
|
|
|
Advanced Sampling: |
|
|
- typical_p=0.95: Focuses on typical token probabilities for legal text patterns |
|
|
- tfs_z=0.95: Tail-free sampling for more natural legal reasoning |
|
|
- mirostat_mode=2: Mirostat v2 for perplexity-controlled generation |
|
|
- mirostat_tau=4.0: Target entropy level for legal analysis |
|
|
- mirostat_eta=0.15: Learning rate for perplexity adaptation |
|
|
""" |
|
|
try: |
|
|
response = model( |
|
|
prompt, |
|
|
max_tokens=max_tokens, |
|
|
|
|
|
temperature=0.3, |
|
|
top_p=0.85, |
|
|
top_k=50, |
|
|
min_p=0.05, |
|
|
|
|
|
|
|
|
repeat_penalty=1.15, |
|
|
presence_penalty=0.1, |
|
|
frequency_penalty=0.1, |
|
|
|
|
|
|
|
|
typical_p=0.95, |
|
|
tfs_z=0.95, |
|
|
mirostat_mode=2, |
|
|
mirostat_tau=4.0, |
|
|
mirostat_eta=0.15, |
|
|
|
|
|
|
|
|
stop=[SOLUTION_END, "</SOLUTION>", "<end_working_out>"] |
|
|
) |
|
|
return response['choices'][0]['text'].strip() |
|
|
except Exception as e: |
|
|
print(f"Error generating response: {e}") |
|
|
|
|
|
try: |
|
|
response = model( |
|
|
prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.3, |
|
|
top_p=0.85, |
|
|
top_k=50, |
|
|
repeat_penalty=1.15, |
|
|
stop=[SOLUTION_END, "</SOLUTION>"] |
|
|
) |
|
|
return response['choices'][0]['text'].strip() |
|
|
except Exception as e2: |
|
|
print(f"Fallback also failed: {e2}") |
|
|
return "" |
|
|
|
|
|
def parse_legislation_json(file_path: str) -> List[Dict[str, Any]]: |
|
|
"""Parse the JSON lines format of NZ legislation dataset""" |
|
|
legislation_entries = [] |
|
|
|
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
line = line.strip() |
|
|
if line: |
|
|
try: |
|
|
entry = json.loads(line) |
|
|
if 'id' in entry and 'text' in entry: |
|
|
legislation_entries.append(entry) |
|
|
else: |
|
|
print(f"Warning: Line {line_num} missing required fields, skipping") |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"Warning: Could not parse line {line_num}: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"Error reading legislation file: {e}") |
|
|
return [] |
|
|
|
|
|
print(f"Successfully parsed {len(legislation_entries)} legislation entries") |
|
|
return legislation_entries |
|
|
|
|
|
def create_finetuning_dataset(input_file: str, model, output_file: str = None) -> List[Dict[str, Any]]: |
|
|
"""Create a finetuning dataset by processing NZ legislation JSON dataset with incremental saving""" |
|
|
if output_file is None: |
|
|
output_file = os.path.join(OUTPUT_DIR, "nz_legislation_loophole_dataset.json") |
|
|
|
|
|
|
|
|
temp_file = output_file.replace('.json', '_temp.jsonl') |
|
|
backup_file = output_file.replace('.json', '_backup.json') |
|
|
|
|
|
print(f"Parsing legislation dataset from {input_file}") |
|
|
legislation_entries = parse_legislation_json(input_file) |
|
|
|
|
|
if not legislation_entries: |
|
|
print("No legislation entries found to process") |
|
|
return [] |
|
|
|
|
|
dataset = [] |
|
|
total_entries = len(legislation_entries) |
|
|
saved_count = 0 |
|
|
|
|
|
print(f"Processing {total_entries} legislation entries...") |
|
|
print(f"Dataset will be saved incrementally to: {temp_file}") |
|
|
|
|
|
try: |
|
|
|
|
|
with open(temp_file, 'w', encoding='utf-8') as temp_f: |
|
|
for entry_num, entry in enumerate(legislation_entries, 1): |
|
|
legislation_id = entry.get('id', f'entry_{entry_num}') |
|
|
title = entry.get('title', 'Unknown Title') |
|
|
year = entry.get('year', 'Unknown Year') |
|
|
raw_text = entry.get('text', '') |
|
|
|
|
|
print(f"\nProcessing entry {entry_num}/{total_entries}: {title} ({year}) - ID: {legislation_id}") |
|
|
|
|
|
|
|
|
cleaned_text = clean_text(raw_text) |
|
|
|
|
|
|
|
|
chunks = chunk_text(cleaned_text) |
|
|
|
|
|
print(f" - Text length: {len(raw_text)} characters") |
|
|
print(f" - Number of chunks: {len(chunks)}") |
|
|
|
|
|
|
|
|
for chunk_id, chunk in enumerate(chunks): |
|
|
|
|
|
system_prompt = create_system_prompt(chunk) |
|
|
full_prompt = generate_chat_template(system_prompt) |
|
|
|
|
|
|
|
|
response = generate_response(model, full_prompt) |
|
|
|
|
|
|
|
|
print(f"\nπ **Generated Analysis for {title} (Chunk {chunk_id + 1}/{len(chunks)})**:") |
|
|
print(f" Response length: {len(response)} characters") |
|
|
|
|
|
|
|
|
preview = response.replace('\n', ' ').strip() |
|
|
print(f" Preview: {preview}") |
|
|
|
|
|
|
|
|
has_reasoning = '<start_working_out>' in response or 'reasoning' in response.lower() |
|
|
has_loopholes = 'loophole' in response.lower() or 'ambiguity' in response.lower() or 'issue' in response.lower() |
|
|
has_recommendations = 'recommend' in response.lower() or 'suggest' in response.lower() |
|
|
|
|
|
print(f" Analysis quality: {'β
' if has_reasoning else 'β'} Reasoning | {'β
' if has_loopholes else 'β'} Loopholes | {'β
' if has_recommendations else 'β'} Recommendations") |
|
|
|
|
|
|
|
|
dataset_entry = { |
|
|
"prompt": full_prompt, |
|
|
"response": response, |
|
|
"legislation_id": legislation_id, |
|
|
"title": title, |
|
|
"year": year, |
|
|
"chunk_id": chunk_id, |
|
|
"total_chunks": len(chunks), |
|
|
"text_length": len(chunk), |
|
|
"original_text_length": len(raw_text) |
|
|
} |
|
|
|
|
|
|
|
|
json.dump(dataset_entry, temp_f, ensure_ascii=False) |
|
|
temp_f.write('\n') |
|
|
temp_f.flush() |
|
|
|
|
|
dataset.append(dataset_entry) |
|
|
saved_count += 1 |
|
|
|
|
|
|
|
|
if saved_count % 10 == 0: |
|
|
print(f" β Saved {saved_count} entries so far...") |
|
|
|
|
|
print(f"\nβ All entries processed and saved to temporary file") |
|
|
print(f"β Total entries saved: {saved_count}") |
|
|
|
|
|
|
|
|
if os.path.exists(output_file): |
|
|
print(f"Creating backup of existing dataset...") |
|
|
os.rename(output_file, backup_file) |
|
|
|
|
|
|
|
|
print(f"Converting to final JSON format...") |
|
|
with open(temp_file, 'r', encoding='utf-8') as temp_f: |
|
|
lines = temp_f.readlines() |
|
|
|
|
|
final_dataset = [] |
|
|
for line in lines: |
|
|
if line.strip(): |
|
|
final_dataset.append(json.loads(line)) |
|
|
|
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(final_dataset, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"β Final dataset saved to: {output_file}") |
|
|
|
|
|
|
|
|
if os.path.exists(temp_file): |
|
|
os.remove(temp_file) |
|
|
print(f"β Temporary file cleaned up") |
|
|
|
|
|
|
|
|
if os.path.exists(backup_file): |
|
|
os.remove(backup_file) |
|
|
print(f"β Backup file cleaned up") |
|
|
|
|
|
print(f"\nπ Dataset creation complete!") |
|
|
print(f" β’ Processed {total_entries} legislation documents") |
|
|
print(f" β’ Generated {len(final_dataset)} analysis entries") |
|
|
print(f" β’ Total chunks processed: {sum(entry.get('total_chunks', 1) for entry in final_dataset[:total_entries])}") |
|
|
|
|
|
return final_dataset |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print(f"\nβ οΈ Process interrupted by user") |
|
|
print(f" β’ Partial dataset saved to: {temp_file}") |
|
|
print(f" β’ {saved_count} entries saved so far") |
|
|
print(f" β’ You can resume processing or use the temporary file") |
|
|
raise |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\nβ Error during processing: {e}") |
|
|
print(f" β’ Partial dataset saved to: {temp_file}") |
|
|
print(f" β’ {saved_count} entries saved so far") |
|
|
if os.path.exists(backup_file): |
|
|
print(f" β’ Original dataset restored from backup") |
|
|
os.rename(backup_file, output_file) |
|
|
raise |
|
|
|
|
|
def main(): |
|
|
"""Main execution function""" |
|
|
print("Starting NZ Legislation Loophole Analysis Dataset Creation") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
model = load_model() |
|
|
|
|
|
|
|
|
dataset = create_finetuning_dataset(INPUT_FILE, model) |
|
|
|
|
|
|
|
|
if hasattr(model, 'close'): |
|
|
model.close() |
|
|
|
|
|
print("\nDataset creation completed successfully!") |
|
|
print(f"Output saved to: {os.path.join(OUTPUT_DIR, 'nz_legislation_loophole_dataset.json')}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|