germeval2025 / subtask_2 /exp027-1.py
Christian Rene Thelen
Initial Commit
963cb02
import os
import pickle
import sys
import time
import numpy as np
import pandas as pd
import torch
import wandb
from datasets import Dataset
from multiset import *
from sklearn.model_selection import train_test_split, StratifiedKFold
from transformers import (
AutoTokenizer,
AutoModelForTokenClassification,
TrainingArguments,
Trainer,
DataCollatorForTokenClassification,
EarlyStoppingCallback
)
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = '0'
os.environ["WANDB_PROJECT"]="GermEval2025-Substask2"
os.environ["WANDB_LOG_MODEL"]="false"
experiment_name = 'exp027-1'
ALL_LABELS = ["affection declaration","agreement","ambiguous",
"compliment","encouragement","gratitude","group membership",
"implicit","positive feedback","sympathy"]
def fine_grained_flausch_by_label(gold, predicted):
gold['cid']= gold['document']+"_"+gold['comment_id'].apply(str)
predicted['cid']= predicted['document']+"_"+predicted['comment_id'].apply(str)
# annotation sets (predicted)
pred_spans = Multiset()
pred_spans_loose = Multiset()
pred_types = Multiset()
# annotation sets (gold)
gold_spans = Multiset()
gold_spans_loose = Multiset()
gold_types = Multiset()
for row in predicted.itertuples(index=False):
pred_spans.add((row.cid,row.type,row.start,row.end))
pred_spans_loose.add((row.cid,row.start,row.end))
pred_types.add((row.cid,row.type))
for row in gold.itertuples(index=False):
gold_spans.add((row.cid,row.type,row.start,row.end))
gold_spans_loose.add((row.cid,row.start,row.end))
gold_types.add((row.cid,row.type))
# precision = true_pos / true_pos + false_pos
# recall = true_pos / true_pos + false_neg
# f_1 = 2 * prec * rec / (prec + rec)
results = {'TOTAL': {'STRICT': {},'SPANS': {},'TYPES': {}}}
# label-wise evaluation (only for strict and type)
for label in ALL_LABELS:
results[label] = {'STRICT': {},'TYPES': {}}
gold_spans_x = set(filter(lambda x: x[1].__eq__(label), gold_spans))
pred_spans_x = set(filter(lambda x: x[1].__eq__(label), pred_spans))
gold_types_x = set(filter(lambda x: x[1].__eq__(label), gold_types))
pred_types_x = set(filter(lambda x: x[1].__eq__(label), pred_types))
# strict: spans + type must match
### NOTE: x and y / x returns 0 if x = 0 and y/x otherwise (test for zero division)
strict_p = float(len(pred_spans_x)) and float( len(gold_spans_x.intersection(pred_spans_x))) / len(pred_spans_x)
strict_r = float(len(gold_spans_x)) and float( len(gold_spans_x.intersection(pred_spans_x))) / len(gold_spans_x)
strict_f = (strict_p + strict_r) and 2 * strict_p * strict_r / (strict_p + strict_r)
results[label]['STRICT']['prec'] = strict_p
results[label]['STRICT']['rec'] = strict_r
results[label]['STRICT']['f1'] = strict_f
# detection mode: only types must match (per post)
types_p = float(len(pred_types_x)) and float( len(gold_types_x.intersection(pred_types_x))) / len(pred_types_x)
types_r = float(len(gold_types_x)) and float( len(gold_types_x.intersection(pred_types_x))) / len(gold_types_x)
types_f = (types_p + types_r) and 2 * types_p * types_r / (types_p + types_r)
results[label]['TYPES']['prec'] = types_p
results[label]['TYPES']['rec'] = types_r
results[label]['TYPES']['f1'] = types_f
# Overall evaluation
# strict: spans + type must match
strict_p = float(len(pred_spans)) and float( len(gold_spans.intersection(pred_spans))) / len(pred_spans)
strict_r = float(len(gold_spans)) and float( len(gold_spans.intersection(pred_spans))) / len(gold_spans)
strict_f = (strict_p + strict_r) and 2 * strict_p * strict_r / (strict_p + strict_r)
results['TOTAL']['STRICT']['prec'] = strict_p
results['TOTAL']['STRICT']['rec'] = strict_r
results['TOTAL']['STRICT']['f1'] = strict_f
# spans: spans must match
spans_p = float(len(pred_spans_loose)) and float( len(gold_spans_loose.intersection(pred_spans_loose))) / len(pred_spans_loose)
spans_r = float(len(gold_spans_loose)) and float( len(gold_spans_loose.intersection(pred_spans_loose))) / len(gold_spans_loose)
spans_f = (spans_p + spans_r) and 2 * spans_p * spans_r / (spans_p + spans_r)
results['TOTAL']['SPANS']['prec'] = spans_p
results['TOTAL']['SPANS']['rec'] = spans_r
results['TOTAL']['SPANS']['f1'] = spans_f
# detection mode: only types must match (per post)
types_p = float(len(pred_types)) and float( len(gold_types.intersection(pred_types))) / len(pred_types)
types_r = float(len(gold_types)) and float( len(gold_types.intersection(pred_types))) / len(gold_types)
types_f = (types_p + types_r) and 2 * types_p * types_r / (types_p + types_r)
results['TOTAL']['TYPES']['prec'] = types_p
results['TOTAL']['TYPES']['rec'] = types_r
results['TOTAL']['TYPES']['f1'] = types_f
return results
class SpanClassifierWithStrictF1:
def __init__(self, model_name="deepset/gbert-base"):
self.model_name = model_name
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.labels =[
"O",
"B-positive feedback", "B-compliment", "B-affection declaration", "B-encouragement", "B-gratitude", "B-agreement", "B-ambiguous", "B-implicit", "B-group membership", "B-sympathy",
"I-positive feedback", "I-compliment", "I-affection declaration", "I-encouragement", "I-gratitude", "I-agreement", "I-ambiguous", "I-implicit", "I-group membership", "I-sympathy"
]
self.label2id = {label: i for i, label in enumerate(self.labels)}
self.id2label = {i: label for i, label in enumerate(self.labels)}
def create_dataset(self, comments_df, spans_df):
"""Erstelle Dataset mit BIO-Labels und speichere Evaluation-Daten"""
examples = []
eval_data = [] # Für Strict F1 Berechnung
spans_grouped = spans_df.groupby(['document', 'comment_id'])
for _, row in comments_df.iterrows():
text = row['comment']
document = row['document']
comment_id = row['comment_id']
key = (document, comment_id)
# True spans für diesen Kommentar
if key in spans_grouped.groups:
true_spans = [(span_type, int(start), int(end))
for span_type, start, end in
spans_grouped.get_group(key)[['type', 'start', 'end']].values]
else:
true_spans = []
# Tokenisierung
tokenized = self.tokenizer(text, truncation=True, max_length=512,
return_offsets_mapping=True)
# BIO-Labels erstellen
labels = self._create_bio_labels(tokenized['offset_mapping'],
spans_grouped.get_group(key)[['start', 'end', 'type']].values
if key in spans_grouped.groups else [])
examples.append({
'input_ids': tokenized['input_ids'],
'attention_mask': tokenized['attention_mask'],
'labels': labels
})
# Evaluation-Daten speichern
eval_data.append({
'text': text,
'offset_mapping': tokenized['offset_mapping'],
'true_spans': true_spans,
'document': document,
'comment_id': comment_id
})
return examples, eval_data
def _create_bio_labels(self, offset_mapping, spans):
"""Erstelle BIO-Labels für Tokens"""
labels = [0] * len(offset_mapping) # 0 = "O"
for start, end, type_label in spans:
for i, (token_start, token_end) in enumerate(offset_mapping):
if token_start is None: # Spezielle Tokens
continue
# Token überlappt mit Span
if token_start < end and token_end > start:
if token_start <= start:
if labels[i] != 0:
# dont overwrite labels if spans are overlapping; just skip the span
break
labels[i] = self.label2id[f'B-{type_label}'] # B-compliment
else:
labels[i] = self.label2id[f'I-{type_label}'] # I-compliment
return labels
def _predictions_to_dataframe(self, predictions_list, comments_df_subset):
"""Konvertiere Vorhersagen zu DataFrame für Flausch-Metrik"""
pred_data = []
for i, pred in enumerate(predictions_list):
if i < len(comments_df_subset):
row = comments_df_subset.iloc[i]
document = row['document']
comment_id = row['comment_id']
for span in pred['spans']:
pred_data.append({
'document': document,
'comment_id': comment_id,
'type': span['type'],
'start': span['start'],
'end': span['end']
})
return pd.DataFrame(pred_data)
# --- helper that builds a DataFrame of spans from eval data + predictions ---
def _build_span_dfs(self, eval_data, batch_pred_spans):
"""
eval_data: list of dicts with keys document, comment_id, true_spans
batch_pred_spans: list of lists of (type, start, end)
returns (gold_df, pred_df) suitable for fine_grained_flausch_by_label
"""
rows_gold = []
rows_pred = []
for item, pred_spans in zip(eval_data, batch_pred_spans):
doc = item['document']
cid = item['comment_id']
# gold
for t, s, e in item['true_spans']:
rows_gold.append({
'document': doc,
'comment_id': cid,
'type': t,
'start': s,
'end': e
})
# pred
for t, s, e in pred_spans:
rows_pred.append({
'document': doc,
'comment_id': cid,
'type': t,
'start': s,
'end': e
})
gold_df = pd.DataFrame(rows_gold, columns=['document','comment_id','type','start','end'])
pred_df = pd.DataFrame(rows_pred, columns=['document','comment_id','type','start','end'])
return gold_df, pred_df
def compute_metrics(self, eval_pred):
"""
Called by the HF-Trainer at each evaluation step.
We collect batch predictions, reconstruct gold/pred spans,
call fine_grained_flausch_by_label and return the TOTAL/STRICT metrics.
"""
logits, labels = eval_pred
preds = np.argmax(logits, axis=2)
# reconstruct spans per example in this batch
batch_pred_spans = []
for i, (p_seq, lab_seq) in enumerate(zip(preds, labels)):
# skip padding (-100)
valid_preds = []
valid_offsets = []
offsets = self.current_eval_data[i]['offset_mapping']
for j,(p,l) in enumerate(zip(p_seq, lab_seq)):
if l != -100:
valid_preds.append(int(p))
valid_offsets.append(offsets[j])
# convert to spans
pred_spans = self._predictions_to_spans(valid_preds, valid_offsets,
self.current_eval_data[i]['text'])
# to (type, start, end)-tuples
batch_pred_spans.append([(sp['type'], sp['start'], sp['end'])
for sp in pred_spans])
# build the gold/pred DataFrames
gold_df, pred_df = self._build_span_dfs(self.current_eval_data,
batch_pred_spans)
# call your fine-grained metrics
results = fine_grained_flausch_by_label(gold_df, pred_df)
# extract the TOTAL/STRICT metrics
total = results['TOTAL']['STRICT']
return {
'strict_prec': torch.tensor(total['prec'], dtype=torch.float32),
'strict_rec': torch.tensor(total['rec'], dtype=torch.float32),
'strict_f1': torch.tensor(total['f1'], dtype=torch.float32),
}
def evaluate_by_label(self, comments_df, spans_df):
"""
Replace evaluate_strict_f1. Runs a full pass over all comments,
uses self.predict() to get spans, then calls your fine_grained_flausch_by_label
and prints & returns the TOTAL metrics.
"""
# 1) run predictions
texts = comments_df['comment'].tolist()
docs = comments_df['document'].tolist()
cids = comments_df['comment_id'].tolist()
preds = self.predict(texts)
# 2) build gold and pred lists
gold_rows = []
for (_, row) in comments_df.iterrows():
key = (row['document'], row['comment_id'])
# get all true spans for this comment_id
group = spans_df[
(spans_df.document==row['document']) &
(spans_df.comment_id==row['comment_id'])
]
for _, sp in group.iterrows():
gold_rows.append({
'document': row['document'],
'comment_id': row['comment_id'],
'type': sp['type'],
'start': sp['start'],
'end': sp['end']
})
pred_rows = []
for doc, cid, p in zip(docs, cids, preds):
for sp in p['spans']:
pred_rows.append({
'document': doc,
'comment_id': cid,
'type': sp['type'],
'start': sp['start'],
'end': sp['end']
})
gold_df = pd.DataFrame(gold_rows, columns=['document','comment_id','type','start','end'])
pred_df = pd.DataFrame(pred_rows, columns=['document','comment_id','type','start','end'])
# 3) call fine-grained
results = fine_grained_flausch_by_label(gold_df, pred_df)
# 4) extract and print
total = results['TOTAL']
print("\n=== EVALUATION BY FLAUSCH METRICS ===")
for mode in ['STRICT','SPANS','TYPES']:
m = total[mode]
print(f"{mode:6} P={m['prec']:.4f} R={m['rec']:.4f} F1={m['f1']:.4f}")
return results
def _predictions_to_spans(self, predicted_labels, offset_mapping, text):
"""Konvertiere Token-Vorhersagen zu Spans"""
spans = []
current_span = None
for i, label_id in enumerate(predicted_labels):
if i >= len(offset_mapping):
break
label = self.id2label[label_id]
token_start, token_end = offset_mapping[i]
if token_start is None:
continue
if label.startswith('B-'):
if current_span:
spans.append(current_span)
current_span = {
'type': label[2:],
'start': token_start,
'end': token_end,
'text': text[token_start:token_end]
}
elif label.startswith('I-') and current_span:
current_span['end'] = token_end
current_span['text'] = text[current_span['start']:current_span['end']]
else:
if current_span:
spans.append(current_span)
current_span = None
if current_span:
spans.append(current_span)
return spans
def predict(self, texts):
"""Vorhersage für neue Texte"""
if not hasattr(self, 'model'):
raise ValueError("Modell muss erst trainiert werden!")
predictions = []
device = next(self.model.parameters()).device
for text in texts:
# Tokenisierung
inputs = self.tokenizer(text, return_tensors="pt", truncation=True,
max_length=512, return_offsets_mapping=True)
offset_mapping = inputs.pop('offset_mapping')
inputs = {k: v.to(device) for k, v in inputs.items()}
# Vorhersage
with torch.no_grad():
outputs = self.model(**inputs)
predicted_labels = torch.argmax(outputs.logits, dim=2)[0].cpu().numpy()
# Spans extrahieren
spans = self._predictions_to_spans(predicted_labels, offset_mapping[0], text)
predictions.append({'text': text, 'spans': spans})
return predictions
def train(self, comments_df, spans_df, experiment_name):
wandb.init(project=os.environ["WANDB_PROJECT"], name=f"{experiment_name}",
group=experiment_name)
# Dataset neu erstellen für diesen Fold
examples, eval_data = self.create_dataset(comments_df, spans_df)
train_examples, val_examples = train_test_split(examples, test_size=0.1, random_state=42)
# Evaluation-Daten entsprechend aufteilen
train_indices, val_indices = train_test_split(range(len(examples)), test_size=0.1, random_state=42)
self.current_eval_data = [eval_data[i] for i in val_indices]
test_comments = comments_df.iloc[val_indices].reset_index(drop=True)
train_dataset = Dataset.from_list(train_examples)
val_dataset = Dataset.from_list(val_examples)
# Modell neu initialisieren
model = AutoModelForTokenClassification.from_pretrained(
self.model_name,
num_labels=len(self.labels),
id2label=self.id2label,
label2id=self.label2id
)
# Training-Argumente
fold_output_dir = f"{experiment_name}"
training_args = TrainingArguments(
output_dir=fold_output_dir,
learning_rate=2e-5,
warmup_steps=400,
per_device_train_batch_size=32,
per_device_eval_batch_size=16,
num_train_epochs=20,
eval_strategy="steps",
eval_steps=40,
save_strategy="steps",
save_steps=40,
load_best_model_at_end=True,
metric_for_best_model="strict_f1",
greater_is_better=True,
logging_steps=10,
logging_strategy="steps",
report_to="all",
disable_tqdm=False,
seed=42,
save_total_limit=3,
)
# Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
data_collator=DataCollatorForTokenClassification(self.tokenizer),
compute_metrics=self.compute_metrics,
callbacks=[EarlyStoppingCallback(early_stopping_patience=87)]
# 87 steps = 3.0 epochs with 29 steps per epoch
)
# Training
print(f"Training auf {len(train_dataset)} Beispielen")
print(f"Validation auf {len(val_dataset)} Beispielen")
trainer.train()
# Aktuelles Modell speichern
self.model = model
# Modell evaluieren auf Test-Daten
print(f"Evaluierung auf {len(test_comments)} Test-Beispielen")
metrics = self.evaluate_by_label(test_comments, spans_df)
wandb.log({
'strict_f1': metrics['TOTAL']['STRICT']['f1'],
'strict_precision': metrics['TOTAL']['STRICT']['prec'],
'strict_recall': metrics['TOTAL']['STRICT']['rec'],
'spans_f1': metrics['TOTAL']['SPANS']['f1'],
'types_f1': metrics['TOTAL']['TYPES']['f1']
})
# Speichere Modell
torch.save(model.state_dict(), f'{fold_output_dir}_model.pth')
torch.cuda.memory.empty_cache()
wandb.finish()
return trainer
def cross_validate(self, comments_df, spans_df, n_splits=5, output_dir_prefix="span-classifier-cv"):
"""Führe n-fache Kreuzvalidierung mit StratifiedKFold durch"""
# Erstelle Label für Stratifizierung (basierend auf dem ersten Span types eines Kommentars)
strat_labels = []
spans_grouped = spans_df.groupby(['document', 'comment_id'])
for _, row in comments_df.iterrows():
key = (row['document'], row['comment_id'])
# 1 wenn Kommentar Spans hat, sonst 0
has_spans = spans_grouped.get_group(key).iloc[0]['type'] if key in spans_grouped.groups and len(spans_grouped.get_group(key)) > 0 else 0
strat_labels.append(has_spans)
# Erstelle StratifiedKFold
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
# Speichere Metriken für jeden Fold
fold_metrics = []
# Iteriere über Folds
for fold, (train_idx, test_idx) in enumerate(skf.split(range(len(comments_df)), strat_labels)):
if '--fold' in sys.argv:
fold_arg = int(sys.argv[sys.argv.index('--fold') + 1])
if fold + 1 != fold_arg:
continue
wandb.init(project=os.environ["WANDB_PROJECT"], name=f"{experiment_name}-fold-{fold+1}",
group=experiment_name)
print(f"\n{'='*50}")
print(f"Fold {fold+1}/{n_splits}")
print(f"{'='*50}")
# Kommentare für diesen Fold
train_comments = comments_df.iloc[train_idx].reset_index(drop=True)
test_comments = comments_df.iloc[test_idx].reset_index(drop=True)
# Dataset neu erstellen für diesen Fold
examples, eval_data = self.create_dataset(train_comments, spans_df)
train_examples, val_examples = train_test_split(examples, test_size=0.1, random_state=42)
# Evaluation-Daten entsprechend aufteilen
train_indices, val_indices = train_test_split(range(len(examples)), test_size=0.1, random_state=42)
self.current_eval_data = [eval_data[i] for i in val_indices]
train_dataset = Dataset.from_list(train_examples)
val_dataset = Dataset.from_list(val_examples)
# Modell neu initialisieren
model = AutoModelForTokenClassification.from_pretrained(
self.model_name,
num_labels=len(self.labels),
id2label=self.id2label,
label2id=self.label2id
)
# Training-Argumente
fold_output_dir = f"{output_dir_prefix}-fold-{fold+1}"
training_args = TrainingArguments(
output_dir=fold_output_dir,
learning_rate=2e-5,
warmup_steps=400,
per_device_train_batch_size=32,
per_device_eval_batch_size=16,
num_train_epochs=15,
eval_strategy="steps",
eval_steps=40,
save_strategy="steps",
save_steps=40,
load_best_model_at_end=True,
metric_for_best_model="strict_f1",
greater_is_better=True,
logging_steps=10,
logging_strategy="steps",
report_to="all",
disable_tqdm=False,
seed=42,
save_total_limit=3,
)
# Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
data_collator=DataCollatorForTokenClassification(self.tokenizer),
compute_metrics=self.compute_metrics,
callbacks=[EarlyStoppingCallback(early_stopping_patience=87)] # 87 steps = 3.0 epochs with 29 steps per epoch
)
# Training
print(f"Training auf {len(train_dataset)} Beispielen")
print(f"Validation auf {len(val_dataset)} Beispielen")
trainer.train()
# Aktuelles Modell speichern
self.model = model
# Modell evaluieren auf Test-Daten
print(f"Evaluierung auf {len(test_comments)} Test-Beispielen")
flausch_results = self.evaluate_by_label(test_comments, spans_df)
# Extrahiere Hauptmetriken für fold_metrics
metrics = {
'strict_f1': flausch_results['TOTAL']['STRICT']['f1'],
'strict_precision': flausch_results['TOTAL']['STRICT']['prec'],
'strict_recall': flausch_results['TOTAL']['STRICT']['rec'],
'spans_f1': flausch_results['TOTAL']['SPANS']['f1'],
'spans_precision': flausch_results['TOTAL']['SPANS']['prec'],
'spans_recall': flausch_results['TOTAL']['SPANS']['rec'],
'types_f1': flausch_results['TOTAL']['TYPES']['f1'],
'types_precision': flausch_results['TOTAL']['TYPES']['prec'],
'types_recall': flausch_results['TOTAL']['TYPES']['rec'],
'full_results': flausch_results
}
fold_metrics.append(metrics)
wandb.log(metrics, step=fold + 1)
# Speichere Modell
torch.save(model.state_dict(), f'{fold_output_dir}_model.pth')
test_predictions = self.predict(test_comments['comment'].tolist())
# Speichere Metriken
with open(f"test_results.{experiment_name}.fold-{fold+1}.pkl", "wb") as p:
pickle.dump((train_comments, test_comments, test_predictions, train_examples, val_examples), p)
with open(f"scores.{experiment_name}.txt", 'a') as f:
f.write(f'[{time.strftime("%Y-%m-%d %H:%M:%S")}] Fold {fold+1} Ergebnisse:\n')
f.write(f"[{experiment_name} fold-{fold+1} {metrics}\n")
torch.cuda.memory.empty_cache()
wandb.finish()
# Zusammenfassung ausgeben
print("\n" + "="*50)
print("Kreuzvalidierung abgeschlossen")
print("="*50)
# Berechne Durchschnitts-Metriken
avg_f1 = np.mean([m['strict_f1'] for m in fold_metrics])
avg_precision = np.mean([m['strict_precision'] for m in fold_metrics])
avg_recall = np.mean([m['strict_recall'] for m in fold_metrics])
print(f"\nDurchschnittliche Metriken über {n_splits} Folds:")
print(f"Precision: {avg_precision:.10f}")
print(f"Recall: {avg_recall:.10f}")
print(f"F1-Score: {avg_f1:.10f}")
# Std-Abweichung
std_f1 = np.std([m['strict_f1'] for m in fold_metrics])
std_precision = np.std([m['strict_precision'] for m in fold_metrics])
std_recall = np.std([m['strict_recall'] for m in fold_metrics])
print(f"\nStandardabweichung über {n_splits} Folds:")
print(f"Precision: {std_precision:.10f}")
print(f"Recall: {std_recall:.10f}")
print(f"F1-Score: {std_f1:.10f}")
# Ergebnisse für jeden Fold ausgeben
for fold, metrics in enumerate(fold_metrics):
print(f"\nFold {fold+1} Ergebnisse:")
print(f"Precision: {metrics['strict_precision']:.4f}")
print(f"Recall: {metrics['strict_recall']:.4f}")
print(f"F1-Score: {metrics['strict_f1']:.4f}")
return {
'fold_metrics': fold_metrics,
'avg_metrics': {
'strict_f1': avg_f1,
'strict_precision': avg_precision,
'strict_recall': avg_recall
},
'std_metrics': {
'strict_f1': std_f1,
'strict_precision': std_precision,
'strict_recall': std_recall
}
}
# Daten laden
comments: pd.DataFrame = pd.read_csv("../../share-GermEval2025-data/Data/training data/comments.csv")
task1: pd.DataFrame = pd.read_csv("../../share-GermEval2025-data/Data/training data/task1.csv")
task2: pd.DataFrame = pd.read_csv("../../share-GermEval2025-data/Data/training data/task2.csv")
comments = comments.merge(task1, on=["document", "comment_id"])
test_data: pd.DataFrame = pd.read_csv("../../share-GermEval2025-data/Data/test data/comments.csv")
# Wähle Teilmenge der Daten für Experiment (z.B. 17000 Kommentare)
experiment_data = comments
# Klassifikator mit Strict F1
classifier = SpanClassifierWithStrictF1('deepset/gbert-large')
# 5-fold Cross-Validation durchführen
cv_results = classifier.cross_validate(
experiment_data,
task2,
n_splits=5,
output_dir_prefix=experiment_name
)
# write results to text file
with open(f"scores.{experiment_name}.txt", 'a') as f:
f.write(f'[{time.strftime("%Y-%m-%d %H:%M:%S")}] KFold cross validation of {experiment_name}\n')
f.write(f'{cv_results}\n')
# Optional: Finales Modell auf allen Daten trainieren
trainer = classifier.train(experiment_data, task2, f'{experiment_name}-final')
torch.save(classifier.model.state_dict(), f'{experiment_name}_final_model.pth')
# Test-Vorhersage mit finalem Modell
test_texts = ["Das ist ein toller Kommentar!", "Schlechter Text hier.",
"Sehr gutes Video. Danke! Ich finde Dich echt toll!", "Du bist doof!", "Das Licht ist echt gut.",
"Team Einhorn", "Macht unbedingt weiter so!", "Das sehe ich ganz genauso.", "Stimmt, Du hast vollkommen Recht!",
"Ich bin so dankbar ein #Lochinator zu sein"]
predictions = classifier.predict(test_texts)
for pred in predictions:
print(f"\nText: {pred['text']}")
for span in pred['spans']:
print(f" Span: '{span['text']}' ({span['start']}-{span['end']}) - {span['type']}")