"""Tests for export progress tracking, broadcast, and FFmpeg parsing.""" import io import unittest from unittest.mock import MagicMock, patch from frigate.jobs.export import ( PROGRESS_BROADCAST_MIN_INTERVAL, ExportJob, ExportJobManager, ) from frigate.record.export import PlaybackSourceEnum, RecordingExporter from frigate.types import JobStatusTypesEnum def _make_exporter( end_minus_start: int = 100, ffmpeg_input_args=None, ffmpeg_output_args=None, on_progress=None, ) -> RecordingExporter: """Build a RecordingExporter without invoking its real __init__ side effects (which create directories and require a full FrigateConfig).""" exporter = RecordingExporter.__new__(RecordingExporter) exporter.config = MagicMock() exporter.export_id = "test_export" exporter.camera = "front" exporter.user_provided_name = None exporter.user_provided_image = None exporter.start_time = 1_000 exporter.end_time = 1_000 + end_minus_start exporter.playback_source = PlaybackSourceEnum.recordings exporter.export_case_id = None exporter.ffmpeg_input_args = ffmpeg_input_args exporter.ffmpeg_output_args = ffmpeg_output_args exporter.cpu_fallback = False exporter.on_progress = on_progress return exporter class TestExportJobToDict(unittest.TestCase): def test_to_dict_includes_progress_fields(self) -> None: job = ExportJob(camera="front", request_start_time=0, request_end_time=10) result = job.to_dict() assert "current_step" in result assert "progress_percent" in result assert result["current_step"] == "queued" assert result["progress_percent"] == 0.0 def test_to_dict_reflects_updated_progress(self) -> None: job = ExportJob(camera="front", request_start_time=0, request_end_time=10) job.current_step = "encoding" job.progress_percent = 42.5 result = job.to_dict() assert result["current_step"] == "encoding" assert result["progress_percent"] == 42.5 class TestExpectedOutputDuration(unittest.TestCase): def test_normal_export_uses_input_duration(self) -> None: exporter = _make_exporter(end_minus_start=600) assert exporter._expected_output_duration_seconds() == 600.0 def test_timelapse_uses_setpts_factor(self) -> None: exporter = _make_exporter( end_minus_start=1000, ffmpeg_input_args="-y", ffmpeg_output_args="-vf setpts=0.04*PTS -r 30", ) # 1000s input * 0.04 = 40s of output assert exporter._expected_output_duration_seconds() == 40.0 def test_unknown_factor_falls_back_to_input_duration(self) -> None: exporter = _make_exporter( end_minus_start=300, ffmpeg_input_args="-y", ffmpeg_output_args="-c:v libx264 -preset veryfast", ) assert exporter._expected_output_duration_seconds() == 300.0 def test_zero_factor_falls_back_to_input_duration(self) -> None: exporter = _make_exporter( end_minus_start=300, ffmpeg_input_args="-y", ffmpeg_output_args="-vf setpts=0*PTS", ) assert exporter._expected_output_duration_seconds() == 300.0 def test_uses_actual_recorded_seconds_when_available(self) -> None: """If the DB shows only 120s of saved recordings inside a 1h requested range, progress should be computed against 120s.""" exporter = _make_exporter(end_minus_start=3600) exporter._sum_source_duration_seconds = lambda: 120.0 # type: ignore[method-assign] assert exporter._expected_output_duration_seconds() == 120.0 def test_actual_recorded_seconds_scaled_by_setpts(self) -> None: """Recorded duration must still be scaled by the timelapse factor.""" exporter = _make_exporter( end_minus_start=3600, ffmpeg_input_args="-y", ffmpeg_output_args="-vf setpts=0.04*PTS -r 30", ) exporter._sum_source_duration_seconds = lambda: 600.0 # type: ignore[method-assign] # 600s * 0.04 = 24s of output assert exporter._expected_output_duration_seconds() == 24.0 def test_db_failure_falls_back_to_requested_range(self) -> None: exporter = _make_exporter(end_minus_start=300) exporter._sum_source_duration_seconds = lambda: None # type: ignore[method-assign] assert exporter._expected_output_duration_seconds() == 300.0 class TestProgressFlagInjection(unittest.TestCase): def test_inserts_before_output_path(self) -> None: exporter = _make_exporter() cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"] result = exporter._inject_progress_flags(cmd) assert result == [ "ffmpeg", "-i", "input.m3u8", "-c", "copy", "-progress", "pipe:2", "-nostats", "/tmp/output.mp4", ] def test_handles_empty_cmd(self) -> None: exporter = _make_exporter() assert exporter._inject_progress_flags([]) == [] class TestFfmpegProgressParsing(unittest.TestCase): """Verify percentage calculation from FFmpeg ``-progress`` output.""" def _run_with_stderr( self, stderr_text: str, expected_duration_seconds: int = 90, ) -> list[tuple[str, float]]: """Helper: run _run_ffmpeg_with_progress against a mocked Popen whose stderr emits the supplied text. Returns the list of (step, percent) tuples that the on_progress callback received.""" captured: list[tuple[str, float]] = [] def on_progress(step: str, percent: float) -> None: captured.append((step, percent)) exporter = _make_exporter( end_minus_start=expected_duration_seconds, on_progress=on_progress, ) fake_proc = MagicMock() fake_proc.stdin = io.StringIO() fake_proc.stderr = io.StringIO(stderr_text) fake_proc.returncode = 0 fake_proc.wait = MagicMock(return_value=0) with patch("frigate.record.export.sp.Popen", return_value=fake_proc): returncode, _stderr = exporter._run_ffmpeg_with_progress( ["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding" ) assert returncode == 0 return captured def test_parses_out_time_us_into_percent(self) -> None: # 90s duration; 45s out_time => 50% stderr = "out_time_us=45000000\nprogress=continue\n" captured = self._run_with_stderr(stderr, expected_duration_seconds=90) # The first call is the synchronous 0.0 emit before Popen runs. assert captured[0] == ("encoding", 0.0) assert any(percent == 50.0 for step, percent in captured if step == "encoding") def test_progress_end_emits_100_percent(self) -> None: stderr = "out_time_us=10000000\nprogress=end\n" captured = self._run_with_stderr(stderr, expected_duration_seconds=90) assert captured[-1] == ("encoding", 100.0) def test_clamps_overshoot_at_100(self) -> None: # 150s of output reported against 90s expected duration. stderr = "out_time_us=150000000\nprogress=continue\n" captured = self._run_with_stderr(stderr, expected_duration_seconds=90) encoding_values = [p for s, p in captured if s == "encoding" and p > 0] assert all(p <= 100.0 for p in encoding_values) assert encoding_values[-1] == 100.0 def test_ignores_garbage_lines(self) -> None: stderr = ( "frame= 120 fps= 30 q=23.0 size= 512kB\n" "out_time_us=not-a-number\n" "out_time_us=30000000\n" "progress=continue\n" ) captured = self._run_with_stderr(stderr, expected_duration_seconds=90) # We expect 0.0 (from initial emit) plus the 30s/90s = 33.33...% step encoding_percents = sorted({round(p, 2) for s, p in captured}) assert 0.0 in encoding_percents assert any(abs(p - (30 / 90 * 100)) < 0.01 for p in encoding_percents) class TestBroadcastAggregation(unittest.TestCase): """Verify ExportJobManager broadcast payload shape and throttling.""" def _make_manager(self) -> tuple[ExportJobManager, MagicMock]: """Build a manager with an injected mock publisher. Returns ``(manager, publisher)`` so tests can assert on broadcast payloads without touching ZMQ at all.""" config = MagicMock() publisher = MagicMock() manager = ExportJobManager( config, max_concurrent=2, max_queued=10, publisher=publisher ) return manager, publisher @staticmethod def _last_payload(publisher: MagicMock) -> dict: return publisher.publish.call_args.args[0] def test_empty_jobs_broadcasts_empty_list(self) -> None: manager, publisher = self._make_manager() manager._broadcast_all_jobs(force=True) publisher.publish.assert_called_once() payload = self._last_payload(publisher) assert payload["job_type"] == "export" assert payload["status"] == "queued" assert payload["results"]["jobs"] == [] def test_single_running_job_payload(self) -> None: manager, publisher = self._make_manager() job = ExportJob(camera="front", request_start_time=0, request_end_time=10) job.status = JobStatusTypesEnum.running job.current_step = "encoding" job.progress_percent = 75.0 manager.jobs[job.id] = job manager._broadcast_all_jobs(force=True) payload = self._last_payload(publisher) assert payload["status"] == "running" assert len(payload["results"]["jobs"]) == 1 broadcast_job = payload["results"]["jobs"][0] assert broadcast_job["current_step"] == "encoding" assert broadcast_job["progress_percent"] == 75.0 def test_multiple_jobs_broadcast(self) -> None: manager, publisher = self._make_manager() for i, status in enumerate( (JobStatusTypesEnum.queued, JobStatusTypesEnum.running) ): job = ExportJob( id=f"job_{i}", camera="front", request_start_time=0, request_end_time=10, ) job.status = status manager.jobs[job.id] = job manager._broadcast_all_jobs(force=True) payload = self._last_payload(publisher) assert payload["status"] == "running" assert len(payload["results"]["jobs"]) == 2 def test_completed_jobs_are_excluded(self) -> None: manager, publisher = self._make_manager() active = ExportJob(id="active", camera="front") active.status = JobStatusTypesEnum.running finished = ExportJob(id="done", camera="front") finished.status = JobStatusTypesEnum.success manager.jobs[active.id] = active manager.jobs[finished.id] = finished manager._broadcast_all_jobs(force=True) payload = self._last_payload(publisher) ids = [j["id"] for j in payload["results"]["jobs"]] assert ids == ["active"] def test_throttle_skips_rapid_unforced_broadcasts(self) -> None: manager, publisher = self._make_manager() job = ExportJob(camera="front") job.status = JobStatusTypesEnum.running manager.jobs[job.id] = job manager._broadcast_all_jobs(force=True) # Immediately following non-forced broadcasts should be skipped. for _ in range(5): manager._broadcast_all_jobs(force=False) assert publisher.publish.call_count == 1 def test_throttle_allows_broadcast_after_interval(self) -> None: manager, publisher = self._make_manager() job = ExportJob(camera="front") job.status = JobStatusTypesEnum.running manager.jobs[job.id] = job with patch("frigate.jobs.export.time.monotonic") as mock_mono: mock_mono.return_value = 100.0 manager._broadcast_all_jobs(force=True) mock_mono.return_value = 100.0 + PROGRESS_BROADCAST_MIN_INTERVAL + 0.01 manager._broadcast_all_jobs(force=False) assert publisher.publish.call_count == 2 def test_force_bypasses_throttle(self) -> None: manager, publisher = self._make_manager() job = ExportJob(camera="front") job.status = JobStatusTypesEnum.running manager.jobs[job.id] = job manager._broadcast_all_jobs(force=True) manager._broadcast_all_jobs(force=True) assert publisher.publish.call_count == 2 def test_publisher_exceptions_do_not_propagate(self) -> None: """A failing publisher must not break the manager: broadcasts are best-effort since the dispatcher may not be available (tests, startup races).""" manager, publisher = self._make_manager() publisher.publish.side_effect = RuntimeError("comms down") job = ExportJob(camera="front") job.status = JobStatusTypesEnum.running manager.jobs[job.id] = job # Swallow our own RuntimeError if the manager doesn't; the real # JobStatePublisher handles its own exceptions internally, so the # manager can stay naive. But if something bubbles up it should # not escape _broadcast_all_jobs — enforce that contract here. try: manager._broadcast_all_jobs(force=True) except RuntimeError: self.fail("_broadcast_all_jobs must tolerate publisher failures") def test_progress_callback_updates_job_and_broadcasts(self) -> None: manager, _publisher = self._make_manager() job = ExportJob(camera="front") job.status = JobStatusTypesEnum.running manager.jobs[job.id] = job callback = manager._make_progress_callback(job) callback("encoding", 33.0) assert job.current_step == "encoding" assert job.progress_percent == 33.0 class TestSchedulesCleanup(unittest.TestCase): def test_schedule_job_cleanup_removes_after_delay(self) -> None: config = MagicMock() manager = ExportJobManager(config, max_concurrent=1, max_queued=1) job = ExportJob(id="cleanup_me", camera="front") manager.jobs[job.id] = job with patch("frigate.jobs.export.threading.Timer") as mock_timer: manager._schedule_job_cleanup(job.id) mock_timer.assert_called_once() delay, fn = mock_timer.call_args.args assert delay > 0 # Invoke the callback directly to confirm it removes the job. fn() assert job.id not in manager.jobs if __name__ == "__main__": unittest.main()