Add step + percent progress for exports (#22915)

* backend

* improve frontend Job typing

* progress frontend

* i18n

* tests
This commit is contained in:
Josh Hawkins
2026-04-17 13:18:12 -05:00
committed by GitHub
parent a94d1b5d9e
commit 74fcd720d3
14 changed files with 1216 additions and 106 deletions

View File

@@ -107,6 +107,14 @@ class ExportJobModel(BaseModel):
default=None,
description="Result metadata for completed jobs",
)
current_step: str = Field(
default="queued",
description="Current execution step (queued, preparing, encoding, encoding_retry, finalizing)",
)
progress_percent: float = Field(
default=0.0,
description="Progress percentage of the current step (0.0 - 100.0)",
)
ExportJobsResponse = List[ExportJobModel]

View File

@@ -7,11 +7,13 @@ import time
from dataclasses import dataclass
from pathlib import Path
from queue import Full, Queue
from typing import Any, Optional
from typing import Any, Callable, Optional
from peewee import DoesNotExist
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import UPDATE_JOB_STATE
from frigate.jobs.job import Job
from frigate.models import Export
from frigate.record.export import PlaybackSourceEnum, RecordingExporter
@@ -23,6 +25,16 @@ logger = logging.getLogger(__name__)
# Prevents a runaway client from unbounded memory growth.
MAX_QUEUED_EXPORT_JOBS = 100
# Minimum interval between progress broadcasts. FFmpeg can emit progress
# events many times per second; we coalesce them so the WebSocket isn't
# flooded with redundant updates.
PROGRESS_BROADCAST_MIN_INTERVAL = 1.0
# Delay before removing a completed job from the in-memory map. Gives the
# frontend a chance to receive the final state via WebSocket before SWR
# polling takes over.
COMPLETED_JOB_CLEANUP_DELAY = 5.0
class ExportQueueFullError(RuntimeError):
"""Raised when the export queue is at capacity."""
@@ -43,6 +55,8 @@ class ExportJob(Job):
ffmpeg_input_args: Optional[str] = None
ffmpeg_output_args: Optional[str] = None
cpu_fallback: bool = False
current_step: str = "queued"
progress_percent: float = 0.0
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API responses.
@@ -64,6 +78,8 @@ class ExportJob(Job):
"end_time": self.end_time,
"error_message": self.error_message,
"results": self.results,
"current_step": self.current_step,
"progress_percent": self.progress_percent,
}
@@ -91,6 +107,38 @@ class ExportQueueWorker(threading.Thread):
self.manager.queue.task_done()
class JobStatePublisher:
"""Publishes a single job state payload to the dispatcher.
Each call opens a short-lived :py:class:`InterProcessRequestor`, sends
the payload, and closes the socket. The short-lived design avoids
REQ/REP state corruption that would arise from sharing a single REQ
socket across the API thread and worker threads (REQ sockets must
strictly alternate send/recv).
With the 1s broadcast throttle in place, socket creation overhead is
negligible. The class also exists so tests can substitute a no-op
instance instead of stubbing ZMQ — see ``BaseTestHttp.setUp``.
"""
def publish(self, payload: dict[str, Any]) -> None:
try:
requestor = InterProcessRequestor()
except Exception as err:
logger.warning("Failed to open job state requestor: %s", err)
return
try:
requestor.send_data(UPDATE_JOB_STATE, payload)
except Exception as err:
logger.debug("Job state broadcast failed: %s", err)
finally:
try:
requestor.stop()
except Exception:
pass
class ExportJobManager:
"""Concurrency-limited manager for queued export jobs."""
@@ -99,6 +147,7 @@ class ExportJobManager:
config: FrigateConfig,
max_concurrent: int,
max_queued: int = MAX_QUEUED_EXPORT_JOBS,
publisher: Optional[JobStatePublisher] = None,
) -> None:
self.config = config
self.max_concurrent = max(1, max_concurrent)
@@ -107,6 +156,68 @@ class ExportJobManager:
self.lock = threading.Lock()
self.workers: list[ExportQueueWorker] = []
self.started = False
self.publisher = publisher if publisher is not None else JobStatePublisher()
self._last_broadcast_monotonic: float = 0.0
self._broadcast_throttle_lock = threading.Lock()
def _broadcast_all_jobs(self, force: bool = False) -> None:
"""Publish aggregate export job state via the job_state WS topic.
When ``force`` is False, broadcasts within
``PROGRESS_BROADCAST_MIN_INTERVAL`` of the previous one are skipped
to avoid flooding the WebSocket with rapid progress updates.
``force`` bypasses the throttle and is used for status transitions
(enqueue/start/finish) where the frontend needs the latest state.
"""
now = time.monotonic()
with self._broadcast_throttle_lock:
if (
not force
and now - self._last_broadcast_monotonic
< PROGRESS_BROADCAST_MIN_INTERVAL
):
return
self._last_broadcast_monotonic = now
with self.lock:
active = [
j
for j in self.jobs.values()
if j.status in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running)
]
any_running = any(j.status == JobStatusTypesEnum.running for j in active)
payload: dict[str, Any] = {
"job_type": "export",
"status": "running" if any_running else "queued",
"results": {"jobs": [j.to_dict() for j in active]},
}
try:
self.publisher.publish(payload)
except Exception as err:
logger.warning("Publisher raised during job state broadcast: %s", err)
def _make_progress_callback(self, job: ExportJob) -> Callable[[str, float], None]:
"""Build a callback the exporter can invoke during execution."""
def on_progress(step: str, percent: float) -> None:
job.current_step = step
job.progress_percent = percent
self._broadcast_all_jobs()
return on_progress
def _schedule_job_cleanup(self, job_id: str) -> None:
"""Drop a completed job from ``self.jobs`` after a short delay."""
def cleanup() -> None:
with self.lock:
self.jobs.pop(job_id, None)
timer = threading.Timer(COMPLETED_JOB_CLEANUP_DELAY, cleanup)
timer.daemon = True
timer.start()
def ensure_started(self) -> None:
"""Ensure worker threads are started exactly once."""
@@ -151,6 +262,8 @@ class ExportJobManager:
with self.lock:
self.jobs[job.id] = job
self._broadcast_all_jobs(force=True)
return job.id
def get_job(self, job_id: str) -> Optional[ExportJob]:
@@ -215,6 +328,7 @@ class ExportJobManager:
"""Execute a queued export job."""
job.status = JobStatusTypesEnum.running
job.start_time = time.time()
self._broadcast_all_jobs(force=True)
exporter = RecordingExporter(
self.config,
@@ -229,6 +343,7 @@ class ExportJobManager:
job.ffmpeg_input_args,
job.ffmpeg_output_args,
job.cpu_fallback,
on_progress=self._make_progress_callback(job),
)
try:
@@ -257,6 +372,8 @@ class ExportJobManager:
job.error_message = str(err)
finally:
job.end_time = time.time()
self._broadcast_all_jobs(force=True)
self._schedule_job_cleanup(job.id)
_job_manager: Optional[ExportJobManager] = None

