|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gc |
|
|
import io |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import random |
|
|
import shutil |
|
|
import subprocess |
|
|
import sys |
|
|
import tempfile |
|
|
import time |
|
|
import traceback |
|
|
import warnings |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
|
|
|
import torch |
|
|
import yaml |
|
|
from einops import rearrange |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
logging.getLogger("huggingface_hub").setLevel(logging.ERROR) |
|
|
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') |
|
|
|
|
|
|
|
|
DEPS_DIR = Path("/data") |
|
|
LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video" |
|
|
BASE_CONFIG_PATH = LTX_VIDEO_REPO_DIR / "configs" |
|
|
DEFAULT_CONFIG_FILE = BASE_CONFIG_PATH / "ltxv-13b-0.9.8-distilled-fp8.yaml" |
|
|
LTX_REPO_ID = "Lightricks/LTX-Video" |
|
|
RESULTS_DIR = Path("/app/output") |
|
|
DEFAULT_FPS = 24.0 |
|
|
FRAMES_ALIGNMENT = 8 |
|
|
|
|
|
def add_deps_to_path(): |
|
|
repo_path = str(LTX_VIDEO_REPO_DIR.resolve()) |
|
|
if str(LTX_VIDEO_REPO_DIR.resolve()) not in sys.path: |
|
|
sys.path.insert(0, repo_path) |
|
|
print(f"[DEBUG] Repo adicionado ao sys.path: {repo_path}") |
|
|
add_deps_to_path() |
|
|
|
|
|
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXMultiScalePipeline |
|
|
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy |
|
|
from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents |
|
|
from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent |
|
|
from api.ltx.inference import ( |
|
|
create_ltx_video_pipeline, |
|
|
create_latent_upsampler, |
|
|
load_image_to_tensor_with_resize_and_crop, |
|
|
seed_everething, |
|
|
) |
|
|
|
|
|
from api.gpu_manager import gpu_manager |
|
|
from managers.vae_manager import vae_manager_singleton |
|
|
from tools.video_encode_tool import video_encode_tool_singleton |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def seed_everything(seed: int): |
|
|
"""Sets the seed for reproducibility across all relevant libraries.""" |
|
|
random.seed(seed) |
|
|
os.environ['PYTHONHASHSEED'] = str(seed) |
|
|
np.random.seed(seed) |
|
|
torch.manual_seed(seed) |
|
|
torch.cuda.manual_seed_all(seed) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]: |
|
|
"""Calculates symmetric padding values to reach a target dimension.""" |
|
|
pad_h = target_h - orig_h |
|
|
pad_w = target_w - orig_w |
|
|
pad_top = pad_h // 2 |
|
|
pad_bottom = pad_h - pad_top |
|
|
pad_left = pad_w // 2 |
|
|
pad_right = pad_w - pad_left |
|
|
return (pad_left, pad_right, pad_top, pad_bottom) |
|
|
|
|
|
def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"): |
|
|
"""Logs detailed information about a PyTorch tensor for debugging.""" |
|
|
if not isinstance(tensor, torch.Tensor): |
|
|
logging.debug(f"'{name}' is not a tensor.") |
|
|
return |
|
|
|
|
|
info_str = ( |
|
|
f"--- Tensor: {name} ---\n" |
|
|
f" - Shape: {tuple(tensor.shape)}\n" |
|
|
f" - Dtype: {tensor.dtype}\n" |
|
|
f" - Device: {tensor.device}\n" |
|
|
) |
|
|
if tensor.numel() > 0: |
|
|
try: |
|
|
info_str += ( |
|
|
f" - Min: {tensor.min().item():.4f} | " |
|
|
f"Max: {tensor.max().item():.4f} | " |
|
|
f"Mean: {tensor.mean().item():.4f}\n" |
|
|
) |
|
|
except Exception: |
|
|
pass |
|
|
logging.debug(info_str + "----------------------") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoService: |
|
|
""" |
|
|
Backend service for orchestrating video generation using the LTX-Video pipeline. |
|
|
Encapsulates model loading, state management, and the logic for multi-stage |
|
|
video generation (low-resolution, upscale). |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
t0 = time.perf_counter() |
|
|
print("[DEBUG] Inicializando VideoService...") |
|
|
|
|
|
|
|
|
|
|
|
target_device = gpu_manager.get_ltx_device() |
|
|
print(f"[DEBUG] LTX foi alocado para o dispositivo: {target_device}") |
|
|
|
|
|
|
|
|
self.config = self._load_config() |
|
|
self.pipeline, self.latent_upsampler = self._load_models() |
|
|
|
|
|
|
|
|
self.move_to_device(target_device) |
|
|
|
|
|
|
|
|
self._apply_precision_policy() |
|
|
vae_manager_singleton.attach_pipeline( |
|
|
self.pipeline, |
|
|
device=self.device, |
|
|
autocast_dtype=self.runtime_autocast_dtype |
|
|
) |
|
|
self._tmp_dirs = set() |
|
|
print(f"[DEBUG] VideoService pronto. boot_time={time.perf_counter()-t0:.3f}s") |
|
|
|
|
|
|
|
|
def move_to_device(self, device): |
|
|
"""Move os modelos do pipeline para o dispositivo especificado.""" |
|
|
print(f"[LTX] Movendo modelos para {device}...") |
|
|
self.device = torch.device(device) |
|
|
self.pipeline.to(self.device) |
|
|
if self.latent_upsampler: |
|
|
self.latent_upsampler.to(self.device) |
|
|
print(f"[LTX] Modelos agora estão em {self.device}.") |
|
|
|
|
|
def move_to_cpu(self): |
|
|
"""Move os modelos para a CPU para liberar VRAM.""" |
|
|
self.move_to_device(torch.device("cpu")) |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_config(self): |
|
|
base = LTX_VIDEO_REPO_DIR / "configs" |
|
|
config_path = base / "ltxv-13b-0.9.8-distilled-fp8.yaml" |
|
|
with open(config_path, "r") as file: |
|
|
return yaml.safe_load(file) |
|
|
|
|
|
def finalize(self, keep_paths=None, extra_paths=None, clear_gpu=True): |
|
|
print("[DEBUG] Finalize: iniciando limpeza...") |
|
|
keep = set(keep_paths or []); extras = set(extra_paths or []) |
|
|
gc.collect() |
|
|
try: |
|
|
if clear_gpu and torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
try: |
|
|
torch.cuda.ipc_collect() |
|
|
except Exception: |
|
|
pass |
|
|
except Exception as e: |
|
|
print(f"[DEBUG] Finalize: limpeza GPU falhou: {e}") |
|
|
|
|
|
def _load_models(self): |
|
|
t0 = time.perf_counter() |
|
|
LTX_REPO = "Lightricks/LTX-Video" |
|
|
print("[DEBUG] Baixando checkpoint principal...") |
|
|
distilled_model_path = hf_hub_download( |
|
|
repo_id=LTX_REPO, |
|
|
filename=self.config["checkpoint_path"], |
|
|
local_dir=os.getenv("HF_HOME"), |
|
|
cache_dir=os.getenv("HF_HOME_CACHE"), |
|
|
token=os.getenv("HF_TOKEN"), |
|
|
) |
|
|
self.config["checkpoint_path"] = distilled_model_path |
|
|
print(f"[DEBUG] Checkpoint em: {distilled_model_path}") |
|
|
|
|
|
print("[DEBUG] Baixando upscaler espacial...") |
|
|
spatial_upscaler_path = hf_hub_download( |
|
|
repo_id=LTX_REPO, |
|
|
filename=self.config["spatial_upscaler_model_path"], |
|
|
local_dir=os.getenv("HF_HOME"), |
|
|
cache_dir=os.getenv("HF_HOME_CACHE"), |
|
|
token=os.getenv("HF_TOKEN") |
|
|
) |
|
|
self.config["spatial_upscaler_model_path"] = spatial_upscaler_path |
|
|
print(f"[DEBUG] Upscaler em: {spatial_upscaler_path}") |
|
|
|
|
|
print("[DEBUG] Construindo pipeline...") |
|
|
pipeline = create_ltx_video_pipeline( |
|
|
ckpt_path=self.config["checkpoint_path"], |
|
|
precision=self.config["precision"], |
|
|
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"], |
|
|
sampler=self.config["sampler"], |
|
|
device="cpu", |
|
|
enhance_prompt=False, |
|
|
prompt_enhancer_image_caption_model_name_or_path=self.config["prompt_enhancer_image_caption_model_name_or_path"], |
|
|
prompt_enhancer_llm_model_name_or_path=self.config["prompt_enhancer_llm_model_name_or_path"], |
|
|
) |
|
|
print("[DEBUG] Pipeline pronto.") |
|
|
|
|
|
latent_upsampler = None |
|
|
if self.config.get("spatial_upscaler_model_path"): |
|
|
print("[DEBUG] Construindo latent_upsampler...") |
|
|
latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu") |
|
|
print("[DEBUG] Upsampler pronto.") |
|
|
print(f"[DEBUG] _load_models() tempo total={time.perf_counter()-t0:.3f}s") |
|
|
return pipeline, latent_upsampler |
|
|
|
|
|
def _apply_precision_policy(self): |
|
|
prec = str(self.config.get("precision", "")).lower() |
|
|
self.runtime_autocast_dtype = torch.float32 |
|
|
if prec in ["float8_e4m3fn", "bfloat16"]: |
|
|
self.runtime_autocast_dtype = torch.bfloat16 |
|
|
elif prec == "mixed_precision": |
|
|
self.runtime_autocast_dtype = torch.float16 |
|
|
|
|
|
def _register_tmp_dir(self, d: str): |
|
|
if d and os.path.isdir(d): |
|
|
self._tmp_dirs.add(d); print(f"[DEBUG] Registrado tmp dir: {d}") |
|
|
|
|
|
@torch.no_grad() |
|
|
def _upsample_latents_internal(self, latents: torch.Tensor) -> torch.Tensor: |
|
|
try: |
|
|
if not self.latent_upsampler: |
|
|
raise ValueError("Latent Upsampler não está carregado.") |
|
|
latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True) |
|
|
upsampled_latents = self.latent_upsampler(latents_unnormalized) |
|
|
return normalize_latents(upsampled_latents, self.pipeline.vae, vae_per_channel_normalize=True) |
|
|
except Exception as e: |
|
|
pass |
|
|
finally: |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.ipc_collect() |
|
|
self.finalize(keep_paths=[]) |
|
|
|
|
|
def _prepare_conditioning_tensor(self, filepath, height, width, padding_values): |
|
|
tensor = load_image_to_tensor_with_resize_and_crop(filepath, height, width) |
|
|
tensor = torch.nn.functional.pad(tensor, padding_values) |
|
|
log_tensor_info(tensor, f"_prepare_conditioning_tensor") |
|
|
return tensor.to(self.device, dtype=self.runtime_autocast_dtype) |
|
|
|
|
|
|
|
|
def _save_and_log_video(self, pixel_tensor, base_filename, fps, temp_dir, results_dir, used_seed, progress_callback=None): |
|
|
output_path = os.path.join(temp_dir, f"{base_filename}_.mp4") |
|
|
video_encode_tool_singleton.save_video_from_tensor( |
|
|
pixel_tensor, output_path, fps=fps, progress_callback=progress_callback |
|
|
) |
|
|
final_path = os.path.join(results_dir, f"{base_filename}_.mp4") |
|
|
shutil.move(output_path, final_path) |
|
|
print(f"[DEBUG] Vídeo salvo em: {final_path}") |
|
|
return final_path |
|
|
|
|
|
def _load_tensor(self, caminho): |
|
|
|
|
|
if isinstance(caminho, torch.Tensor): |
|
|
return caminho |
|
|
|
|
|
if isinstance(caminho, (bytes, bytearray)): |
|
|
return torch.load(io.BytesIO(caminho)) |
|
|
|
|
|
return torch.load(caminho |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_narrative_low(self, prompt: str, **kwargs) -> Tuple[Optional[str], Optional[str], Optional[int]]: |
|
|
""" |
|
|
[ORCHESTRATOR] Generates a video from a multi-line prompt, creating a sequence of scenes. |
|
|
|
|
|
Returns: |
|
|
A tuple of (video_path, latents_path, used_seed). |
|
|
""" |
|
|
logging.info("Starting narrative low-res generation...") |
|
|
used_seed = self._resolve_seed(kwargs.get("seed")) |
|
|
seed_everything(used_seed) |
|
|
|
|
|
prompt_list = [p.strip() for p in prompt.splitlines() if p.strip()] |
|
|
if not prompt_list: |
|
|
raise ValueError("Prompt is empty or contains no valid lines.") |
|
|
|
|
|
num_chunks = len(prompt_list) |
|
|
total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0)) |
|
|
frames_per_chunk = (total_frames // num_chunks // FRAMES_ALIGNMENT) * FRAMES_ALIGNMENT |
|
|
overlap_frames = self.config.get("overlap_frames", 8) |
|
|
|
|
|
all_latents_paths = [] |
|
|
overlap_condition_item = None |
|
|
|
|
|
try: |
|
|
for i, chunk_prompt in enumerate(prompt_list): |
|
|
logging.info(f"Generating narrative chunk {i+1}/{num_chunks}: '{chunk_prompt[:50]}...'") |
|
|
|
|
|
current_frames = frames_per_chunk |
|
|
if i > 0: |
|
|
current_frames += overlap_frames |
|
|
|
|
|
|
|
|
current_conditions = kwargs.get("initial_conditions", []) if i == 0 else [] |
|
|
if overlap_condition_item: |
|
|
current_conditions.append(overlap_condition_item) |
|
|
|
|
|
chunk_latents = self._generate_single_chunk_low( |
|
|
prompt=chunk_prompt, |
|
|
num_frames=current_frames, |
|
|
seed=used_seed + i, |
|
|
conditioning_items=current_conditions, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
if chunk_latents is None: |
|
|
raise RuntimeError(f"Failed to generate latents for chunk {i+1}.") |
|
|
|
|
|
|
|
|
if i < num_chunks - 1: |
|
|
overlap_latents = chunk_latents[:, :, -overlap_frames:, :, :].clone() |
|
|
log_tensor_info(overlap_latents, f"Overlap Latents from chunk {i+1}") |
|
|
overlap_condition_item = ConditioningItem( |
|
|
media_item=overlap_latents, media_frame_number=0, conditioning_strength=1.0 |
|
|
) |
|
|
|
|
|
|
|
|
if i > 0: |
|
|
chunk_latents = chunk_latents[:, :, overlap_frames:, :, :] |
|
|
|
|
|
|
|
|
chunk_path = RESULTS_DIR / f"chunk_{i}_{used_seed}.pt" |
|
|
torch.save(chunk_latents.cpu(), chunk_path) |
|
|
all_latents_paths.append(chunk_path) |
|
|
|
|
|
|
|
|
return self._finalize_generation(all_latents_paths, "narrative_video", used_seed) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error during narrative generation: {e}") |
|
|
traceback.print_exc() |
|
|
return None, None, None |
|
|
finally: |
|
|
|
|
|
for path in all_latents_paths: |
|
|
if os.path.exists(path): |
|
|
os.remove(path) |
|
|
self.finalize() |
|
|
|
|
|
|
|
|
def generate_single_low(self, **kwargs) -> Tuple[Optional[str], Optional[str], Optional[int]]: |
|
|
""" |
|
|
[ORCHESTRATOR] Generates a video from a single prompt in one go. |
|
|
|
|
|
Returns: |
|
|
A tuple of (video_path, latents_path, used_seed). |
|
|
""" |
|
|
logging.info("Starting single-prompt low-res generation...") |
|
|
used_seed = self._resolve_seed(kwargs.get("seed")) |
|
|
seed_everything(used_seed) |
|
|
|
|
|
try: |
|
|
total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0), min_frames=9) |
|
|
|
|
|
final_latents = self._generate_single_chunk_low( |
|
|
num_frames=total_frames, |
|
|
seed=used_seed, |
|
|
conditioning_items=kwargs.get("initial_conditions", []), |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
if final_latents is None: |
|
|
raise RuntimeError("Failed to generate latents.") |
|
|
|
|
|
|
|
|
latents_path = RESULTS_DIR / f"single_{used_seed}.pt" |
|
|
torch.save(final_latents.cpu(), latents_path) |
|
|
return self._finalize_generation([latents_path], "single_video", used_seed) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error during single generation: {e}") |
|
|
traceback.print_exc() |
|
|
return None, None, None |
|
|
finally: |
|
|
self.finalize() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _generate_single_chunk_low( |
|
|
self, prompt: str, negative_prompt: str, height: int, width: int, num_frames: int, seed: int, |
|
|
conditioning_items: List[ConditioningItem], ltx_configs_override: Optional[Dict], **kwargs |
|
|
) -> Optional[torch.Tensor]: |
|
|
""" |
|
|
[WORKER] Generates a single chunk of latents. This is the core generation unit. |
|
|
Returns the raw latents tensor on the target device, or None on failure. |
|
|
""" |
|
|
height_padded, width_padded = (self._align(d) for d in (height, width)) |
|
|
downscale_factor = self.config.get("downscale_factor", 0.6666666) |
|
|
vae_scale_factor = self.pipeline.vae_scale_factor |
|
|
|
|
|
downscaled_height = self._align(int(height_padded * downscale_factor), vae_scale_factor) |
|
|
downscaled_width = self._align(int(width_padded * downscale_factor), vae_scale_factor) |
|
|
|
|
|
first_pass_config = self.config.get("first_pass", {}).copy() |
|
|
if ltx_configs_override: |
|
|
first_pass_config.update(self._prepare_guidance_overrides(ltx_configs_override)) |
|
|
|
|
|
pipeline_kwargs = { |
|
|
"prompt": prompt, |
|
|
"negative_prompt": negative_prompt, |
|
|
"height": downscaled_height, |
|
|
"width": downscaled_width, |
|
|
"num_frames": num_frames, |
|
|
"frame_rate": DEFAULT_FPS, |
|
|
"generator": torch.Generator(device=self.device).manual_seed(seed), |
|
|
"output_type": "latent", |
|
|
"conditioning_items": conditioning_items, |
|
|
**first_pass_config |
|
|
} |
|
|
|
|
|
logging.debug(f"Pipeline call args: { {k: v for k, v in pipeline_kwargs.items() if k != 'conditioning_items'} }") |
|
|
|
|
|
with torch.autocast(device_type=self.device.type, dtype=self.runtime_autocast_dtype, enabled=self.device.type == 'cuda'): |
|
|
latents_raw = self.pipeline(**pipeline_kwargs).images |
|
|
|
|
|
log_tensor_info(latents_raw, f"Raw Latents for '{prompt[:40]}...'") |
|
|
return latents_raw |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _finalize_generation(self, latents_paths: List[Path], base_filename: str, seed: int) -> Tuple[str, str, int]: |
|
|
""" |
|
|
Loads latents from paths, concatenates them, decodes to video, and saves both. |
|
|
""" |
|
|
logging.info("Finalizing generation: decoding latents to video.") |
|
|
|
|
|
all_tensors_cpu = [torch.load(p) for p in latents_paths] |
|
|
final_latents_cpu = torch.cat(all_tensors_cpu, dim=2) |
|
|
|
|
|
|
|
|
final_latents_path = RESULTS_DIR / f"latents_{base_filename}_{seed}.pt" |
|
|
torch.save(final_latents_cpu, final_latents_path) |
|
|
logging.info(f"Final latents saved to: {final_latents_path}") |
|
|
|
|
|
|
|
|
final_latents_gpu = final_latents_cpu.to(self.device) |
|
|
log_tensor_info(final_latents_gpu, "Final Concatenated Latents") |
|
|
|
|
|
with torch.autocast(device_type=self.device.type, dtype=self.runtime_autocast_dtype, enabled=self.device.type == 'cuda'): |
|
|
pixel_tensor = vae_manager_singleton.decode( |
|
|
final_latents_gpu, |
|
|
decode_timestep=float(self.config.get("decode_timestep", 0.05)) |
|
|
) |
|
|
|
|
|
video_path = self._save_and_log_video(pixel_tensor, f"{base_filename}_{seed}") |
|
|
return str(video_path), str(final_latents_path), seed |
|
|
|
|
|
def prepare_condition_items(self, items_list: List, height: int, width: int, num_frames: int) -> List[ConditioningItem]: |
|
|
"""Prepares a list of ConditioningItem objects from file paths or tensors.""" |
|
|
if not items_list: |
|
|
return [] |
|
|
|
|
|
height_padded, width_padded = self._align(height), self._align(width) |
|
|
padding_values = calculate_padding(height, width, height_padded, width_padded) |
|
|
|
|
|
conditioning_items = [] |
|
|
for media, frame, weight in items_list: |
|
|
tensor = self._prepare_conditioning_tensor(media, height, width, padding_values) |
|
|
safe_frame = max(0, min(int(frame), num_frames - 1)) |
|
|
conditioning_items.append(ConditioningItem(tensor, safe_frame, float(weight))) |
|
|
return conditioning_items |
|
|
|
|
|
def _prepare_conditioning_tensor(self, media_path: str, height: int, width: int, padding: Tuple) -> torch.Tensor: |
|
|
"""Loads and processes an image to be a conditioning tensor.""" |
|
|
tensor = load_image_to_tensor_with_resize_and_crop(media_path, height, width) |
|
|
tensor = torch.nn.functional.pad(tensor, padding) |
|
|
log_tensor_info(tensor, f"Prepared Conditioning Tensor from {media_path}") |
|
|
return tensor.to(self.device, dtype=self.runtime_autocast_dtype) |
|
|
|
|
|
def _prepare_guidance_overrides(self, ltx_configs: Dict) -> Dict: |
|
|
"""Parses UI presets for guidance into pipeline-compatible arguments.""" |
|
|
overrides = {} |
|
|
preset = ltx_configs.get("guidance_preset", "Padrão (Recomendado)") |
|
|
|
|
|
|
|
|
if preset == "Agressivo": |
|
|
overrides["guidance_scale"] = [1, 2, 8, 12, 8, 2, 1] |
|
|
overrides["stg_scale"] = [0, 0, 5, 6, 5, 3, 2] |
|
|
elif preset == "Suave": |
|
|
overrides["guidance_scale"] = [1, 1, 4, 5, 4, 1, 1] |
|
|
overrides["stg_scale"] = [0, 0, 2, 2, 2, 1, 0] |
|
|
elif preset == "Customizado": |
|
|
try: |
|
|
overrides["guidance_scale"] = json.loads(ltx_configs["guidance_scale_list"]) |
|
|
overrides["stg_scale"] = json.loads(ltx_configs["stg_scale_list"]) |
|
|
except (json.JSONDecodeError, KeyError) as e: |
|
|
logging.warning(f"Failed to parse custom guidance values: {e}. Falling back to defaults.") |
|
|
|
|
|
if overrides: |
|
|
logging.info(f"Applying '{preset}' guidance preset overrides.") |
|
|
return overrides |
|
|
|
|
|
def _save_and_log_video(self, pixel_tensor: torch.Tensor, base_filename: str) -> Path: |
|
|
"""Saves a pixel tensor to an MP4 file and returns the final path.""" |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
temp_path = os.path.join(temp_dir, f"{base_filename}.mp4") |
|
|
video_encode_tool_singleton.save_video_from_tensor( |
|
|
pixel_tensor, temp_path, fps=DEFAULT_FPS |
|
|
) |
|
|
final_path = RESULTS_DIR / f"{base_filename}.mp4" |
|
|
shutil.move(temp_path, final_path) |
|
|
logging.info(f"Video saved successfully to: {final_path}") |
|
|
return final_path |
|
|
|
|
|
def _apply_precision_policy(self): |
|
|
"""Sets the autocast dtype based on the configuration file.""" |
|
|
precision = str(self.config.get("precision", "bfloat16")).lower() |
|
|
if precision in ["float8_e4m3fn", "bfloat16"]: |
|
|
self.runtime_autocast_dtype = torch.bfloat16 |
|
|
elif precision == "mixed_precision": |
|
|
self.runtime_autocast_dtype = torch.float16 |
|
|
else: |
|
|
self.runtime_autocast_dtype = torch.float32 |
|
|
logging.info(f"Runtime precision policy set for autocast: {self.runtime_autocast_dtype}") |
|
|
|
|
|
def _align(self, dim: int, alignment: int = FRAMES_ALIGNMENT) -> int: |
|
|
"""Aligns a dimension to the nearest multiple of `alignment`.""" |
|
|
return ((dim - 1) // alignment + 1) * alignment |
|
|
|
|
|
def _calculate_aligned_frames(self, duration_s: float, min_frames: int = 1) -> int: |
|
|
"""Calculates the total number of frames based on duration, ensuring alignment.""" |
|
|
num_frames = int(round(duration_s * DEFAULT_FPS)) |
|
|
aligned_frames = self._align(num_frames) |
|
|
|
|
|
final_frames = max(aligned_frames + 1, min_frames) |
|
|
return final_frames |
|
|
|
|
|
def _resolve_seed(self, seed: Optional[int]) -> int: |
|
|
"""Returns the given seed or generates a new random one.""" |
|
|
return random.randint(0, 2**32 - 1) if seed is None else int(seed) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
video_generation_service = VideoService() |
|
|
logging.info("Global VideoService instance created successfully.") |
|
|
except Exception as e: |
|
|
logging.critical(f"Failed to initialize VideoService: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
sys.exit(1) |