# General import os import kagglehub import pandas as pd import json from typing import Literal from datasets import load_dataset import random #Markdown from IPython.display import Markdown, display, Image # Image from PIL import Image # langchain for llms from langchain_groq import ChatGroq # Langchain from langchain.prompts import PromptTemplate, ChatPromptTemplate from langchain.output_parsers import StructuredOutputParser, ResponseSchema from langchain_core.output_parsers import JsonOutputParser from langchain_core.messages import HumanMessage from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, START, StateGraph, MessagesState from langgraph.prebuilt import ToolNode from langchain_core.tools import tool # Hugging Face from transformers import AutoModelForImageClassification, AutoProcessor from langchain_huggingface import HuggingFaceEmbeddings # Extra libraries from pydantic import BaseModel, Field, model_validator # Advanced RAG from langchain_core.documents import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from langchain.retrievers.multi_query import MultiQueryRetriever from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser # ## APIs os.environ["SERPER_API_KEY"] = os.getenv("SERPER_API_KEY") os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") os.environ["HF_TOKEN"] = os.getenv("HF_TOKEN") GROQ_API_KEY = os.environ["GROQ_API_KEY"] HF_TOKEN = os.environ["HF_TOKEN"] # ## Setup LLM (Llama 3.3 via Groq) # Note: Model 3.2 70b is not available on Groq any more # We will be using 3.3 from Now on os.environ["GROQ_API_KEY"] = GROQ_API_KEY #model_3_2 = 'llama-3.2-11b-text-preview' => his model has been removed from Groq platform model_3_2_small = 'llama-3.1-8b-instant' # Smaller Model 3 Billion parameters if you need speed model_3_3 ='llama-3.3-70b-versatile' # Very Large and Versatile Model with 70 Billion parameters llm = ChatGroq( model= model_3_3, # temperature=0, max_tokens=None, timeout=None, max_retries=2, # groq_api_key=os.getenv("GROQ_API_KEY") # other params... ) # A test message # new text: response = llm.invoke("hi, Please generate 10 unique Dutch names for both male and female?") response display(Markdown(response.content)) # # First Agent: Chatbot Agent from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages class ChatState(TypedDict): # Messages have the type "list". The `add_messages` function # in the annotation defines how this state key should be updated # (in this case, it appends messages to the list, rather than overwriting them) messages: Annotated[list, add_messages] chat_graph = StateGraph(ChatState) def chatbot_agent(state: ChatState): return {"messages": [llm.invoke(state["messages"])]} # The first argument is the unique node name # The second argument is the function or object that will be called whenever # the node is used. chat_graph.add_node("chatbot_agent", chatbot_agent) chat_graph.add_edge(START, "chatbot_agent") chat_graph.add_edge("chatbot_agent", END) # Finally, we'll want to be able to run our graph. To do so, call "compile()" # We basically now give our AI Agent graph_app = chat_graph.compile() # Persistent state to maintain conversation history persistent_state = {"messages": []} # Start with an empty message list from IPython.display import Image, display display(Image(graph_app.get_graph(xray=True).draw_mermaid_png())) from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from IPython.display import display, Markdown class ChatState(TypedDict): messages: Annotated[list, add_messages] chat_graph = StateGraph(ChatState) def chatbot_agent(state: ChatState): # Assuming `llm` is your language model that can handle the conversation history return {"messages": [llm.invoke(state["messages"])]} chat_graph.add_node("chatbot_agent", chatbot_agent) chat_graph.add_edge(START, "chatbot_agent") chat_graph.add_edge("chatbot_agent", END) graph_app = chat_graph.compile() # Persistent state to maintain conversation history persistent_state = {"messages": []} # Start with an empty message list def stream_graph_updates(user_input: str): global persistent_state # Append the user's message to the persistent state persistent_state["messages"].append(("user", user_input)) is_finished = False for event in graph_app.stream(persistent_state): for value in event.values(): last_msg = value["messages"][-1] display(Markdown("Assistant: " + last_msg.content)) # Append the assistant's response to the persistent state persistent_state["messages"].append(("assistant", last_msg.content)) finish_reason = last_msg.response_metadata.get("finish_reason") if finish_reason == "stop": is_finished = True break if is_finished: break while True: try: user_input = input('User:') if user_input.lower() in ["quit", "exit", "q"]: print("Thank you and Goodbye!") break stream_graph_updates(user_input) except Exception as e: print(f"An error occurred: {e}") break # # Second Agent: Add Search to Chatbot to make it Stronger from langchain_community.tools import GoogleSerperResults from typing import List, Annotated from langchain_core.messages import BaseMessage from langgraph.prebuilt import ToolNode, create_react_agent import operator import functools class ChatState(TypedDict): # Messages have the type "list". The `add_messages` function # in the annotation defines how this state key should be updated # (in this case, it appends messages to the list, rather than overwriting them) messages: Annotated[list, add_messages] def agent_node(state, agent, name): result = agent.invoke(state) return { "messages": [HumanMessage(content=result["messages"][-1].content, name=name)] } class SearchState(TypedDict): # A message is added after each team member finishes messages: Annotated[List[BaseMessage], operator.add] # Search Tool serper_tool = GoogleSerperResults( num_results=5, # how many Google results to return ) search_agent = create_react_agent(llm, tools=[serper_tool]) search_node = functools.partial(agent_node, agent=search_agent, name="search_agent") # The first argument is the unique node name # The second argument is the function or object that will be called whenever # the node is used. search_graph = StateGraph(SearchState) search_graph.add_node("search_agent", search_node) search_graph.add_edge(START, "search_agent") search_graph.add_edge("search_agent", END) # Finally, we'll want to be able to run our graph. To do so, call "compile()" # We basically now give our AI Agent search_app = search_graph.compile() from IPython.display import Image, display display(Image(search_app.get_graph(xray=True).draw_mermaid_png())) from langchain_community.tools import GoogleSerperResults from typing import List, Annotated from langchain_core.messages import BaseMessage, HumanMessage from langgraph.prebuilt import ToolNode, create_react_agent from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from IPython.display import display, Markdown import operator import functools class ChatState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] def agent_node(state, agent, name): result = agent.invoke(state) return { "messages": [HumanMessage(content=result["messages"][-1].content, name=name)] } class SearchState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] # Search Tool serper_tool = GoogleSerperResults(num_results=5) # how many Google results to return search_agent = create_react_agent(llm, tools=[serper_tool]) search_node = functools.partial(agent_node, agent=search_agent, name="search_agent") # Create the search graph search_graph = StateGraph(SearchState) search_graph.add_node("search_agent", search_node) search_graph.add_edge(START, "search_agent") search_graph.add_edge("search_agent", END) # Compile the search graph search_app = search_graph.compile() # Persistent state to maintain conversation history persistent_state = {"messages": []} # Start with an empty message list def stream_graph_updates(user_input: str): global persistent_state # Append the user's message to the persistent state persistent_state["messages"].append(HumanMessage(content=user_input)) # Display "Searching the Web Now..." message display(Markdown("**Assistant:** Searching the Web Now...")) is_finished = False for event in search_app.stream(persistent_state): for value in event.values(): last_msg = value["messages"][-1] display(Markdown("**Assistant:** " + last_msg.content)) # Append the assistant's response to the persistent state persistent_state["messages"].append(last_msg) finish_reason = last_msg.response_metadata.get("finish_reason") if finish_reason == "stop": is_finished = True break if is_finished: break while True: try: user_input = input('User:') if user_input.lower() in ["quit", "exit", "q"]: print("Thank you and Goodbye!") break stream_graph_updates(user_input) except Exception as e: print(f"An error occurred: {e}") break # # Step 1: Medical Database Preparation # This step involves preparing and enhancing patient data to be used throughout the simulation. # ## 1.1 Load Dataset # ### 1.1.1 Disease Symptoms and Patient Profile Dataset # Ensure you have downloaded it and placed it in your project directory. # - https://www.kaggle.com/datasets/uom190346a/disease-symptoms-and-patient-profile-dataset # Download latest version path = kagglehub.dataset_download("uom190346a/disease-symptoms-and-patient-profile-dataset") print("Path to dataset files:", path) patient_df = pd.read_csv(path+'/Disease_symptom_and_patient_profile_dataset.csv') patient_df.shape patient_df.head() # Calculate the counts of each gender female_count = patient_df[patient_df['Gender'] == 'Female'].shape[0] male_count = patient_df[patient_df['Gender'] == 'Male'].shape[0] # Calculate the ratio ratio = female_count / male_count print(f"The ratio of Female to Male is {ratio}:1") patient_df['Disease'].value_counts().head(20) # **prepare_medical_dataset Code in One Plalce** def prepare_medical_dataset(path, file_name): patient_df = pd.read_csv(path+file_name) return patient_df path = kagglehub.dataset_download("uom190346a/disease-symptoms-and-patient-profile-dataset") file_name = '/Disease_symptom_and_patient_profile_dataset.csv' patient_df = prepare_medical_dataset(path, file_name) # ### 1.1.2 Chest X-Ray Images (Pneumonia) # # - https://huggingface.co/lxyuan/vit-xray-pneumonia-classification # - https://huggingface.co/datasets/keremberke/chest-xray-classification # # #from datasets import load_dataset #patient_x_ray_path = "keremberke/chest-xray-classification" #x_ray_ds = load_dataset(patient_x_ray_path, name="full") from datasets import load_dataset x_ray_ds = load_dataset("keremberke/chest-xray-classification", name="full") random_index = random.randint(0, x_ray_ds['train'].shape[0] - 1) patient_x_ray = random_row = x_ray_ds['train'][random_index]['image'] from datasets import load_dataset x_ray_ds = load_dataset("keremberke/chest-xray-classification", name="full") x_ray_ds['train'].shape[0] # Assuming x_ray_ds['train'] is a dataset where we want to pick a random row import random random_index = random.randint(0, x_ray_ds['train'].shape[0] - 1) patient_x_ray = x_ray_ds['train'][random_index]['image'] patient_x_ray type(patient_x_ray) #!pip install --upgrade accelerate==0.31.0 #!pip install --upgrade huggingface-hub>=0.23.0 from transformers import pipeline # Model in Hugging Face: https://huggingface.co/lxyuan/vit-xray-pneumonia-classification # vit-xray-pneumonia-classification classifier = pipeline(model="lxyuan/vit-xray-pneumonia-classification") patient_x_ray_results = classifier(patient_x_ray) patient_x_ray_results # Find the label with the highest score patient_x_ray_label = max(patient_x_ray_results, key=lambda x: x['score'])['label'] print(patient_x_ray_label) # Model in Hugging Face: https://huggingface.co/lxyuan/vit-xray-pneumonia-classification # vit-xray-pneumonia-classification classifier = pipeline(model="lxyuan/vit-xray-pneumonia-classification") patient_x_ray_results = classifier(patient_x_ray) # Find the label with the highest score and its score highest = max(patient_x_ray_results, key=lambda x: x['score']) highest_score_label = highest['label'] highest_score = highest['score'] * 100 # Convert to percentage # Choose the correct verb based on the label verb = "is" if highest_score_label == "NORMAL" else "has" # Print the result dynamically print(f"Patient {verb} {highest_score_label} with Probability of ca. {highest_score:.0f}%") # ## 1.2 Generate Synthetic Data with LLMs # Generate culturally appropriate Dutch names and unique alphanumeric IDs for each patient. # ### 1.2.1 Generate Random Names and IDs for Patience # This Code Goes Slower because of Llama 3.3 70b being very big and slow LLM # comparing to llama 3.2 11b # Switch to model_3_2_smal when running this code # === Step 1: Define Response Schemas === # Define the structure of the expected JSON output. # ResponseSchema for First_Name first_name_schema = ResponseSchema( name="First_Name", description="The first name of the patient." ) # ResponseSchema for Last_Name last_name_schema = ResponseSchema( name="Last_Name", description="The last name of the patient." ) # ResponseSchema for Patient_ID patient_id_schema = ResponseSchema( name="Patient_ID", description="A unique 13-character alphanumeric patient identifier." ) # ResponseSchema for Patient_ID gender_schema = ResponseSchema( name="G_Gender", description="Indicate the first name you generate belong which Gender: Male or Female" ) # Aggregate all response schemas response_schemas = [ first_name_schema, last_name_schema, patient_id_schema, gender_schema ] # === Step 2: Set Up the Output Parser === # Initialize the StructuredOutputParser with the defined response schemas. output_parser = StructuredOutputParser.from_response_schemas(response_schemas) # Get the format instructions to include in the prompt format_instructions = output_parser.get_format_instructions() # === Step 3: Craft the Prompt === # Create a prompt that instructs the LLM to generate only the structured JSON data. # Define the prompt template using ChatPromptTemplate prompt_template = ChatPromptTemplate.from_template(""" you MUST Generate a list of {n} Dutch names along with a unique 13-character alphanumeric Patient_ID for each gender provided. Always Use {genders} to generate a First_Name which belong to the right Gender, two category is possible: 'Male' or 'Female'. Ensure the names are culturally appropriate for the Netherlands. Generate unique names, no repetitions, and ensure diversity. The ratio of Female to Male is {ratio}:1 {format_instructions} Genders: {genders} **IMPORTANT:** Do not include any explanations, code, or additional text. you MUST ALWAYS generate Dutch names and Patient_ID according {format_instructions} and NEVER return empty values. YOU MUST Provide only the JSON array as specified. JSON array Should have exactly {n} rows and 3 columns """) # Determine the number of patients n_patients = len(patient_df) #n_patients = 120 # Calculate the counts of each gender female_count = patient_df[patient_df['Gender'] == 'Female'].shape[0] male_count = patient_df[patient_df['Gender'] == 'Male'].shape[0] # Calculate the ratio ratio = female_count / male_count # Prepare the list of genders genders = patient_df['Gender'].tolist() # === Step 6: Generate the Prompt === # Format the prompt with the number of patients and their genders. formatted_prompt = prompt_template.format( n=n_patients, ratio = ratio, genders=', '.join(genders), format_instructions=format_instructions ) # Invoke the model with s Smaller Llama Model for Speed model_3_2_small = 'llama-3.1-8b-instant' # if you need speed llm = ChatGroq( model= model_3_2_small, # temperature=0, max_tokens=None, timeout=None, max_retries=2 ) output = llm.invoke(formatted_prompt, timeout=1000) display(Markdown(output.content)) output_parser = JsonOutputParser() json_output = output_parser.invoke(output) json_output all_patients = [] generated_patients = pd.DataFrame(json_output) generated_patients.head(5) generated_patients.shape # Adjusted LLM parameters (if supported) llm.temperature = 0.9 # Increases randomness all_patients_name_id = pd.DataFrame() output_parser = JsonOutputParser() while all_patients_name_id.shape[0] < n_patients: output = llm.invoke(formatted_prompt) json_output = output_parser.invoke(output) generated_patients = pd.DataFrame(json_output) all_patients_name_id = pd.concat([generated_patients, all_patients_name_id], axis = 0) print(f"len all_patients_name_id: {len(all_patients_name_id)}") all_patients_name_id = all_patients_name_id.drop_duplicates() print(f"len all_patients_name_id after droping duplicates: {len(all_patients_name_id)}") all_patients_name_id.rename(columns = {"G_Gender": "Gender"}, inplace= True) all_patients_name_id.head(10) gender_counts = patient_df['Gender'].value_counts() gender_counts all_patients_name_id['Gender'].value_counts() # Step 1: Count the number of males and females in patient_df gender_counts = patient_df['Gender'].value_counts() # Step 2: Select the required number of unique males and females from all_patients_name_id unique_males = all_patients_name_id[all_patients_name_id['Gender'] == 'Male'].drop_duplicates().head(gender_counts['Male']) unique_females = all_patients_name_id[all_patients_name_id['Gender'] == 'Female'].drop_duplicates().head(gender_counts['Female']) patient_male = patient_df[patient_df['Gender'] == 'Male'].reset_index(drop=True) patient_female = patient_df[patient_df['Gender'] == 'Female'].reset_index(drop=True) updated_male_patients = pd.concat([patient_male.reset_index(drop=True), unique_males[0:patient_male.shape[0]].reset_index(drop=True)], axis = 1) updated_female_patients = pd.concat([patient_female.reset_index(drop=True), unique_females[0:patient_female.shape[0]].reset_index(drop=True)], axis = 1) # Step 3: Concatenate patient_df with the selected rows from all_patients_name_id updated_patient_df = pd.concat([updated_male_patients, updated_female_patients], axis = 0) updated_patient_df.shape[0] # Display the final concatenated dataframe updated_patient_df updated_patient_df = updated_patient_df.loc[:, ~updated_patient_df.columns.duplicated()] updated_patient_df updated_patient_df['Gender'].value_counts() # #### 1.2.1.1 Select a Random Patient # Pick a Random Patient: A female between 20 and 29 and with Pneumonia as Positive so that later we can check X-Ray Agent mask = (updated_patient_df['Gender'] == 'Female') & \ (updated_patient_df["Age"].between(20, 29)) & \ (updated_patient_df['Difficulty Breathing'] == 'Yes') & \ (updated_patient_df['Outcome Variable'] == 'Positive') selected_patients = updated_patient_df[mask].reset_index(drop=True) selected_patients.head() selected_patient = selected_patients.iloc[0] selected_patient # # Step 2: Create IDentity Photo for the Front Desk Agent # ## 2.1 Build the Vision Model for Gender Classification (Image Classification Task) # In[46]: # Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("image-classification", model="rizvandwiki/gender-classification") # In[47]: # Load model directly from transformers import AutoImageProcessor, AutoModelForImageClassification processor = AutoImageProcessor.from_pretrained("rizvandwiki/gender-classification") model = AutoModelForImageClassification.from_pretrained("rizvandwiki/gender-classification") # In machine learning, particularly in classification tasks, logits are the raw, unnormalized outputs produced by a model's final layer before any activation function is applied. These outputs represent the model's confidence scores for each class and are essential for subsequent probability calculations. # In[48]: from transformers import AutoModelForImageClassification, AutoProcessor from PIL import Image import requests # Load the model and processor model_name = "rizvandwiki/gender-classification" model = AutoModelForImageClassification.from_pretrained(model_name) processor = AutoProcessor.from_pretrained(model_name) # Load the image from URL or local path image_url = "https://thispersondoesnotexist.com" image = Image.open(requests.get(image_url, stream=True).raw) # Prepare the image for the model inputs = processor(images=image, return_tensors="pt") # Perform inference outputs = model(**inputs) logits = outputs.logits predicted_class = logits.argmax(-1).item() # Map prediction to class label classes = model.config.id2label gender_label = classes[predicted_class] print(f"Predicted Gender: {gender_label}") import matplotlib.pyplot as plt # Display the image and prediction plt.imshow(image) plt.axis('off') # Hide axes plt.title(f"Predicted Gender: {gender_label}") plt.show() # ## 2.2 Build the Vision Model for Age Classification (Image Classification Task) # Load age classification model age_model_name = "nateraw/vit-age-classifier" age_model = AutoModelForImageClassification.from_pretrained(age_model_name) age_processor = AutoProcessor.from_pretrained(age_model_name) # Age Prediction age_inputs = age_processor(images=image, return_tensors="pt") age_outputs = age_model(**age_inputs) age_logits = age_outputs.logits age_prediction = age_logits.argmax(-1).item() age_label = age_model.config.id2label[age_prediction] age_label # Display the image with both predictions plt.imshow(image) plt.axis('off') plt.title(f"Predicted Gender: {gender_label}, Predicted Age: {age_label}") plt.show() # # Step 3: Start Building Multi-Agents # # Define Each AI Agent # We'll define agents for: # # * Administration Front Desk # * Physician for General Health Examination + Blood Laboratory # * X-Ray Image Department # ## 3.1 Hospital Front Desk Agent # # # **--IMPORTANT NOTE--**
# 1. Don't forget to save one photo from https://thispersondoesnotexist.com/ #
as female.jpg and save it to this Path "/content/sample_data/' #
which is standard path within your Google Colab # # --- # 2. Don't Forget to Save one of the images from the x-ray-dataset
**Load Dataset in this way:**
# patient_x_ray_path = "keremberke/chest-xray-classification"
# x_ray_ds = load_dataset(patient_x_ray_path, name="full") #
Then save one image labelled as x-ray-chest.jpg to the path "/content/sample_data/' patient_x_ray_path = "keremberke/chest-xray-classification" x_ray_ds = load_dataset(patient_x_ray_path, name="full") from typing import List, Tuple, Dict, Any, Sequence, Annotated, Literal from typing_extensions import TypedDict from langchain_core.messages import BaseMessage import operator import functools from langchain_core.messages import HumanMessage from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, START, StateGraph, MessagesState from langgraph.prebuilt import ToolNode, create_react_agent from langchain_core.tools import tool from transformers import AutoModelForImageClassification, AutoProcessor from PIL import Image from pydantic import BaseModel from langchain_core.output_parsers import StrOutputParser, JsonOutputParser from langchain_core.prompts import ChatPromptTemplate # Annotated in python allows developers to declare the type of a reference and provide additional information related to it. # Literal, after that the value are exact and literal #----------------- Build Fucntions that Agents use ---------------------- def patient_verification_tool(image_Path, selected_patient_data, updated_patient_df) -> str: """Detects the gender from an image provided as a file path.""" from PIL import Image print(image_Path) model = AutoModelForImageClassification.from_pretrained("rizvandwiki/gender-classification") processor = AutoProcessor.from_pretrained("rizvandwiki/gender-classification") image = Image.open(image_Path) inputs = processor(images=image, return_tensors="pt") outputs = model(**inputs) predicted_class = outputs.logits.argmax(-1).item() print(f"Predicted Gender Of Patient is : {model.config.id2label[predicted_class]}") predicted_gender = model.config.id2label[predicted_class] from PIL import Image model = AutoModelForImageClassification.from_pretrained("nateraw/vit-age-classifier") processor = AutoProcessor.from_pretrained("nateraw/vit-age-classifier") image = Image.open(image_Path) inputs = processor(images=image, return_tensors="pt") outputs = model(**inputs) predicted_class = outputs.logits.argmax(-1).item() print(f"predicted Age Class: {model.config.id2label[predicted_class]}") predicted_age_range = model.config.id2label[predicted_class] # Parse the age range string (e.g., "20-29") age_min, age_max = map(int, predicted_age_range.split('-')) print(f"age_mi: {age_min}, age_max: {age_max}") # Verify against the DataFrame matching_row = updated_patient_df[ (updated_patient_df["First_Name"] == selected_patient["First_Name"]) & (updated_patient_df["Last_Name"] == selected_patient["Last_Name"]) & (updated_patient_df["Patient_ID"] == selected_patient["Patient_ID"]) & (updated_patient_df["Gender"].str.lower() == predicted_gender) & (updated_patient_df["Age"].between(age_min, age_max)) ] print(f"matching_row {matching_row} ") if not matching_row.empty: patient_verification = f'''Verification successful. Patient is : {selected_patient["First_Name"]} {selected_patient["Last_Name"]} with ID {selected_patient["Patient_ID"]} which is {predicted_gender} in age range of {predicted_age_range} can proceed to the physician.''' else: patient_verification = "ID not verified. Patient cannot proceed." return patient_verification #------------------- Define Agents----------------------------- class AgentState(TypedDict): initial_prompt : str messages: Annotated[List[BaseMessage], operator.add] patient_verification : str def front_desk_agent(state, image_Path, selected_patient_data, updated_patient_df): initial_prompt = state["initial_prompt"] # Call function patient_verification = patient_verification_tool(image_Path, selected_patient_data, updated_patient_df) print(patient_verification) return {"patient_verification": patient_verification} #----------------------------------------------------------------- # Build the LangGraph for Hospital Front Desk # #----------------------------------------------------------------- image_Path = "female.jpg" selected_patient_data = selected_patient.to_dict() updated_patient_df front_desk_agent_node = functools.partial(front_desk_agent, image_Path = image_Path, selected_patient_data=selected_patient_data, updated_patient_df =updated_patient_df) # 6. Set up the Langgraph state graph FrontDeskGraph = StateGraph(AgentState) # Define nodes for workflow FrontDeskGraph.add_node("front_desk_agent", front_desk_agent_node) FrontDeskGraph.add_edge(START, "front_desk_agent") FrontDeskGraph.add_edge("front_desk_agent", END) # Initialize memory to persist state between graph runs FrontDeskWorkflow = FrontDeskGraph.compile() from IPython.display import Markdown, display, Image display(Image(FrontDeskWorkflow.get_graph(xray=True).draw_mermaid_png())) initial_prompt = "You are Front Desk Administrator in an Hospital in the Netherlands. Start Verification of the following Patient:" # Run the workflow inputs = {"initial_prompt" : initial_prompt } output = FrontDeskWorkflow.invoke(inputs) output display(Markdown(output['patient_verification'])) # ## 3.2 Pysician Agent def question_patient_symptoms(selected_patient_data) -> str: """Asks the patient about symptoms, generates responses, and summarizes the answers based on patient data.""" symptoms_questions = { "Cough": "\nAre you coughing?\n", "Fatigue": "\nDo you feel fatigue?\n", "\nDifficulty Breathing": "Do you have difficulty breathing?\n" } conversation = [] for symptom, question in symptoms_questions.items(): conversation.append(f"\nPhysician: {question}") response = selected_patient_data.get(symptom, "No") answer = "Yes" if response == "Yes" else "No" conversation.append(f"\nPatient: {answer}") first_name = selected_patient_data.get("First_Name", "") last_name = selected_patient_data.get("Last_Name", "") patient_id = selected_patient_data.get("Patient_ID", "") gender = selected_patient_data.get("Gender", "") age = selected_patient_data.get("Age", "") profile = f"\nYou are {first_name} {last_name}, a {age} years old {gender} with Patient ID: {patient_id}." summary = profile +"I gathered that you are experiencing the following: " summaries = [] for symptom in symptoms_questions.keys(): response = selected_patient_data.get(symptom, "No") if response == "Yes": summaries.append(f"you are experiencing {symptom.lower()}") else: summaries.append(f"\nI am glad you are not experiencing {symptom.lower()}") summary += "; ".join(summaries) + "." conversation.append(f"\nPhysician: {summary}") return "\n".join(conversation) def perform_examination(selected_patient_data) -> str: """Performs examination by reporting fever, blood pressure, and cholesterol level from patient data.""" fever = selected_patient_data.get("Fever", "Unknown") blood_pressure = selected_patient_data.get("Blood Pressure", "Unknown") cholesterol = selected_patient_data.get("Cholesterol Level", "Unknown") return f"Examination Results: Fever - {fever}, Blood Pressure - {blood_pressure}, Cholesterol Level - {cholesterol}" def diagnose_patient(selected_patient_data) -> str: """Provides diagnosis based on Disease and Outcome columns in patient data.""" disease = selected_patient_data.get("Disease", "Unknown Disease") outcome = selected_patient_data.get("Outcome Variable", "Unknown Outcome") if outcome == 'Positive': diagnosis = 'Make X-Ray from Chest' else: diagnosis = 'Rest to Recover' return f"Diagnosis: {disease}. Test Result: {outcome}. Final Diagnosis: {diagnosis}", diagnosis class AgentState(TypedDict): initial_prompt : str messages: Annotated[List[BaseMessage], operator.add] question_patient_symptoms: str examination_patient: str diagnosis_patient: str diagnosis : str def physician_agent(state, selected_patient_data): question_patient= question_patient_symptoms(selected_patient_data) examination = perform_examination(selected_patient_data) diagnosis_report, diagnosis = diagnose_patient(selected_patient_data) return {"question_patient_symptoms": question_patient, "examination_patient": examination, "diagnosis_patient": diagnosis_report, "diagnosis": diagnosis} selected_patient_data = selected_patient.to_dict() physician_agent_node = functools.partial(physician_agent, selected_patient_data=selected_patient_data) # 6. Set up the Langgraph state graph PhysicianGraph = StateGraph(AgentState) # Define nodes for workflow PhysicianGraph.add_node("physician_agent", physician_agent_node) PhysicianGraph.add_edge(START, "physician_agent") PhysicianGraph.add_edge("physician_agent", END) # Initialize memory to persist state between graph runs PhysicianWorkflow = PhysicianGraph.compile() display(Image(PhysicianWorkflow.get_graph(xray=True).draw_mermaid_png())) initial_prompt = "You are a Very Experience Doctor in an Hospital in the Netherlands. Start a conversation with the patient and determine \ symptoms and give diagnosis" # Run the workflow inputs = {"initial_prompt" : initial_prompt } output = PhysicianWorkflow.invoke(inputs) output display(Markdown(output['question_patient_symptoms'])) display(Markdown(output['examination_patient'])) display(Markdown(output['diagnosis_patient'])) # ## 3.3 Radiologist def examine_X_ray_image(patient_x_ray_path) -> str: """Use Vision Models to recognise if the X-Ray Image of Patient is NORMAL or PNEUMONIA""" # Model in Hugging Face: https://huggingface.co/lxyuan/vit-xray-pneumonia-classification # vit-xray-pneumonia-classification x_ray_ds = load_dataset(patient_x_ray_path, name="full") random_index = random.randint(0, x_ray_ds['train'].shape[0] - 1) patient_x_ray_image = x_ray_ds['train'][random_index]['image'] classifier = pipeline(model="lxyuan/vit-xray-pneumonia-classification") patient_x_ray_results = classifier(patient_x_ray_image) # Find the label with the highest score and its score highest = max(patient_x_ray_results, key=lambda x: x['score']) highest_score_label = highest['label'] highest_score = highest['score'] * 100 # Convert to percentage # Choose the correct verb based on the label verb = "is" if highest_score_label == "NORMAL" else "has" return f"Patient {verb} {highest_score_label} with Probability of ca. {highest_score:.0f}%" class AgentState(TypedDict): initial_prompt : str messages: Annotated[List[BaseMessage], operator.add] pneumonia_detection: str def radiologist_agent(state, patient_x_ray_path): pneumonia_detection = examine_X_ray_image(patient_x_ray_path) return {"pneumonia_detection": pneumonia_detection} patient_x_ray_path = "keremberke/chest-xray-classification" radiologist_agent_node = functools.partial(radiologist_agent, patient_x_ray_path=patient_x_ray_path) # 6. Set up the Langgraph state graph RadiologistGraph = StateGraph(AgentState) # Define nodes for workflow RadiologistGraph.add_node("radiologist_agent", radiologist_agent_node) RadiologistGraph.add_edge(START, "radiologist_agent") RadiologistGraph.add_edge("radiologist_agent", END) # Initialize memory to persist state between graph runs RadiologistWorkflow = RadiologistGraph.compile() display(Image(RadiologistWorkflow.get_graph(xray=True).draw_mermaid_png())) initial_prompt = "You are a Very Experienced Radiologist in an Hospital in the Netherlands. Diagnose if the patient has pneumonia" # Run the workflow inputs = {"initial_prompt" : initial_prompt } output = RadiologistWorkflow.invoke(inputs) output display(Markdown(output['pneumonia_detection'])) # # Step 4: Putting All Agents in One Graph from langchain_core.output_parsers import StrOutputParser, JsonOutputParser from langchain_core.prompts import ChatPromptTemplate selected_patient_data = selected_patient.to_dict() image_Path = "female.jpg" patient_x_ray_image = patient_x_ray def patient_verification_tool(image_Path, selected_patient_data, updated_patient_df) -> str: """Detects the gender from an image provided as a file path.""" from PIL import Image print(image_Path) model = AutoModelForImageClassification.from_pretrained("rizvandwiki/gender-classification") processor = AutoProcessor.from_pretrained("rizvandwiki/gender-classification") image = Image.open(image_Path) inputs = processor(images=image, return_tensors="pt") outputs = model(**inputs) predicted_class = outputs.logits.argmax(-1).item() print(f"Predicted Gender Of Patient is : {model.config.id2label[predicted_class]}") predicted_gender = model.config.id2label[predicted_class] from PIL import Image model = AutoModelForImageClassification.from_pretrained("nateraw/vit-age-classifier") processor = AutoProcessor.from_pretrained("nateraw/vit-age-classifier") image = Image.open(image_Path) inputs = processor(images=image, return_tensors="pt") outputs = model(**inputs) predicted_class = outputs.logits.argmax(-1).item() print(f"predicted Age Class: {model.config.id2label[predicted_class]}") predicted_age_range = model.config.id2label[predicted_class] # Parse the age range string (e.g., "20-29") age_min, age_max = map(int, predicted_age_range.split('-')) print(f"age_mi: {age_min}, age_max: {age_max}") # Verify against the DataFrame matching_row = updated_patient_df[ (updated_patient_df["First_Name"] == selected_patient["First_Name"]) & (updated_patient_df["Last_Name"] == selected_patient["Last_Name"]) & (updated_patient_df["Patient_ID"] == selected_patient["Patient_ID"]) & (updated_patient_df["Gender"].str.lower() == predicted_gender) & (updated_patient_df["Age"].between(age_min, age_max)) ] print(f"matching_row {matching_row} ") if not matching_row.empty: patient_verification = f'''Verification successful. Patient is : {selected_patient["First_Name"]} {selected_patient["Last_Name"]} with ID {selected_patient["Patient_ID"]} which is {predicted_gender} in age range of {predicted_age_range} can proceed to the physician.''' else: patient_verification = "ID not verified. Patient cannot proceed." return patient_verification def question_patient_symptoms(selected_patient_data) -> str: """Asks the patient about symptoms, generates responses, and summarizes the answers based on patient data.""" symptoms_questions = { "Cough": "\nAre you coughing?\n", "Fatigue": "\nDo you feel fatigue?\n", "\nDifficulty Breathing": "Do you have difficulty breathing?\n" } conversation = [] for symptom, question in symptoms_questions.items(): conversation.append(f"\nPhysician: {question}") response = selected_patient_data.get(symptom, "No") answer = "Yes" if response == "Yes" else "No" conversation.append(f"\nPatient: {answer}") first_name = selected_patient_data.get("First_Name", "") last_name = selected_patient_data.get("Last_Name", "") patient_id = selected_patient_data.get("Patient_ID", "") gender = selected_patient_data.get("Gender", "") age = selected_patient_data.get("Age", "") profile = f"\nYou are {first_name} {last_name}, a {age} years old {gender} with Patient ID: {patient_id}." summary = profile +"I gathered that you are experiencing the following: " summaries = [] for symptom in symptoms_questions.keys(): response = selected_patient_data.get(symptom, "No") if response == "Yes": summaries.append(f"you are experiencing {symptom.lower()}") else: summaries.append(f"\nI am glad you are not experiencing {symptom.lower()}") summary += "; ".join(summaries) + "." conversation.append(f"\nPhysician: {summary}") return "\n".join(conversation) def perform_examination(selected_patient_data) -> str: """Performs examination by reporting fever, blood pressure, and cholesterol level from patient data.""" fever = selected_patient_data.get("Fever", "Unknown") blood_pressure = selected_patient_data.get("Blood Pressure", "Unknown") cholesterol = selected_patient_data.get("Cholesterol Level", "Unknown") return f"Examination Results: Fever - {fever}, Blood Pressure - {blood_pressure}, Cholesterol Level - {cholesterol}" def diagnose_patient(selected_patient_data) -> str: """Provides diagnosis based on Disease and Outcome columns in patient data.""" disease = selected_patient_data.get("Disease", "Unknown Disease") outcome = selected_patient_data.get("Outcome Variable", "Unknown Outcome") if outcome == 'Positive': diagnosis = 'Make X-Ray from Chest' else: diagnosis = 'Rest to Recover' return f"Diagnosis: {disease}. Test Result: {outcome}. Final Diagnosis: {diagnosis}", diagnosis def examine_X_ray_image(patient_x_ray_path) -> str: """Use Vision Models to recognise if the X-Ray Image of Patient is NORMAL or PNEUMONIA""" # Model in Hugging Face: https://huggingface.co/lxyuan/vit-xray-pneumonia-classification # vit-xray-pneumonia-classification x_ray_ds = load_dataset(patient_x_ray_path, name="full") random_index = random.randint(0, x_ray_ds['train'].shape[0] - 1) patient_x_ray_image = x_ray_ds['train'][random_index]['image'] classifier = pipeline(model="lxyuan/vit-xray-pneumonia-classification") patient_x_ray_results = classifier(patient_x_ray_image) # Find the label with the highest score and its score highest = max(patient_x_ray_results, key=lambda x: x['score']) highest_score_label = highest['label'] highest_score = highest['score'] * 100 # Convert to percentage # Choose the correct verb based on the label verb = "is" if highest_score_label == "NORMAL" else "has" return f"Patient {verb} {highest_score_label} with Probability of ca. {highest_score:.0f}%" # The agent state is the input to each node in the graph class AgentState(TypedDict): # The annotation tells the graph that new messages will always # be added to the current states initial_prompt : str messages: Annotated[List[BaseMessage], operator.add] patient_verification : str question_patient_symptoms: str examination_patient: str diagnosis_patient: str diagnosis : str pneumonia_detection: str def front_desk_agent(state, image_Path, selected_patient_data, updated_patient_df): initial_prompt = state["initial_prompt"] patient_verification = patient_verification_tool(image_Path, selected_patient_data, updated_patient_df) print(patient_verification) return {"patient_verification": patient_verification} def physician_agent(state, selected_patient_data): question_patient= question_patient_symptoms(selected_patient_data) examination = perform_examination(selected_patient_data) diagnosis_report, diagnosis = diagnose_patient(selected_patient_data) pneumonia_detection = examine_X_ray_image(patient_x_ray_path) return {"question_patient_symptoms": question_patient, "examination_patient": examination, "diagnosis_patient": diagnosis_report, "diagnosis": diagnosis} def radiologist_agent(state, patient_x_ray_path): pneumonia_detection = examine_X_ray_image(patient_x_ray_path) return {"pneumonia_detection": pneumonia_detection} def decide_on_radiologist(state): if state["diagnosis"] == 'Make X-Ray from Chest': return 'radiologist' else: return '' image_Path = "female.jpg" selected_patient_data = selected_patient.to_dict() updated_patient_df patient_x_ray_path = "keremberke/chest-xray-classification" front_desk_agent_node = functools.partial(front_desk_agent, image_Path = image_Path, selected_patient_data=selected_patient_data, updated_patient_df =updated_patient_df) physician_agent_node = functools.partial(physician_agent, selected_patient_data=selected_patient_data) radiologist_agent_node = functools.partial(radiologist_agent, patient_x_ray_path=patient_x_ray_path) def decide_on_radiologist(state): if state["diagnosis"] == 'Make X-Ray from Chest': return 'radiologist' else: return 'end' # 6. Set up the Langgraph state graph HospitalGraph = StateGraph(AgentState) # Define nodes for workflow HospitalGraph.add_node("front_desk_agent", front_desk_agent_node) HospitalGraph.add_node("physician_agent", physician_agent_node) HospitalGraph.add_node("radiologist_agent", radiologist_agent_node) HospitalGraph.add_edge(START, "front_desk_agent") HospitalGraph.add_edge("front_desk_agent", "physician_agent") HospitalGraph.add_conditional_edges("physician_agent", decide_on_radiologist, {'radiologist': "radiologist_agent", 'end': END}) # Initialize memory to persist state between graph runs HospitalWorkflow = HospitalGraph.compile() display(Image(HospitalWorkflow.get_graph(xray=True).draw_mermaid_png())) initial_prompt = "Start with the following Patient" # Run the workflow inputs = {"initial_prompt" : initial_prompt } output = HospitalWorkflow.invoke(inputs) output display(Markdown(output['patient_verification'])) display(Markdown(output['question_patient_symptoms'])) display(Markdown(output['examination_patient'])) display(Markdown(output['diagnosis_patient'])) display(Markdown(output['pneumonia_detection'])) # # Step 5: Gradio Dashboard # ## 5.1 Build the Hospital Dashboard APP # In[69]: x_ray_image_path = 'x-ray-chest.png' import gradio as gr info = ( f"**First Name:** {selected_patient_data['First_Name']}\n\n" f"**Last Name:** {selected_patient_data['Last_Name']}\n\n" f"**Patient ID:** {selected_patient_data['Patient_ID']}" ) def verify_age_gender(): """ Function to verify age and gender. """ # Placeholder logic: In a real scenario, perform necessary checks or computations initial_prompt = "You are Front Desk Administrator in an Hospital in the Netherlands. Start Verification of the following Patient:" inputs = {"initial_prompt" : initial_prompt } output = FrontDeskWorkflow.invoke(inputs) verification_message = '✅ ' + output['patient_verification'] return verification_message, gr.update(visible=True) def physician_examination(): initial_prompt = "You are a Very Experience Doctor in an Hospital in the Netherlands. Start a conversation with the patient and determine \ symptoms and give diagnosis" # Run the workflow inputs = {"initial_prompt" : initial_prompt } output = PhysicianWorkflow.invoke(inputs) output_all = f''' 🩺 {output['question_patient_symptoms']}\n 💓 {output['examination_patient']}\n 🌬️ {output['diagnosis_patient']}''' return output_all, gr.update(visible=True) def pneumonia_detection(): initial_prompt = "You are a Very Experienced Radiologist in an Hospital in the Netherlands. Diagnose if the patient has pneumonia" inputs = {"initial_prompt" : initial_prompt } output = RadiologistWorkflow.invoke(inputs) pneumonia_detection = 'From X-Ray Image 🖼️ ' + output['pneumonia_detection'] return pneumonia_detection def take_xray_image(): return gr.update(visible=True), gr.update(visible=True) with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=1): gr.Markdown(info) # Add a Button below the Markdown verify_button = gr.Button("Verify Age and Gender") # Add an output component to display verification status verification_output = gr.Textbox(label="Verification Status", interactive=False, lines=5, max_lines=None) # Add a Button below the Markdown physician_button = gr.Button("Get Examination at Physician", visible=False) physician_output = gr.Textbox(label="Examination by Physician Placeholder", interactive=False, lines=35, max_lines=None) x_ray_button = gr.Button("Take Chest X-Ray Image", visible=False) # Display X-Ray Image (Initially Hidden) xray_image_display = gr.Image(value=x_ray_image_path, label="X-Ray Image", visible=False) radiologist_button = gr.Button("Go to Radiologist", visible=False) # Add an output component to display verification status radiologist_output = gr.Textbox(label="Radiologist Placeholder", interactive=False, lines=5, max_lines=None) with gr.Column(scale=1): gr.Image(value=image_Path, label="Static Image", show_label=True) # Define the button's action: When clicked, call verify_age_gender and display the result verify_button.click(fn=verify_age_gender, inputs=None, outputs=[verification_output, physician_button]) physician_button.click(fn=physician_examination, inputs=None, outputs=[physician_output, x_ray_button]) x_ray_button.click(fn=take_xray_image, inputs=None, outputs=[xray_image_display, radiologist_button]) radiologist_button.click(fn=pneumonia_detection, inputs=None, outputs=[radiologist_output]) # ## 5.2 Run the App # Launch the app #demo.launch(share=True, debug=False) #demo.launch(share=True, debug=False, allowed_paths=[dataDir], ssr_mode=False) demo.launch(share=True, debug=False, ssr_mode=False)