"""Media sync job management with background execution.""" import logging import os import threading from dataclasses import dataclass, field from datetime import datetime from typing import Optional, cast from frigate.comms.inter_process import InterProcessRequestor from frigate.const import CONFIG_DIR, UPDATE_JOB_STATE from frigate.jobs.job import Job from frigate.jobs.manager import ( get_current_job, get_job_by_id, job_is_running, set_current_job, ) from frigate.types import JobStatusTypesEnum from frigate.util.media import sync_all_media, write_orphan_report logger = logging.getLogger(__name__) @dataclass class MediaSyncJob(Job): """In-memory job state for media sync operations.""" job_type: str = "media_sync" dry_run: bool = False media_types: list[str] = field(default_factory=lambda: ["all"]) force: bool = False verbose: bool = False class MediaSyncRunner(threading.Thread): """Thread-based runner for media sync jobs.""" def __init__(self, job: MediaSyncJob) -> None: super().__init__(daemon=True, name="media_sync") self.job = job self.requestor = InterProcessRequestor() def run(self) -> None: """Execute the media sync job and broadcast status updates.""" try: # Update job status to running self.job.status = JobStatusTypesEnum.running self.job.start_time = datetime.now().timestamp() self._broadcast_status() # Execute sync with provided parameters logger.debug( f"Starting media sync job {self.job.id}: " f"media_types={self.job.media_types}, " f"dry_run={self.job.dry_run}, " f"force={self.job.force}" ) results = sync_all_media( dry_run=self.job.dry_run, media_types=self.job.media_types, force=self.job.force, ) # Write verbose report if requested if self.job.verbose: report_dir = os.path.join(CONFIG_DIR, "media_sync") os.makedirs(report_dir, exist_ok=True) report_path = os.path.join(report_dir, f"{self.job.id}.txt") write_orphan_report( results, report_path, job_id=self.job.id, dry_run=self.job.dry_run, ) logger.info( "Media sync verbose orphan report written to %s", report_path ) # Store results and mark as complete self.job.results = results.to_dict() self.job.status = JobStatusTypesEnum.success self.job.end_time = datetime.now().timestamp() logger.debug(f"Media sync job {self.job.id} completed successfully") self._broadcast_status() except Exception as e: logger.error(f"Media sync job {self.job.id} failed: {e}", exc_info=True) self.job.status = JobStatusTypesEnum.failed self.job.error_message = str(e) self.job.end_time = datetime.now().timestamp() self._broadcast_status() finally: if self.requestor: self.requestor.stop() def _broadcast_status(self) -> None: """Broadcast job status update via IPC to all WebSocket subscribers.""" try: self.requestor.send_data( UPDATE_JOB_STATE, self.job.to_dict(), ) except Exception as e: logger.warning(f"Failed to broadcast media sync status: {e}") def start_media_sync_job( dry_run: bool = False, media_types: Optional[list[str]] = None, force: bool = False, verbose: bool = False, ) -> Optional[str]: """Start a new media sync job if none is currently running. Returns job ID on success, None if job already running. """ # Check if a job is already running if job_is_running("media_sync"): current = get_current_job("media_sync") logger.warning( f"Media sync job {current.id if current else 'unknown'} is already running. Rejecting new request." ) return None # Create and start new job job = MediaSyncJob( dry_run=dry_run, media_types=media_types or ["all"], force=force, verbose=verbose, ) logger.debug(f"Creating new media sync job: {job.id}") set_current_job(job) # Start the background runner runner = MediaSyncRunner(job) runner.start() return job.id def get_current_media_sync_job() -> Optional[MediaSyncJob]: """Get the current running/queued media sync job, if any.""" return cast(Optional[MediaSyncJob], get_current_job("media_sync")) def get_media_sync_job_by_id(job_id: str) -> Optional[MediaSyncJob]: """Get media sync job by ID. Currently only tracks the current job.""" return cast(Optional[MediaSyncJob], get_job_by_id("media_sync", job_id))