File size: 22,468 Bytes
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb657cb
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
bb657cb
 
 
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ef6739
 
58f0729
7ef6739
58f0729
 
 
7ef6739
 
 
 
 
 
 
 
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ef6739
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58f0729
7ef6739
58f0729
 
 
 
 
7ef6739
58f0729
 
 
 
 
 
 
 
 
bb657cb
 
 
 
 
 
 
 
 
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb657cb
 
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ef6739
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7ef6739
58f0729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
#!/usr/bin/env python3

import os
import json
import tempfile
import torch
import warnings
from pathlib import Path
from transformers import AutoProcessor, AutoModelForImageTextToText
import subprocess
import logging
import argparse
from typing import List, Tuple, Dict

# Suppress warnings
os.environ["TOKENIZERS_PARALLELISM"] = "false"
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", message=".*torchvision.*")
warnings.filterwarnings("ignore", message=".*torchcodec.*")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_video_duration_seconds(video_path: str) -> float:
    """Use ffprobe to get video duration in seconds."""
    cmd = [
        "ffprobe",
        "-v", "quiet",
        "-print_format", "json",
        "-show_format",
        video_path
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    info = json.loads(result.stdout)
    return float(info["format"]["duration"])

class VideoHighlightDetector:
    def __init__(
        self,
        model_path: str,
        device: str = None,
        batch_size: int = 8
    ):
        # Auto-detect device if not specified
        if device is None:
            if torch.cuda.is_available():
                device = "cuda"
            elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
                device = "mps"
            else:
                device = "cpu"
        
        self.device = device
        self.batch_size = batch_size
        
        # Initialize model and processor
        self.processor = AutoProcessor.from_pretrained(model_path)
        self.model = AutoModelForImageTextToText.from_pretrained(
            model_path,
            torch_dtype=torch.bfloat16,
            # _attn_implementation="flash_attention_2"
        ).to(device)
        
        # Store model path for reference
        self.model_path = model_path
    
    def analyze_video_content(self, video_path: str) -> str:
        """Analyze video content to determine its type and description."""
        system_message = "You are a helpful assistant that can understand videos. Describe what type of video this is and what's happening in it."
        messages = [
            {
                "role": "system",
                "content": [{"type": "text", "text": system_message}]
            },
            {
                "role": "user",
                "content": [
                    {"type": "video", "path": video_path},
                    {"type": "text", "text": "What type of video is this and what's happening in it? Be specific about the content type and general activities you observe."}
                ]
            }
        ]
        
        inputs = self.processor.apply_chat_template(
            messages,
            add_generation_prompt=True,
            tokenize=True,
            return_dict=True,
            return_tensors="pt"
        ).to(self.device)
        
        outputs = self.model.generate(**inputs, max_new_tokens=512, do_sample=True, temperature=0.7)
        return self.processor.decode(outputs[0], skip_special_tokens=True).lower().split("assistant: ")[1]
    
    def determine_highlights(self, video_description: str, prompt_num: int = 1) -> str:
        """Determine what constitutes highlights based on video description with different prompts."""
        system_prompts = {
            1: "You are a highlight editor. List archetypal dramatic moments that would make compelling highlights if they appear in the video. Each moment should be specific enough to be recognizable but generic enough to potentially exist in other videos of this type.",
            2: "You are a helpful visual-language assistant that can understand videos and edit. You are tasked helping the user to create highlight reels for videos. Highlights should be rare and important events in the video in question."
        }
        user_prompts = {
            1: "List potential highlight moments to look for in this video:",
            2: "List dramatic moments that would make compelling highlights if they appear in the video. Each moment should be specific enough to be recognizable but generic enough to potentially exist in any video of this type:"
        }
        
        
        messages = [
            {
                "role": "system",
                "content": [{"type": "text", "text": system_prompts[prompt_num]}]
            },
            {
                "role": "user",
                "content": [{"type": "text", "text": f"""Here is a description of a video:\n\n{video_description}\n\n{user_prompts[prompt_num]}"""}]
            }
        ]
        
        print(f"Using prompt {prompt_num} for highlight detection")
        
        inputs = self.processor.apply_chat_template(
            messages,
            add_generation_prompt=True,
            tokenize=True,
            return_dict=True,
            return_tensors="pt"
        ).to(self.device)
        
        outputs = self.model.generate(**inputs, max_new_tokens=256, do_sample=True, temperature=0.7)
        response = self.processor.decode(outputs[0], skip_special_tokens=True)
        
        # Extract the actual response with better formatting
        if "Assistant: " in response:
            clean_response = response.split("Assistant: ")[1]
        elif "assistant: " in response.lower():
            clean_response = response.lower().split("assistant: ")[1]
        else:
            # If no assistant tag found, try to extract meaningful content
            parts = response.split("User:")
            if len(parts) > 1:
                clean_response = parts[-1].strip()
            else:
                clean_response = response
                
        return clean_response.strip()
    
    def process_segment(self, video_path: str, highlight_types: str) -> bool:
        """Process a video segment and determine if it contains highlights."""
        messages = [
            {
                "role": "system",
                "content": [{"type": "text", "text": "You are a STRICT video highlight analyzer. You must be very selective and only identify truly exceptional moments. Most segments should be rejected. Only select segments with high dramatic value, clear action, strong visual interest, or significant events. Be critical and selective."}]
            },
            {
                "role": "user",
                "content": [
                    {"type": "video", "path": video_path},
                    {"type": "text", "text": f"""Looking for these highlights:\n{highlight_types}\n\nDoes this video segment match ANY of these highlights?\n\nAnswer with ONE WORD ONLY:\nYES or NO\n\nNothing else. Just YES or NO."""}]
            }
        ]
        
        try:
            inputs = self.processor.apply_chat_template(
                messages,
                add_generation_prompt=True,
                tokenize=True,
                return_dict=True,
                return_tensors="pt"
            ).to(self.device)
            
            outputs = self.model.generate(
                **inputs, 
                max_new_tokens=8,  # Force very short responses
                do_sample=False,    # Use greedy decoding for consistency
                temperature=0.1     # Very low temperature for strict adherence
            )
            response = self.processor.decode(outputs[0], skip_special_tokens=True)
            
            # Extract assistant response
            if "Assistant:" in response:
                response = response.split("Assistant:")[-1].strip()
            elif "assistant:" in response:
                response = response.split("assistant:")[-1].strip()
            
            response = response.lower()
            print(f"   πŸ€– AI Response: {response}")
            
            # Simple yes/no detection - AI returns simple answers
            response_clean = response.strip().replace("'", "").replace("-", "").replace(".", "").strip()
            
            if response_clean.startswith("no"):
                return False
            elif response_clean.startswith("yes"):
                return True
            else:
                # Default to no if unclear
                return False
                
        except Exception as e:
            print(f"   ❌ Error processing segment: {str(e)}")
            return False
    
    def _concatenate_scenes(
        self,
        video_path: str,
        scene_times: list,
        output_path: str,
        with_effects: bool = True
    ):
        """Concatenate selected scenes into final video with optional effects."""
        if not scene_times:
            logger.warning("No scenes to concatenate, skipping.")
            return
            
        if with_effects:
            self._concatenate_with_effects(video_path, scene_times, output_path)
        else:
            self._concatenate_basic(video_path, scene_times, output_path)
    
    def _concatenate_basic(self, video_path: str, scene_times: list, output_path: str):
        """Basic concatenation without effects."""
        filter_complex_parts = []
        concat_inputs = []
        for i, (start_sec, end_sec) in enumerate(scene_times):
            filter_complex_parts.append(
                f"[0:v]trim=start={start_sec}:end={end_sec},"
                f"setpts=PTS-STARTPTS[v{i}];"
            )
            filter_complex_parts.append(
                f"[0:a]atrim=start={start_sec}:end={end_sec},"
                f"asetpts=PTS-STARTPTS[a{i}];"
            )
            concat_inputs.append(f"[v{i}][a{i}]")
        
        concat_filter = f"{''.join(concat_inputs)}concat=n={len(scene_times)}:v=1:a=1[outv][outa]"
        filter_complex = "".join(filter_complex_parts) + concat_filter
        
        cmd = [
            "ffmpeg",
            "-y",
            "-i", video_path,
            "-filter_complex", filter_complex,
            "-map", "[outv]",
            "-map", "[outa]",
            "-c:v", "libx264",
            "-c:a", "aac",
            output_path
        ]
        
        logger.info(f"Running ffmpeg command: {' '.join(cmd)}")
        subprocess.run(cmd, check=True, capture_output=True, text=True)
    
    def _concatenate_with_effects(self, video_path: str, scene_times: list, output_path: str):
        """Concatenate with fade effects between segments."""
        if len(scene_times) == 1:
            # Single segment - just extract with fade in/out
            start_sec, end_sec = scene_times[0]
            duration = end_sec - start_sec
            fade_duration = min(0.5, duration / 4)  # 0.5s or 25% of duration, whichever is shorter
            
            cmd = [
                "ffmpeg", "-y",
                "-i", video_path,
                "-ss", str(start_sec),
                "-t", str(duration),
                "-vf", f"fade=in:0:{int(fade_duration*30)},fade=out:{int((duration-fade_duration)*30)}:{int(fade_duration*30)}",
                "-af", f"afade=in:st=0:d={fade_duration},afade=out:st={duration-fade_duration}:d={fade_duration}",
                "-c:v", "libx264", "-c:a", "aac",
                output_path
            ]
        else:
            # Multiple segments - create with crossfade transitions  
            filter_parts = []
            audio_parts = []
            
            for i, (start_sec, end_sec) in enumerate(scene_times):
                duration = end_sec - start_sec
                fade_duration = min(0.3, duration / 6)  # Shorter fades for multiple segments
                
                # Video with fade
                filter_parts.append(
                    f"[0:v]trim=start={start_sec}:end={end_sec},setpts=PTS-STARTPTS,"
                    f"fade=in:0:{int(fade_duration*30)},fade=out:{int((duration-fade_duration)*30)}:{int(fade_duration*30)}[v{i}]"
                )
                
                # Audio with fade
                audio_parts.append(
                    f"[0:a]atrim=start={start_sec}:end={end_sec},asetpts=PTS-STARTPTS,"
                    f"afade=in:st=0:d={fade_duration},afade=out:st={duration-fade_duration}:d={fade_duration}[a{i}]"
                )
            
            # Concatenate all segments
            video_concat = "".join([f"[v{i}]" for i in range(len(scene_times))])
            audio_concat = "".join([f"[a{i}]" for i in range(len(scene_times))])
            
            filter_complex = (
                ";".join(filter_parts) + ";" +
                ";".join(audio_parts) + ";" +
                f"{video_concat}concat=n={len(scene_times)}:v=1:a=0[outv];" +
                f"{audio_concat}concat=n={len(scene_times)}:v=0:a=1[outa]"
            )
            
            cmd = [
                "ffmpeg", "-y",
                "-i", video_path,
                "-filter_complex", filter_complex,
                "-map", "[outv]", "-map", "[outa]",
                "-c:v", "libx264", "-c:a", "aac",
                output_path
            ]
        
        logger.info(f"Running ffmpeg command with effects: {' '.join(cmd)}")
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            logger.error(f"FFmpeg error: {result.stderr}")
            # Fall back to basic concatenation
            logger.info("Falling back to basic concatenation...")
            self._concatenate_basic(video_path, scene_times, output_path)

    def process_video(self, video_path: str, output_path: str, segment_length: float = 10.0, with_effects: bool = True) -> Dict:
        """Process video using exact HuggingFace approach."""
        print("πŸš€ Starting HuggingFace Exact Video Highlight Detection")
        print(f"πŸ“ Input: {video_path}")
        print(f"πŸ“ Output: {output_path}")
        print(f"⏱️ Segment Length: {segment_length}s")
        print(f"🎨 With Effects: {with_effects}")
        print()
        
        # Get video duration
        duration = get_video_duration_seconds(video_path)
        if duration <= 0:
            return {"error": "Could not determine video duration"}
        
        print(f"πŸ“Ή Video duration: {duration:.1f}s ({duration/60:.1f} minutes)")
        
        # Check if video is too short for meaningful highlights
        if duration < segment_length * 2:
            return {
                "error": f"Video too short ({duration:.1f}s). Need at least {segment_length * 2:.1f}s for meaningful highlights.",
                "video_description": "Video too short for analysis",
                "total_segments": 0,
                "selected_segments": 0
            }
        
        # Step 1: Analyze overall video content
        print("🎬 Step 1: Analyzing overall video content...")
        video_desc = self.analyze_video_content(video_path)
        print(f"πŸ“ Video Description: {video_desc}")
        print()
        
        # Step 2: Get two different sets of highlights
        print("🎯 Step 2: Determining highlight types (2 variations)...")
        highlights1 = self.determine_highlights(video_desc, prompt_num=1)
        highlights2 = self.determine_highlights(video_desc, prompt_num=2)
        
        print(f"🎯 Highlight Set 1: {highlights1}")
        print()
        print(f"🎯 Highlight Set 2: {highlights2}")
        print()
        
        # Step 3: Split video into segments
        temp_dir = os.path.join("/tmp", "temp_segments")
        os.makedirs(temp_dir, mode=0o755, exist_ok=True)
        
        kept_segments1 = []
        kept_segments2 = []
        segments_processed = 0
        total_segments = int(duration / segment_length)
        
        print(f"πŸ” Step 3: Processing {total_segments} segments of {segment_length}s each...")
        
        for start_time in range(0, int(duration), int(segment_length)):
            progress = int((segments_processed / total_segments) * 100) if total_segments > 0 else 0
            end_time = min(start_time + segment_length, duration)
            
            print(f"πŸ“Š Processing segment {segments_processed+1}/{total_segments} ({progress}%)")
            print(f"   ⏰ Time: {start_time}s - {end_time:.1f}s")
            
            # Create segment
            segment_path = f"{temp_dir}/segment_{start_time}.mp4"
            
            cmd = [
                "ffmpeg",
                "-y",
                "-v", "quiet",  # Suppress FFmpeg output
                "-i", video_path,
                "-ss", str(start_time),
                "-t", str(segment_length),
                "-c:v", "libx264",
                "-preset", "ultrafast",  # Use ultrafast preset for speed
                "-pix_fmt", "yuv420p",  # Ensure compatible pixel format
                segment_path
            ]
            subprocess.run(cmd, check=True, capture_output=True)
            
            # Process segment with both highlight sets
            if self.process_segment(segment_path, highlights1):
                print("   βœ… KEEPING SEGMENT FOR SET 1")
                kept_segments1.append((start_time, end_time))
            else:
                print("   ❌ REJECTING SEGMENT FOR SET 1")
            
            if self.process_segment(segment_path, highlights2):
                print("   βœ… KEEPING SEGMENT FOR SET 2") 
                kept_segments2.append((start_time, end_time))
            else:
                print("   ❌ REJECTING SEGMENT FOR SET 2")
            
            # Clean up segment file
            os.remove(segment_path)
            segments_processed += 1
            print()
        
        # Remove temp directory
        os.rmdir(temp_dir)
        
        # Calculate percentages of video kept for each highlight set
        total_duration = duration
        duration1 = sum(end - start for start, end in kept_segments1)
        duration2 = sum(end - start for start, end in kept_segments2)
        
        percent1 = (duration1 / total_duration) * 100
        percent2 = (duration2 / total_duration) * 100
        
        print(f"πŸ“Š Results Summary:")
        print(f"   🎯 Highlight set 1: {percent1:.1f}% of video ({len(kept_segments1)} segments)")
        print(f"   🎯 Highlight set 2: {percent2:.1f}% of video ({len(kept_segments2)} segments)")
        
        # Choose the set with lower percentage unless it's zero
        final_segments = kept_segments2 if (0 < percent2 <= percent1 or percent1 == 0) else kept_segments1
        selected_set = "2" if final_segments == kept_segments2 else "1"
        percent_used = percent2 if final_segments == kept_segments2 else percent1
        
        print(f"πŸ† Selected Set {selected_set} with {len(final_segments)} segments ({percent_used:.1f}% of video)")
        
        if not final_segments:
            return {
                "error": "No highlights detected in the video with either set of criteria",
                "video_description": video_desc,
                "highlights1": highlights1,
                "highlights2": highlights2,
                "total_segments": total_segments
            }
        
        # Step 4: Create final video
        print(f"🎬 Step 4: Creating final highlights video...")
        self._concatenate_scenes(video_path, final_segments, output_path, with_effects)
        
        print("βœ… Highlights video created successfully!")
        print(f"πŸŽ‰ SUCCESS! Created highlights with {len(final_segments)} segments")
        print(f"   πŸ“Ή Total highlight duration: {sum(end - start for start, end in final_segments):.1f}s")
        print(f"   πŸ“Š Percentage of original video: {percent_used:.1f}%")
        
        # Return analysis results
        return {
            "success": True,
            "video_description": video_desc,
            "highlights1": highlights1,
            "highlights2": highlights2,
            "selected_set": selected_set,
            "total_segments": total_segments,
            "selected_segments": len(final_segments),
            "selected_times": final_segments,
            "total_duration": sum(end - start for start, end in final_segments),
            "compression_ratio": percent_used / 100,
            "output_path": output_path
        }


def main():
    parser = argparse.ArgumentParser(description='HuggingFace Exact Video Highlights')
    parser.add_argument('video_path', help='Path to input video file')
    parser.add_argument('--output', required=True, help='Path to output highlights video')
    parser.add_argument('--save-analysis', action='store_true', help='Save analysis results to JSON')
    parser.add_argument('--segment-length', type=float, default=10.0, help='Length of each segment in seconds (default: 10.0)')
    parser.add_argument('--model', default='HuggingFaceTB/SmolVLM2-256M-Video-Instruct', help='SmolVLM2 model to use')
    
    args = parser.parse_args()
    
    # Validate input file
    if not os.path.exists(args.video_path):
        print(f"❌ Error: Video file not found: {args.video_path}")
        return
    
    # Create output directory if needed
    output_dir = os.path.dirname(args.output)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    print(f"πŸš€ HuggingFace Exact SmolVLM2 Video Highlights")
    print(f"   Model: {args.model}")
    print()
    
    try:
        # Initialize detector
        print(f"πŸ”₯ Loading {args.model} for HuggingFace Exact Analysis...")
        device = "mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")
        detector = VideoHighlightDetector(
            model_path=args.model,
            device=device,
            batch_size=16
        )
        print("βœ… SmolVLM2 loaded successfully!")
        print()
        
        # Process video
        results = detector.process_video(
            video_path=args.video_path,
            output_path=args.output,
            segment_length=args.segment_length
        )
        
        # Save analysis if requested
        if args.save_analysis:
            analysis_file = args.output.replace('.mp4', '_exact_analysis.json')
            with open(analysis_file, 'w') as f:
                json.dump(results, f, indent=2, default=str)
            print(f"πŸ“Š Analysis saved: {analysis_file}")
        
    except Exception as e:
        print(f"❌ Error: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()