# KC Robot AI V4.1 – Cloud Brain Intelligent Assistant # Flask server: Chat (HF), TTS, STT, Telegram poller, REST API cho ESP32 # Features: bilingual greetings, radar detect, OLED lines, TTS/STT, Telegram, HuggingFace brain import os, io, time, json, threading, logging, requests from typing import Optional, List, Tuple from flask import Flask, request, jsonify, send_file, render_template_string logging.basicConfig(level=logging.INFO) logger = logging.getLogger("kcrobot.v4.1") app = Flask(__name__) # ====== Config from env / Secrets ====== HF_API_TOKEN = os.getenv("HF_API_TOKEN", "") HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large") HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "facebook/tts_transformer-en-ljspeech") HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small") TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "") PORT = int(os.getenv("PORT", 7860)) HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} if not HF_API_TOKEN: logger.warning("⚠️ HF_API_TOKEN not set. Put HF_API_TOKEN in Secrets.") # ====== In-memory storage ====== CONV: List[Tuple[str, str]] = [] DISPLAY_LINES: List[str] = [] def push_display(line: str, limit=6): global DISPLAY_LINES DISPLAY_LINES.append(line) if len(DISPLAY_LINES) > limit: DISPLAY_LINES = DISPLAY_LINES[-limit:] # ====== Hugging Face API helpers ====== def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256) -> str: model = model or HF_MODEL url = f"https://api-inference.huggingface.co/models/{model}" payload = { "inputs": prompt, "parameters": {"max_new_tokens": max_new_tokens, "temperature": 0.7}, "options": {"wait_for_model": True} } r = requests.post(url, headers=HF_HEADERS, json=payload, timeout=120) if r.status_code != 200: raise RuntimeError(f"HF text generation failed: {r.status_code}: {r.text}") data = r.json() if isinstance(data, list) and len(data) and isinstance(data[0], dict): return data[0].get("generated_text", "") if isinstance(data, dict) and "generated_text" in data: return data["generated_text"] return str(data) def hf_tts_get_mp3(text: str, model: Optional[str] = None) -> bytes: model = model or HF_TTS_MODEL url = f"https://api-inference.huggingface.co/models/{model}" payload = {"inputs": text} headers = dict(HF_HEADERS) headers["Content-Type"] = "application/json" r = requests.post(url, headers=headers, json=payload, stream=True, timeout=120) if r.status_code != 200: raise RuntimeError(f"HF TTS failed: {r.status_code}: {r.text}") return r.content def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: model = model or HF_STT_MODEL url = f"https://api-inference.huggingface.co/models/{model}" headers = dict(HF_HEADERS) headers["Content-Type"] = "application/octet-stream" r = requests.post(url, headers=headers, data=audio_bytes, timeout=180) if r.status_code != 200: raise RuntimeError(f"HF STT failed: {r.status_code}: {r.text}") j = r.json() return j.get("text", str(j)) # ====== API cho ESP32 ====== @app.route("/ask", methods=["POST"]) def api_ask(): data = request.get_json(force=True) text = data.get("text", "").strip() lang = data.get("lang", "auto") if not text: return jsonify({"error": "no text"}), 400 if lang == "vi": prompt = "Bạn là trợ lý thông minh, trả lời bằng tiếng Việt:\n" + text elif lang == "en": prompt = "You are a helpful assistant. Answer in English:\n" + text else: prompt = "Bạn là trợ lý song ngữ Việt-Anh, trả lời theo ngôn ngữ người dùng:\n" + text try: ans = hf_text_generate(prompt) except Exception as e: return jsonify({"error": str(e)}), 500 CONV.append((text, ans)) push_display("YOU: " + text[:40]) push_display("BOT: " + ans[:40]) return jsonify({"answer": ans}) @app.route("/presence", methods=["POST"]) def api_presence(): data = request.get_json(force=True) note = data.get("note", "Có người tới gần!") greeting_vi = f"Xin chào! {note}" greeting_en = "Hello there! Nice to see you." combined = f"{greeting_vi}\n{greeting_en}" CONV.append(("__presence__", combined)) push_display("RADAR: " + note[:40]) if TELEGRAM_TOKEN: try: send_telegram_message(f"⚠️ Phát hiện có người: {note}") except Exception as e: logger.error("Telegram send failed: %s", e) return jsonify({"greeting": combined}) @app.route("/display", methods=["GET"]) def api_display(): return jsonify({"lines": DISPLAY_LINES, "conv_len": len(CONV)}) # ====== Web UI (simple) ====== @app.route("/") def index(): return render_template_string("

🤖 KC Robot AI V4.1 - Cloud Brain Running

Bilingual & Smart Connected to ESP32

") # ====== Telegram ====== def send_telegram_message(text: str): if not TELEGRAM_TOKEN: return chat_id = os.getenv("TELEGRAM_CHATID", "") url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" requests.post(url, json={"chat_id": chat_id, "text": text}, timeout=10) def telegram_poll_loop(): if not TELEGRAM_TOKEN: return logger.info("Starting Telegram poller...") offset = None base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" while True: try: params = {"timeout": 30} if offset: params["offset"] = offset r = requests.get(base + "/getUpdates", params=params, timeout=35) j = r.json() for u in j.get("result", []): offset = u["update_id"] + 1 msg = u.get("message", {}) text = msg.get("text", "") chat_id = msg.get("chat", {}).get("id") if text.lower().startswith("/ask "): q = text[5:].strip() ans = hf_text_generate(q) requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}) elif text.lower().startswith("/say "): t = text[5:].strip() mp3 = hf_tts_get_mp3(t) files = {"audio": ("robot.mp3", mp3, "audio/mpeg")} requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}) except Exception as e: logger.error("TG poll error: %s", e) time.sleep(3) def start_background(): if TELEGRAM_TOKEN: threading.Thread(target=telegram_poll_loop, daemon=True).start() @app.before_first_request def _startup(): start_background() if __name__ == "__main__": start_background() logger.info(f"🚀 KC Robot AI V4.1 running on port {PORT}") print("Xin chào chủ nhân! Em là KC Robot — rất vui được gặp bạn.\nHello master! I’m KC Robot, your smart assistant.") app.run(host="0.0.0.0", port=PORT)