View File

@@ -4,13 +4,14 @@ import datetime
import logging
import os
import random
import re
import shutil
import string
import subprocess as sp
import threading
from enum import Enum
from pathlib import Path
from typing import Optional
from typing import Callable, Optional
from peewee import DoesNotExist
@@ -36,6 +37,10 @@ logger = logging.getLogger(__name__)
DEFAULT_TIME_LAPSE_FFMPEG_ARGS = "-vf setpts=0.04*PTS -r 30"
TIMELAPSE_DATA_INPUT_ARGS = "-an -skip_frame nokey"
# Matches the setpts factor used in timelapse exports (e.g. setpts=0.04*PTS).
# Captures the floating-point factor so we can scale expected duration.
SETPTS_FACTOR_RE = re.compile(r"setpts=([0-9]*\.?[0-9]+)\*PTS")
# ffmpeg flags that can read from or write to arbitrary files
BLOCKED_FFMPEG_ARGS = frozenset(
{
@@ -116,6 +121,7 @@ class RecordingExporter(threading.Thread):
ffmpeg_input_args: Optional[str] = None,
ffmpeg_output_args: Optional[str] = None,
cpu_fallback: bool = False,
on_progress: Optional[Callable[[str, float], None]] = None,
) -> None:
super().__init__()
self.config = config
@@ -130,10 +136,213 @@ class RecordingExporter(threading.Thread):
self.ffmpeg_input_args = ffmpeg_input_args
self.ffmpeg_output_args = ffmpeg_output_args
self.cpu_fallback = cpu_fallback
self.on_progress = on_progress
# ensure export thumb dir
Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True)
def _emit_progress(self, step: str, percent: float) -> None:
"""Invoke the progress callback if one was supplied."""
if self.on_progress is None:
return
try:
self.on_progress(step, max(0.0, min(100.0, percent)))
except Exception:
logger.exception("Export progress callback failed")
def _expected_output_duration_seconds(self) -> float:
"""Compute the expected duration of the output video in seconds.
Users often request a wide time range (e.g. a full hour) when only
a few minutes of recordings actually live on disk for that span,
so the requested range overstates the work and progress would
plateau very early. We sum the actual saved seconds from the
Recordings/Previews tables and use that as the input duration.
Timelapse exports then scale this by the setpts factor.
"""
requested_duration = max(0.0, float(self.end_time - self.start_time))
recorded = self._sum_source_duration_seconds()
input_duration = (
recorded if recorded is not None and recorded > 0 else requested_duration
)
if not self.ffmpeg_output_args:
return input_duration
match = SETPTS_FACTOR_RE.search(self.ffmpeg_output_args)
if match is None:
return input_duration
try:
factor = float(match.group(1))
except ValueError:
return input_duration
if factor <= 0:
return input_duration
return input_duration * factor
def _sum_source_duration_seconds(self) -> Optional[float]:
"""Sum saved-video seconds inside [start_time, end_time].
Queries Recordings or Previews depending on the playback source,
clamps each segment to the requested range, and returns the total.
Returns ``None`` on any error so the caller can fall back to the
requested range duration without losing progress reporting.
"""
try:
if self.playback_source == PlaybackSourceEnum.recordings:
rows = (
Recordings.select(Recordings.start_time, Recordings.end_time)
.where(
Recordings.start_time.between(self.start_time, self.end_time)
| Recordings.end_time.between(self.start_time, self.end_time)
| (
(self.start_time > Recordings.start_time)
& (self.end_time < Recordings.end_time)
)
)
.where(Recordings.camera == self.camera)
.iterator()
)
else:
rows = (
Previews.select(Previews.start_time, Previews.end_time)
.where(
Previews.start_time.between(self.start_time, self.end_time)
| Previews.end_time.between(self.start_time, self.end_time)
| (
(self.start_time > Previews.start_time)
& (self.end_time < Previews.end_time)
)
)
.where(Previews.camera == self.camera)
.iterator()
)
except Exception:
logger.exception(
"Failed to sum source duration for export %s", self.export_id
)
return None
total = 0.0
try:
for row in rows:
clipped_start = max(float(row.start_time), float(self.start_time))
clipped_end = min(float(row.end_time), float(self.end_time))
if clipped_end > clipped_start:
total += clipped_end - clipped_start
except Exception:
logger.exception(
"Failed to read recording rows for export %s", self.export_id
)
return None
return total
def _inject_progress_flags(self, ffmpeg_cmd: list[str]) -> list[str]:
"""Insert FFmpeg progress reporting flags before the output path.
``-progress pipe:2`` writes structured key=value lines to stderr,
``-nostats`` suppresses the noisy default stats output.
"""
if not ffmpeg_cmd:
return ffmpeg_cmd
return ffmpeg_cmd[:-1] + ["-progress", "pipe:2", "-nostats", ffmpeg_cmd[-1]]
def _run_ffmpeg_with_progress(
self,
ffmpeg_cmd: list[str],
playlist_lines: str | list[str],
step: str = "encoding",
) -> tuple[int, str]:
"""Run an FFmpeg export command, parsing progress events from stderr.
Returns ``(returncode, captured_stderr)``. Stdout is left attached to
the parent process so we don't have to drain it (and risk a deadlock
if the buffer fills). Progress percent is computed against the
expected output duration; values are clamped to [0, 100] inside
:py:meth:`_emit_progress`.
"""
cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + self._inject_progress_flags(
ffmpeg_cmd
)
if isinstance(playlist_lines, list):
stdin_payload = "\n".join(playlist_lines)
else:
stdin_payload = playlist_lines
expected_duration = self._expected_output_duration_seconds()
self._emit_progress(step, 0.0)
proc = sp.Popen(
cmd,
stdin=sp.PIPE,
stderr=sp.PIPE,
text=True,
encoding="ascii",
errors="replace",
)
assert proc.stdin is not None
assert proc.stderr is not None
try:
proc.stdin.write(stdin_payload)
except (BrokenPipeError, OSError):
# FFmpeg may have rejected the input early; still wait for it
# to terminate so the returncode is meaningful.
pass
finally:
try:
proc.stdin.close()
except (BrokenPipeError, OSError):
pass
captured: list[str] = []
try:
for raw_line in proc.stderr:
captured.append(raw_line)
line = raw_line.strip()
if not line:
continue
if line.startswith("out_time_us="):
if expected_duration <= 0:
continue
try:
out_time_us = int(line.split("=", 1)[1])
except (ValueError, IndexError):
continue
if out_time_us < 0:
continue
out_seconds = out_time_us / 1_000_000.0
percent = (out_seconds / expected_duration) * 100.0
self._emit_progress(step, percent)
elif line == "progress=end":
self._emit_progress(step, 100.0)
break
except Exception:
logger.exception("Failed reading FFmpeg progress for %s", self.export_id)
proc.wait()
# Drain any remaining stderr so callers can log it on failure.
try:
remaining = proc.stderr.read()
if remaining:
captured.append(remaining)
except Exception:
pass
return proc.returncode, "".join(captured)
def get_datetime_from_timestamp(self, timestamp: int) -> str:
# return in iso format
return datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
@@ -406,6 +615,7 @@ class RecordingExporter(threading.Thread):
logger.debug(
f"Beginning export for {self.camera} from {self.start_time} to {self.end_time}"
)
self._emit_progress("preparing", 0.0)
export_name = (
self.user_provided_name
or f"{self.camera.replace('_', ' ')} {self.get_datetime_from_timestamp(self.start_time)} {self.get_datetime_from_timestamp(self.end_time)}"
@@ -443,16 +653,23 @@ class RecordingExporter(threading.Thread):
except DoesNotExist:
return
p = sp.run(
["nice", "-n", str(PROCESS_PRIORITY_LOW)] + ffmpeg_cmd,
input="\n".join(playlist_lines),
encoding="ascii",
capture_output=True,
# When neither custom ffmpeg arg is set the default path uses
# `-c copy` (stream copy — no re-encoding). Report that as a
# distinct step so the UI doesn't mislabel a remux as encoding.
# The retry branch below always re-encodes because cpu_fallback
# requires custom args; it stays "encoding_retry".
is_stream_copy = (
self.ffmpeg_input_args is None and self.ffmpeg_output_args is None
)
initial_step = "copying" if is_stream_copy else "encoding"
returncode, stderr = self._run_ffmpeg_with_progress(
ffmpeg_cmd, playlist_lines, step=initial_step
)
# If export failed and cpu_fallback is enabled, retry without hwaccel
if (
p.returncode != 0
returncode != 0
and self.cpu_fallback
and self.ffmpeg_input_args is not None
and self.ffmpeg_output_args is not None
@@ -470,23 +687,21 @@ class RecordingExporter(threading.Thread):
video_path, use_hwaccel=False
)
p = sp.run(
["nice", "-n", str(PROCESS_PRIORITY_LOW)] + ffmpeg_cmd,
input="\n".join(playlist_lines),
encoding="ascii",
capture_output=True,
returncode, stderr = self._run_ffmpeg_with_progress(
ffmpeg_cmd, playlist_lines, step="encoding_retry"
)
if p.returncode != 0:
if returncode != 0:
logger.error(
f"Failed to export {self.playback_source.value} for command {' '.join(ffmpeg_cmd)}"
)
logger.error(p.stderr)
logger.error(stderr)
Path(video_path).unlink(missing_ok=True)
Export.delete().where(Export.id == self.export_id).execute()
Path(thumb_path).unlink(missing_ok=True)
return
else:
self._emit_progress("finalizing", 100.0)
Export.update({Export.in_progress: False}).where(
Export.id == self.export_id
).execute()

View File

@@ -2,6 +2,7 @@ import datetime
import logging
import os
import unittest
from unittest.mock import patch
from fastapi import Request
from fastapi.testclient import TestClient
@@ -14,6 +15,7 @@ from frigate.api.fastapi_app import create_fastapi_app
from frigate.config import FrigateConfig
from frigate.const import BASE_DIR, CACHE_DIR
from frigate.debug_replay import DebugReplayManager
from frigate.jobs.export import JobStatePublisher
from frigate.models import Event, Recordings, ReviewSegment
from frigate.review.types import SeverityEnum
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
@@ -44,6 +46,19 @@ class BaseTestHttp(unittest.TestCase):
self.db = SqliteQueueDatabase(TEST_DB)
self.db.bind(models)
# The export job manager broadcasts via JobStatePublisher on
# enqueue/start/finish. There is no dispatcher process bound to
# the IPC socket in tests, so a real publish() would block on
# recv_json forever. Replace publish with a no-op for the
# lifetime of this test; the lookup goes through the class so any
# already-instantiated publisher (the singleton manager's) picks
# up the no-op too.
publisher_patch = patch.object(
JobStatePublisher, "publish", lambda self, payload: None
)
publisher_patch.start()
self.addCleanup(publisher_patch.stop)
self.minimal_config = {
"mqtt": {"host": "mqtt"},
"cameras": {

View File

@@ -0,0 +1,385 @@
"""Tests for export progress tracking, broadcast, and FFmpeg parsing."""
import io
import unittest
from unittest.mock import MagicMock, patch
from frigate.jobs.export import (
PROGRESS_BROADCAST_MIN_INTERVAL,
ExportJob,
ExportJobManager,
)
from frigate.record.export import PlaybackSourceEnum, RecordingExporter
from frigate.types import JobStatusTypesEnum
def _make_exporter(
end_minus_start: int = 100,
ffmpeg_input_args=None,
ffmpeg_output_args=None,
on_progress=None,
) -> RecordingExporter:
"""Build a RecordingExporter without invoking its real __init__ side
effects (which create directories and require a full FrigateConfig)."""
exporter = RecordingExporter.__new__(RecordingExporter)
exporter.config = MagicMock()
exporter.export_id = "test_export"
exporter.camera = "front"
exporter.user_provided_name = None
exporter.user_provided_image = None
exporter.start_time = 1_000
exporter.end_time = 1_000 + end_minus_start
exporter.playback_source = PlaybackSourceEnum.recordings
exporter.export_case_id = None
exporter.ffmpeg_input_args = ffmpeg_input_args
exporter.ffmpeg_output_args = ffmpeg_output_args
exporter.cpu_fallback = False
exporter.on_progress = on_progress
return exporter
class TestExportJobToDict(unittest.TestCase):
def test_to_dict_includes_progress_fields(self) -> None:
job = ExportJob(camera="front", request_start_time=0, request_end_time=10)
result = job.to_dict()
assert "current_step" in result
assert "progress_percent" in result
assert result["current_step"] == "queued"
assert result["progress_percent"] == 0.0
def test_to_dict_reflects_updated_progress(self) -> None:
job = ExportJob(camera="front", request_start_time=0, request_end_time=10)
job.current_step = "encoding"
job.progress_percent = 42.5
result = job.to_dict()
assert result["current_step"] == "encoding"
assert result["progress_percent"] == 42.5
class TestExpectedOutputDuration(unittest.TestCase):
def test_normal_export_uses_input_duration(self) -> None:
exporter = _make_exporter(end_minus_start=600)
assert exporter._expected_output_duration_seconds() == 600.0
def test_timelapse_uses_setpts_factor(self) -> None:
exporter = _make_exporter(
end_minus_start=1000,
ffmpeg_input_args="-y",
ffmpeg_output_args="-vf setpts=0.04*PTS -r 30",
)
# 1000s input * 0.04 = 40s of output
assert exporter._expected_output_duration_seconds() == 40.0
def test_unknown_factor_falls_back_to_input_duration(self) -> None:
exporter = _make_exporter(
end_minus_start=300,
ffmpeg_input_args="-y",
ffmpeg_output_args="-c:v libx264 -preset veryfast",
)
assert exporter._expected_output_duration_seconds() == 300.0
def test_zero_factor_falls_back_to_input_duration(self) -> None:
exporter = _make_exporter(
end_minus_start=300,
ffmpeg_input_args="-y",
ffmpeg_output_args="-vf setpts=0*PTS",
)
assert exporter._expected_output_duration_seconds() == 300.0
def test_uses_actual_recorded_seconds_when_available(self) -> None:
"""If the DB shows only 120s of saved recordings inside a 1h
requested range, progress should be computed against 120s."""
exporter = _make_exporter(end_minus_start=3600)
exporter._sum_source_duration_seconds = lambda: 120.0 # type: ignore[method-assign]
assert exporter._expected_output_duration_seconds() == 120.0
def test_actual_recorded_seconds_scaled_by_setpts(self) -> None:
"""Recorded duration must still be scaled by the timelapse factor."""
exporter = _make_exporter(
end_minus_start=3600,
ffmpeg_input_args="-y",
ffmpeg_output_args="-vf setpts=0.04*PTS -r 30",
)
exporter._sum_source_duration_seconds = lambda: 600.0 # type: ignore[method-assign]
# 600s * 0.04 = 24s of output
assert exporter._expected_output_duration_seconds() == 24.0
def test_db_failure_falls_back_to_requested_range(self) -> None:
exporter = _make_exporter(end_minus_start=300)
exporter._sum_source_duration_seconds = lambda: None # type: ignore[method-assign]
assert exporter._expected_output_duration_seconds() == 300.0
class TestProgressFlagInjection(unittest.TestCase):
def test_inserts_before_output_path(self) -> None:
exporter = _make_exporter()
cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"]
result = exporter._inject_progress_flags(cmd)
assert result == [
"ffmpeg",
"-i",
"input.m3u8",
"-c",
"copy",
"-progress",
"pipe:2",
"-nostats",
"/tmp/output.mp4",
]
def test_handles_empty_cmd(self) -> None:
exporter = _make_exporter()
assert exporter._inject_progress_flags([]) == []
class TestFfmpegProgressParsing(unittest.TestCase):
"""Verify percentage calculation from FFmpeg ``-progress`` output."""
def _run_with_stderr(
self,
stderr_text: str,
expected_duration_seconds: int = 90,
) -> list[tuple[str, float]]:
"""Helper: run _run_ffmpeg_with_progress against a mocked Popen
whose stderr emits the supplied text. Returns the list of
(step, percent) tuples that the on_progress callback received."""
captured: list[tuple[str, float]] = []
def on_progress(step: str, percent: float) -> None:
captured.append((step, percent))
exporter = _make_exporter(
end_minus_start=expected_duration_seconds,
on_progress=on_progress,
)
fake_proc = MagicMock()
fake_proc.stdin = io.StringIO()
fake_proc.stderr = io.StringIO(stderr_text)
fake_proc.returncode = 0
fake_proc.wait = MagicMock(return_value=0)
with patch("frigate.record.export.sp.Popen", return_value=fake_proc):
returncode, _stderr = exporter._run_ffmpeg_with_progress(
["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding"
)
assert returncode == 0
return captured
def test_parses_out_time_us_into_percent(self) -> None:
# 90s duration; 45s out_time => 50%
stderr = "out_time_us=45000000\nprogress=continue\n"
captured = self._run_with_stderr(stderr, expected_duration_seconds=90)
# The first call is the synchronous 0.0 emit before Popen runs.
assert captured[0] == ("encoding", 0.0)
assert any(percent == 50.0 for step, percent in captured if step == "encoding")
def test_progress_end_emits_100_percent(self) -> None:
stderr = "out_time_us=10000000\nprogress=end\n"
captured = self._run_with_stderr(stderr, expected_duration_seconds=90)
assert captured[-1] == ("encoding", 100.0)
def test_clamps_overshoot_at_100(self) -> None:
# 150s of output reported against 90s expected duration.
stderr = "out_time_us=150000000\nprogress=continue\n"
captured = self._run_with_stderr(stderr, expected_duration_seconds=90)
encoding_values = [p for s, p in captured if s == "encoding" and p > 0]
assert all(p <= 100.0 for p in encoding_values)
assert encoding_values[-1] == 100.0
def test_ignores_garbage_lines(self) -> None:
stderr = (
"frame= 120 fps= 30 q=23.0 size= 512kB\n"
"out_time_us=not-a-number\n"
"out_time_us=30000000\n"
"progress=continue\n"
)
captured = self._run_with_stderr(stderr, expected_duration_seconds=90)
# We expect 0.0 (from initial emit) plus the 30s/90s = 33.33...% step
encoding_percents = sorted({round(p, 2) for s, p in captured})
assert 0.0 in encoding_percents
assert any(abs(p - (30 / 90 * 100)) < 0.01 for p in encoding_percents)
class TestBroadcastAggregation(unittest.TestCase):
"""Verify ExportJobManager broadcast payload shape and throttling."""
def _make_manager(self) -> tuple[ExportJobManager, MagicMock]:
"""Build a manager with an injected mock publisher. Returns
``(manager, publisher)`` so tests can assert on broadcast payloads
without touching ZMQ at all."""
config = MagicMock()
publisher = MagicMock()
manager = ExportJobManager(
config, max_concurrent=2, max_queued=10, publisher=publisher
)
return manager, publisher
@staticmethod
def _last_payload(publisher: MagicMock) -> dict:
return publisher.publish.call_args.args[0]
def test_empty_jobs_broadcasts_empty_list(self) -> None:
manager, publisher = self._make_manager()
manager._broadcast_all_jobs(force=True)
publisher.publish.assert_called_once()
payload = self._last_payload(publisher)
assert payload["job_type"] == "export"
assert payload["status"] == "queued"
assert payload["results"]["jobs"] == []
def test_single_running_job_payload(self) -> None:
manager, publisher = self._make_manager()
job = ExportJob(camera="front", request_start_time=0, request_end_time=10)
job.status = JobStatusTypesEnum.running
job.current_step = "encoding"
job.progress_percent = 75.0
manager.jobs[job.id] = job
manager._broadcast_all_jobs(force=True)
payload = self._last_payload(publisher)
assert payload["status"] == "running"
assert len(payload["results"]["jobs"]) == 1
broadcast_job = payload["results"]["jobs"][0]
assert broadcast_job["current_step"] == "encoding"
assert broadcast_job["progress_percent"] == 75.0
def test_multiple_jobs_broadcast(self) -> None:
manager, publisher = self._make_manager()
for i, status in enumerate(
(JobStatusTypesEnum.queued, JobStatusTypesEnum.running)
):
job = ExportJob(
id=f"job_{i}",
camera="front",
request_start_time=0,
request_end_time=10,
)
job.status = status
manager.jobs[job.id] = job
manager._broadcast_all_jobs(force=True)
payload = self._last_payload(publisher)
assert payload["status"] == "running"
assert len(payload["results"]["jobs"]) == 2
def test_completed_jobs_are_excluded(self) -> None:
manager, publisher = self._make_manager()
active = ExportJob(id="active", camera="front")
active.status = JobStatusTypesEnum.running
finished = ExportJob(id="done", camera="front")
finished.status = JobStatusTypesEnum.success
manager.jobs[active.id] = active
manager.jobs[finished.id] = finished
manager._broadcast_all_jobs(force=True)
payload = self._last_payload(publisher)
ids = [j["id"] for j in payload["results"]["jobs"]]
assert ids == ["active"]
def test_throttle_skips_rapid_unforced_broadcasts(self) -> None:
manager, publisher = self._make_manager()
job = ExportJob(camera="front")
job.status = JobStatusTypesEnum.running
manager.jobs[job.id] = job
manager._broadcast_all_jobs(force=True)
# Immediately following non-forced broadcasts should be skipped.
for _ in range(5):
manager._broadcast_all_jobs(force=False)
assert publisher.publish.call_count == 1
def test_throttle_allows_broadcast_after_interval(self) -> None:
manager, publisher = self._make_manager()
job = ExportJob(camera="front")
job.status = JobStatusTypesEnum.running
manager.jobs[job.id] = job
with patch("frigate.jobs.export.time.monotonic") as mock_mono:
mock_mono.return_value = 100.0
manager._broadcast_all_jobs(force=True)
mock_mono.return_value = 100.0 + PROGRESS_BROADCAST_MIN_INTERVAL + 0.01
manager._broadcast_all_jobs(force=False)
assert publisher.publish.call_count == 2
def test_force_bypasses_throttle(self) -> None:
manager, publisher = self._make_manager()
job = ExportJob(camera="front")
job.status = JobStatusTypesEnum.running
manager.jobs[job.id] = job
manager._broadcast_all_jobs(force=True)
manager._broadcast_all_jobs(force=True)
assert publisher.publish.call_count == 2
def test_publisher_exceptions_do_not_propagate(self) -> None:
"""A failing publisher must not break the manager: broadcasts are
best-effort since the dispatcher may not be available (tests,
startup races)."""
manager, publisher = self._make_manager()
publisher.publish.side_effect = RuntimeError("comms down")
job = ExportJob(camera="front")
job.status = JobStatusTypesEnum.running
manager.jobs[job.id] = job
# Swallow our own RuntimeError if the manager doesn't; the real
# JobStatePublisher handles its own exceptions internally, so the
# manager can stay naive. But if something bubbles up it should
# not escape _broadcast_all_jobs — enforce that contract here.
try:
manager._broadcast_all_jobs(force=True)
except RuntimeError:
self.fail("_broadcast_all_jobs must tolerate publisher failures")
def test_progress_callback_updates_job_and_broadcasts(self) -> None:
manager, _publisher = self._make_manager()
job = ExportJob(camera="front")
job.status = JobStatusTypesEnum.running
manager.jobs[job.id] = job
callback = manager._make_progress_callback(job)
callback("encoding", 33.0)
assert job.current_step == "encoding"
assert job.progress_percent == 33.0
class TestSchedulesCleanup(unittest.TestCase):
def test_schedule_job_cleanup_removes_after_delay(self) -> None:
config = MagicMock()
manager = ExportJobManager(config, max_concurrent=1, max_queued=1)
job = ExportJob(id="cleanup_me", camera="front")
manager.jobs[job.id] = job
with patch("frigate.jobs.export.threading.Timer") as mock_timer:
manager._schedule_job_cleanup(job.id)
mock_timer.assert_called_once()
delay, fn = mock_timer.call_args.args
assert delay > 0
# Invoke the callback directly to confirm it removes the job.
fn()
assert job.id not in manager.jobs
if __name__ == "__main__":
unittest.main()