TanmayTomar commited on
Commit
0cec2c9
·
verified ·
1 Parent(s): 82a2213

Upload 8 files

Browse files
Files changed (9) hide show
  1. .gitattributes +2 -0
  2. Dockerfile +22 -0
  3. IMG_PIPELINE.py +28 -0
  4. TEXT_PIPELINE.py +50 -0
  5. app.py +105 -0
  6. data.csv +3 -0
  7. evidence_index.faiss +3 -0
  8. pmo_func.py +313 -0
  9. requirements.txt +0 -0
.gitattributes CHANGED
@@ -33,3 +33,5 @@ saved_model/**/* filter=lfs diff=lfs merge=lfs -text
33
  *.zip filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
 
 
 
33
  *.zip filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
36
+ data.csv filter=lfs diff=lfs merge=lfs -text
37
+ evidence_index.faiss filter=lfs diff=lfs merge=lfs -text
Dockerfile ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Use an official Python runtime
2
+ FROM python:3.11-slim
3
+
4
+ # Set the working directory
5
+ WORKDIR /app
6
+
7
+ # Set environment variables
8
+ ENV HF_HUB_DISABLE_SYMLINKS_WARNING=1
9
+ ENV PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
10
+
11
+ # Copy and install dependencies
12
+ COPY requirements.txt .
13
+ RUN pip install --no-cache-dir -r requirements.txt
14
+
15
+ # Copy the rest of your application code
16
+ COPY . .
17
+
18
+ # Expose the port the app runs on
19
+ EXPOSE 7860
20
+
21
+ # Run the FastAPI server when the container launches
22
+ CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
IMG_PIPELINE.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from TEXT_PIPELINE import run_text_pipeline
2
+
3
+ def run_img_pipeline(img_pth: str, state: dict):
4
+ """
5
+ Orchestrates the image analysis workflow using pre-loaded tools.
6
+ """
7
+ # Unpack the image-specific tools
8
+ manipulation_analyzer = state['manipulation_analyzer']
9
+ ocr_analyzer = state['ocr_analyzer']
10
+
11
+ # Run the analyses
12
+ manipulation_results = manipulation_analyzer.run_image_forensics(img_pth)
13
+ in_image_report = ocr_analyzer.get_in_image_anal(img_pth)
14
+ rev_img_search_res = ocr_analyzer.rev_img_search(img_pth)
15
+
16
+ text_analysis_report = {}
17
+
18
+ # If text is found, run the text pipeline, passing all the necessary state
19
+ if in_image_report.get("Extracted Text", "").strip():
20
+ text_analysis_report = run_text_pipeline(in_image_report["Extracted Text"], state)
21
+
22
+ return {
23
+ 'image_manipulation_report': manipulation_results,
24
+ 'in_image_content_report': in_image_report,
25
+ 'reverse_image_search_report': rev_img_search_res,
26
+ 'extracted_text_analysis_report': text_analysis_report
27
+ }
28
+
TEXT_PIPELINE.py ADDED
@@ -0,0 +1,50 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pandas as pd
2
+ import faiss
3
+ import os
4
+ import json
5
+
6
+ def run_text_pipeline(claim: str, state: dict):
7
+ """
8
+ Executes the text analysis pipeline using pre-loaded tools.
9
+ """
10
+ # Unpack all the necessary tools and data from the state dictionary
11
+ retriever = state['retriever']
12
+ reranker = state['reranker']
13
+ classifier = state['classifier']
14
+ summarizer = state['summarizer']
15
+ fact_checker = state['fact_checker']
16
+ df = state['df']
17
+ evidence_corpus = state['evidence_corpus']
18
+ faiss_index = state['faiss_index']
19
+
20
+ # --- RAG Pipeline ---
21
+ retrieved_docs, indices = retriever.retrieve_evidence(claim, faiss_index, evidence_corpus)
22
+ reranked_docs = reranker.rerank_evidendce(claim, retrieved_docs)
23
+
24
+ if not reranked_docs:
25
+ # --- Fallback to Google Fact Check ---
26
+ print("No results from RAG, trying Google Fact Check...")
27
+ result = fact_checker.check_claim(claim)
28
+ return {
29
+ "final_verdict": result.get('verdict', 'NEUTRAL'),
30
+ "explanation": result.get('summary', 'Could not verify claim.'),
31
+ "source": {result.get('source'): result.get('URLs', ['#'])[0]} if result else {}
32
+ }
33
+
34
+ final_verdict, _ = classifier(claim, reranked_docs)
35
+ top_evidence_for_summary = reranked_docs[:3]
36
+ _, explanation = summarizer(claim, top_evidence_for_summary, final_verdict)
37
+
38
+ # Get sources from the original dataframe
39
+ sources_dict = {}
40
+ if len(indices) > 0 and 'source' in df.columns and 'url' in df.columns:
41
+ df_rel = df.iloc[indices]
42
+ # Handle potential duplicate sources by taking the first URL for each source
43
+ sources_dict = df_rel.groupby('source')['url'].first().to_dict()
44
+
45
+ return {
46
+ "final_verdict": final_verdict,
47
+ "explanation": explanation,
48
+ "source": sources_dict
49
+ }
50
+
app.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request
2
+ from fastapi.responses import JSONResponse, FileResponse
3
+ from fastapi.staticfiles import StaticFiles
4
+ from fastapi.middleware.cors import CORSMiddleware
5
+ from contextlib import asynccontextmanager
6
+ import uvicorn
7
+ import os
8
+ import shutil
9
+ import pandas as pd
10
+ import faiss
11
+ # Import your classes and pipeline functions
12
+ from pmo_func import retriver, reranker, Classifier, summarizer, img_manipulation, OCR, FactChecker
13
+ from TEXT_PIPELINE import run_text_pipeline
14
+ from IMG_PIPELINE import run_img_pipeline
15
+
16
+ # This dictionary will hold all our initialized models and data
17
+ app_state = {}
18
+
19
+ @asynccontextmanager
20
+ async def lifespan(app: FastAPI):
21
+ """Loads all models and data once when the server starts up."""
22
+ print("--- 🚀 Server starting up... Loading all models... 🚀 ---")
23
+ app_state['retriever'] = retriver()
24
+ app_state['reranker'] = reranker()
25
+ app_state['classifier'] = Classifier()
26
+ app_state['summarizer'] = summarizer()
27
+ app_state['manipulation_analyzer'] = img_manipulation()
28
+ app_state['ocr_analyzer'] = OCR()
29
+ app_state['fact_checker'] = FactChecker()
30
+
31
+ try:
32
+ df = pd.read_csv('data.csv', low_memory=False)
33
+ app_state['evidence_corpus'] = df['text'].dropna().tolist()
34
+ app_state['df'] = df
35
+ except Exception as e:
36
+ print(f"CRITICAL ERROR: Could not load data.csv: {e}")
37
+ app_state['evidence_corpus'] = []
38
+ app_state['df'] = pd.DataFrame()
39
+
40
+ index_file = "evidence_index.faiss"
41
+ if os.path.exists(index_file):
42
+ app_state['faiss_index'] = faiss.read_index(index_file)
43
+ elif app_state['evidence_corpus']:
44
+ print("Building FAISS index for the first time...")
45
+ app_state['faiss_index'] = app_state['retriever'].build_faiss_idx(app_state['evidence_corpus'])
46
+ else:
47
+ app_state['faiss_index'] = None
48
+
49
+ print("--- ✅ All models and data loaded successfully! ✅ ---")
50
+ yield
51
+ print("--- Shutting down ---")
52
+
53
+ app = FastAPI(lifespan=lifespan)
54
+
55
+ app.add_middleware(
56
+ CORSMiddleware,
57
+ allow_origins=["*"], # Allows all origins (fine for a hackathon)
58
+ allow_credentials=True,
59
+ allow_methods=["*"], # Allows all methods
60
+ allow_headers=["*"], # Allows all headers
61
+ )
62
+
63
+ # Mounts the 'frontend_by_gemini' folder at the '/static' URL path
64
+ app.mount("/static", StaticFiles(directory="frontend_by_gemini"), name="static")
65
+ # Mounts the root directory to serve files like 'ela_result.png'
66
+ app.mount("/results", StaticFiles(directory="."), name="results")
67
+
68
+ @app.get("/")
69
+ async def read_index():
70
+ return FileResponse('frontend_by_gemini/index.html')
71
+
72
+ @app.post("/analyze")
73
+ async def analyze_content(
74
+ text_input: str = Form(None),
75
+ image_file: UploadFile = File(None)
76
+ ):
77
+ # This logic correctly prioritizes the image if both are sent
78
+ if image_file and image_file.filename:
79
+ try:
80
+ temp_dir = "temp_uploads"
81
+ os.makedirs(temp_dir, exist_ok=True)
82
+ temp_path = os.path.join(temp_dir, image_file.filename)
83
+ with open(temp_path, "wb") as buffer:
84
+ shutil.copyfileobj(image_file.file, buffer)
85
+
86
+ report = run_img_pipeline(temp_path, app_state)
87
+ shutil.rmtree(temp_dir)
88
+ return JSONResponse(content=report)
89
+ except Exception as e:
90
+ print(f"Error in image pipeline: {e}")
91
+ raise HTTPException(status_code=500, detail="Error processing image.")
92
+
93
+ elif text_input:
94
+ try:
95
+ report = run_text_pipeline(text_input, app_state)
96
+ return JSONResponse(content=report)
97
+ except Exception as e:
98
+ print(f"Error in text pipeline: {e}")
99
+ raise HTTPException(status_code=500, detail="Error processing text.")
100
+
101
+ else:
102
+ raise HTTPException(status_code=400, detail="No valid input provided.")
103
+
104
+ if __name__ == "__main__":
105
+ uvicorn.run("main:app", host="0.0.0.0", port=int(os.environ.get("PORT", 8080)), reload=True)
data.csv ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:54f359a9ec2dd519c4ac6f24ff1002f6b6aab1ed3a227422fa4e5ef63c93afc0
3
+ size 401398654
evidence_index.faiss ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:9e2b94c8c2f9c2411f93b6b3edbdb5b400355dd4176f65bb1c93bbeb63e4f9e6
3
+ size 542823981
pmo_func.py ADDED
@@ -0,0 +1,313 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import faiss
3
+ from sentence_transformers import SentenceTransformer
4
+ from sentence_transformers.cross_encoder import CrossEncoder
5
+ from transformers import pipeline
6
+ from PIL import Image, ImageChops, ImageEnhance
7
+ import torch
8
+ from google.cloud import vision
9
+ import os
10
+ import io
11
+ from transformers import AutoModelForSequenceClassification, AutoTokenizer
12
+ from transformers import T5Tokenizer, T5ForConditionalGeneration
13
+ from dotenv import load_dotenv
14
+ import requests
15
+ from bs4 import BeautifulSoup
16
+ import trafilatura as tra
17
+
18
+ DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
19
+
20
+ class retriver:
21
+ def __init__(self):
22
+ self.retrivermodel = SentenceTransformer('all-MiniLM-L6-v2')
23
+
24
+ def build_faiss_idx(self, evidence_corpus):
25
+ embeddings = self.retrivermodel.encode(evidence_corpus)
26
+ index = faiss.IndexFlatIP(embeddings.shape[1])
27
+ index.add(np.array(embeddings, dtype=np.float32))
28
+ faiss.write_index(index, "evidence_index.faiss")
29
+ return index
30
+
31
+ def retrieve_evidence(self, claim, index, evidence_corpus, top_k=10):
32
+ claim_embedding = self.retrivermodel.encode([claim])
33
+ distances, indices = index.search(np.array(claim_embedding, dtype=np.float32), top_k)
34
+ retrieved_docs = [evidence_corpus[i] for i in indices[0]]
35
+ return retrieved_docs, indices[0]
36
+
37
+ class reranker:
38
+ def __init__(self):
39
+ self.reranker_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', device=DEVICE)
40
+
41
+ def rerank_evidendce(self, claim, evidence_list):
42
+ sentance_pairs = [[claim, evidence] for evidence in evidence_list]
43
+ score = self.reranker_model.predict(sentance_pairs)
44
+ scored_evidence = sorted(zip(score, evidence_list), reverse=True)
45
+ return scored_evidence
46
+
47
+ class Classifier:
48
+ def __init__(self):
49
+ self.model_name = "MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli"
50
+ self.label_names = ["entailment", "neutral", "contradiction"]
51
+ self.device = torch.device(DEVICE)
52
+ print(f"Classifier device: {self.device}")
53
+ self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(self.device)
54
+ self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
55
+ self.model.eval()
56
+
57
+ def classify(self, claim, top_evidence):
58
+ verdicts = []
59
+ evidences = [e[1] for e in top_evidence]
60
+ if not evidences:
61
+ return "NEUTRAL", []
62
+
63
+ inputs = self.tokenizer(evidences, [claim] * len(evidences), return_tensors="pt", padding=True, truncation=True, max_length=512)
64
+ with torch.no_grad():
65
+ inputs = {k: v.to(self.device) for k, v in inputs.items()}
66
+ outputs = self.model(**inputs)
67
+
68
+ probs = torch.softmax(outputs.logits, dim=-1)
69
+ for i, evidence in enumerate(evidences):
70
+ pred = torch.argmax(probs[i]).item()
71
+ verdicts.append({
72
+ "evidence": evidence,
73
+ "verdict": self.label_names[pred],
74
+ "scores": {name: float(probs[i][j]) for j, name in enumerate(self.label_names)}
75
+ })
76
+
77
+ top_verdict_info = verdicts[0]
78
+ if top_verdict_info["verdict"] == "entailment" and top_verdict_info["scores"]["entailment"] > 0.8:
79
+ result = "TRUE"
80
+ elif top_verdict_info["verdict"] == "contradiction" and top_verdict_info["scores"]["contradiction"] > 0.8:
81
+ result = "FALSE"
82
+ else:
83
+ for v in verdicts[1:]:
84
+ if v["verdict"] == "contradiction" and v["scores"]["contradiction"] > 0.9:
85
+ result = "FALSE"
86
+ break
87
+ else:
88
+ result = "NEUTRAL"
89
+ return result, verdicts
90
+
91
+ def __call__(self, claim, evidences):
92
+ return self.classify(claim, evidences)
93
+
94
+ class summarizer:
95
+ def __init__(self):
96
+ self.model_name = "google/flan-t5-base" # Using a smaller model for server efficiency
97
+ self.model = T5ForConditionalGeneration.from_pretrained(self.model_name)
98
+ self.tokenizer = T5Tokenizer.from_pretrained(self.model_name)
99
+ self.device = torch.device(DEVICE)
100
+ self.model.to(self.device)
101
+ self.model.eval()
102
+ print(f"Summarizer device: {self.device}")
103
+
104
+ def forward(self, claim, top_evidence, verdict, max_input_len=1024, max_output_len=150):
105
+ evidence_texts = [e[1] for e in top_evidence]
106
+ if not evidence_texts:
107
+ return verdict, "No evidence was provided to generate a summary."
108
+
109
+ input_text = f"""Claim: "{claim}"\nVerdict: {verdict}\nEvidence:\n{"\n---\n".join(evidence_texts)}\n\nWrite a short, neutral explanation for why the verdict is {verdict}, based only on the evidence provided."""
110
+ inputs = self.tokenizer(input_text, return_tensors="pt", truncation=True, max_length=max_input_len).to(self.device)
111
+
112
+ with torch.no_grad():
113
+ summary_ids = self.model.generate(inputs["input_ids"], max_length=max_output_len, num_beams=4, early_stopping=True)
114
+
115
+ summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
116
+ return verdict, summary
117
+
118
+ def __call__(self, claim, top_evidence, verdict):
119
+ return self.forward(claim, top_evidence, verdict)
120
+
121
+ class FactChecker:
122
+ def __init__(self):
123
+ self.factcheck_api = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
124
+ self.google_search = "https://www.google.com/search"
125
+ load_dotenv()
126
+ self.factcheck_api_key = os.getenv("GOOGLE_FACT_CHECK_API_KEY")
127
+ # Lazy load heavy models
128
+ self.reranker = None
129
+ self.classifier = None
130
+ self.summarizer = None
131
+
132
+ def check_google_factcheck(self, claim: str, pages: int = 5):
133
+ if not self.factcheck_api_key:
134
+ print("Google FactCheck API key not found in .env file.")
135
+ return None
136
+
137
+ params = {'key': self.factcheck_api_key, 'query': claim, 'languageCode': 'en-US', 'pageSize': pages}
138
+ try:
139
+ response = requests.get(self.factcheck_api, params=params, timeout=10)
140
+ response.raise_for_status()
141
+ data = response.json()
142
+ if 'claims' in data and data['claims']:
143
+ claim_data = data['claims'][0]
144
+ review = claim_data.get('claimReview', [{}])[0]
145
+ return {
146
+ 'claim': claim_data.get('text', claim),
147
+ 'verdict': review.get('textualRating', 'Unknown'),
148
+ 'summary': f"Rated by {review.get('publisher', {}).get('name', 'Unknown')}",
149
+ 'source': [review.get('publisher', {}).get('name', 'Unknown')],
150
+ 'method': 'google_factcheck',
151
+ 'URLs': [review.get('url', '')]
152
+ }
153
+ except Exception as e:
154
+ print(f"FactCheck API error: {e}")
155
+ return None
156
+
157
+ def google_news_search(self, query: str, num_pages: int = 1):
158
+ print("Searching the Web...")
159
+ headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
160
+ articles_gg = []
161
+ for page in range(num_pages):
162
+ params = {"q": query, "tbm": "nws", 'start': page * 10}
163
+ try:
164
+ res = requests.get(self.google_search, params=params, headers=headers, timeout=15)
165
+ soup = BeautifulSoup(res.text, 'html.parser')
166
+ # Note: This selector is fragile and may break if Google changes its HTML.
167
+ for article_link in soup.select("a.WlydOe"):
168
+ title_div = article_link.find('div', class_="n0jPhd")
169
+ source_div = article_link.find('div', class_="MgUUmf")
170
+
171
+ if not (title_div and source_div): continue
172
+
173
+ title = title_div.text
174
+ a_url = article_link['href']
175
+ source = source_div.text
176
+
177
+ content = tra.extract(tra.fetch_url(a_url)) if a_url else "No content extracted"
178
+ articles_gg.append({'title': title, 'url': a_url, 'text': content or "", 'source': source})
179
+ except Exception as e:
180
+ print(f"Error during web search: {e}")
181
+
182
+ top_evidences = [d.get('text', '') for d in articles_gg]
183
+ urls = [d.get('url', '') for d in articles_gg]
184
+ return top_evidences, urls, articles_gg
185
+
186
+ def search_and_analyze_claim(self, claim: str):
187
+ print("Performing web analysis...")
188
+
189
+ if self.reranker is None:
190
+ print("Loading AI models for web analysis...")
191
+ self.reranker = reranker()
192
+ self.classifier = Classifier()
193
+ self.summarizer = summarizer()
194
+
195
+ top_evidences, urls, article_list = self.google_news_search(claim)
196
+
197
+ if not top_evidences:
198
+ return {'claim': claim, 'verdict': 'Unverifiable', 'summary': 'No relevant sources found.', 'source': [], 'method': 'web_search', 'URLs': []}
199
+
200
+ reranked_articles = self.reranker.rerank_evidendce(claim, top_evidences)
201
+ if not reranked_articles:
202
+ return {'claim': claim, 'verdict': 'Unverifiable', 'summary': 'No relevant sources found after reranking.', 'source': [], 'method': 'web_search', 'URLs': []}
203
+
204
+ verdict, _ = self.classifier(claim, reranked_articles)
205
+ _, summary = self.summarizer(claim, reranked_articles[:3], verdict)
206
+
207
+ return {
208
+ 'claim': claim,
209
+ 'verdict': verdict,
210
+ 'summary': summary,
211
+ 'source': [arc.get('source', '') for arc in article_list],
212
+ 'method': 'web_analysis',
213
+ 'URLs': urls
214
+ }
215
+
216
+ def check_claim(self, claim: str):
217
+ """Main function to check a claim using the fallback pipeline."""
218
+ print(f"\n--- Checking claim: '{claim}' ---")
219
+ factcheck_result = self.check_google_factcheck(claim)
220
+ if factcheck_result:
221
+ print("Found result in FactCheck database.")
222
+ return factcheck_result
223
+
224
+ print("No FactCheck result, falling back to live web analysis...")
225
+ return self.search_and_analyze_claim(claim)
226
+
227
+ class img_manipulation:
228
+ def __init__(self):
229
+ self.GEN_AI_IMAGE = pipeline("image-classification", model="umm-maybe/AI-image-detector", device=DEVICE)
230
+
231
+ def Gen_AI_IMG(self, img_pth):
232
+ try:
233
+ with Image.open(img_pth) as img:
234
+ img = img.convert('RGB')
235
+ result = self.GEN_AI_IMAGE(img)
236
+ proba = next((item['score'] for item in result if item['label'] == 'artificial'), 0.0)
237
+ return proba * 100
238
+ except Exception as e:
239
+ print(f'AI image detection error: {e}')
240
+ return 0.0
241
+
242
+ def generated_image(self, img_pth, quality=90, scale=15):
243
+ try:
244
+ with Image.open(img_pth) as orig_img:
245
+ orig_img = orig_img.convert('RGB')
246
+ temp_path = 'temp_resaved.jpg'
247
+ orig_img.save(temp_path, 'JPEG', quality=quality)
248
+ with Image.open(temp_path) as resaved_img:
249
+ ela_image = ImageChops.difference(orig_img, resaved_img)
250
+ os.remove(temp_path)
251
+ ela_data = np.array(ela_image)
252
+ mean_intensity = ela_data.mean()
253
+ scaled_score = min(100, (mean_intensity / 25.0) * 100)
254
+
255
+ # Save the ELA image and return its path for serving
256
+ ela_path = "ela_result.png"
257
+ enhancer = ImageEnhance.Brightness(ela_image)
258
+ max_diff = max(1, max([ex[1] for ex in ela_image.getextrema()]))
259
+ ela_image_enhanced = enhancer.enhance(scale / max_diff)
260
+ ela_image_enhanced.save(ela_path)
261
+ return scaled_score, ela_path
262
+ except Exception as e:
263
+ print(f'ELA generation error: {e}')
264
+ return 0.0, None
265
+
266
+ def run_image_forensics(self, image_path):
267
+ ai_score = self.Gen_AI_IMG(image_path)
268
+ classic_score, ela_path = self.generated_image(image_path)
269
+ return {
270
+ "ai_generated_score_percent": ai_score,
271
+ "classic_edit_score_percent": classic_score,
272
+ "ela_image_path": ela_path
273
+ }
274
+
275
+ class OCR:
276
+ def __init__(self, key_path='GOOGLE_VISION_API.json'):
277
+ os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_path
278
+ self.client = vision.ImageAnnotatorClient()
279
+
280
+ def _get_full_vision_analysis(self, img_pth):
281
+ try:
282
+ with open(img_pth, 'rb') as image_file:
283
+ content = image_file.read()
284
+ image = vision.Image(content=content)
285
+ features = [{'type_': vision.Feature.Type.DOCUMENT_TEXT_DETECTION}, {'type_': vision.Feature.Type.SAFE_SEARCH_DETECTION}, {'type_': vision.Feature.Type.LANDMARK_DETECTION}, {'type_': vision.Feature.Type.LOGO_DETECTION}, {'type_': vision.Feature.Type.WEB_DETECTION}]
286
+ response = self.client.annotate_image({'image': image, 'features': features})
287
+ return response, None
288
+ except Exception as e:
289
+ return None, str(e)
290
+
291
+ def get_in_image_anal(self, img_pth):
292
+ response, error = self._get_full_vision_analysis(img_pth)
293
+ if error: return {'error': error}
294
+ report = {}
295
+ if response.full_text_annotation: report['Extracted Text'] = response.full_text_annotation.text
296
+ if response.safe_search_annotation:
297
+ safe = response.safe_search_annotation
298
+ report['Safe Search'] = {'adult': vision.Likelihood(safe.adult).name, 'violence': vision.Likelihood(safe.violence).name}
299
+ entities = []
300
+ if response.landmark_annotations: entities.extend([f'Landmark: {l.description}' for l in response.landmark_annotations])
301
+ if response.logo_annotations: entities.extend([f'Logo: {l.description}' for l in response.logo_annotations])
302
+ if entities: report['Identified Entities'] = entities
303
+ return report
304
+
305
+ def rev_img_search(self, img_pth):
306
+ response, error = self._get_full_vision_analysis(img_pth)
307
+ if error: return {'error': error}
308
+ report = {}
309
+ if response.web_detection and response.web_detection.pages_with_matching_images:
310
+ matches = [{'title': p.page_title, 'url': p.url} for p in response.web_detection.pages_with_matching_images[:5]]
311
+ report['Reverse Image Matches'] = matches
312
+ return report
313
+
requirements.txt ADDED
Binary file (8.01 kB). View file