Spaces:
Running
Running
| import os | |
| import time | |
| import base64 | |
| import random | |
| import json | |
| import requests | |
| from datetime import datetime, timedelta, timezone | |
| from flask import Flask, request, jsonify, Response | |
| from huggingface_hub import InferenceClient | |
| app = Flask(__name__) | |
| app.secret_key = os.getenv("FLASK_SECRET_KEY") | |
| # ==== API KEYS ==== | |
| GROQ_API_KEY_1 = os.getenv("GROQ_API_KEY_1") | |
| GROQ_API_KEY_2 = os.getenv("GROQ_API_KEY_2") | |
| GROQ_API_KEY_3 = os.getenv("GROQ_API_KEY_3") | |
| SERPAPI_KEY = os.getenv("SERPAPI_KEY") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # ==== URL ==== | |
| GROQ_URL_CHAT = "https://api.groq.com/openai/v1/chat/completions" | |
| GROQ_URL_TTS = "https://api.groq.com/openai/v1/audio/speech" | |
| GROQ_URL_STT = "https://api.groq.com/openai/v1/audio/transcriptions" | |
| # ==== SYSTEM PROMPT ==== | |
| SYSTEM_PROMPT = ( | |
| "You are Talk GTE β a friendly AI assistant created by Vibow AI. " | |
| "GTE means Generative Text Expert in Vibow AI. " | |
| "Vibow AI created in 29 June 2025 and Talk GTE created in 23 October 2025. " | |
| "The owner of Vibow AI is Nick Mclen. " | |
| "Talk GTE have approximately 1 trillion parameters. " | |
| "Stay positive, kind, and expert. " | |
| "Always capitalize the first letter of sentences. " | |
| "If the user requests code, always use triple backticks (```). " | |
| "Be concise, neutral, and accurate. " | |
| "Sometimes use emoji but relevant." | |
| ) | |
| # ========================= | |
| # π€ STT | |
| # ========================= | |
| def transcribe_audio(file_path: str) -> str: | |
| try: | |
| print(f"[STT] π€ Starting transcription for: {file_path}") | |
| headers = {"Authorization": f"Bearer {GROQ_API_KEY_2}"} | |
| files = { | |
| "file": (os.path.basename(file_path), open(file_path, "rb"), "audio/wav"), | |
| "model": (None, "whisper-large-v3-turbo"), | |
| } | |
| res = requests.post(GROQ_URL_STT, headers=headers, files=files, timeout=60) | |
| res.raise_for_status() | |
| text = res.json().get("text", "") | |
| print(f"[STT] β Transcription success: {text[:50]}...") | |
| return text | |
| except Exception as e: | |
| print(f"[STT] β Error: {e}") | |
| return "" | |
| finally: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| print(f"[STT] ποΈ Deleted temp file: {file_path}") | |
| # ========================= | |
| # π TTS | |
| # ========================= | |
| def text_to_speech(text: str) -> bytes: | |
| try: | |
| print(f"[TTS] π Converting text to speech: {text[:50]}...") | |
| headers = {"Authorization": f"Bearer {GROQ_API_KEY_3}"} | |
| data = {"model": "playai-tts", "voice": "Celeste-PlayAI", "input": text} | |
| res = requests.post(GROQ_URL_TTS, headers=headers, json=data, timeout=60) | |
| if res.status_code != 200: | |
| print(f"[TTS] β Error: {res.text}") | |
| return b"" | |
| print(f"[TTS] β Audio generated successfully ({len(res.content)} bytes)") | |
| return res.content | |
| except Exception as e: | |
| print(f"[TTS] β Exception: {e}") | |
| return b"" | |
| def serpapi_search(query: str, location=None, num_results=3): | |
| print(f"\n[SEARCH] π Starting search for: '{query}'") | |
| indonesian_keywords = ["di jakarta", "di bali", "di bekasi", "di surabaya", "di bandung", | |
| "di indonesia", "di yogyakarta", "di medan", "di semarang", | |
| "termurah", "terbaik di", "dekat", "murah"] | |
| is_indonesian_query = any(kw in query.lower() for kw in indonesian_keywords) | |
| if is_indonesian_query: | |
| country = "id" | |
| lang = "id" | |
| search_location = location or "Indonesia" | |
| else: | |
| country = "us" | |
| lang = "en" | |
| search_location = location or "" | |
| url = "https://serpapi.com/search.json" | |
| params = { | |
| "q": query, | |
| "location": search_location, | |
| "engine": "google", | |
| "api_key": SERPAPI_KEY, | |
| "num": num_results, | |
| "gl": country, | |
| "hl": lang | |
| } | |
| try: | |
| # --- TEXT SEARCH --- | |
| r = requests.get(url, params=params, timeout=10) | |
| r.raise_for_status() | |
| data = r.json() | |
| text_block = f"π **Hasil Google untuk:** {query}\n\n" | |
| if "organic_results" in data: | |
| for i, item in enumerate(data["organic_results"][:num_results], 1): | |
| title = item.get("title", "") | |
| snippet = item.get("snippet", "") | |
| link = item.get("link", "") | |
| text_block += f"**{i}. {title}**\n{snippet}\nπ {link}\n\n" | |
| # --- IMAGE SEARCH --- | |
| img_params = { | |
| "q": query, | |
| "engine": "google_images", | |
| "api_key": SERPAPI_KEY, | |
| "num": 3, | |
| "gl": country, | |
| "hl": lang | |
| } | |
| img_r = requests.get(url, params=img_params, timeout=10) | |
| img_r.raise_for_status() | |
| img_data = img_r.json() | |
| if "images_results" in img_data: | |
| for img in img_data["images_results"][:3]: | |
| img_url = img.get("original", img.get("thumbnail", "")) | |
| if img_url: | |
| text_block += f"\n" | |
| print("[SEARCH] β Search text assembled for AI stream.") | |
| return text_block.strip() | |
| except Exception as e: | |
| print(f"[SEARCH] β Error: {e}") | |
| return f"Tidak dapat menemukan hasil untuk: {query}" | |
| # ========================= | |
| # π¬ Stream Chat | |
| # ========================= | |
| def stream_chat(prompt: str, history=None): | |
| wib = timezone(timedelta(hours=7)) | |
| now = datetime.now(wib) | |
| sys_prompt = SYSTEM_PROMPT + f"\nCurrent time: {now.strftime('%A, %d %B %Y β %H:%M:%S WIB')}." | |
| messages = [{"role": "system", "content": sys_prompt}] | |
| if history: | |
| messages += history | |
| messages.append({"role": "user", "content": prompt}) | |
| payload = { | |
| "model": "moonshotai/kimi-k2-instruct-0905", | |
| "messages": messages, | |
| "temperature": 0.7, | |
| "max_tokens": 4188, | |
| "stream": True, | |
| } | |
| headers = {"Authorization": f"Bearer {GROQ_API_KEY_1}"} | |
| for line in requests.post(GROQ_URL_CHAT, headers=headers, json=payload, stream=True).iter_lines(): | |
| if not line: | |
| continue | |
| line = line.decode() | |
| if line.startswith("data: "): | |
| data = line[6:] | |
| if data == "[DONE]": | |
| break | |
| try: | |
| delta = json.loads(data)["choices"][0]["delta"].get("content", "") | |
| if delta: | |
| print("[DEBUG] πΉ AI chunk:", delta) | |
| yield delta | |
| except: | |
| continue | |
| # ========================= | |
| # π¬ Text-to-Video (HF + fal-ai) | |
| # ========================= | |
| def generate_video(prompt: str, model="Wan-AI/Wan2.2-T2V-A14B-Diffusers"): | |
| try: | |
| print(f"[VIDEO] π¬ Generating video for prompt: {prompt}") | |
| client = InferenceClient(provider="fal-ai", api_key=HF_TOKEN) | |
| video_bytes = client.text_to_video(prompt=prompt, model=model) | |
| if not video_bytes: | |
| print("[VIDEO] β Empty response from HF model") | |
| return None | |
| file_name = f"/tmp/video_{int(time.time())}.mp4" | |
| with open(file_name, "wb") as f: | |
| f.write(video_bytes) | |
| print(f"[VIDEO] β Video saved to {file_name}") | |
| return file_name | |
| except Exception as e: | |
| print(f"[VIDEO] β Error: {e}") | |
| return None | |
| # ========================= | |
| # π Chat Endpoint (Text + Voice) | |
| # ========================= | |
| def chat(): | |
| print("\n" + "="*60) | |
| print(f"[REQUEST] π¨ New request at {datetime.now().strftime('%H:%M:%S')}") | |
| if "audio" in request.files: | |
| # π€ Voice Mode | |
| audio = request.files["audio"] | |
| temp = f"/tmp/{time.time()}_{random.randint(1000,9999)}.wav" | |
| audio.save(temp) | |
| user_text = transcribe_audio(temp) | |
| keywords = ["hotel", "mall", "resort", "villa", "tempat wisata", "restaurant", "cafe"] | |
| has_keyword = any(k in user_text.lower() for k in keywords) | |
| if has_keyword: | |
| serp = serpapi_search(user_text) | |
| text = "\n".join([f"{r['title']} β {r['snippet']} β {r['link']}" for r in serp["results"]]) | |
| imgs = " ".join(serp["images"]) | |
| user_text = f"{user_text}\n\nGoogle Results:\n{text}\n\nImages: {imgs}\n\nExplain & recommend." | |
| ai = "".join(chunk for chunk in stream_chat(user_text)) | |
| audio_bytes = text_to_speech(ai) | |
| # Debug final JSON | |
| debug_json = { | |
| "mode": "voice", | |
| "transcript": user_text, | |
| "reply_text": ai, | |
| "audio_base64": "data:audio/mp3;base64," + base64.b64encode(audio_bytes).decode() | |
| } | |
| return jsonify(debug_json) | |
| # π¬ Text Mode | |
| data = request.get_json(force=True) | |
| prompt = data.get("prompt", "") | |
| history = data.get("history", []) | |
| keywords = ["hotel", "mall", "resort", "villa", "tempat wisata", "restaurant", "cafe"] | |
| has_keyword = any(k in prompt.lower() for k in keywords) | |
| if has_keyword: | |
| serp_text = serpapi_search(prompt) | |
| prompt = f"{prompt}\n\n{serp_text}\n\nπ§ Explain this search." | |
| def generate(): | |
| for chunk in stream_chat(prompt, history): | |
| yield chunk | |
| return Response(generate(), mimetype="text/plain") | |
| # ========================= | |
| # ποΈ Video Endpoint | |
| # ========================= | |
| def video(): | |
| try: | |
| data = request.get_json(force=True) | |
| prompt = data.get("prompt", "") | |
| if not prompt: | |
| return jsonify({"error": "Missing prompt"}), 400 | |
| # Jalankan text-to-video | |
| video_path = generate_video(prompt) | |
| if not video_path or not os.path.exists(video_path): | |
| return jsonify({"error": "Failed to generate video"}), 500 | |
| # Encode ke base64 biar mudah dikirim via API | |
| with open(video_path, "rb") as f: | |
| video_b64 = base64.b64encode(f.read()).decode() | |
| print(f"[VIDEO] π₯ Returning base64 video ({len(video_b64)} chars)") | |
| return jsonify({ | |
| "prompt": prompt, | |
| "video_base64": f"data:video/mp4;base64,{video_b64}" | |
| }) | |
| except Exception as e: | |
| print(f"[VIDEO] β Exception in endpoint: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| # ========================= | |
| # βΆοΈ Run | |
| # ========================= | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print("π Vibow Talk GTE Server Running") | |
| print("π Search keywords: hotel, mall, resort, villa, tempat wisata, restaurant, cafe") | |
| print("πΌοΈ Image extraction: ENABLED") | |
| print("π Global search: ENABLED (auto-detect region)") | |
| print("="*60 + "\n") | |
| app.run(host="0.0.0.0", port=7860, debug=True, threaded=True) |