Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import tempfile | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| import gradio as gr | |
| import pandas as pd | |
| import pybboxes as pbx | |
| from PIL import Image | |
| from huggingface_hub import CommitScheduler | |
| # Import our custom modules | |
| from py_files import yolo | |
| from py_files import dataset_upload | |
| from py_files.ocr import get_text_from_image_doc | |
| # Global debug mode variables | |
| DEBUG_MODE = False | |
| DEBUG_TABLE_DF = None | |
| DEBUG_ORIGINAL_IMAGE = None | |
| DEBUG_ANNOTATED_IMAGE = None | |
| def take_screenshot_and_process(url, gemini_api_key): | |
| """ | |
| Take a screenshot of the provided URL and process it for deceptive pattern detection. | |
| Returns (dataframe, status_message, image_path, eval_dir_for_cleanup) | |
| """ | |
| print(f"\n[CONSOLE] ===== STARTING ANALYSIS PROCESS =====") | |
| print(f"[CONSOLE] URL: {url}") | |
| print(f"[CONSOLE] Gemini API Key provided: {'Yes' if gemini_api_key else 'No'}") | |
| # Check if debug mode is enabled | |
| if DEBUG_MODE: | |
| print(f"[CONSOLE] ===== DEBUG MODE ENABLED =====") | |
| print(f"[CONSOLE] [DEBUG MODE] Using pre-loaded debug data instead of actual analysis") | |
| # Create temporary directory for debug processing | |
| eval_dir = tempfile.mkdtemp() | |
| print(f"[CONSOLE] [DEBUG MODE] Created temporary directory: {eval_dir}") | |
| # Use the mock pipeline with debug data | |
| for result in create_mock_analysis_with_debug_data( | |
| DEBUG_TABLE_DF, | |
| DEBUG_ORIGINAL_IMAGE, | |
| DEBUG_ANNOTATED_IMAGE, | |
| eval_dir | |
| ): | |
| yield result | |
| return | |
| # Normal mode - proceed with regular processing | |
| if not url or not (url.startswith("http://") or url.startswith("https://")): | |
| print(f"[CONSOLE] ERROR: Invalid URL format - {url}") | |
| yield (None, "β Invalid URL format - please use http:// or https://", None, None) | |
| raise gr.Error("Please enter a valid URL (starting with http:// or https://).") | |
| if not gemini_api_key: | |
| print(f"[CONSOLE] ERROR: No Gemini API key provided") | |
| yield (None, "β No Gemini API key provided", None, None) | |
| raise gr.Error("Please provide a Gemini API Key.") | |
| # Set the Gemini API key in the environment | |
| os.environ["GEMINI_API"] = gemini_api_key | |
| print(f"[CONSOLE] Gemini API key set in environment") | |
| # Create temporary directory for processing | |
| eval_dir = tempfile.mkdtemp() | |
| print(f"[CONSOLE] Created temporary directory: {eval_dir}") | |
| try: | |
| # Step 1: Taking screenshot | |
| print(f"[CONSOLE] STEP 1/6: Taking screenshot of the website...") | |
| yield (None, "Step 1/6: Taking screenshot of the website...", None, eval_dir) | |
| screenshots_dir = os.path.join(eval_dir, "screenshots") | |
| ocr_dir = os.path.join(eval_dir, "ocr") | |
| yolo_dir = os.path.join(eval_dir, "yolo") | |
| csv_yolo_dir = os.path.join(eval_dir, "csv_with_yolo") | |
| gemini_fs_dir = os.path.join(eval_dir, "gemini_fs") | |
| for d in [screenshots_dir, ocr_dir, yolo_dir, csv_yolo_dir, gemini_fs_dir]: | |
| os.makedirs(d, exist_ok=True) | |
| print(f"[CONSOLE] Created directory: {d}") | |
| # Take screenshot using Selenium | |
| image_path = os.path.join(screenshots_dir, "screenshot.png") | |
| print(f"[CONSOLE] Taking screenshot, saving to: {image_path}") | |
| image = take_website_screenshot(url, image_path) | |
| print(f"[CONSOLE] Screenshot completed successfully") | |
| # Display the original screenshot immediately | |
| print(f"[CONSOLE] Displaying original screenshot") | |
| yield (None, "π· Screenshot captured! Starting analysis...", image_path, eval_dir) | |
| # Step 2: Setup directories | |
| print(f"[CONSOLE] STEP 2/6: Setting up processing directories...") | |
| yield (None, "Step 2/6: Setting up processing directories...", image_path, eval_dir) | |
| # Step 3: Run OCR | |
| print(f"[CONSOLE] STEP 3/6: Running OCR analysis...") | |
| yield (None, "Step 3/6: Running OCR analysis...", image_path, eval_dir) | |
| csv_path = os.path.join(ocr_dir, "screenshot.csv") | |
| print(f"[CONSOLE] Running OCR on image...") | |
| ocr_result = get_text_from_image_doc(image)[0] | |
| ocr_df = ocr_result.get_dataframe(image) | |
| ocr_df.to_csv(csv_path, index=False) | |
| print(f"[CONSOLE] OCR completed, saved to: {csv_path}") | |
| print(f"[CONSOLE] OCR found {len(ocr_df)} text elements") | |
| # Step 4: Run YOLO object detection | |
| print(f"[CONSOLE] STEP 4/6: Running YOLO object detection...") | |
| yield (None, "Step 4/6: Running YOLO object detection...", image_path, eval_dir) | |
| yolo_result_path = os.path.join(yolo_dir, "screenshot.txt") | |
| # Use real YOLO ensemble | |
| print(f"[CONSOLE] Loading YOLO ensemble models...") | |
| models = yolo.YoloEnsemble(weights=["models/vision/16.pt", "models/vision/15.pt", "models/vision/14.pt"]) | |
| print(f"[CONSOLE] Running YOLO prediction with confidence threshold 0.3...") | |
| results = models.predict(image_path, conf=0.3, verbose=True) | |
| if results[0].boxes is None: | |
| print(f"[CONSOLE] YOLO: No objects detected") | |
| with open(yolo_result_path, 'w') as f: | |
| f.write("") | |
| else: | |
| print(f"[CONSOLE] YOLO: {len(results[0].boxes)} objects detected") | |
| results[0].save_txt(yolo_result_path) | |
| print(f"[CONSOLE] YOLO results saved to: {yolo_result_path}") | |
| # Step 5: Combine OCR and YOLO results | |
| print(f"[CONSOLE] STEP 5/6: Combining OCR and element detection results...") | |
| yield (None, "Step 5/6: Combining OCR and element detection results...", image_path, eval_dir) | |
| combined_csv_path = os.path.join(csv_yolo_dir, "screenshot.csv") | |
| # Combine results using original logic | |
| print(f"[CONSOLE] Combining OCR and YOLO results...") | |
| combined_df = combine_ocr_yolo_results_original(ocr_df, yolo_result_path, image) | |
| combined_df.to_csv(combined_csv_path, index=False) | |
| print(f"[CONSOLE] Combined results saved to: {combined_csv_path}") | |
| print(f"[CONSOLE] Combined dataframe has {len(combined_df)} rows") | |
| # Step 6: Analyze with Gemini | |
| print(f"[CONSOLE] STEP 6/6: Analyzing for deceptive patterns with Gemini...") | |
| yield (None, "Step 6/6: Analyzing for deceptive patterns with Gemini...", image_path, eval_dir) | |
| # Use the generator version for real-time notifications | |
| from py_files.gemini_analysis import few_shots_generator | |
| # Enhanced progress reporting for Gemini analysis | |
| yield (None, "π§ Preparing data for Gemini analysis...", image_path, eval_dir) | |
| # Save the combined results for few_shots processing | |
| os.makedirs(gemini_fs_dir, exist_ok=True) | |
| print(f"[CONSOLE] Running Gemini few_shots analysis...") | |
| print(f"[CONSOLE] Input file: {combined_csv_path}") | |
| yield (None, f"π Processing {len(combined_df)} UI elements for deceptive pattern analysis...", image_path, eval_dir) | |
| # Use the generator version that yields real-time notifications | |
| final_df = None | |
| try: | |
| for status, data in few_shots_generator(eval_dir=eval_dir, files=[combined_csv_path], api_key=gemini_api_key): | |
| if status == 'notification': | |
| # Yield the notification immediately to the UI | |
| yield None, data, image_path, eval_dir | |
| elif status == 'result': | |
| final_df = data | |
| break | |
| print(f"[CONSOLE] Gemini analysis completed") | |
| except gr.Error: | |
| # Re-raise gr.Error exceptions as they should propagate to the UI | |
| print(f"[CONSOLE] Gemini analysis raised gr.Error, propagating...") | |
| raise | |
| except Exception as gemini_error: | |
| # Handle any other unexpected errors from Gemini analysis | |
| print(f"[CONSOLE] Unexpected error in Gemini analysis: {str(gemini_error)}") | |
| error_msg = f"β Gemini analysis failed: {str(gemini_error)}" | |
| yield (None, error_msg, image_path, eval_dir) | |
| # Don't raise here - let the function continue with final_df = None | |
| if final_df is None: | |
| print(f"[CONSOLE] Gemini analysis failed completely") | |
| yield (None, "β Gemini analysis failed - please check your API key and try again", image_path, eval_dir) | |
| if final_df is not None: | |
| print(f"[CONSOLE] Final analysis result: {len(final_df)} rows detected") | |
| deceptive_count = len(final_df[final_df['Deceptive Design Category'].str.lower() != 'non-deceptive']) if 'Deceptive Design Category' in final_df.columns else 0 | |
| total_count = len(final_df) | |
| yield (None, f"π Analysis complete! Found {deceptive_count} deceptive patterns out of {total_count} UI elements", image_path, eval_dir) | |
| yield (None, "π¨ Creating annotated screenshot with colored highlights...", image_path, eval_dir) | |
| # Create annotated screenshot | |
| annotated_path = create_annotated_screenshot(image_path, final_df, eval_dir) | |
| print(f"[CONSOLE] Annotated screenshot created at: {annotated_path}") | |
| # Yield the final results with annotated screenshot replacing the original | |
| # annotated_path will always be valid now (either annotated or original as fallback) | |
| status_message = "β Analysis complete! All elements annotated with colored bounding boxes." | |
| if annotated_path == image_path: | |
| status_message = "β Analysis complete! (Note: Screenshot annotation failed, showing original)" | |
| yield (final_df, status_message, annotated_path, eval_dir) | |
| else: | |
| print(f"[CONSOLE] WARNING: Final analysis result is None") | |
| yield (None, "β Analysis failed - unable to process results", None, eval_dir) | |
| print(f"[CONSOLE] ===== ANALYSIS PROCESS COMPLETED =====") | |
| except Exception as e: | |
| print(f"[CONSOLE] ERROR in take_screenshot_and_process: {str(e)}") | |
| print(f"[CONSOLE] Exception type: {type(e).__name__}") | |
| # Send notification to user about the error before yielding error state | |
| error_msg = f"β Error occurred: {str(e)}" | |
| yield (None, error_msg, None, eval_dir) | |
| raise gr.Error(f"Error processing website: {str(e)}") | |
| def cleanup_temp_directory(eval_dir): | |
| """ | |
| Clean up temporary files after image has been displayed to the frontend. | |
| This should be called after the UI has had time to display the image. | |
| """ | |
| if not eval_dir: | |
| return | |
| try: | |
| print(f"[CONSOLE] Cleaning up temporary directory: {eval_dir}") | |
| if os.path.exists(eval_dir): | |
| shutil.rmtree(eval_dir) | |
| print(f"[CONSOLE] Cleanup completed successfully") | |
| else: | |
| print(f"[CONSOLE] Temp directory {eval_dir} does not exist or was already cleaned up") | |
| except Exception as cleanup_error: | |
| print(f"[CONSOLE] WARNING: Failed to cleanup temp directory: {cleanup_error}") | |
| # Try to clean up individual files if directory removal fails | |
| try: | |
| if eval_dir and os.path.exists(eval_dir): | |
| for root, dirs, files in os.walk(eval_dir): | |
| for file in files: | |
| try: | |
| os.remove(os.path.join(root, file)) | |
| print(f"[CONSOLE] Removed individual file: {file}") | |
| except Exception as file_error: | |
| print(f"[CONSOLE] Failed to remove file {file}: {file_error}") | |
| # Try to remove empty directories | |
| for root, dirs, files in os.walk(eval_dir, topdown=False): | |
| for dir in dirs: | |
| try: | |
| os.rmdir(os.path.join(root, dir)) | |
| except Exception: | |
| pass | |
| # Try to remove the main directory | |
| os.rmdir(eval_dir) | |
| print(f"[CONSOLE] Manual cleanup completed") | |
| except Exception as manual_cleanup_error: | |
| print(f"[CONSOLE] ERROR: Complete cleanup failure: {manual_cleanup_error}") | |
| print(f"[CONSOLE] Temp directory may not be fully cleaned: {eval_dir}") | |
| def take_website_screenshot(url, output_path): | |
| """ | |
| Take a screenshot of a website using Selenium WebDriver. | |
| """ | |
| print(f"[CONSOLE] take_website_screenshot: Starting selenium screenshot capture for {url}") | |
| print(f"[CONSOLE] Output path: {output_path}") | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| import time | |
| try: | |
| # Setup Chrome options for headless mode | |
| print(f"[CONSOLE] Setting up Chrome WebDriver in headless mode...") | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| # chrome_options.add_argument("--disable-gpu") | |
| chrome_options.add_argument("--window-size=1280,1024") | |
| # chrome_options.add_argument("--disable-extensions") | |
| # chrome_options.add_argument("--disable-plugins") | |
| # chrome_options.add_argument("--disable-images") # Faster loading | |
| # chrome_options.add_argument("--disable-javascript") # Faster loading, optional | |
| # Create WebDriver instance | |
| print(f"[CONSOLE] Creating Chrome WebDriver instance...") | |
| css_to_inject = ":root { color-scheme: only light; }" | |
| javascript_code = """ | |
| var style = document.createElement('style'); | |
| style.type = 'text/css'; | |
| style.innerHTML = arguments[0]; | |
| document.head.appendChild(style); | |
| """ | |
| driver = webdriver.Chrome(options=chrome_options) | |
| driver.set_window_size(1280, 1024) | |
| driver.execute_script(javascript_code, css_to_inject) | |
| time.sleep(0.5) | |
| try: | |
| # Set page load timeout | |
| driver.set_page_load_timeout(30) | |
| # Navigate to the URL | |
| print(f"[CONSOLE] Navigating to URL: {url}") | |
| driver.get(url) | |
| # Wait a bit for the page to render | |
| print(f"[CONSOLE] Waiting for page to load... (5 secs)") | |
| time.sleep(5) | |
| # Take screenshot | |
| print(f"[CONSOLE] Taking screenshot...") | |
| driver.save_screenshot(output_path) | |
| print(f"[CONSOLE] Screenshot saved to: {output_path}") | |
| # Load and return the image | |
| image = Image.open(output_path) | |
| print(f"[CONSOLE] Screenshot completed successfully, image size: {image.size}") | |
| return image | |
| finally: | |
| # Always close the driver | |
| print(f"[CONSOLE] Closing WebDriver...") | |
| driver.quit() | |
| except Exception as e: | |
| print(f"[CONSOLE] Exception in selenium screenshot: {str(e)}") | |
| print(f"[CONSOLE] Exception type: {type(e).__name__}") | |
| raise Exception(f"Screenshot failed: {str(e)}") | |
| def combine_ocr_yolo_results_original(ocr_df, yolo_result_path, image): | |
| """ | |
| Combine OCR results with YOLO detection results using the original logic. | |
| """ | |
| W, H = image.size | |
| # Load YOLO results | |
| if not os.path.exists(yolo_result_path) or os.path.getsize(yolo_result_path) == 0: | |
| # If no YOLO results, just add Element Type column and return | |
| ocr_df['Element Type'] = 'text' | |
| return ocr_df | |
| # Read YOLO results | |
| yolo_df = pd.read_csv(yolo_result_path, sep=" ", names=["class", "x1", "y1", "x2", "y2"]) | |
| # Convert YOLO format to pixel coordinates | |
| for j in range(len(yolo_df)): | |
| scaled = pbx.convert_bbox( | |
| [yolo_df.iloc[j]['x1'], yolo_df.iloc[j]['y1'], yolo_df.iloc[j]['x2'], yolo_df.iloc[j]['y2']], | |
| from_type="yolo", to_type="voc", image_size=(W, H) | |
| ) | |
| yolo_df.iat[j, 1], yolo_df.iat[j, 2], yolo_df.iat[j, 3], yolo_df.iat[j, 4] = scaled | |
| # Class mapping | |
| cls_dict = { | |
| 0: "button", 1: "checked checkbox", 2: "unchecked checkbox", | |
| 3: "checked radio button", 4: "unchecked radio button", | |
| 5: "checked switch", 6: "unchecked switch" | |
| } | |
| # Ensure coordinate columns exist and are strings before processing | |
| if 'Top Co-ordinates' not in ocr_df.columns or 'Bottom Co-ordinates' not in ocr_df.columns: | |
| ocr_df['Element Type'] = 'text' | |
| return ocr_df | |
| # Create coordinates column for easier processing | |
| ocr_df['Coordinates'] = ( | |
| ocr_df['Top Co-ordinates'].astype(str).str.replace('(', '', regex=False).str.replace(')', '', regex=False) + ', ' + | |
| ocr_df['Bottom Co-ordinates'].astype(str).str.replace('(', '', regex=False).str.replace(')', '', regex=False) | |
| ) | |
| ele_types = ["text"] * len(ocr_df) | |
| bboxes = yolo_df[['x1', 'y1', 'x2', 'y2']].values.tolist() | |
| clss = yolo_df['class'].tolist() | |
| if not isinstance(clss, list): | |
| clss = [clss] | |
| coords = ocr_df['Coordinates'].tolist() | |
| # Match YOLO detections with OCR text | |
| for ele_cls, ele_rect in zip(clss, bboxes): | |
| distance_dict = {} | |
| for ci, coord in enumerate(coords): | |
| try: | |
| rect_text = list(map(float, coord.split(','))) | |
| except (ValueError, AttributeError): | |
| continue # Skip if coordinate string is invalid | |
| if ele_cls == 0: # button | |
| if yolo.do_rectangles_overlap(ele_rect, rect_text): | |
| ele_types[ci] = cls_dict[ele_cls] | |
| break | |
| elif ele_cls in [1, 2, 3, 4]: # checkbox or radio | |
| e_y1, e_y2 = ele_rect[1], ele_rect[3] | |
| r_y1, r_y2 = rect_text[1], rect_text[3] | |
| text_mid_y = (r_y1 + r_y2) / 2 | |
| if e_y1 < text_mid_y < e_y2 and rect_text[0] > ele_rect[0] and rect_text[0] - ele_rect[2] < 100: | |
| distance_dict[rect_text[0] - ele_rect[2]] = ci | |
| if ele_cls > 0 and len(distance_dict) > 0: | |
| ele_types[sorted(distance_dict.items(), key=lambda x: x[0])[0][1]] = cls_dict[ele_cls] | |
| ocr_df['Element Type'] = ele_types | |
| ocr_df = ocr_df.drop(columns=['Coordinates']) | |
| # Reorder columns | |
| cols = ocr_df.columns.tolist() | |
| cols = cols[:1] + cols[-1:] + cols[1:-1] | |
| ocr_df = ocr_df[cols] | |
| return ocr_df | |
| def create_result_display(df): | |
| """ | |
| Create a display of the analysis results. | |
| """ | |
| if df is None or df.empty: | |
| return "No results to display." | |
| # Count deceptive patterns | |
| if 'Deceptive Design Category' in df.columns: | |
| deceptive_count = len(df[df['Deceptive Design Category'].str.lower() != 'non-deceptive']) | |
| total_count = len(df) | |
| html_output = f""" | |
| <div style="padding: 20px; border: 1px solid var(--border-color-primary); border-radius: 8px; background-color: var(--block-background-fill); color: var(--body-text-color);"> | |
| <h3 style="color: var(--body-text-color); margin-top: 0;">Analysis Results</h3> | |
| <p style="color: var(--body-text-color);"><strong>Total elements analyzed:</strong> {total_count}</p> | |
| <p style="color: var(--body-text-color);"><strong>Potentially deceptive elements:</strong> {deceptive_count}</p> | |
| <p style="color: var(--body-text-color);"><strong>Non-deceptive elements:</strong> {total_count - deceptive_count}</p> | |
| </div> | |
| """ | |
| return html_output | |
| else: | |
| return "Analysis completed, but results format is unexpected." | |
| def create_annotated_screenshot(image_path, df, eval_dir=None): | |
| """ | |
| Create an annotated screenshot with bounding boxes for deceptive patterns. | |
| """ | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| print(f"[CONSOLE] Creating annotated screenshot from: {image_path}") | |
| try: | |
| # Load the original image | |
| image = Image.open(image_path) | |
| annotated_image = image.copy() | |
| draw = ImageDraw.Draw(annotated_image) | |
| # Define colors for different deceptive pattern categories | |
| color_map = { | |
| 'forced-action': '#FF0000', # Red | |
| 'interface-interference': '#FF8C00', # Dark Orange | |
| 'obstruction': '#800080', # Purple | |
| 'sneaking': '#FF1493', # Deep Pink | |
| 'confirmshaming': '#FF4500', # Orange Red | |
| 'nudge': '#32CD32', # Lime Green | |
| 'fake-scarcity-fake-urgency': '#FFD700', # Gold | |
| 'hard-to-cancel': '#DC143C', # Crimson | |
| 'pre-selection': '#8A2BE2', # Blue Violet | |
| 'visual-interference': '#FF6347', # Tomato | |
| 'jargon': '#4169E1', # Royal Blue | |
| 'hidden-subscription': '#B22222', # Fire Brick | |
| 'hidden-costs': '#CD5C5C', # Indian Red | |
| 'disguised-ads': '#FF69B4', # Hot Pink | |
| 'trick-wording': '#FF7F50', # Coral | |
| 'non-deceptive': '#90EE90' # Light Green (for non-deceptive elements) | |
| } | |
| # Default color for unknown categories | |
| default_color = '#FFFF00' # Yellow | |
| # Try to load a bigger font (at least 2x size) | |
| try: | |
| font = ImageFont.truetype("arial.ttf", 18) | |
| except: | |
| try: | |
| font = ImageFont.load_default().font_variant(size=18) | |
| except: | |
| font = ImageFont.load_default() | |
| deceptive_count = 0 | |
| non_deceptive_count = 0 | |
| # Track used text positions to avoid overlaps | |
| used_text_regions = [] | |
| # Draw bounding boxes for each element | |
| for idx, row in df.iterrows(): | |
| if 'Deceptive Design Category' not in df.columns: | |
| continue | |
| category = str(row.get('Deceptive Design Category', '')).lower().strip() | |
| subtype = str(row.get('Deceptive Design Subtype', '')).lower().strip() | |
| # Count deceptive vs non-deceptive elements | |
| if category == 'non-deceptive' or category == 'not-applicable': | |
| non_deceptive_count += 1 | |
| else: | |
| deceptive_count += 1 | |
| # Get bounding box coordinates | |
| x1, y1, x2, y2 = None, None, None, None | |
| # Method 1: Try to extract from 'Top Co-ordinates' and 'Bottom Co-ordinates' columns | |
| try: | |
| top_coords = row.get('Top Co-ordinates') | |
| bottom_coords = row.get('Bottom Co-ordinates') | |
| if top_coords is not None and bottom_coords is not None: | |
| # Parse tuple strings like "(10, 20)" or tuple objects | |
| if isinstance(top_coords, str): | |
| top_coords = top_coords.strip('()') | |
| x1, y1 = map(float, top_coords.split(',')) | |
| elif isinstance(top_coords, (tuple, list)): | |
| x1, y1 = float(top_coords[0]), float(top_coords[1]) | |
| if isinstance(bottom_coords, str): | |
| bottom_coords = bottom_coords.strip('()') | |
| x2, y2 = map(float, bottom_coords.split(',')) | |
| elif isinstance(bottom_coords, (tuple, list)): | |
| x2, y2 = float(bottom_coords[0]), float(bottom_coords[1]) | |
| except (ValueError, TypeError, AttributeError): | |
| # Method 2: Try direct coordinate columns (x1, y1, x2, y2) | |
| try: | |
| x1 = float(row.get('x1', 0)) | |
| y1 = float(row.get('y1', 0)) | |
| x2 = float(row.get('x2', 0)) | |
| y2 = float(row.get('y2', 0)) | |
| except (ValueError, TypeError): | |
| # Method 3: Try alternative coordinate column names (X1, Y1, X2, Y2) | |
| try: | |
| x1 = float(row.get('X1', 0)) | |
| y1 = float(row.get('Y1', 0)) | |
| x2 = float(row.get('X2', 0)) | |
| y2 = float(row.get('Y2', 0)) | |
| except (ValueError, TypeError): | |
| print(f"[CONSOLE] Warning: Could not extract coordinates for row {idx}") | |
| continue | |
| # Validate that all coordinates were successfully extracted | |
| if any(coord is None for coord in [x1, y1, x2, y2]): | |
| print(f"[CONSOLE] Warning: Missing coordinates for row {idx}") | |
| continue | |
| # Ensure coordinates are within image bounds | |
| x1 = max(0, min(x1, image.width)) | |
| x2 = max(0, min(x2, image.width)) | |
| y1 = max(0, min(y1, image.height)) | |
| y2 = max(0, min(y2, image.height)) | |
| # Ensure x1 <= x2 and y1 <= y2 (swap if necessary) | |
| if x1 > x2: | |
| x1, x2 = x2, x1 | |
| if y1 > y2: | |
| y1, y2 = y2, y1 | |
| # Skip if box is too small or invalid | |
| if (x2 - x1) < 5 or (y2 - y1) < 5: | |
| continue | |
| # Choose color based on category or subtype | |
| color = color_map.get(category, color_map.get(subtype, default_color)) | |
| # Draw bounding box | |
| draw.rectangle([x1, y1, x2, y2], outline=color, width=2) | |
| # Draw label | |
| text = f"{category}" | |
| if subtype and subtype != 'not-applicable' and subtype != 'n/a': | |
| text = f"{category}: {subtype}" | |
| # Get text dimensions | |
| text_bbox = draw.textbbox((0, 0), text, font=font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| # Function to check if a rectangle overlaps with any used regions | |
| def check_overlap(x, y, w, h, used_regions): | |
| new_rect = (x, y, x + w, y + h) | |
| for used_rect in used_regions: | |
| if not (new_rect[2] < used_rect[0] or new_rect[0] > used_rect[2] or | |
| new_rect[3] < used_rect[1] or new_rect[1] > used_rect[3]): | |
| return True | |
| return False | |
| # Try different positions for the text to avoid overlaps | |
| text_x = x1 | |
| text_y = None | |
| padding = 4 | |
| # Position 1: Above the bounding box | |
| candidate_y = y1 - text_height - padding | |
| if candidate_y >= 0: # Within image bounds | |
| # Adjust x position to stay within image bounds | |
| if text_x + text_width > image.width: | |
| text_x = image.width - text_width | |
| if text_x < 0: | |
| text_x = 0 | |
| # Check for overlaps | |
| if not check_overlap(text_x, candidate_y, text_width, text_height, used_text_regions): | |
| text_y = candidate_y | |
| # Position 2: Below the bounding box (if above didn't work) | |
| if text_y is None: | |
| candidate_y = y2 + padding | |
| if candidate_y + text_height <= image.height: # Within image bounds | |
| # Adjust x position to stay within image bounds | |
| text_x = x1 | |
| if text_x + text_width > image.width: | |
| text_x = image.width - text_width | |
| if text_x < 0: | |
| text_x = 0 | |
| # Check for overlaps | |
| if not check_overlap(text_x, candidate_y, text_width, text_height, used_text_regions): | |
| text_y = candidate_y | |
| # Position 3: To the right of the bounding box | |
| if text_y is None: | |
| candidate_x = x2 + padding | |
| if candidate_x + text_width <= image.width: # Within image bounds | |
| candidate_y = y1 | |
| if candidate_y + text_height > image.height: | |
| candidate_y = image.height - text_height | |
| if candidate_y < 0: | |
| candidate_y = 0 | |
| # Check for overlaps | |
| if not check_overlap(candidate_x, candidate_y, text_width, text_height, used_text_regions): | |
| text_x = candidate_x | |
| text_y = candidate_y | |
| # Position 4: To the left of the bounding box | |
| if text_y is None: | |
| candidate_x = x1 - text_width - padding | |
| if candidate_x >= 0: # Within image bounds | |
| candidate_y = y1 | |
| if candidate_y + text_height > image.height: | |
| candidate_y = image.height - text_height | |
| if candidate_y < 0: | |
| candidate_y = 0 | |
| # Check for overlaps | |
| if not check_overlap(candidate_x, candidate_y, text_width, text_height, used_text_regions): | |
| text_x = candidate_x | |
| text_y = candidate_y | |
| # Position 5: Find any available space (fallback) | |
| if text_y is None: | |
| # Try to find space by scanning the image in a grid pattern | |
| step_size = 20 | |
| found = False | |
| for scan_y in range(0, image.height - text_height, step_size): | |
| if found: | |
| break | |
| for scan_x in range(0, image.width - text_width, step_size): | |
| if not check_overlap(scan_x, scan_y, text_width, text_height, used_text_regions): | |
| text_x = scan_x | |
| text_y = scan_y | |
| found = True | |
| break | |
| # Last resort: place at top-left corner (may overlap) | |
| if text_y is None: | |
| text_x = 0 | |
| text_y = 0 | |
| # Draw text background rectangle | |
| draw.rectangle([text_x, text_y, text_x + text_width, text_y + text_height], | |
| fill=color, outline=color) | |
| # Draw text | |
| draw.text((text_x, text_y), text, fill='white', font=font) | |
| # Add this text region to used regions to prevent future overlaps | |
| used_text_regions.append((text_x, text_y, text_x + text_width, text_y + text_height)) | |
| print(f"[CONSOLE] Annotated screenshot created with {deceptive_count} deceptive patterns and {non_deceptive_count} non-deceptive elements highlighted") | |
| # Save annotated image to temporary file | |
| if eval_dir: | |
| # Create in the managed temp directory that will be cleaned up | |
| temp_filename = os.path.join(eval_dir, "annotated_screenshot.png") | |
| annotated_image.save(temp_filename) | |
| return temp_filename | |
| else: | |
| # Fallback to system temp directory | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.png', delete=False) | |
| annotated_image.save(temp_file.name) | |
| return temp_file.name | |
| except Exception as e: | |
| print(f"[CONSOLE] Error creating annotated screenshot: {e}") | |
| print(f"[CONSOLE] Falling back to original image: {image_path}") | |
| # Return the original image path as fallback | |
| return image_path | |
| def load_debug_table_data(repo_id, split_name): | |
| """Load pre-analyzed table from HuggingFace dataset.""" | |
| from datasets import load_dataset | |
| print(f"[CONSOLE] [DEBUG MODE] Loading table data from repo: {repo_id}, split: {split_name}") | |
| try: | |
| dataset = load_dataset(repo_id, split=split_name) | |
| df = dataset.to_pandas() | |
| df = df[["Text", "Element Type", "Top Co-ordinates", "Bottom Co-ordinates", "Font Size", "Background Color", "Font Color", "Deceptive Design Category", "Deceptive Design Subtype", "Reasoning"]] | |
| print(f"[CONSOLE] [DEBUG MODE] Loaded table with {len(df)} rows") | |
| return df | |
| except Exception as e: | |
| print(f"[CONSOLE] [DEBUG MODE] Error loading table data: {e}") | |
| # Return a dummy dataframe as fallback | |
| return pd.DataFrame({ | |
| 'Text': ['Sample Button', 'Sample Checkbox'], | |
| 'Element Type': ['button', 'checked checkbox'], | |
| 'Top Co-ordinates': ['(100, 100)', '(200, 200)'], | |
| 'Bottom Co-ordinates': ['(200, 150)', '(250, 230)'], | |
| 'Deceptive Design Category': ['forced-action', 'non-deceptive'], | |
| 'Deceptive Design Subtype': ['obstruction', 'not-applicable'] | |
| }) | |
| def load_debug_images(repo_id, image_id): | |
| """Load original and annotated images from HuggingFace dataset.""" | |
| from datasets import load_dataset | |
| print(f"[CONSOLE] [DEBUG MODE] Loading images from repo: {repo_id}, image_id: {image_id}") | |
| try: | |
| dataset = load_dataset(repo_id, split='train') | |
| # Find the record with matching ID | |
| for record in dataset: | |
| if record.get('id') == image_id: | |
| original_image = record.get('image') | |
| annotated_image = record.get('annotated') | |
| # Save images to temporary files | |
| original_path = None | |
| annotated_path = None | |
| if original_image: | |
| temp_original = tempfile.NamedTemporaryFile(suffix='.png', delete=False) | |
| if hasattr(original_image, 'save'): | |
| original_image.save(temp_original.name) | |
| original_path = temp_original.name | |
| print(f"[CONSOLE] [DEBUG MODE] Original image saved to: {original_path}") | |
| if annotated_image: | |
| temp_annotated = tempfile.NamedTemporaryFile(suffix='.png', delete=False) | |
| if hasattr(annotated_image, 'save'): | |
| annotated_image.save(temp_annotated.name) | |
| annotated_path = temp_annotated.name | |
| print(f"[CONSOLE] [DEBUG MODE] Annotated image saved to: {annotated_path}") | |
| return original_path, annotated_path | |
| print(f"[CONSOLE] [DEBUG MODE] Image ID '{image_id}' not found in dataset") | |
| return None, None | |
| except Exception as e: | |
| print(f"[CONSOLE] [DEBUG MODE] Error loading images: {e}") | |
| return None, None | |
| def create_mock_analysis_with_debug_data(debug_table_df, debug_original_image, debug_annotated_image, eval_dir): | |
| """ | |
| Simulate the analysis pipeline using debug data with time delays. | |
| Yields progress updates like the real function. | |
| """ | |
| print(f"[CONSOLE] [DEBUG MODE] Starting mock analysis pipeline") | |
| try: | |
| # Create necessary directories | |
| screenshots_dir = os.path.join(eval_dir, "screenshots") | |
| ocr_dir = os.path.join(eval_dir, "ocr") | |
| yolo_dir = os.path.join(eval_dir, "yolo") | |
| csv_yolo_dir = os.path.join(eval_dir, "csv_with_yolo") | |
| gemini_fs_dir = os.path.join(eval_dir, "gemini_fs") | |
| for d in [screenshots_dir, ocr_dir, yolo_dir, csv_yolo_dir, gemini_fs_dir]: | |
| os.makedirs(d, exist_ok=True) | |
| # Step 1: Taking screenshot | |
| print(f"[CONSOLE] [DEBUG MODE] STEP 1/6: Mock screenshot capture") | |
| yield (None, "Step 1/6: Taking screenshot of the website...", None, eval_dir) | |
| time.sleep(2) | |
| # Copy debug original image to screenshots directory | |
| screenshot_path = os.path.join(screenshots_dir, "screenshot.png") | |
| if debug_original_image and os.path.exists(debug_original_image): | |
| shutil.copy(debug_original_image, screenshot_path) | |
| print(f"[CONSOLE] [DEBUG MODE] Copied original image to: {screenshot_path}") | |
| yield (None, "π· Screenshot captured! Starting analysis...", screenshot_path, eval_dir) | |
| # Step 2: Setup directories | |
| print(f"[CONSOLE] [DEBUG MODE] STEP 2/6: Setting up directories") | |
| yield (None, "Step 2/6: Setting up processing directories...", screenshot_path, eval_dir) | |
| time.sleep(0.5) | |
| # Step 3: Run OCR (mock) | |
| print(f"[CONSOLE] [DEBUG MODE] STEP 3/6: Mock OCR analysis") | |
| yield (None, "Step 3/6: Running OCR analysis...", screenshot_path, eval_dir) | |
| time.sleep(0.2) | |
| print(f"[CONSOLE] [DEBUG MODE] Mock OCR completed") | |
| # Step 4: Run YOLO (mock) | |
| print(f"[CONSOLE] [DEBUG MODE] STEP 4/6: Mock YOLO detection") | |
| yield (None, "Step 4/6: Running YOLO object detection...", screenshot_path, eval_dir) | |
| time.sleep(1) | |
| print(f"[CONSOLE] [DEBUG MODE] Mock YOLO completed") | |
| # Step 5: Combine results (mock) | |
| print(f"[CONSOLE] [DEBUG MODE] STEP 5/6: Mock combining results") | |
| yield (None, "Step 5/6: Combining OCR and element detection results...", screenshot_path, eval_dir) | |
| time.sleep(0.3) | |
| print(f"[CONSOLE] [DEBUG MODE] Mock combining completed") | |
| # Step 6: Gemini analysis (mock) | |
| print(f"[CONSOLE] [DEBUG MODE] STEP 6/6: Mock Gemini analysis") | |
| yield (None, "Step 6/6: Analyzing for deceptive patterns with Gemini...", screenshot_path, eval_dir) | |
| yield (None, "π§ Preparing data for Gemini analysis...", screenshot_path, eval_dir) | |
| total_elements = len(debug_table_df) if debug_table_df is not None else 0 | |
| yield (None, f"π Processing {total_elements} UI elements for deceptive pattern analysis...", screenshot_path, eval_dir) | |
| time.sleep(0.4) | |
| print(f"[CONSOLE] [DEBUG MODE] Mock Gemini analysis completed") | |
| # Return the debug data | |
| deceptive_count = 0 | |
| if debug_table_df is not None and 'Deceptive Design Category' in debug_table_df.columns: | |
| deceptive_count = len(debug_table_df[debug_table_df['Deceptive Design Category'].str.lower() != 'non-deceptive']) | |
| yield (None, f"π Analysis complete! Found {deceptive_count} deceptive patterns out of {total_elements} UI elements", screenshot_path, eval_dir) | |
| yield (None, "π¨ Creating annotated screenshot with colored highlights...", screenshot_path, eval_dir) | |
| # Use the debug annotated image | |
| annotated_path = screenshot_path | |
| if debug_annotated_image and os.path.exists(debug_annotated_image): | |
| annotated_path = os.path.join(eval_dir, "annotated_screenshot.png") | |
| shutil.copy(debug_annotated_image, annotated_path) | |
| print(f"[CONSOLE] [DEBUG MODE] Copied annotated image to: {annotated_path}") | |
| status_message = "β Analysis complete! All elements annotated with colored bounding boxes." | |
| yield (debug_table_df, status_message, annotated_path, eval_dir) | |
| print(f"[CONSOLE] [DEBUG MODE] Mock analysis pipeline completed successfully") | |
| except Exception as e: | |
| print(f"[CONSOLE] [DEBUG MODE] Error in mock analysis: {str(e)}") | |
| yield (None, f"β Error in debug mode: {str(e)}", None, eval_dir) | |
| raise gr.Error(f"Debug mode error: {str(e)}") | |
| # Create the Gradio interface | |
| def create_interface(): | |
| global scheduler, dataset_dir, jsonl_path | |
| with gr.Blocks(title="Deceptive Pattern Detector", theme=gr.themes.Soft()) as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; margin-bottom: 30px;"> | |
| <h1>π Deceptive Pattern Detector</h1> | |
| <p style="font-size: 18px; color: #666;"> | |
| Enter a website URL to analyze for deceptive design patterns | |
| </p> | |
| <div style="margin-top: 12px;"> | |
| <a href="https://arxiv.org/abs/2411.07441" target="_blank" rel="noopener noreferrer" aria-label="Read our arXiv paper" | |
| style="display: inline-block; padding: 10px 14px; border-radius: 999px; background: #2563eb; color: white; text-decoration: none; font-weight: 600; box-shadow: 0 6px 16px rgba(37, 99, 235, 0.35);"> | |
| π Read our paper on arXiv | |
| </a> | |
| </div> | |
| </div> | |
| """) | |
| # How to Use section - collapsible accordion with tab format | |
| with gr.Tabs(): | |
| with gr.TabItem("π Privacy Policy"): | |
| gr.HTML(""" | |
| <div style="padding: 20px; background-color: var(--block-background-fill); border-radius: 8px; border-left: 4px solid #28a745; border: 1px solid var(--border-color-primary);"> | |
| <div style="display: flex; gap: 20px; flex-wrap: wrap; align-items: stretch;"> | |
| <!-- Left Column: Privacy Highlights --> | |
| <div style="flex: 1; min-width: 300px; display: flex; flex-direction: column;"> | |
| <div style="margin-bottom: 16px; padding: 16px; background-color: var(--block-background-fill); border-radius: 6px; border: 1px solid #10b981; opacity: 0.9;"> | |
| <div style="margin-bottom: 12px;"> | |
| <strong style="color: #10b981;">π API Keys:</strong> <span style="color: var(--body-text-color); font-size: 14px; line-height: 1.6;">We <strong style="color: #dc2626;">NEVER</strong> save or store your Gemini API keys. They are only used temporarily in memory during your analysis session and are immediately discarded.</span> | |
| </div> | |
| <div style="margin-bottom: 0;"> | |
| <strong style="color: #8b5cf6;">π« No PII Storage:</strong> <span style="color: var(--body-text-color); font-size: 14px; line-height: 1.6;">We do not store any Personally Identifiable Information (PII), including API keys, user identifiers, or sensitive data from analyzed websites.</span> | |
| </div> | |
| </div> | |
| </div> | |
| <!-- Right Column: Data Usage --> | |
| <div style="flex: 1; min-width: 300px; display: flex; flex-direction: column; gap: 12px;"> | |
| <div style="padding: 16px; background-color: var(--block-background-fill); border-radius: 6px; border: 1px solid #f59e0b; opacity: 0.9; flex-grow: 1;"> | |
| <strong style="color: #f59e0b;">π Website URLs & Classifications:</strong> | |
| <ul style="margin: 8px 0; padding-left: 20px; color: var(--body-text-color); font-size: 14px; line-height: 1.6;"> | |
| <li style="margin-bottom: 6px;">We <strong style="color: #f59e0b;">may</strong> save the websites you analyze (URLs only) and their corresponding deceptive pattern classifications</li> | |
| <li style="margin-bottom: 6px;">This data helps us improve our detection system and fine-tune our framework</li> | |
| <li style="margin-bottom: 6px;">No personal information is linked to this data</li> | |
| <li>This data is used solely for research and system improvement purposes</li> | |
| </ul> | |
| </div> | |
| </div> | |
| </div> | |
| <div style="margin-top: 20px; padding: 15px; background-color: var(--block-background-fill); border-radius: 6px; border: 1px solid #10b981; text-align: center;"> | |
| <strong style="color: #10b981;">β Summary:</strong> <span style="color: var(--body-text-color); font-size: 14px;">Your API keys are never stored. Anonymized URL and classification data may be retained for system improvement.</span> | |
| </div> | |
| </div> | |
| """) | |
| with gr.TabItem("βΉοΈ How to Use"): | |
| gr.HTML(""" | |
| <div style="padding: 20px; background-color: var(--block-background-fill); border-radius: 8px; border-left: 4px solid #2196F3; border: 1px solid var(--border-color-primary);"> | |
| <div style="display: flex; gap: 20px; flex-wrap: wrap; align-items: stretch;"> | |
| <!-- Left Column: Steps --> | |
| <div style="flex: 1; min-width: 300px; display: flex; flex-direction: column;"> | |
| <ol style="margin: 0; color: var(--body-text-color); line-height: 1.6; flex-grow: 1;"> | |
| <li style="margin-bottom: 12px;"><strong>Enter URL:</strong> Provide the website URL you want to analyze (must start with http:// or https://)</li> | |
| <li style="margin-bottom: 12px;"><strong>API Key:</strong> Enter your Google Gemini API key (get a free one at <a href="https://makersuite.google.com/app/apikey" target="_blank" style="color: #2196F3;">Google AI Studio</a>). We may make 1-2 Gemini-2.5-Pro API calls per analysis.</li> | |
| <li style="margin-bottom: 12px;"><strong>Analyze:</strong> Click the analyze button and watch as the screenshot appears and the analysis runs</li> | |
| <li style="margin-bottom: 12px;"><strong>Review:</strong> The annotated screenshot will show all elements with colored bounding boxes (light green for non-deceptive, various colors for deceptive patterns). Rerun the analysis if the detailed results and annotation mismatch.</li> | |
| <li style="margin-bottom: 12px;"><strong>Note:</strong> E2E Analysis time may range from <5 sec to 5 mins based on various factors such as cloud infrastructure, demand, amount of text on page</li> | |
| </ol> | |
| </div> | |
| <!-- Right Column: Disclaimer and Technical Info --> | |
| <div style="flex: 1; min-width: 300px; display: flex; flex-direction: column; gap: 12px;"> | |
| <div style="padding: 12px; background-color: var(--block-background-fill); border-radius: 4px; border: 1px solid var(--border-color-accent); opacity: 0.9; flex-grow: 1;"> | |
| <strong style="color: #ff9800;">β οΈ Disclaimer:</strong> <span style="color: var(--body-text-color); font-size: 14px; line-height: 1.6;">This tool uses AI analysis and may not catch all deceptive patterns or may flag legitimate design elements. Use as a supplementary guide only.</span> | |
| </div> | |
| <div style="padding: 12px; background-color: var(--block-background-fill); border-radius: 4px; border: 1px solid var(--border-color-accent); opacity: 0.9; flex-grow: 1;"> | |
| <strong style="color: var(--body-text-color);">π· Screenshot Method:</strong> | |
| <ul style="margin: 8px 0; padding-left: 20px; color: var(--body-text-color); font-size: 14px; line-height: 1.6;"> | |
| <li style="margin-bottom: 6px;"><strong>Selenium WebDriver:</strong> Automatic screenshots using Chrome in headless mode (~1280x1080)</li> | |
| <li><span style="color: #dc2626; font-weight: bold;">Static capture of front page only (no scrolling), with 5 second wait from initial page load</span></li> | |
| </ul> | |
| </div> | |
| </div> | |
| </div> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Input section | |
| gr.Markdown("### π Website Analysis") | |
| # with gr.Tabs(): | |
| # with gr.TabItem("π± URL Analysis"): | |
| url_input = gr.Textbox( | |
| type="text", | |
| label="Website URL (Required)", | |
| placeholder="https://example.com" | |
| ) | |
| gemini_api_key = gr.Textbox( | |
| type="password", | |
| label="Gemini API Key (Required)", | |
| placeholder="Enter your Google Gemini API key...", | |
| info='Create your free API key by visiting <a href="https://makersuite.google.com/app/apikey" target="_blank" style="color: #2196F3;">Google AI Studio</a>' | |
| ) | |
| # Expandable guide for getting API key | |
| with gr.Accordion("β How to get a free Gemini API key (Step-by-step guide)", open=False): | |
| gr.HTML(""" | |
| <div style="padding: 15px; background-color: var(--block-background-fill); border-radius: 8px; border-left: 4px solid #4CAF50;"> | |
| <h4 style="color: var(--body-text-color); margin-top: 0; margin-bottom: 15px;">π Get Your Free Google Gemini API Key:</h4> | |
| <div style="margin-bottom: 20px;"> | |
| <ol style="color: var(--body-text-color); line-height: 1.8; margin: 0; padding-left: 20px;"> | |
| <li style="margin-bottom: 10px;"> | |
| <strong>Visit Google AI Studio:</strong> Go to | |
| <a href="https://makersuite.google.com/app/apikey" target="_blank" style="color: #2196F3; text-decoration: underline;"> | |
| https://makersuite.google.com/app/apikey | |
| </a> | |
| </li> | |
| <li style="margin-bottom: 10px;"> | |
| <strong>Sign in:</strong> Use your Google account to sign in (create one if needed) | |
| </li> | |
| <li style="margin-bottom: 10px;"> | |
| <strong>Create API Key:</strong> Click the "Create API Key" button | |
| </li> | |
| <li style="margin-bottom: 10px;"> | |
| <strong>Select Project:</strong> Choose an existing Google Cloud project or create a new one | |
| </li> | |
| <li style="margin-bottom: 10px;"> | |
| <strong>Copy Key:</strong> Once generated, copy the API key to your clipboard | |
| </li> | |
| <li style="margin-bottom: 10px;"> | |
| <strong>Paste Here:</strong> Paste the API key into the field above and start analyzing! | |
| </li> | |
| </ol> | |
| </div> | |
| <div style="padding: 12px; background-color: var(--background-fill-secondary); border-radius: 6px; border: 1px solid var(--border-color-accent);"> | |
| <div style="display: flex; align-items: center; gap: 8px; margin-bottom: 8px;"> | |
| <span style="font-size: 16px;">π‘</span> | |
| <strong style="color: var(--body-text-color);">Pro Tips:</strong> | |
| </div> | |
| <ul style="color: var(--body-text-color); font-size: 14px; line-height: 1.6; margin: 0; padding-left: 20px;"> | |
| <li style="margin-bottom: 6px;">This tool typically uses 1-2 API calls per analysis</li> | |
| <li>Your API key is never <strong>STORED</strong> by this application</li> | |
| </ul> | |
| </div> | |
| </div> | |
| """) | |
| analyze_url_btn = gr.Button( | |
| "π Analyze Website URL", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # Status moved to left column | |
| gr.Markdown("### π Analysis Status") | |
| status_text = gr.Textbox( | |
| label="Status", | |
| value="Ready to analyze...", | |
| lines=2, | |
| interactive=False | |
| ) | |
| # Results display moved to left column | |
| results_display = gr.HTML( | |
| value="<div style='text-align: center; padding: 40px; color: var(--body-text-color); opacity: 0.7;'>Enter a URL and click analyze to see results here.</div>" | |
| ) | |
| with gr.Column(scale=3): | |
| # Screenshot section - only screenshot in right column | |
| gr.Markdown("### π· Website Screenshot") | |
| # Placeholder container for screenshot | |
| screenshot_placeholder = gr.HTML( | |
| value=""" | |
| <div style=" | |
| border: 2px dashed var(--border-color-primary); | |
| border-radius: 12px; | |
| padding: 60px 20px; | |
| text-align: center; | |
| background-color: var(--block-background-fill); | |
| background-image: | |
| radial-gradient(circle at 20% 50%, var(--border-color-accent) 0%, transparent 50%), | |
| radial-gradient(circle at 80% 50%, var(--border-color-accent) 0%, transparent 50%); | |
| background-size: 100px 100px; | |
| background-position: 0 0, 50px 50px; | |
| min-height: 400px; | |
| display: flex; | |
| flex-direction: column; | |
| justify-content: center; | |
| align-items: center; | |
| opacity: 0.8; | |
| "> | |
| <div style=" | |
| background-color: var(--block-background-fill); | |
| padding: 20px 30px; | |
| border-radius: 8px; | |
| border: 1px solid var(--border-color-primary); | |
| backdrop-filter: blur(10px); | |
| "> | |
| <h3 style=" | |
| color: var(--body-text-color); | |
| margin: 0 0 10px 0; | |
| font-size: 20px; | |
| ">π· Screenshot Preview Area</h3> | |
| <p style=" | |
| color: var(--body-text-color); | |
| margin: 0; | |
| opacity: 0.8; | |
| font-size: 16px; | |
| line-height: 1.5; | |
| "> | |
| Website screenshots will appear here during analysis.<br> | |
| <span style="font-size: 14px; opacity: 0.7;"> | |
| Original screenshot β Annotated with deceptive pattern highlights | |
| </span> | |
| </p> | |
| </div> | |
| </div> | |
| """, | |
| visible=True | |
| ) | |
| screenshot_display = gr.Image( | |
| label="Website Screenshot", | |
| visible=False, | |
| interactive=False | |
| ) | |
| # Detailed results table spanning both columns (full width) | |
| results_dataframe = gr.Dataframe( | |
| label="Detailed Results (Scroll right to see all columns)", | |
| visible=False, | |
| wrap=True, | |
| column_widths=["14%", "5%", "8%", "8%", "3%", "11%", "11%", "9%", "9%", "19%"] # First column (Text) gets 15% width, others auto-sized | |
| ) | |
| # Download button for results CSV | |
| download_btn = gr.DownloadButton( | |
| label="π₯ Download Results as CSV", | |
| visible=False, | |
| variant="secondary" | |
| ) | |
| # Event handlers | |
| def save_results_to_csv(df, url): | |
| """Save the results dataframe to a CSV file named after the analyzed site.""" | |
| if df is None or (isinstance(df, pd.DataFrame) and df.empty): | |
| return None | |
| # Create a safe filename from the URL | |
| safe_filename = url.lower().replace("http://", "").replace("https://", "").strip().replace("www.", "") \ | |
| .replace(".", "_") \ | |
| .replace("/", "_") \ | |
| .replace("-", "_") \ | |
| .replace("=", "_") \ | |
| .replace("?", "_") \ | |
| .replace("&", "_") \ | |
| .replace("%", "_") \ | |
| .replace(":", "_") \ | |
| .replace("#", "_") \ | |
| .replace("'", "_") \ | |
| .replace('"', "_") \ | |
| .replace("*", "_") \ | |
| .replace("<", "_") \ | |
| .replace(">", "_") \ | |
| .replace("|", "_") \ | |
| .replace(" ", "_") | |
| # Trim if too long and ensure it doesn't end with underscore | |
| safe_filename = safe_filename[:100].rstrip("_") | |
| # Create final filename | |
| csv_filename = f"{safe_filename}.csv" | |
| # Create a temporary file for download | |
| temp_csv = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv', prefix='analysis_') | |
| csv_path = temp_csv.name | |
| temp_csv.close() | |
| # Save dataframe to CSV | |
| if isinstance(df, pd.DataFrame): | |
| df.to_csv(csv_path, index=False) | |
| else: | |
| # If it's not a DataFrame, try to convert it | |
| pd.DataFrame(df).to_csv(csv_path, index=False) | |
| # Rename to the site-specific filename for download | |
| final_path = os.path.join(os.path.dirname(csv_path), csv_filename) | |
| shutil.copy(csv_path, final_path) | |
| os.remove(csv_path) | |
| print(f"[CONSOLE] CSV file created for download: {final_path} (filename: {csv_filename})") | |
| return final_path | |
| def handle_url_analysis(url, api_key): | |
| """Handle URL analysis with screenshot capture.""" | |
| print(f"[CONSOLE] handle_url_analysis called with URL: {url}") | |
| print(f"[CONSOLE] API key provided: {'Yes' if api_key else 'No'}") | |
| eval_dir_for_cleanup = None # Track eval_dir for cleanup | |
| try: | |
| print(f"[CONSOLE] Starting analysis generator for URL: {url}") | |
| # Clear any previous error messages at the start of new analysis | |
| yield ( | |
| "π Starting new analysis...", | |
| gr.update(visible=True), # Show placeholder initially | |
| gr.update(visible=False), # Hide screenshot initially | |
| "<div style='text-align: center; padding: 20px; color: var(--body-text-color); opacity: 0.7;'>Preparing analysis...</div>", # Clear previous errors | |
| gr.update(visible=False), # Hide dataframe | |
| gr.update(visible=False) # Hide download button | |
| ) | |
| analysis_generator = take_screenshot_and_process(url, api_key) | |
| final_result = None | |
| final_image = None | |
| original_image = None # Track original screenshot separately for dataset upload | |
| print(f"[CONSOLE] Processing generator results...") | |
| for result_tuple in analysis_generator: | |
| if len(result_tuple) == 4: | |
| dataframe_result, status_update, image_path, eval_dir = result_tuple | |
| eval_dir_for_cleanup = eval_dir # Store for cleanup | |
| if dataframe_result is None: | |
| # Progress update - show screenshot if available and clear any previous error messages | |
| if image_path: | |
| # Store the first image as original (before annotation) | |
| if original_image is None: | |
| original_image = image_path | |
| final_image = image_path # Update the current image for display | |
| yield ( | |
| status_update, | |
| gr.update(visible=False), # Hide placeholder | |
| gr.update(value=image_path, visible=True, label="π· Original Screenshot"), # Show original screenshot | |
| "<div style='text-align: center; padding: 20px; color: var(--body-text-color); opacity: 0.7;'>Analysis in progress...</div>", # Clear previous errors | |
| gr.update(visible=False), | |
| gr.update(visible=False) # Hide download button | |
| ) | |
| else: | |
| yield ( | |
| status_update, | |
| gr.update(visible=True), # Keep placeholder visible | |
| gr.update(visible=False), # Hide screenshot | |
| "<div style='text-align: center; padding: 20px; color: var(--body-text-color); opacity: 0.7;'>Analysis in progress...</div>", # Clear previous errors | |
| gr.update(visible=False), | |
| gr.update(visible=False) # Hide download button | |
| ) | |
| else: | |
| print(f"[CONSOLE] Received final result with {len(dataframe_result)} rows") | |
| final_result = dataframe_result | |
| final_status = status_update | |
| final_image = image_path # This will be the annotated image for display | |
| # Store the first image as original if not already set | |
| if original_image is None: | |
| original_image = image_path | |
| # Clear any previous errors when we get successful results | |
| yield ( | |
| final_status, | |
| gr.update(visible=False), # Hide placeholder | |
| gr.update(value=final_image, visible=True, label="π― Annotated Screenshot (Analysis Complete)") if final_image else gr.update(visible=False), | |
| "<div style='text-align: center; padding: 20px; color: var(--body-text-color); opacity: 0.7;'>Processing results...</div>", # Clear previous errors | |
| gr.update(visible=False), | |
| gr.update(visible=False) # Hide download button | |
| ) | |
| break | |
| # Generator approach provides real-time notifications automatically | |
| if final_result is not None: | |
| print(f"[CONSOLE] Creating result display HTML") | |
| results_html = create_result_display(final_result) | |
| print(f"[CONSOLE] Yielding final results to UI") | |
| save_url = url.lower().replace("http://", "").replace("https://", "").strip().replace("www.", "") \ | |
| .replace(".", "_x01x_") \ | |
| .replace("/", "_x02x_") \ | |
| .replace("-", "_x03x_") \ | |
| .replace("=", "_x04x_") \ | |
| .replace("?", "_x05x_") \ | |
| .replace("&", "_x06x_") \ | |
| .replace("%", "_x07x_") \ | |
| .replace(":", "_x08x_") \ | |
| .replace("#", "_x09x_") \ | |
| .replace("'", "_x10x_") \ | |
| .replace('"', "_x11x_") \ | |
| .replace("*", "_x12x_") \ | |
| .replace("<", "_x13x_") \ | |
| .replace(">", "_x14x_") \ | |
| .replace("|", "_x15x_") | |
| save_url = save_url + "__" + str(uuid.uuid4()).replace("-", "_") | |
| save_dict = { | |
| save_url: final_result | |
| } | |
| # Create DataFrame for image dataset with "id" and "image" columns | |
| # Use original screenshot (not annotated) for dataset upload | |
| dataset_image_path = original_image if original_image else final_image | |
| annotated_image_path = final_image if final_image else original_image | |
| print(f"[CONSOLE] Using image for dataset upload: {dataset_image_path} (original: {original_image}, final: {final_image})") | |
| print(f"[CONSOLE] Using annotated image for display: {annotated_image_path} (original: {original_image}, final: {final_image})") | |
| if dataset_image_path and os.path.exists(dataset_image_path) and annotated_image_path and os.path.exists(annotated_image_path): | |
| try: | |
| # Load the original image using PIL | |
| pil_image = Image.open(dataset_image_path) | |
| pil_final = Image.open(annotated_image_path) | |
| # Convert to RGB if needed (removes alpha channel if present) | |
| if pil_image.mode != 'RGB': | |
| pil_image = pil_image.convert('RGB') | |
| if pil_final.mode != 'RGB': | |
| pil_final = pil_final.convert('RGB') | |
| image_df = pd.DataFrame([{"id": save_url, "image": pil_image, "annotated_image": pil_final}]) | |
| print(f"[CONSOLE] Loaded original image for dataset: {dataset_image_path} -> PIL Image {pil_image.size}") | |
| except Exception as e: | |
| print(f"[CONSOLE] Error loading image {dataset_image_path}: {e}") | |
| # Fallback to path if image loading fails | |
| image_df = pd.DataFrame([{"id": save_url, "image": dataset_image_path, "annotated_image": annotated_image_path}]) | |
| else: | |
| print(f"[CONSOLE] Warning: Image path not found or invalid: {dataset_image_path}") | |
| image_df = pd.DataFrame([{"id": save_url, "image": None, "annotated_image": None}]) | |
| if not DEBUG_MODE: | |
| dataset_upload.update_dataset_with_new_splits(save_dict) | |
| dataset_upload.update_dataset_with_new_images(image_df, scheduler=scheduler, dataset_dir=dataset_dir, jsonl_path=jsonl_path) | |
| # Prepare CSV for download | |
| csv_file_path = save_results_to_csv(final_result, url) | |
| # Show final results with annotated screenshot | |
| yield ( | |
| final_status, | |
| gr.update(visible=False), # Hide placeholder | |
| gr.update(value=final_image, visible=True, label="π― Annotated Screenshot (Analysis Complete)") if final_image else gr.update(visible=False), | |
| results_html, | |
| gr.update(value=final_result, visible=True), | |
| gr.update(value=csv_file_path, visible=True) if csv_file_path else gr.update(visible=False) | |
| ) | |
| # Clean up temporary files after successful display | |
| # Add small delay to let frontend finish loading images before cleanup | |
| time.sleep(5) # Give frontend time to load the images | |
| cleanup_temp_directory(eval_dir_for_cleanup) | |
| else: | |
| print(f"[CONSOLE] No final result generated, analysis failed") | |
| # Clean up temp files even on failure | |
| cleanup_temp_directory(eval_dir_for_cleanup) | |
| yield ( | |
| "β Analysis failed - no results generated", | |
| gr.update(visible=True), # Show placeholder again | |
| gr.update(visible=False, label="Website Screenshot"), # Hide screenshot and reset label | |
| "<div style='color: #ef4444; text-align: center; background-color: var(--block-background-fill); padding: 15px; border-radius: 8px; border: 1px solid #ef4444; opacity: 0.9;'>Analysis failed. Please check your Gemini API key and try again.</div>", | |
| gr.update(visible=False), | |
| gr.update(visible=False) # Hide download button | |
| ) | |
| except Exception as e: | |
| print(f"[CONSOLE] Exception in handle_url_analysis: {str(e)}") | |
| print(f"[CONSOLE] Exception type: {type(e).__name__}") | |
| # Clean up temp files on exception | |
| cleanup_temp_directory(eval_dir_for_cleanup) | |
| error_msg = f"β Error: {str(e)}" | |
| yield ( | |
| error_msg, | |
| gr.update(visible=True), # Show placeholder again | |
| gr.update(visible=False, label="Website Screenshot"), # Hide screenshot and reset label | |
| f"<div style='color: #ef4444; text-align: center; background-color: var(--block-background-fill); padding: 15px; border-radius: 8px; border: 1px solid #ef4444; opacity: 0.9;'>{error_msg}</div>", | |
| gr.update(visible=False), | |
| gr.update(visible=False) # Hide download button | |
| ) | |
| if e.__class__ == gr.exceptions.Error: | |
| raise e | |
| # Connect the analyze buttons | |
| print(f"[CONSOLE] Setting up button click handlers") | |
| analyze_url_btn.click( | |
| fn=handle_url_analysis, | |
| inputs=[url_input, gemini_api_key], | |
| outputs=[status_text, screenshot_placeholder, screenshot_display, results_display, results_dataframe, download_btn], | |
| show_progress="full" | |
| ) | |
| return demo | |
| # Create unique directory for this session using temp directory | |
| session_id = str(uuid.uuid4())[:8] | |
| temp_base = Path(tempfile.gettempdir()) / "deceptive_pattern_images" | |
| dataset_dir = temp_base / f"{session_id}" | |
| dataset_dir.mkdir(parents=True, exist_ok=True) | |
| jsonl_path = dataset_dir / "metadata.jsonl" | |
| scheduler = CommitScheduler( | |
| repo_id=os.environ["IMAGE_REPO_ID"], | |
| repo_type="dataset", | |
| folder_path=dataset_dir, | |
| path_in_repo=dataset_dir.name, | |
| token=os.environ["HF_TOKEN"], | |
| every=1 | |
| ) | |
| # Create and launch the interface | |
| if __name__ == "__main__": | |
| # import torch | |
| # | |
| # print(f"Is CUDA available: {torch.cuda.is_available()}") | |
| # print(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") | |
| from py_files.utils import decrypt_system_prompts | |
| if os.path.exists("./system_prompt.txt") and os.path.exists("./system_prompt_thinking.txt"): | |
| print(f"[CONSOLE] System prompts already decrypted, skipping decryption step") | |
| else: | |
| print(f"[CONSOLE] Decrypting system prompts...") | |
| if not decrypt_system_prompts(): | |
| print(f"[CONSOLE] Failed to decrypt system prompts, exiting...") | |
| exit(1) | |
| # ===== DEBUG MODE CONFIGURATION ===== | |
| # Check if debug mode is enabled via environment variable | |
| debug_mode_env = os.environ.get("DEBUG_MODE", "false").lower() | |
| if debug_mode_env in ["true", "1", "yes", "on"]: | |
| DEBUG_MODE = True | |
| print(f"[CONSOLE] ===== DEBUG MODE ENABLED =====") | |
| # Get debug configuration from environment variables | |
| debug_table_split = os.environ.get("DEBUG_TABLE_SPLIT", "") | |
| debug_image_id = os.environ.get("DEBUG_TABLE_SPLIT", "") | |
| print(f"[CONSOLE] [DEBUG MODE] Table Split: {debug_table_split}") | |
| print(f"[CONSOLE] [DEBUG MODE] Image ID: {debug_image_id}") | |
| # Load debug data from HuggingFace datasets | |
| try: | |
| repo_id = os.environ.get("REPO_ID") | |
| image_repo_id = os.environ.get("IMAGE_REPO_ID") | |
| if not repo_id or not image_repo_id: | |
| print(f"[CONSOLE] [DEBUG MODE] ERROR: REPO_ID or IMAGE_REPO_ID not set in environment") | |
| print(f"[CONSOLE] [DEBUG MODE] REPO_ID: {repo_id}") | |
| print(f"[CONSOLE] [DEBUG MODE] IMAGE_REPO_ID: {image_repo_id}") | |
| else: | |
| print(f"[CONSOLE] [DEBUG MODE] Loading data from REPO_ID: {repo_id}") | |
| print(f"[CONSOLE] [DEBUG MODE] Loading images from IMAGE_REPO_ID: {image_repo_id}") | |
| # Load table data | |
| DEBUG_TABLE_DF = load_debug_table_data(repo_id, debug_table_split) | |
| print(f"[CONSOLE] [DEBUG MODE] Table loaded: {len(DEBUG_TABLE_DF) if DEBUG_TABLE_DF is not None else 0} rows") | |
| # Load images | |
| DEBUG_ORIGINAL_IMAGE, DEBUG_ANNOTATED_IMAGE = load_debug_images(image_repo_id, debug_image_id) | |
| print(f"[CONSOLE] [DEBUG MODE] Original image: {DEBUG_ORIGINAL_IMAGE}") | |
| print(f"[CONSOLE] [DEBUG MODE] Annotated image: {DEBUG_ANNOTATED_IMAGE}") | |
| if DEBUG_TABLE_DF is None or DEBUG_ORIGINAL_IMAGE is None: | |
| print(f"[CONSOLE] [DEBUG MODE] WARNING: Failed to load debug data, debug mode may not work correctly") | |
| except Exception as e: | |
| print(f"[CONSOLE] [DEBUG MODE] ERROR loading debug data: {e}") | |
| print(f"[CONSOLE] [DEBUG MODE] Debug mode will use fallback dummy data") | |
| else: | |
| print(f"[CONSOLE] Debug mode is OFF (set DEBUG_MODE=true to enable)") | |
| print(f"[CONSOLE] ===== STARTING GRADIO APPLICATION =====") | |
| print(f"[CONSOLE] Creating Gradio interface...") | |
| demo = create_interface() | |
| print(f"[CONSOLE] Interface created successfully") | |
| print(f"[CONSOLE] Launching server on 0.0.0.0:7860...") | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860) | |