mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-20 13:54:36 +01:00
* generic job infrastructure * types and dispatcher changes for jobs * save data in memory only for completed jobs * implement media sync job and endpoints * change logs to debug * websocket hook and types * frontend * i18n * docs tweaks * endpoint descriptions * tweak docs
71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
"""Generic job management for long-running background tasks."""
|
|
|
|
import threading
|
|
from typing import Optional
|
|
|
|
from frigate.jobs.job import Job
|
|
from frigate.types import JobStatusTypesEnum
|
|
|
|
# Global state and locks for enforcing single concurrent job per job type
|
|
_job_locks: dict[str, threading.Lock] = {}
|
|
_current_jobs: dict[str, Optional[Job]] = {}
|
|
# Keep completed jobs for retrieval, keyed by (job_type, job_id)
|
|
_completed_jobs: dict[tuple[str, str], Job] = {}
|
|
|
|
|
|
def _get_lock(job_type: str) -> threading.Lock:
|
|
"""Get or create a lock for the specified job type."""
|
|
if job_type not in _job_locks:
|
|
_job_locks[job_type] = threading.Lock()
|
|
return _job_locks[job_type]
|
|
|
|
|
|
def set_current_job(job: Job) -> None:
|
|
"""Set the current job for a given job type."""
|
|
lock = _get_lock(job.job_type)
|
|
with lock:
|
|
# Store the previous job if it was completed
|
|
old_job = _current_jobs.get(job.job_type)
|
|
if old_job and old_job.status in (
|
|
JobStatusTypesEnum.success,
|
|
JobStatusTypesEnum.failed,
|
|
JobStatusTypesEnum.cancelled,
|
|
):
|
|
_completed_jobs[(job.job_type, old_job.id)] = old_job
|
|
_current_jobs[job.job_type] = job
|
|
|
|
|
|
def clear_current_job(job_type: str, job_id: Optional[str] = None) -> None:
|
|
"""Clear the current job for a given job type, optionally checking the ID."""
|
|
lock = _get_lock(job_type)
|
|
with lock:
|
|
if job_type in _current_jobs:
|
|
current = _current_jobs[job_type]
|
|
if current is None or (job_id is None or current.id == job_id):
|
|
_current_jobs[job_type] = None
|
|
|
|
|
|
def get_current_job(job_type: str) -> Optional[Job]:
|
|
"""Get the current running/queued job for a given job type, if any."""
|
|
lock = _get_lock(job_type)
|
|
with lock:
|
|
return _current_jobs.get(job_type)
|
|
|
|
|
|
def get_job_by_id(job_type: str, job_id: str) -> Optional[Job]:
|
|
"""Get job by ID. Checks current job first, then completed jobs."""
|
|
lock = _get_lock(job_type)
|
|
with lock:
|
|
# Check if it's the current job
|
|
current = _current_jobs.get(job_type)
|
|
if current and current.id == job_id:
|
|
return current
|
|
# Check if it's a completed job
|
|
return _completed_jobs.get((job_type, job_id))
|
|
|
|
|
|
def job_is_running(job_type: str) -> bool:
|
|
"""Check if a job of the given type is currently running or queued."""
|
|
job = get_current_job(job_type)
|
|
return job is not None and job.status in ("queued", "running")
|