Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import sys | |
| import warnings | |
| import pandas as pd | |
| from rdkit import Chem, RDLogger | |
| from sklearn.model_selection import train_test_split | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | |
| from utils import remove_atom_mapping, seed_everything | |
| seed_everything(seed=42) | |
| # Disable RDKit warnings and Python warnings | |
| RDLogger.DisableLog("rdApp.*") | |
| warnings.filterwarnings("ignore") | |
| script_dir = os.path.abspath(os.path.dirname(__file__)) | |
| project_root = os.path.abspath(os.path.join(script_dir, "..")) | |
| data_dir = os.path.join(project_root, "data") | |
| files_to_download = [ | |
| "1ZPsoUYb4HcxFzK_ac9rb_pQj7oO3Gagh", | |
| "1XwkxxHiaWFbSNhGyxnv6hAliutIMNrIp", | |
| "1yIwUH_OhER9nuMo9HjBhBmyc6zvmrSPA", | |
| "1skFRirstIUijhieshvJEScBD2aB3H1YU", | |
| "1fa2MyLdN1vcA7Rysk8kLQENE92YejS9B", | |
| ] | |
| for file_id in files_to_download: | |
| subprocess.run( | |
| f"gdown 'https://drive.google.com/uc?export=download&id={file_id}'", shell=True | |
| ) | |
| # Move downloaded files to data directory | |
| subprocess.run("mv *.smi " + data_dir, shell=True) | |
| subprocess.run("mv *.tsv " + data_dir, shell=True) | |
| # Function to process SMILES files and save canonicalized versions | |
| def process_smiles_files(file_paths): | |
| unique_smiles = set() | |
| for file_path in file_paths: | |
| suppl = Chem.SmilesMolSupplier(file_path) | |
| for mol in suppl: | |
| if mol is not None: | |
| try: | |
| sm = Chem.MolToSmiles(mol, canonical=True) | |
| unique_smiles.add(sm) | |
| except: | |
| continue | |
| df = pd.DataFrame({"smiles": list(unique_smiles)}) | |
| df.to_csv(os.path.join(data_dir, "ZINC-canonicalized.csv"), index=False) | |
| train, valid = train_test_split(df, test_size=0.1) | |
| # Save train and validation data | |
| train.to_csv(os.path.join(data_dir, "ZINC-canonicalized-train.csv"), index=False) | |
| valid.to_csv(os.path.join(data_dir, "ZINC-canonicalized-valid.csv"), index=False) | |
| # Process 16_p files | |
| process_smiles_files([os.path.join(data_dir, f"16_p{i}.smi") for i in range(4)]) | |
| # Load reaction data | |
| ord_df = pd.read_csv( | |
| os.path.join(data_dir, "all_ord_reaction_uniq_with_attr20240506_v1.tsv"), | |
| sep="\t", | |
| names=["id", "input", "product", "condition"], | |
| ) | |
| def data_split(row): | |
| categories = [ | |
| "CATALYST", | |
| "REACTANT", | |
| "REAGENT", | |
| "SOLVENT", | |
| "INTERNAL_STANDARD", | |
| "NoData", | |
| ] | |
| data = {cat: [] for cat in categories} | |
| input_data = row["input"] | |
| if isinstance(input_data, str): | |
| for item in input_data.split("."): | |
| for cat in categories: | |
| if cat in item: | |
| data[cat].append(item[item.find(":") + 1 :]) | |
| break | |
| for key, value in data.items(): | |
| data[key] = ".".join(value) | |
| product_data = row["product"] | |
| if isinstance(product_data, str): | |
| product_data = product_data.replace(".PRODUCT", "PRODUCT") | |
| pro_lis = [] | |
| for item in product_data.split("PRODUCT:"): | |
| if item != "": | |
| pro_lis.append(item) | |
| data["PRODUCT"] = ".".join(pro_lis) | |
| else: | |
| data["PRODUCT"] = None | |
| condition_data = row["condition"] | |
| if isinstance(condition_data, str): | |
| data["YIELD"] = ( | |
| float(condition_data.split(":")[1]) if "YIELD" in condition_data else None | |
| ) | |
| temp_pos = condition_data.find("TEMP") | |
| data["TEMP"] = ( | |
| float(condition_data[temp_pos:].split(":")[1]) | |
| if "TEMP" in condition_data | |
| else None | |
| ) | |
| else: | |
| data["YIELD"] = None | |
| data["TEMP"] = None | |
| return list(data.values()) | |
| # Split data and create cleaned DataFrame | |
| categories = [ | |
| "CATALYST", | |
| "REACTANT", | |
| "REAGENT", | |
| "SOLVENT", | |
| "INTERNAL_STANDARD", | |
| "NoData", | |
| "PRODUCT", | |
| "YIELD", | |
| "TEMP", | |
| ] | |
| cleaned_data = {cat: [] for cat in categories} | |
| for _, row in ord_df.iterrows(): | |
| split_data = data_split(row) | |
| for i, value in enumerate(split_data): | |
| cleaned_data[categories[i]].append(value) | |
| cleaned_df = pd.DataFrame(cleaned_data) | |
| # Apply remove_atom_mapping function to relevant columns | |
| for column in [ | |
| "CATALYST", | |
| "REACTANT", | |
| "REAGENT", | |
| "SOLVENT", | |
| "INTERNAL_STANDARD", | |
| "NoData", | |
| "PRODUCT", | |
| ]: | |
| cleaned_df[column] = cleaned_df[column].apply( | |
| lambda x: remove_atom_mapping(x) if isinstance(x, str) else None | |
| ) | |
| # Save cleaned DataFrame | |
| cleaned_df.to_csv(os.path.join(data_dir, "preprocessed_ord.tsv"), index=False) | |
| train, valid = train_test_split(cleaned_df, test_size=int(len(cleaned_df) * 0.1)) | |
| train, test = train_test_split(train, test_size=int(len(cleaned_df) * 0.1)) | |
| # Save train and validation data | |
| train.to_csv(os.path.join(data_dir, "preprocessed_ord_train.csv"), index=False) | |
| valid.to_csv(os.path.join(data_dir, "preprocessed_ord_valid.csv"), index=False) | |
| test.to_csv(os.path.join(data_dir, "preprocessed_ord_test.csv"), index=False) | |