Spaces:
Paused
Paused
| # aduc_framework/tools/video_encode_tool.py | |
| # | |
| # Versão 1.4.0 (Conjunto de Ferramentas de Vídeo Finalizado) | |
| # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos | |
| # | |
| # Este módulo atua como o especialista central para todas as operações de | |
| # manipulação e codificação de vídeo. Ele abstrai as interações com | |
| # FFmpeg e imageio, fornecendo uma API limpa e robusta para o resto do framework. | |
| # - save_video_from_tensor: Converte um tensor de pixel em um arquivo .mp4. | |
| # - extract_..._frame: Extrai frames específicos de clipes de vídeo. | |
| # - concatenate_videos: Monta o filme final a partir dos clipes de cena. | |
| import os | |
| import subprocess | |
| import logging | |
| import random | |
| import time | |
| import shutil | |
| from typing import List, Optional, Tuple | |
| import imageio | |
| import numpy as np | |
| import torch | |
| logger = logging.getLogger(__name__) | |
| class VideoToolError(Exception): | |
| """Exceção personalizada para erros originados do VideoEncodeTool.""" | |
| pass | |
| class VideoEncodeTool: | |
| def __init__(self, frame_log_every=8): | |
| self.frame_log_every = frame_log_every | |
| """ | |
| Um especialista para lidar com tarefas de codificação e manipulação de vídeo. | |
| """ | |
| def save_video_from_tensor(self, pixel_5d: torch.Tensor, path: str, fps: int = 24, progress_callback=None): | |
| """ | |
| Espera pixel_5d em [0,1], shape (B,C,T,H,W). | |
| Escreve MP4 incremental, convertendo cada frame para (H,W,C) uint8. | |
| """ | |
| # Move para CPU apenas para formar os frames HWC uint8 com baixo overhead | |
| device = "cuda" if pixel_5d.is_cuda else "cpu" | |
| B, C, T, H, W = pixel_5d.shape | |
| if B != 1: | |
| # Mantemos simples: um vídeo por chamada (B=1) | |
| raise ValueError(f"Esperado B=1, recebido B={B}") | |
| with imageio.get_writer(path, fps=int(fps), codec="libx264", quality=8) as writer: | |
| for i in range(T): | |
| frame_chw = pixel_5d[0, :, i] # (C,H,W) | |
| frame_hwc_u8 = (frame_chw.permute(1, 2, 0) | |
| .clamp(0, 1) | |
| .mul(255) | |
| .to(torch.uint8) | |
| .cpu() | |
| .numpy()) | |
| writer.append_data(frame_hwc_u8) | |
| if progress_callback: | |
| progress_callback(i + 1, T) | |
| if i % self.frame_log_every == 0: | |
| print(f"[DEBUG] [Encoder] frame {i}/{T} gravado ({H}x{W}@{fps}fps)") | |
| def extract_first_frame(self, video_path: str, output_image_path: str) -> str: | |
| """ | |
| Extrai o primeiro frame de um arquivo de vídeo e o salva como uma imagem. | |
| """ | |
| logger.info(f"Extraindo primeiro frame de '{os.path.basename(video_path)}'...") | |
| cmd = ['ffmpeg', '-y', '-v', 'error', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| return output_image_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"FFmpeg (extract_first_frame) falhou: {e.stderr}") | |
| raise VideoToolError(f"Falha ao extrair o primeiro frame de {video_path}") | |
| def extract_last_frame(self, video_path: str, output_image_path: str) -> str: | |
| """ | |
| Extrai o último frame de um arquivo de vídeo e o salva como uma imagem. | |
| """ | |
| logger.info(f"Extraindo último frame de '{os.path.basename(video_path)}'...") | |
| cmd = ['ffmpeg', '-y', '-v', 'error', '-sseof', '-0.1', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| return output_image_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"FFmpeg (extract_last_frame) falhou: {e.stderr}") | |
| raise VideoToolError(f"Falha ao extrair o último frame de {video_path}") | |
| def create_transition_bridge(self, start_image_path: str, end_image_path: str, | |
| duration: float, fps: int, target_resolution: Tuple[int, int], | |
| workspace_dir: str, effect: Optional[str] = None) -> str: | |
| """ | |
| Cria um clipe de vídeo curto que transiciona entre duas imagens estáticas. | |
| """ | |
| output_path = os.path.join(workspace_dir, f"bridge_{int(time.time())}_{random.randint(100, 999)}.mp4") | |
| width, height = target_resolution | |
| fade_effects = ["fade", "wipeleft", "wiperight", "wipeup", "wipedown", "dissolve", "fadeblack", "fadewhite", "radial", "rectcrop", "circleopen", "circleclose", "horzopen", "horzclose"] | |
| selected_effect = effect if effect and effect.strip() else random.choice(fade_effects) | |
| transition_duration = max(0.1, duration) | |
| cmd = (f"ffmpeg -y -v error -loop 1 -t {transition_duration} -i \"{start_image_path}\" -loop 1 -t {transition_duration} -i \"{end_image_path}\" " | |
| f"-filter_complex \"[0:v]scale={width}:{height},setsar=1[v0];[1:v]scale={width}:{height},setsar=1[v1];" | |
| f"[v0][v1]xfade=transition={selected_effect}:duration={transition_duration}:offset=0[out]\" " | |
| f"-map \"[out]\" -c:v libx264 -r {fps} -pix_fmt yuv420p \"{output_path}\"") | |
| logger.info(f"Criando ponte de transição com efeito '{selected_effect}'...") | |
| try: | |
| subprocess.run(cmd, shell=True, check=True, text=True) | |
| except subprocess.CalledProcessError as e: | |
| raise VideoToolError(f"Falha ao criar vídeo de transição: {e.stderr}") | |
| return output_path | |
| def concatenate_videos(self, video_paths: List[str], output_path: str, workspace_dir: str, start:int= 0, overlap:int=3) -> str: | |
| """ | |
| Concatena múltiplos vídeos MP4, removendo exatamente o último frame | |
| de cada vídeo (exceto o último), salvando os cortes e recriando a lista a cada execução. | |
| """ | |
| if not video_paths: | |
| raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") | |
| # Se houver apenas um clipe, apenas copia | |
| if len(video_paths) == 1: | |
| shutil.copy(video_paths[0], output_path) | |
| print(f"[Concat] Apenas um clipe fornecido. Copiado para '{output_path}'.") | |
| return output_path | |
| # Diretório fixo para guardar os recortes | |
| trimmed_dir = os.path.join(workspace_dir, "trimmed_parts") | |
| os.makedirs(trimmed_dir, exist_ok=True) | |
| # Remove possíveis restos de execuções anteriores | |
| for old in os.listdir(trimmed_dir): | |
| try: | |
| os.remove(os.path.join(trimmed_dir, old)) | |
| except Exception: | |
| pass | |
| processed_videos = [] | |
| try: | |
| for i, base in enumerate(video_paths): | |
| abs_base = os.path.abspath(base) | |
| base_name = os.path.basename(abs_base) | |
| video_podado = os.path.join(trimmed_dir, f"cut_{i}_{base_name}") | |
| # Conta frames via ffprobe | |
| probe_cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-select_streams", "v:0", | |
| "-count_frames", | |
| "-show_entries", "stream=nb_read_frames", | |
| "-of", "default=nokey=1:noprint_wrappers=1", | |
| abs_base | |
| ] | |
| result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) | |
| total_frames = int(result.stdout.strip()) | |
| print(f"[Trim] {base_name} → total_frames={total_frames}") | |
| # Calcula range do trim | |
| start_frame = start | |
| end_frame = total_frames if i == len(video_paths) - 1 else total_frames - overlap | |
| if i < len(video_paths) - 1: | |
| # Corta com trim frame-a-frame | |
| cmd_fim = ( | |
| f'ffmpeg -y -hide_banner -loglevel error -i "{abs_base}" ' | |
| f'-vf "trim=start_frame={start_frame}:end_frame={end_frame},setpts=PTS-STARTPTS" ' | |
| f'-an "{video_podado}"' | |
| ) | |
| print(f"[CmdTrim] {cmd_fim}") | |
| subprocess.run(cmd_fim, shell=True, check=True) | |
| print(f"[TrimOK] {base_name}: corte {end_frame}/{total_frames} frames → {os.path.basename(video_podado)}") | |
| processed_videos.append(video_podado) | |
| else: | |
| processed_videos.append(abs_base) | |
| print(f"[Keep] Último vídeo sem corte: {base_name}") | |
| # Gera lista de concatenação do zero | |
| list_file_path = os.path.join(workspace_dir, "concat_list.txt") | |
| if os.path.exists(list_file_path): | |
| os.remove(list_file_path) | |
| with open(list_file_path, "w", encoding="utf-8") as f: | |
| for p in processed_videos: | |
| f.write(f"file '{os.path.abspath(p)}'\n") | |
| # Executa concatenação final | |
| cmd_concat = ( | |
| f'ffmpeg -y -hide_banner -loglevel error -f concat -safe 0 ' | |
| f'-i "{list_file_path}" -c copy "{output_path}"' | |
| ) | |
| print(f"[Concat] Executando concatenação final:\n{cmd_concat}") | |
| subprocess.run(cmd_concat, shell=True, check=True) | |
| print("[ConcatOK] Concatenação concluída com sucesso.") | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"[ConcatERR] Erro FFmpeg: {e}") | |
| raise VideoToolError("Falha durante concatenação de vídeos.") | |
| def concatenate_videos2(self, video_paths: List[str], output_path: str, workspace_dir: str) -> str: | |
| """ | |
| Concatena múltiplos clipes de vídeo em um único arquivo sem re-codificar. | |
| """ | |
| if not video_paths: | |
| raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") | |
| # Se houver apenas um clipe, apenas o copie para o destino final. | |
| if len(video_paths) == 1: | |
| shutil.copy(video_paths[0], output_path) | |
| logger.info(f"Apenas um clipe fornecido. Copiado para '{output_path}'.") | |
| return output_path | |
| list_file_path = os.path.join(workspace_dir, f"concat_list_{int(time.time())}.txt") | |
| try: | |
| with open(list_file_path, 'w', encoding='utf-8') as f: | |
| for path in video_paths: | |
| # Garante que o caminho seja absoluto para o ffmpeg encontrar | |
| f.write(f"file '{os.path.abspath(path)}'\n") | |
| cmd_list = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path] | |
| logger.info(f"Concatenando {len(video_paths)} clipes para '{os.path.basename(output_path)}'...") | |
| subprocess.run(cmd_list, check=True, capture_output=True, text=True) | |
| logger.info("Concatenação FFmpeg bem-sucedida.") | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Falha ao montar o vídeo final com FFmpeg: {e.stderr}") | |
| raise VideoToolError(f"Falha ao montar o vídeo final com FFmpeg.") | |
| finally: | |
| if os.path.exists(list_file_path): | |
| os.remove(list_file_path) | |
| # --- Instância Singleton --- | |
| video_encode_tool_singleton = VideoEncodeTool() |