mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-04-19 23:08:08 +02:00
* generic job infrastructure * types and dispatcher changes for jobs * save data in memory only for completed jobs * implement media sync job and endpoints * change logs to debug * websocket hook and types * frontend * i18n * docs tweaks * endpoint descriptions * tweak docs
22 lines
730 B
Python
22 lines
730 B
Python
"""Generic base class for long-running background jobs."""
|
|
|
|
from dataclasses import asdict, dataclass, field
|
|
from typing import Any, Optional
|
|
|
|
|
|
@dataclass
|
|
class Job:
|
|
"""Base class for long-running background jobs."""
|
|
|
|
id: str = field(default_factory=lambda: __import__("uuid").uuid4().__str__()[:12])
|
|
job_type: str = "" # Must be set by subclasses
|
|
status: str = "queued" # queued, running, success, failed, cancelled
|
|
results: Optional[dict[str, Any]] = None
|
|
start_time: Optional[float] = None
|
|
end_time: Optional[float] = None
|
|
error_message: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Convert to dictionary for WebSocket transmission."""
|
|
return asdict(self)
|