diff --git a/frigate/api/defs/response/export_response.py b/frigate/api/defs/response/export_response.py index b796ba9ac..10b4a7e64 100644 --- a/frigate/api/defs/response/export_response.py +++ b/frigate/api/defs/response/export_response.py @@ -107,6 +107,14 @@ class ExportJobModel(BaseModel): default=None, description="Result metadata for completed jobs", ) + current_step: str = Field( + default="queued", + description="Current execution step (queued, preparing, encoding, encoding_retry, finalizing)", + ) + progress_percent: float = Field( + default=0.0, + description="Progress percentage of the current step (0.0 - 100.0)", + ) ExportJobsResponse = List[ExportJobModel] diff --git a/frigate/jobs/export.py b/frigate/jobs/export.py index 4540f7dd8..a74b91713 100644 --- a/frigate/jobs/export.py +++ b/frigate/jobs/export.py @@ -7,11 +7,13 @@ import time from dataclasses import dataclass from pathlib import Path from queue import Full, Queue -from typing import Any, Optional +from typing import Any, Callable, Optional from peewee import DoesNotExist +from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig +from frigate.const import UPDATE_JOB_STATE from frigate.jobs.job import Job from frigate.models import Export from frigate.record.export import PlaybackSourceEnum, RecordingExporter @@ -23,6 +25,16 @@ logger = logging.getLogger(__name__) # Prevents a runaway client from unbounded memory growth. MAX_QUEUED_EXPORT_JOBS = 100 +# Minimum interval between progress broadcasts. FFmpeg can emit progress +# events many times per second; we coalesce them so the WebSocket isn't +# flooded with redundant updates. +PROGRESS_BROADCAST_MIN_INTERVAL = 1.0 + +# Delay before removing a completed job from the in-memory map. Gives the +# frontend a chance to receive the final state via WebSocket before SWR +# polling takes over. +COMPLETED_JOB_CLEANUP_DELAY = 5.0 + class ExportQueueFullError(RuntimeError): """Raised when the export queue is at capacity.""" @@ -43,6 +55,8 @@ class ExportJob(Job): ffmpeg_input_args: Optional[str] = None ffmpeg_output_args: Optional[str] = None cpu_fallback: bool = False + current_step: str = "queued" + progress_percent: float = 0.0 def to_dict(self) -> dict[str, Any]: """Convert to dictionary for API responses. @@ -64,6 +78,8 @@ class ExportJob(Job): "end_time": self.end_time, "error_message": self.error_message, "results": self.results, + "current_step": self.current_step, + "progress_percent": self.progress_percent, } @@ -91,6 +107,38 @@ class ExportQueueWorker(threading.Thread): self.manager.queue.task_done() +class JobStatePublisher: + """Publishes a single job state payload to the dispatcher. + + Each call opens a short-lived :py:class:`InterProcessRequestor`, sends + the payload, and closes the socket. The short-lived design avoids + REQ/REP state corruption that would arise from sharing a single REQ + socket across the API thread and worker threads (REQ sockets must + strictly alternate send/recv). + + With the 1s broadcast throttle in place, socket creation overhead is + negligible. The class also exists so tests can substitute a no-op + instance instead of stubbing ZMQ — see ``BaseTestHttp.setUp``. + """ + + def publish(self, payload: dict[str, Any]) -> None: + try: + requestor = InterProcessRequestor() + except Exception as err: + logger.warning("Failed to open job state requestor: %s", err) + return + + try: + requestor.send_data(UPDATE_JOB_STATE, payload) + except Exception as err: + logger.debug("Job state broadcast failed: %s", err) + finally: + try: + requestor.stop() + except Exception: + pass + + class ExportJobManager: """Concurrency-limited manager for queued export jobs.""" @@ -99,6 +147,7 @@ class ExportJobManager: config: FrigateConfig, max_concurrent: int, max_queued: int = MAX_QUEUED_EXPORT_JOBS, + publisher: Optional[JobStatePublisher] = None, ) -> None: self.config = config self.max_concurrent = max(1, max_concurrent) @@ -107,6 +156,68 @@ class ExportJobManager: self.lock = threading.Lock() self.workers: list[ExportQueueWorker] = [] self.started = False + self.publisher = publisher if publisher is not None else JobStatePublisher() + self._last_broadcast_monotonic: float = 0.0 + self._broadcast_throttle_lock = threading.Lock() + + def _broadcast_all_jobs(self, force: bool = False) -> None: + """Publish aggregate export job state via the job_state WS topic. + + When ``force`` is False, broadcasts within + ``PROGRESS_BROADCAST_MIN_INTERVAL`` of the previous one are skipped + to avoid flooding the WebSocket with rapid progress updates. + ``force`` bypasses the throttle and is used for status transitions + (enqueue/start/finish) where the frontend needs the latest state. + """ + now = time.monotonic() + with self._broadcast_throttle_lock: + if ( + not force + and now - self._last_broadcast_monotonic + < PROGRESS_BROADCAST_MIN_INTERVAL + ): + return + self._last_broadcast_monotonic = now + + with self.lock: + active = [ + j + for j in self.jobs.values() + if j.status in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running) + ] + + any_running = any(j.status == JobStatusTypesEnum.running for j in active) + payload: dict[str, Any] = { + "job_type": "export", + "status": "running" if any_running else "queued", + "results": {"jobs": [j.to_dict() for j in active]}, + } + + try: + self.publisher.publish(payload) + except Exception as err: + logger.warning("Publisher raised during job state broadcast: %s", err) + + def _make_progress_callback(self, job: ExportJob) -> Callable[[str, float], None]: + """Build a callback the exporter can invoke during execution.""" + + def on_progress(step: str, percent: float) -> None: + job.current_step = step + job.progress_percent = percent + self._broadcast_all_jobs() + + return on_progress + + def _schedule_job_cleanup(self, job_id: str) -> None: + """Drop a completed job from ``self.jobs`` after a short delay.""" + + def cleanup() -> None: + with self.lock: + self.jobs.pop(job_id, None) + + timer = threading.Timer(COMPLETED_JOB_CLEANUP_DELAY, cleanup) + timer.daemon = True + timer.start() def ensure_started(self) -> None: """Ensure worker threads are started exactly once.""" @@ -151,6 +262,8 @@ class ExportJobManager: with self.lock: self.jobs[job.id] = job + self._broadcast_all_jobs(force=True) + return job.id def get_job(self, job_id: str) -> Optional[ExportJob]: @@ -215,6 +328,7 @@ class ExportJobManager: """Execute a queued export job.""" job.status = JobStatusTypesEnum.running job.start_time = time.time() + self._broadcast_all_jobs(force=True) exporter = RecordingExporter( self.config, @@ -229,6 +343,7 @@ class ExportJobManager: job.ffmpeg_input_args, job.ffmpeg_output_args, job.cpu_fallback, + on_progress=self._make_progress_callback(job), ) try: @@ -257,6 +372,8 @@ class ExportJobManager: job.error_message = str(err) finally: job.end_time = time.time() + self._broadcast_all_jobs(force=True) + self._schedule_job_cleanup(job.id) _job_manager: Optional[ExportJobManager] = None diff --git a/frigate/record/export.py b/frigate/record/export.py index 173a55624..9d7a9eb0c 100644 --- a/frigate/record/export.py +++ b/frigate/record/export.py @@ -4,13 +4,14 @@ import datetime import logging import os import random +import re import shutil import string import subprocess as sp import threading from enum import Enum from pathlib import Path -from typing import Optional +from typing import Callable, Optional from peewee import DoesNotExist @@ -36,6 +37,10 @@ logger = logging.getLogger(__name__) DEFAULT_TIME_LAPSE_FFMPEG_ARGS = "-vf setpts=0.04*PTS -r 30" TIMELAPSE_DATA_INPUT_ARGS = "-an -skip_frame nokey" +# Matches the setpts factor used in timelapse exports (e.g. setpts=0.04*PTS). +# Captures the floating-point factor so we can scale expected duration. +SETPTS_FACTOR_RE = re.compile(r"setpts=([0-9]*\.?[0-9]+)\*PTS") + # ffmpeg flags that can read from or write to arbitrary files BLOCKED_FFMPEG_ARGS = frozenset( { @@ -116,6 +121,7 @@ class RecordingExporter(threading.Thread): ffmpeg_input_args: Optional[str] = None, ffmpeg_output_args: Optional[str] = None, cpu_fallback: bool = False, + on_progress: Optional[Callable[[str, float], None]] = None, ) -> None: super().__init__() self.config = config @@ -130,10 +136,213 @@ class RecordingExporter(threading.Thread): self.ffmpeg_input_args = ffmpeg_input_args self.ffmpeg_output_args = ffmpeg_output_args self.cpu_fallback = cpu_fallback + self.on_progress = on_progress # ensure export thumb dir Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True) + def _emit_progress(self, step: str, percent: float) -> None: + """Invoke the progress callback if one was supplied.""" + if self.on_progress is None: + return + try: + self.on_progress(step, max(0.0, min(100.0, percent))) + except Exception: + logger.exception("Export progress callback failed") + + def _expected_output_duration_seconds(self) -> float: + """Compute the expected duration of the output video in seconds. + + Users often request a wide time range (e.g. a full hour) when only + a few minutes of recordings actually live on disk for that span, + so the requested range overstates the work and progress would + plateau very early. We sum the actual saved seconds from the + Recordings/Previews tables and use that as the input duration. + Timelapse exports then scale this by the setpts factor. + """ + requested_duration = max(0.0, float(self.end_time - self.start_time)) + + recorded = self._sum_source_duration_seconds() + input_duration = ( + recorded if recorded is not None and recorded > 0 else requested_duration + ) + + if not self.ffmpeg_output_args: + return input_duration + + match = SETPTS_FACTOR_RE.search(self.ffmpeg_output_args) + if match is None: + return input_duration + + try: + factor = float(match.group(1)) + except ValueError: + return input_duration + + if factor <= 0: + return input_duration + + return input_duration * factor + + def _sum_source_duration_seconds(self) -> Optional[float]: + """Sum saved-video seconds inside [start_time, end_time]. + + Queries Recordings or Previews depending on the playback source, + clamps each segment to the requested range, and returns the total. + Returns ``None`` on any error so the caller can fall back to the + requested range duration without losing progress reporting. + """ + try: + if self.playback_source == PlaybackSourceEnum.recordings: + rows = ( + Recordings.select(Recordings.start_time, Recordings.end_time) + .where( + Recordings.start_time.between(self.start_time, self.end_time) + | Recordings.end_time.between(self.start_time, self.end_time) + | ( + (self.start_time > Recordings.start_time) + & (self.end_time < Recordings.end_time) + ) + ) + .where(Recordings.camera == self.camera) + .iterator() + ) + else: + rows = ( + Previews.select(Previews.start_time, Previews.end_time) + .where( + Previews.start_time.between(self.start_time, self.end_time) + | Previews.end_time.between(self.start_time, self.end_time) + | ( + (self.start_time > Previews.start_time) + & (self.end_time < Previews.end_time) + ) + ) + .where(Previews.camera == self.camera) + .iterator() + ) + except Exception: + logger.exception( + "Failed to sum source duration for export %s", self.export_id + ) + return None + + total = 0.0 + try: + for row in rows: + clipped_start = max(float(row.start_time), float(self.start_time)) + clipped_end = min(float(row.end_time), float(self.end_time)) + if clipped_end > clipped_start: + total += clipped_end - clipped_start + except Exception: + logger.exception( + "Failed to read recording rows for export %s", self.export_id + ) + return None + + return total + + def _inject_progress_flags(self, ffmpeg_cmd: list[str]) -> list[str]: + """Insert FFmpeg progress reporting flags before the output path. + + ``-progress pipe:2`` writes structured key=value lines to stderr, + ``-nostats`` suppresses the noisy default stats output. + """ + if not ffmpeg_cmd: + return ffmpeg_cmd + return ffmpeg_cmd[:-1] + ["-progress", "pipe:2", "-nostats", ffmpeg_cmd[-1]] + + def _run_ffmpeg_with_progress( + self, + ffmpeg_cmd: list[str], + playlist_lines: str | list[str], + step: str = "encoding", + ) -> tuple[int, str]: + """Run an FFmpeg export command, parsing progress events from stderr. + + Returns ``(returncode, captured_stderr)``. Stdout is left attached to + the parent process so we don't have to drain it (and risk a deadlock + if the buffer fills). Progress percent is computed against the + expected output duration; values are clamped to [0, 100] inside + :py:meth:`_emit_progress`. + """ + cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + self._inject_progress_flags( + ffmpeg_cmd + ) + + if isinstance(playlist_lines, list): + stdin_payload = "\n".join(playlist_lines) + else: + stdin_payload = playlist_lines + + expected_duration = self._expected_output_duration_seconds() + + self._emit_progress(step, 0.0) + + proc = sp.Popen( + cmd, + stdin=sp.PIPE, + stderr=sp.PIPE, + text=True, + encoding="ascii", + errors="replace", + ) + + assert proc.stdin is not None + assert proc.stderr is not None + + try: + proc.stdin.write(stdin_payload) + except (BrokenPipeError, OSError): + # FFmpeg may have rejected the input early; still wait for it + # to terminate so the returncode is meaningful. + pass + finally: + try: + proc.stdin.close() + except (BrokenPipeError, OSError): + pass + + captured: list[str] = [] + + try: + for raw_line in proc.stderr: + captured.append(raw_line) + line = raw_line.strip() + + if not line: + continue + + if line.startswith("out_time_us="): + if expected_duration <= 0: + continue + try: + out_time_us = int(line.split("=", 1)[1]) + except (ValueError, IndexError): + continue + if out_time_us < 0: + continue + out_seconds = out_time_us / 1_000_000.0 + percent = (out_seconds / expected_duration) * 100.0 + self._emit_progress(step, percent) + elif line == "progress=end": + self._emit_progress(step, 100.0) + break + except Exception: + logger.exception("Failed reading FFmpeg progress for %s", self.export_id) + + proc.wait() + + # Drain any remaining stderr so callers can log it on failure. + try: + remaining = proc.stderr.read() + if remaining: + captured.append(remaining) + except Exception: + pass + + return proc.returncode, "".join(captured) + def get_datetime_from_timestamp(self, timestamp: int) -> str: # return in iso format return datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") @@ -406,6 +615,7 @@ class RecordingExporter(threading.Thread): logger.debug( f"Beginning export for {self.camera} from {self.start_time} to {self.end_time}" ) + self._emit_progress("preparing", 0.0) export_name = ( self.user_provided_name or f"{self.camera.replace('_', ' ')} {self.get_datetime_from_timestamp(self.start_time)} {self.get_datetime_from_timestamp(self.end_time)}" @@ -443,16 +653,23 @@ class RecordingExporter(threading.Thread): except DoesNotExist: return - p = sp.run( - ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + ffmpeg_cmd, - input="\n".join(playlist_lines), - encoding="ascii", - capture_output=True, + # When neither custom ffmpeg arg is set the default path uses + # `-c copy` (stream copy — no re-encoding). Report that as a + # distinct step so the UI doesn't mislabel a remux as encoding. + # The retry branch below always re-encodes because cpu_fallback + # requires custom args; it stays "encoding_retry". + is_stream_copy = ( + self.ffmpeg_input_args is None and self.ffmpeg_output_args is None + ) + initial_step = "copying" if is_stream_copy else "encoding" + + returncode, stderr = self._run_ffmpeg_with_progress( + ffmpeg_cmd, playlist_lines, step=initial_step ) # If export failed and cpu_fallback is enabled, retry without hwaccel if ( - p.returncode != 0 + returncode != 0 and self.cpu_fallback and self.ffmpeg_input_args is not None and self.ffmpeg_output_args is not None @@ -470,23 +687,21 @@ class RecordingExporter(threading.Thread): video_path, use_hwaccel=False ) - p = sp.run( - ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + ffmpeg_cmd, - input="\n".join(playlist_lines), - encoding="ascii", - capture_output=True, + returncode, stderr = self._run_ffmpeg_with_progress( + ffmpeg_cmd, playlist_lines, step="encoding_retry" ) - if p.returncode != 0: + if returncode != 0: logger.error( f"Failed to export {self.playback_source.value} for command {' '.join(ffmpeg_cmd)}" ) - logger.error(p.stderr) + logger.error(stderr) Path(video_path).unlink(missing_ok=True) Export.delete().where(Export.id == self.export_id).execute() Path(thumb_path).unlink(missing_ok=True) return else: + self._emit_progress("finalizing", 100.0) Export.update({Export.in_progress: False}).where( Export.id == self.export_id ).execute() diff --git a/frigate/test/http_api/base_http_test.py b/frigate/test/http_api/base_http_test.py index 2ca4aafd0..32d110962 100644 --- a/frigate/test/http_api/base_http_test.py +++ b/frigate/test/http_api/base_http_test.py @@ -2,6 +2,7 @@ import datetime import logging import os import unittest +from unittest.mock import patch from fastapi import Request from fastapi.testclient import TestClient @@ -14,6 +15,7 @@ from frigate.api.fastapi_app import create_fastapi_app from frigate.config import FrigateConfig from frigate.const import BASE_DIR, CACHE_DIR from frigate.debug_replay import DebugReplayManager +from frigate.jobs.export import JobStatePublisher from frigate.models import Event, Recordings, ReviewSegment from frigate.review.types import SeverityEnum from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS @@ -44,6 +46,19 @@ class BaseTestHttp(unittest.TestCase): self.db = SqliteQueueDatabase(TEST_DB) self.db.bind(models) + # The export job manager broadcasts via JobStatePublisher on + # enqueue/start/finish. There is no dispatcher process bound to + # the IPC socket in tests, so a real publish() would block on + # recv_json forever. Replace publish with a no-op for the + # lifetime of this test; the lookup goes through the class so any + # already-instantiated publisher (the singleton manager's) picks + # up the no-op too. + publisher_patch = patch.object( + JobStatePublisher, "publish", lambda self, payload: None + ) + publisher_patch.start() + self.addCleanup(publisher_patch.stop) + self.minimal_config = { "mqtt": {"host": "mqtt"}, "cameras": { diff --git a/frigate/test/test_export_progress.py b/frigate/test/test_export_progress.py new file mode 100644 index 000000000..616a63503 --- /dev/null +++ b/frigate/test/test_export_progress.py @@ -0,0 +1,385 @@ +"""Tests for export progress tracking, broadcast, and FFmpeg parsing.""" + +import io +import unittest +from unittest.mock import MagicMock, patch + +from frigate.jobs.export import ( + PROGRESS_BROADCAST_MIN_INTERVAL, + ExportJob, + ExportJobManager, +) +from frigate.record.export import PlaybackSourceEnum, RecordingExporter +from frigate.types import JobStatusTypesEnum + + +def _make_exporter( + end_minus_start: int = 100, + ffmpeg_input_args=None, + ffmpeg_output_args=None, + on_progress=None, +) -> RecordingExporter: + """Build a RecordingExporter without invoking its real __init__ side + effects (which create directories and require a full FrigateConfig).""" + exporter = RecordingExporter.__new__(RecordingExporter) + exporter.config = MagicMock() + exporter.export_id = "test_export" + exporter.camera = "front" + exporter.user_provided_name = None + exporter.user_provided_image = None + exporter.start_time = 1_000 + exporter.end_time = 1_000 + end_minus_start + exporter.playback_source = PlaybackSourceEnum.recordings + exporter.export_case_id = None + exporter.ffmpeg_input_args = ffmpeg_input_args + exporter.ffmpeg_output_args = ffmpeg_output_args + exporter.cpu_fallback = False + exporter.on_progress = on_progress + return exporter + + +class TestExportJobToDict(unittest.TestCase): + def test_to_dict_includes_progress_fields(self) -> None: + job = ExportJob(camera="front", request_start_time=0, request_end_time=10) + result = job.to_dict() + + assert "current_step" in result + assert "progress_percent" in result + assert result["current_step"] == "queued" + assert result["progress_percent"] == 0.0 + + def test_to_dict_reflects_updated_progress(self) -> None: + job = ExportJob(camera="front", request_start_time=0, request_end_time=10) + job.current_step = "encoding" + job.progress_percent = 42.5 + + result = job.to_dict() + + assert result["current_step"] == "encoding" + assert result["progress_percent"] == 42.5 + + +class TestExpectedOutputDuration(unittest.TestCase): + def test_normal_export_uses_input_duration(self) -> None: + exporter = _make_exporter(end_minus_start=600) + assert exporter._expected_output_duration_seconds() == 600.0 + + def test_timelapse_uses_setpts_factor(self) -> None: + exporter = _make_exporter( + end_minus_start=1000, + ffmpeg_input_args="-y", + ffmpeg_output_args="-vf setpts=0.04*PTS -r 30", + ) + # 1000s input * 0.04 = 40s of output + assert exporter._expected_output_duration_seconds() == 40.0 + + def test_unknown_factor_falls_back_to_input_duration(self) -> None: + exporter = _make_exporter( + end_minus_start=300, + ffmpeg_input_args="-y", + ffmpeg_output_args="-c:v libx264 -preset veryfast", + ) + assert exporter._expected_output_duration_seconds() == 300.0 + + def test_zero_factor_falls_back_to_input_duration(self) -> None: + exporter = _make_exporter( + end_minus_start=300, + ffmpeg_input_args="-y", + ffmpeg_output_args="-vf setpts=0*PTS", + ) + assert exporter._expected_output_duration_seconds() == 300.0 + + def test_uses_actual_recorded_seconds_when_available(self) -> None: + """If the DB shows only 120s of saved recordings inside a 1h + requested range, progress should be computed against 120s.""" + exporter = _make_exporter(end_minus_start=3600) + exporter._sum_source_duration_seconds = lambda: 120.0 # type: ignore[method-assign] + assert exporter._expected_output_duration_seconds() == 120.0 + + def test_actual_recorded_seconds_scaled_by_setpts(self) -> None: + """Recorded duration must still be scaled by the timelapse factor.""" + exporter = _make_exporter( + end_minus_start=3600, + ffmpeg_input_args="-y", + ffmpeg_output_args="-vf setpts=0.04*PTS -r 30", + ) + exporter._sum_source_duration_seconds = lambda: 600.0 # type: ignore[method-assign] + # 600s * 0.04 = 24s of output + assert exporter._expected_output_duration_seconds() == 24.0 + + def test_db_failure_falls_back_to_requested_range(self) -> None: + exporter = _make_exporter(end_minus_start=300) + exporter._sum_source_duration_seconds = lambda: None # type: ignore[method-assign] + assert exporter._expected_output_duration_seconds() == 300.0 + + +class TestProgressFlagInjection(unittest.TestCase): + def test_inserts_before_output_path(self) -> None: + exporter = _make_exporter() + cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"] + + result = exporter._inject_progress_flags(cmd) + + assert result == [ + "ffmpeg", + "-i", + "input.m3u8", + "-c", + "copy", + "-progress", + "pipe:2", + "-nostats", + "/tmp/output.mp4", + ] + + def test_handles_empty_cmd(self) -> None: + exporter = _make_exporter() + assert exporter._inject_progress_flags([]) == [] + + +class TestFfmpegProgressParsing(unittest.TestCase): + """Verify percentage calculation from FFmpeg ``-progress`` output.""" + + def _run_with_stderr( + self, + stderr_text: str, + expected_duration_seconds: int = 90, + ) -> list[tuple[str, float]]: + """Helper: run _run_ffmpeg_with_progress against a mocked Popen + whose stderr emits the supplied text. Returns the list of + (step, percent) tuples that the on_progress callback received.""" + captured: list[tuple[str, float]] = [] + + def on_progress(step: str, percent: float) -> None: + captured.append((step, percent)) + + exporter = _make_exporter( + end_minus_start=expected_duration_seconds, + on_progress=on_progress, + ) + + fake_proc = MagicMock() + fake_proc.stdin = io.StringIO() + fake_proc.stderr = io.StringIO(stderr_text) + fake_proc.returncode = 0 + fake_proc.wait = MagicMock(return_value=0) + + with patch("frigate.record.export.sp.Popen", return_value=fake_proc): + returncode, _stderr = exporter._run_ffmpeg_with_progress( + ["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding" + ) + + assert returncode == 0 + return captured + + def test_parses_out_time_us_into_percent(self) -> None: + # 90s duration; 45s out_time => 50% + stderr = "out_time_us=45000000\nprogress=continue\n" + captured = self._run_with_stderr(stderr, expected_duration_seconds=90) + + # The first call is the synchronous 0.0 emit before Popen runs. + assert captured[0] == ("encoding", 0.0) + assert any(percent == 50.0 for step, percent in captured if step == "encoding") + + def test_progress_end_emits_100_percent(self) -> None: + stderr = "out_time_us=10000000\nprogress=end\n" + captured = self._run_with_stderr(stderr, expected_duration_seconds=90) + + assert captured[-1] == ("encoding", 100.0) + + def test_clamps_overshoot_at_100(self) -> None: + # 150s of output reported against 90s expected duration. + stderr = "out_time_us=150000000\nprogress=continue\n" + captured = self._run_with_stderr(stderr, expected_duration_seconds=90) + + encoding_values = [p for s, p in captured if s == "encoding" and p > 0] + assert all(p <= 100.0 for p in encoding_values) + assert encoding_values[-1] == 100.0 + + def test_ignores_garbage_lines(self) -> None: + stderr = ( + "frame= 120 fps= 30 q=23.0 size= 512kB\n" + "out_time_us=not-a-number\n" + "out_time_us=30000000\n" + "progress=continue\n" + ) + captured = self._run_with_stderr(stderr, expected_duration_seconds=90) + + # We expect 0.0 (from initial emit) plus the 30s/90s = 33.33...% step + encoding_percents = sorted({round(p, 2) for s, p in captured}) + assert 0.0 in encoding_percents + assert any(abs(p - (30 / 90 * 100)) < 0.01 for p in encoding_percents) + + +class TestBroadcastAggregation(unittest.TestCase): + """Verify ExportJobManager broadcast payload shape and throttling.""" + + def _make_manager(self) -> tuple[ExportJobManager, MagicMock]: + """Build a manager with an injected mock publisher. Returns + ``(manager, publisher)`` so tests can assert on broadcast payloads + without touching ZMQ at all.""" + config = MagicMock() + publisher = MagicMock() + manager = ExportJobManager( + config, max_concurrent=2, max_queued=10, publisher=publisher + ) + return manager, publisher + + @staticmethod + def _last_payload(publisher: MagicMock) -> dict: + return publisher.publish.call_args.args[0] + + def test_empty_jobs_broadcasts_empty_list(self) -> None: + manager, publisher = self._make_manager() + manager._broadcast_all_jobs(force=True) + + publisher.publish.assert_called_once() + payload = self._last_payload(publisher) + assert payload["job_type"] == "export" + assert payload["status"] == "queued" + assert payload["results"]["jobs"] == [] + + def test_single_running_job_payload(self) -> None: + manager, publisher = self._make_manager() + job = ExportJob(camera="front", request_start_time=0, request_end_time=10) + job.status = JobStatusTypesEnum.running + job.current_step = "encoding" + job.progress_percent = 75.0 + manager.jobs[job.id] = job + + manager._broadcast_all_jobs(force=True) + + payload = self._last_payload(publisher) + assert payload["status"] == "running" + assert len(payload["results"]["jobs"]) == 1 + broadcast_job = payload["results"]["jobs"][0] + assert broadcast_job["current_step"] == "encoding" + assert broadcast_job["progress_percent"] == 75.0 + + def test_multiple_jobs_broadcast(self) -> None: + manager, publisher = self._make_manager() + for i, status in enumerate( + (JobStatusTypesEnum.queued, JobStatusTypesEnum.running) + ): + job = ExportJob( + id=f"job_{i}", + camera="front", + request_start_time=0, + request_end_time=10, + ) + job.status = status + manager.jobs[job.id] = job + + manager._broadcast_all_jobs(force=True) + + payload = self._last_payload(publisher) + assert payload["status"] == "running" + assert len(payload["results"]["jobs"]) == 2 + + def test_completed_jobs_are_excluded(self) -> None: + manager, publisher = self._make_manager() + active = ExportJob(id="active", camera="front") + active.status = JobStatusTypesEnum.running + finished = ExportJob(id="done", camera="front") + finished.status = JobStatusTypesEnum.success + manager.jobs[active.id] = active + manager.jobs[finished.id] = finished + + manager._broadcast_all_jobs(force=True) + + payload = self._last_payload(publisher) + ids = [j["id"] for j in payload["results"]["jobs"]] + assert ids == ["active"] + + def test_throttle_skips_rapid_unforced_broadcasts(self) -> None: + manager, publisher = self._make_manager() + job = ExportJob(camera="front") + job.status = JobStatusTypesEnum.running + manager.jobs[job.id] = job + + manager._broadcast_all_jobs(force=True) + # Immediately following non-forced broadcasts should be skipped. + for _ in range(5): + manager._broadcast_all_jobs(force=False) + + assert publisher.publish.call_count == 1 + + def test_throttle_allows_broadcast_after_interval(self) -> None: + manager, publisher = self._make_manager() + job = ExportJob(camera="front") + job.status = JobStatusTypesEnum.running + manager.jobs[job.id] = job + + with patch("frigate.jobs.export.time.monotonic") as mock_mono: + mock_mono.return_value = 100.0 + manager._broadcast_all_jobs(force=True) + + mock_mono.return_value = 100.0 + PROGRESS_BROADCAST_MIN_INTERVAL + 0.01 + manager._broadcast_all_jobs(force=False) + + assert publisher.publish.call_count == 2 + + def test_force_bypasses_throttle(self) -> None: + manager, publisher = self._make_manager() + job = ExportJob(camera="front") + job.status = JobStatusTypesEnum.running + manager.jobs[job.id] = job + + manager._broadcast_all_jobs(force=True) + manager._broadcast_all_jobs(force=True) + + assert publisher.publish.call_count == 2 + + def test_publisher_exceptions_do_not_propagate(self) -> None: + """A failing publisher must not break the manager: broadcasts are + best-effort since the dispatcher may not be available (tests, + startup races).""" + manager, publisher = self._make_manager() + publisher.publish.side_effect = RuntimeError("comms down") + + job = ExportJob(camera="front") + job.status = JobStatusTypesEnum.running + manager.jobs[job.id] = job + + # Swallow our own RuntimeError if the manager doesn't; the real + # JobStatePublisher handles its own exceptions internally, so the + # manager can stay naive. But if something bubbles up it should + # not escape _broadcast_all_jobs — enforce that contract here. + try: + manager._broadcast_all_jobs(force=True) + except RuntimeError: + self.fail("_broadcast_all_jobs must tolerate publisher failures") + + def test_progress_callback_updates_job_and_broadcasts(self) -> None: + manager, _publisher = self._make_manager() + job = ExportJob(camera="front") + job.status = JobStatusTypesEnum.running + manager.jobs[job.id] = job + + callback = manager._make_progress_callback(job) + callback("encoding", 33.0) + + assert job.current_step == "encoding" + assert job.progress_percent == 33.0 + + +class TestSchedulesCleanup(unittest.TestCase): + def test_schedule_job_cleanup_removes_after_delay(self) -> None: + config = MagicMock() + manager = ExportJobManager(config, max_concurrent=1, max_queued=1) + job = ExportJob(id="cleanup_me", camera="front") + manager.jobs[job.id] = job + + with patch("frigate.jobs.export.threading.Timer") as mock_timer: + manager._schedule_job_cleanup(job.id) + mock_timer.assert_called_once() + delay, fn = mock_timer.call_args.args + assert delay > 0 + + # Invoke the callback directly to confirm it removes the job. + fn() + assert job.id not in manager.jobs + + +if __name__ == "__main__": + unittest.main() diff --git a/web/e2e/specs/export.spec.ts b/web/e2e/specs/export.spec.ts index 605e2dca4..4db98d5e9 100644 --- a/web/e2e/specs/export.spec.ts +++ b/web/e2e/specs/export.spec.ts @@ -732,3 +732,200 @@ test.describe("Multi-Review Export @high", () => { }); }); }); + +test.describe("Export Page - Active Job Progress @medium", () => { + test("encoding job renders percent label and progress bar", async ({ + frigateApp, + }) => { + // Override the default empty mock with an encoding job. Per-test + // page.route registrations win over those set by the api-mocker. + await frigateApp.page.route("**/api/jobs/export", (route) => + route.fulfill({ + json: [ + { + id: "job-encoding", + job_type: "export", + status: "running", + camera: "front_door", + name: "Encoding Sample", + export_case_id: null, + request_start_time: 1775407931, + request_end_time: 1775408531, + start_time: 1775407932, + end_time: null, + error_message: null, + results: null, + current_step: "encoding", + progress_percent: 42, + }, + ], + }), + ); + + await frigateApp.goto("/export"); + + await expect(frigateApp.page.getByText("Encoding Sample")).toBeVisible(); + // Step label and percent are rendered together as text near the + // progress bar (separated by a middle dot), not in a corner badge. + await expect(frigateApp.page.getByText(/Encoding\s*·\s*42%/)).toBeVisible(); + }); + + test("queued job shows queued badge", async ({ frigateApp }) => { + await frigateApp.page.route("**/api/jobs/export", (route) => + route.fulfill({ + json: [ + { + id: "job-queued", + job_type: "export", + status: "queued", + camera: "front_door", + name: "Queued Sample", + export_case_id: null, + request_start_time: 1775407931, + request_end_time: 1775408531, + start_time: null, + end_time: null, + error_message: null, + results: null, + current_step: "queued", + progress_percent: 0, + }, + ], + }), + ); + + await frigateApp.goto("/export"); + + await expect(frigateApp.page.getByText("Queued Sample")).toBeVisible(); + await expect( + frigateApp.page.getByText("Queued", { exact: true }), + ).toBeVisible(); + }); + + test("active job hides matching in_progress export row", async ({ + frigateApp, + }) => { + // The backend inserts the Export row with in_progress=True before + // FFmpeg starts encoding, so the same id appears in BOTH /jobs/export + // and /exports during the run. The page must show the rich progress + // card from the active jobs feed and suppress the binary-spinner + // ExportCard from the exports feed; otherwise the older binary + // spinner replaces the percent label as soon as SWR re-polls. + await frigateApp.page.route("**/api/jobs/export", (route) => + route.fulfill({ + json: [ + { + id: "shared-id", + job_type: "export", + status: "running", + camera: "front_door", + name: "Shared Id Encoding", + export_case_id: null, + request_start_time: 1775407931, + request_end_time: 1775408531, + start_time: 1775407932, + end_time: null, + error_message: null, + results: null, + current_step: "encoding", + progress_percent: 67, + }, + ], + }), + ); + + await frigateApp.page.route("**/api/exports**", (route) => { + if (route.request().method() !== "GET") { + return route.fallback(); + } + return route.fulfill({ + json: [ + { + id: "shared-id", + camera: "front_door", + name: "Shared Id Encoding", + date: 1775407931, + video_path: "/exports/shared-id.mp4", + thumb_path: "/exports/shared-id-thumb.jpg", + in_progress: true, + export_case_id: null, + }, + ], + }); + }); + + await frigateApp.goto("/export"); + + // The progress label must be present — proving the rich card won. + await expect(frigateApp.page.getByText(/Encoding\s*·\s*67%/)).toBeVisible(); + + // And only ONE card should be visible for that id, not two. + const titles = frigateApp.page.getByText("Shared Id Encoding"); + await expect(titles).toHaveCount(1); + }); + + test("stream copy job shows copying label", async ({ frigateApp }) => { + // Default (non-custom) exports use `-c copy`, which is a remux, not + // a real encode. The step label should read "Copying" so users + // aren't misled into thinking re-encoding is happening. + await frigateApp.page.route("**/api/jobs/export", (route) => + route.fulfill({ + json: [ + { + id: "job-copying", + job_type: "export", + status: "running", + camera: "front_door", + name: "Copy Sample", + export_case_id: null, + request_start_time: 1775407931, + request_end_time: 1775408531, + start_time: 1775407932, + end_time: null, + error_message: null, + results: null, + current_step: "copying", + progress_percent: 80, + }, + ], + }), + ); + + await frigateApp.goto("/export"); + + await expect(frigateApp.page.getByText("Copy Sample")).toBeVisible(); + await expect(frigateApp.page.getByText(/Copying\s*·\s*80%/)).toBeVisible(); + }); + + test("encoding retry job shows retry label", async ({ frigateApp }) => { + await frigateApp.page.route("**/api/jobs/export", (route) => + route.fulfill({ + json: [ + { + id: "job-retry", + job_type: "export", + status: "running", + camera: "front_door", + name: "Retry Sample", + export_case_id: null, + request_start_time: 1775407931, + request_end_time: 1775408531, + start_time: 1775407932, + end_time: null, + error_message: null, + results: null, + current_step: "encoding_retry", + progress_percent: 12, + }, + ], + }), + ); + + await frigateApp.goto("/export"); + + await expect(frigateApp.page.getByText("Retry Sample")).toBeVisible(); + await expect( + frigateApp.page.getByText(/Encoding \(retry\)\s*·\s*12%/), + ).toBeVisible(); + }); +}); diff --git a/web/public/locales/en/views/exports.json b/web/public/locales/en/views/exports.json index 5e64952d8..0dd47d342 100644 --- a/web/public/locales/en/views/exports.json +++ b/web/public/locales/en/views/exports.json @@ -58,7 +58,12 @@ "jobCard": { "defaultName": "{{camera}} export", "queued": "Queued", - "running": "Running" + "running": "Running", + "preparing": "Preparing", + "copying": "Copying", + "encoding": "Encoding", + "encodingRetry": "Encoding (retry)", + "finalizing": "Finalizing" }, "caseView": { "noDescription": "No description", diff --git a/web/src/api/ws.ts b/web/src/api/ws.ts index 845ad1fa6..909a1bb5d 100644 --- a/web/src/api/ws.ts +++ b/web/src/api/ws.ts @@ -811,10 +811,10 @@ export function useTriggers(): { payload: TriggerStatus } { return { payload: parsed }; } -export function useJobStatus( +export function useJobStatus( jobType: string, revalidateOnFocus: boolean = true, -): { payload: Job | null } { +): { payload: Job | null } { const { value: { payload }, send: sendCommand, @@ -846,7 +846,7 @@ export function useJobStatus( // eslint-disable-next-line react-hooks/exhaustive-deps }, [revalidateOnFocus]); - return { payload: currentJob as Job | null }; + return { payload: currentJob as Job | null }; } export function useWsMessageSubscribe(callback: (msg: WsFeedMessage) => void) { diff --git a/web/src/components/card/ExportCard.tsx b/web/src/components/card/ExportCard.tsx index 724179128..966aab4dc 100644 --- a/web/src/components/card/ExportCard.tsx +++ b/web/src/components/card/ExportCard.tsx @@ -1,6 +1,7 @@ import ActivityIndicator from "../indicators/activity-indicator"; import { Button } from "../ui/button"; -import { useCallback, useMemo, useRef, useState } from "react"; +import { Progress } from "../ui/progress"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { isMobile } from "react-device-detect"; import { FiMoreVertical } from "react-icons/fi"; import { Skeleton } from "../ui/skeleton"; @@ -128,6 +129,14 @@ export function ExportCard({ exportedRecording.thumb_path.length > 0, ); + // Resync the skeleton state whenever the backing export changes. The + // list keys by id now, so in practice the component remounts instead + // of receiving new props — but this keeps the card honest if a parent + // ever reuses the instance across different exports. + useEffect(() => { + setLoading(exportedRecording.thumb_path.length > 0); + }, [exportedRecording.thumb_path]); + // selection const cardRef = useRef(null); @@ -392,8 +401,35 @@ export function ActiveExportJobCard({ camera: cameraName, }); }, [cameraName, job.name, t]); - const statusLabel = - job.status === "queued" ? t("jobCard.queued") : t("jobCard.running"); + + const step = job.current_step + ? job.current_step + : job.status === "queued" + ? "queued" + : "preparing"; + const percent = Math.round(job.progress_percent ?? 0); + + const stepLabel = useMemo(() => { + switch (step) { + case "queued": + return t("jobCard.queued"); + case "preparing": + return t("jobCard.preparing"); + case "copying": + return t("jobCard.copying"); + case "encoding": + return t("jobCard.encoding"); + case "encoding_retry": + return t("jobCard.encodingRetry"); + case "finalizing": + return t("jobCard.finalizing"); + default: + return t("jobCard.running"); + } + }, [step, t]); + + const hasDeterminateProgress = + step === "copying" || step === "encoding" || step === "encoding_retry"; return (
-
- {statusLabel} -
-
- +
+
+ {stepLabel} + {hasDeterminateProgress && ` · ${percent}%`} +
+ {step === "queued" ? ( + + ) : hasDeterminateProgress ? ( + + ) : ( +
+
+
+ )}
{displayName}
diff --git a/web/src/components/filter/ExportActionGroup.tsx b/web/src/components/filter/ExportActionGroup.tsx index 92e5f251b..d2dd789fe 100644 --- a/web/src/components/filter/ExportActionGroup.tsx +++ b/web/src/components/filter/ExportActionGroup.tsx @@ -30,6 +30,7 @@ type ExportActionGroupProps = { cases?: ExportCase[]; currentCaseId?: string; mutate: () => void; + deleteExports: (ids: string[]) => Promise; }; export default function ExportActionGroup({ selectedExports, @@ -38,6 +39,7 @@ export default function ExportActionGroup({ cases, currentCaseId, mutate, + deleteExports, }: ExportActionGroupProps) { const { t } = useTranslation(["views/exports", "common"]); const isAdmin = useIsAdmin(); @@ -50,27 +52,24 @@ export default function ExportActionGroup({ const onDelete = useCallback(() => { const ids = selectedExports.map((e) => e.id); - axios - .post("exports/delete", { ids }) - .then((resp) => { - if (resp.status === 200) { - toast.success(t("bulkToast.success.delete"), { - position: "top-center", - }); - setSelectedExports([]); - mutate(); - } + deleteExports(ids) + .then(() => { + toast.success(t("bulkToast.success.delete"), { + position: "top-center", + }); + setSelectedExports([]); }) .catch((error) => { const errorMessage = - error.response?.data?.message || - error.response?.data?.detail || + error?.response?.data?.message || + error?.response?.data?.detail || "Unknown error"; - toast.error(t("bulkToast.error.deleteFailed", { errorMessage }), { - position: "top-center", - }); + toast.error( + t("bulkToast.error.deleteFailed", { errorMessage: errorMessage }), + { position: "top-center" }, + ); }); - }, [selectedExports, setSelectedExports, mutate, t]); + }, [selectedExports, setSelectedExports, deleteExports, t]); const [deleteDialogOpen, setDeleteDialogOpen] = useState(false); const [bypassDialog, setBypassDialog] = useState(false); @@ -92,36 +91,54 @@ export default function ExportActionGroup({ const [removeDialogOpen, setRemoveDialogOpen] = useState(false); const [deleteExportsOnRemove, setDeleteExportsOnRemove] = useState(false); + const [isRemovingFromCase, setIsRemovingFromCase] = useState(false); const handleRemoveFromCase = useCallback(() => { const ids = selectedExports.map((e) => e.id); + const deleting = deleteExportsOnRemove; + setIsRemovingFromCase(true); - const request = deleteExportsOnRemove - ? axios.post("exports/delete", { ids }) - : axios.post("exports/reassign", { ids, export_case_id: null }); + const request = deleting + ? deleteExports(ids) + : axios + .post("exports/reassign", { ids, export_case_id: null }) + .then(() => { + mutate(); + }); request - .then((resp) => { - if (resp.status === 200) { - toast.success(t("bulkToast.success.remove"), { - position: "top-center", - }); - setSelectedExports([]); - mutate(); - setRemoveDialogOpen(false); - setDeleteExportsOnRemove(false); - } + .then(() => { + const successKey = deleting + ? "bulkToast.success.delete" + : "bulkToast.success.remove"; + toast.success(t(successKey), { position: "top-center" }); + setSelectedExports([]); + setRemoveDialogOpen(false); + setDeleteExportsOnRemove(false); }) .catch((error) => { const errorMessage = - error.response?.data?.message || - error.response?.data?.detail || + error?.response?.data?.message || + error?.response?.data?.detail || "Unknown error"; - toast.error(t("bulkToast.error.reassignFailed", { errorMessage }), { + const errorKey = deleting + ? "bulkToast.error.deleteFailed" + : "bulkToast.error.reassignFailed"; + toast.error(t(errorKey, { errorMessage: errorMessage }), { position: "top-center", }); + }) + .finally(() => { + setIsRemovingFromCase(false); }); - }, [selectedExports, deleteExportsOnRemove, setSelectedExports, mutate, t]); + }, [ + selectedExports, + deleteExportsOnRemove, + setSelectedExports, + mutate, + deleteExports, + t, + ]); // ── Case picker ───────────────────────────────────────────────── @@ -243,6 +260,7 @@ export default function ExportActionGroup({ { + if (isRemovingFromCase) return; if (!open) { setRemoveDialogOpen(false); setDeleteExportsOnRemove(false); @@ -274,15 +292,17 @@ export default function ExportActionGroup({ id="bulk-delete-exports-switch" checked={deleteExportsOnRemove} onCheckedChange={setDeleteExportsOnRemove} + disabled={isRemovingFromCase} />
- + {t("button.cancel", { ns: "common" })} {t("button.delete", { ns: "common" })} diff --git a/web/src/pages/Exports.tsx b/web/src/pages/Exports.tsx index 8d9466e3d..4f9b78c6d 100644 --- a/web/src/pages/Exports.tsx +++ b/web/src/pages/Exports.tsx @@ -1,4 +1,5 @@ import { baseUrl } from "@/api/baseUrl"; +import { useJobStatus } from "@/api/ws"; import { ActiveExportJobCard, CaseCard, @@ -87,23 +88,45 @@ function Exports() { // Data const { data: cases, mutate: updateCases } = useSWR("cases"); - const { data: activeExportJobs } = useSWR("jobs/export", { - refreshInterval: (latestJobs) => ((latestJobs ?? []).length > 0 ? 2000 : 0), - }); - // Keep polling exports while there are queued/running jobs OR while any - // existing export is still marked in_progress. Without the second clause, - // a stale in_progress=true snapshot can stick if the activeExportJobs poll - // clears before the rawExports poll fires — SWR cancels the pending - // rawExports refresh and the UI freezes on spinners until a manual reload. + + // The HTTP fetch hydrates the page on first paint and on focus. Once the + // WebSocket is connected, the `job_state` topic delivers progress updates + // in real time, so periodic polling here would only add noise. + const { data: pollExportJobs, mutate: updateActiveJobs } = useSWR< + ExportJob[] + >("jobs/export", { refreshInterval: 0 }); + + const { payload: exportJobState } = useJobStatus<{ jobs: ExportJob[] }>( + "export", + ); + const wsExportJobs = useMemo( + () => exportJobState?.results?.jobs ?? [], + [exportJobState], + ); + + // Merge: a job present in the WS payload is authoritative (it has the + // freshest progress); the SWR snapshot fills in jobs that haven't yet + // arrived over the socket (e.g. before the first WS message after a + // page load). Once we've seen at least one WS message, we trust the WS + // payload as the complete active set. + const hasWsState = exportJobState !== null; + const activeExportJobs = useMemo(() => { + if (hasWsState) { + return wsExportJobs; + } + return pollExportJobs ?? []; + }, [hasWsState, wsExportJobs, pollExportJobs]); + + // Keep polling exports while any existing export is still marked + // in_progress so the UI flips from spinner to playable card without a + // manual reload. Once active jobs disappear from the WS feed we also + // mutate() below to fetch newly-completed exports immediately. const { data: rawExports, mutate: updateExports } = useSWR( exportSearchParams && Object.keys(exportSearchParams).length > 0 ? ["exports", exportSearchParams] : "exports", { refreshInterval: (latestExports) => { - if ((activeExportJobs?.length ?? 0) > 0) { - return 2000; - } if ((latestExports ?? []).some((exp) => exp.in_progress)) { return 2000; } @@ -112,22 +135,40 @@ function Exports() { }, ); + // When one or more active jobs disappear from the WS feed, refresh the + // exports list so newly-finished items appear without waiting for focus- + // based SWR revalidation. Clear the HTTP jobs snapshot once the live set is + // empty so a stale poll result does not resurrect completed jobs. + const previousActiveJobIdsRef = useRef>(new Set()); + useEffect(() => { + const previousIds = previousActiveJobIdsRef.current; + const currentIds = new Set(activeExportJobs.map((job) => job.id)); + const removedJob = Array.from(previousIds).some( + (id) => !currentIds.has(id), + ); + + if (removedJob) { + updateExports(); + updateCases(); + } + + if (previousIds.size > 0 && currentIds.size === 0) { + updateActiveJobs([], false); + } + previousActiveJobIdsRef.current = currentIds; + }, [activeExportJobs, updateExports, updateCases, updateActiveJobs]); + const visibleActiveJobs = useMemo(() => { - const existingExportIds = new Set((rawExports ?? []).map((exp) => exp.id)); const filteredCameras = exportFilter?.cameras; return (activeExportJobs ?? []).filter((job) => { - if (existingExportIds.has(job.id)) { - return false; - } - if (filteredCameras && filteredCameras.length > 0) { return filteredCameras.includes(job.camera); } return true; }); - }, [activeExportJobs, exportFilter?.cameras, rawExports]); + }, [activeExportJobs, exportFilter?.cameras]); const activeJobsByCase = useMemo<{ [caseId: string]: ExportJob[] }>(() => { const grouped: { [caseId: string]: ExportJob[] } = {}; @@ -144,9 +185,26 @@ function Exports() { return grouped; }, [visibleActiveJobs]); + // The backend inserts the Export row with in_progress=True before the + // FFmpeg encode kicks off, so the same id is briefly present in BOTH + // rawExports and the active job list. The ActiveExportJobCard renders + // step + percent; the ExportCard would render a binary spinner. To + // avoid that downgrade, hide the rawExport entry while there's a + // matching active job — once the job leaves the active list the + // exports SWR refresh kicks in and the regular card takes over. + const activeJobIds = useMemo>( + () => new Set(visibleActiveJobs.map((job) => job.id)), + [visibleActiveJobs], + ); + + const visibleExports = useMemo( + () => (rawExports ?? []).filter((exp) => !activeJobIds.has(exp.id)), + [activeJobIds, rawExports], + ); + const exportsByCase = useMemo<{ [caseId: string]: Export[] }>(() => { const grouped: { [caseId: string]: Export[] } = {}; - (rawExports ?? []).forEach((exp) => { + visibleExports.forEach((exp) => { const caseId = exp.export_case ?? exp.export_case_id ?? "none"; if (!grouped[caseId]) { grouped[caseId] = []; @@ -155,7 +213,7 @@ function Exports() { grouped[caseId].push(exp); }); return grouped; - }, [rawExports]); + }, [visibleExports]); const filteredCases = useMemo(() => { if (!cases) return []; @@ -184,6 +242,34 @@ function Exports() { updateCases(); }, [updateExports, updateCases]); + // Deletes one or more exports and keeps the UI in sync. SWR's default + // mutate() keeps the stale list visible until the revalidation GET + // returns, which can be seconds for large batches — long enough for + // users to click on a card whose underlying file is already gone. + // Strip the deleted ids from the cache up front, then fire the POST, + // then revalidate to reconcile with server truth. + const deleteExports = useCallback( + async (ids: string[]): Promise => { + const idSet = new Set(ids); + const removeDeleted = (current: Export[] | undefined) => + current ? current.filter((exp) => !idSet.has(exp.id)) : current; + + await updateExports(removeDeleted, { revalidate: false }); + + try { + await axios.post("exports/delete", { ids }); + await updateExports(); + await updateCases(); + } catch (err) { + // On failure, pull fresh state from the server so any items that + // weren't actually deleted reappear in the UI. + await updateExports(); + throw err; + } + }, + [updateExports, updateCases], + ); + // Search const [search, setSearch] = useState(""); @@ -208,7 +294,9 @@ function Exports() { return false; } - setSelected(rawExports.find((exp) => exp.id == id)); + // Use visibleExports so deep links to a still-encoding id don't try + // to open a player against a half-written video file. + setSelected(visibleExports.find((exp) => exp.id == id)); return true; }); @@ -260,7 +348,7 @@ function Exports() { const currentExports = selectedCaseId ? exportsByCase[selectedCaseId] || [] : exports; - const visibleExports = currentExports.filter((e) => { + const selectable = currentExports.filter((e) => { if (e.in_progress) return false; if (!search) return true; return e.name @@ -268,8 +356,8 @@ function Exports() { .replaceAll("_", " ") .includes(search.toLowerCase()); }); - if (selectedExports.length < visibleExports.length) { - setSelectedExports(visibleExports); + if (selectedExports.length < selectable.length) { + setSelectedExports(selectable); } else { setSelectedExports([]); } @@ -293,15 +381,19 @@ function Exports() { return; } - axios - .post("exports/delete", { ids: [deleteClip.file] }) - .then((response) => { - if (response.status == 200) { - setDeleteClip(undefined); - mutate(); - } + deleteExports([deleteClip.file]) + .then(() => setDeleteClip(undefined)) + .catch((error) => { + const errorMessage = + error?.response?.data?.message || + error?.response?.data?.detail || + "Unknown error"; + toast.error( + t("bulkToast.error.deleteFailed", { errorMessage: errorMessage }), + { position: "top-center" }, + ); }); - }, [deleteClip, mutate]); + }, [deleteClip, deleteExports, t]); const onHandleRename = useCallback( (id: string, update: string) => { @@ -629,6 +721,7 @@ function Exports() { cases={cases} currentCaseId={selectedCaseId} mutate={mutate} + deleteExports={deleteExports} /> ) : ( <> @@ -893,7 +986,7 @@ function AllExportsView({ ))} {filteredExports.map((item) => ( e.id === item.id)} diff --git a/web/src/types/export.ts b/web/src/types/export.ts index 1926b533b..113f0f103 100644 --- a/web/src/types/export.ts +++ b/web/src/types/export.ts @@ -59,6 +59,14 @@ export type StartExportResponse = { status?: string | null; }; +export type ExportJobStep = + | "queued" + | "preparing" + | "copying" + | "encoding" + | "encoding_retry" + | "finalizing"; + export type ExportJob = { id: string; job_type: string; @@ -77,6 +85,8 @@ export type ExportJob = { video_path?: string; thumb_path?: string; } | null; + current_step?: ExportJobStep; + progress_percent?: number; }; export type CameraActivitySegment = { diff --git a/web/src/types/ws.ts b/web/src/types/ws.ts index 6e22345eb..1ed4ca039 100644 --- a/web/src/types/ws.ts +++ b/web/src/types/ws.ts @@ -146,11 +146,11 @@ export type MediaSyncResults = { totals: MediaSyncTotals; }; -export type Job = { +export type Job = { id: string; job_type: string; status: string; - results?: MediaSyncResults; + results?: TResults; start_time?: number; end_time?: number; error_message?: string; diff --git a/web/src/views/settings/MediaSyncSettingsView.tsx b/web/src/views/settings/MediaSyncSettingsView.tsx index 5ff72b8c8..57f3f9685 100644 --- a/web/src/views/settings/MediaSyncSettingsView.tsx +++ b/web/src/views/settings/MediaSyncSettingsView.tsx @@ -13,7 +13,7 @@ import { Switch } from "@/components/ui/switch"; import { LuCheck, LuX } from "react-icons/lu"; import { cn } from "@/lib/utils"; import { formatUnixTimestampToDateTime } from "@/utils/dateUtil"; -import { MediaSyncStats } from "@/types/ws"; +import { MediaSyncResults, MediaSyncStats } from "@/types/ws"; export default function MediaSyncSettingsView() { const { t } = useTranslation("views/settings"); @@ -35,7 +35,8 @@ export default function MediaSyncSettingsView() { ]; // Subscribe to media sync status via WebSocket - const { payload: currentJob } = useJobStatus("media_sync"); + const { payload: currentJob } = useJobStatus("media_sync"); + const mediaSyncResults = currentJob?.results; const isJobRunning = Boolean( currentJob && @@ -301,7 +302,7 @@ export default function MediaSyncSettingsView() {
)} - {currentJob?.results && ( + {mediaSyncResults && (

{t("maintenance.sync.results")} @@ -309,7 +310,7 @@ export default function MediaSyncSettingsView() {

{/* Individual media type results */}
- {Object.entries(currentJob.results) + {Object.entries(mediaSyncResults) .filter(([key]) => key !== "totals") .map(([mediaType, stats]) => { const mediaStats = stats as MediaSyncStats; @@ -386,7 +387,7 @@ export default function MediaSyncSettingsView() { })}
{/* Totals */} - {currentJob.results.totals && ( + {mediaSyncResults.totals && (

{t("maintenance.sync.resultsFields.totals")} @@ -399,7 +400,7 @@ export default function MediaSyncSettingsView() { )} - {currentJob.results.totals.files_checked} + {mediaSyncResults.totals.files_checked}

@@ -410,12 +411,12 @@ export default function MediaSyncSettingsView() { 0 + mediaSyncResults.totals.orphans_found > 0 ? "font-medium text-yellow-500" : "font-medium" } > - {currentJob.results.totals.orphans_found} + {mediaSyncResults.totals.orphans_found}
@@ -427,13 +428,12 @@ export default function MediaSyncSettingsView() { - 0 + mediaSyncResults.totals.orphans_deleted > 0 ? "text-success" : "text-muted-foreground", )} > - {currentJob.results.totals.orphans_deleted} + {mediaSyncResults.totals.orphans_deleted}