blakeblackshear.frigate/frigate/app.py

715 lines
26 KiB
Python
Raw Normal View History

import datetime
2020-11-04 13:28:07 +01:00
import logging
import multiprocessing as mp
import os
import secrets
import shutil
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
2020-11-04 13:28:07 +01:00
import psutil
Frigate HTTP API using FastAPI (#13871) * POC: Added FastAPI with one endpoint (get /logs/service) * POC: Revert error_log * POC: Converted preview related endpoints to FastAPI * POC: Converted two more endpoints to FastAPI * POC: lint * Convert all media endpoints to FastAPI. Added /media prefix (/media/camera && media/events && /media/preview) * Convert all notifications API endpoints to FastAPI * Convert first review API endpoints to FastAPI * Convert remaining review API endpoints to FastAPI * Convert export endpoints to FastAPI * Fix path parameters * Convert events endpoints to FastAPI * Use body for multiple events endpoints * Use body for multiple events endpoints (create and end event) * Convert app endpoints to FastAPI * Convert app endpoints to FastAPI * Convert auth endpoints to FastAPI * Removed flask app in favour of FastAPI app. Implemented FastAPI middleware to check CSRF, connect and disconnect from DB. Added middleware x-forwared-for headers * Added starlette plugin to expose custom headers * Use slowapi as the limiter * Use query parameters for the frame latest endpoint * Use query parameters for the media snapshot.jpg endpoint * Use query parameters for the media MJPEG feed endpoint * Revert initial nginx.conf change * Added missing even_id for /events/search endpoint * Removed left over comment * Use FastAPI TestClient * severity query parameter should be a string * Use the same pattern for all tests * Fix endpoint * Revert media routers to old names. Order routes to make sure the dynamic ones from media.py are only used whenever there's no match on auth/etc * Reverted paths for media on tsx files * Deleted file * Fix test_http to use TestClient * Formatting * Bind timeline to DB * Fix http tests * Replace filename with pathvalidate * Fix latest.ext handling and disable uvicorn access logs * Add cosntraints to api provided values * Formatting * Remove unused * Remove unused * Get rate limiter working --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2024-09-24 15:05:30 +02:00
import uvicorn
2020-12-24 14:47:27 +01:00
from peewee_migrate import Router
2020-11-04 13:28:07 +01:00
from playhouse.sqlite_ext import SqliteExtDatabase
import frigate.util as util
from frigate.api.auth import hash_password
Frigate HTTP API using FastAPI (#13871) * POC: Added FastAPI with one endpoint (get /logs/service) * POC: Revert error_log * POC: Converted preview related endpoints to FastAPI * POC: Converted two more endpoints to FastAPI * POC: lint * Convert all media endpoints to FastAPI. Added /media prefix (/media/camera && media/events && /media/preview) * Convert all notifications API endpoints to FastAPI * Convert first review API endpoints to FastAPI * Convert remaining review API endpoints to FastAPI * Convert export endpoints to FastAPI * Fix path parameters * Convert events endpoints to FastAPI * Use body for multiple events endpoints * Use body for multiple events endpoints (create and end event) * Convert app endpoints to FastAPI * Convert app endpoints to FastAPI * Convert auth endpoints to FastAPI * Removed flask app in favour of FastAPI app. Implemented FastAPI middleware to check CSRF, connect and disconnect from DB. Added middleware x-forwared-for headers * Added starlette plugin to expose custom headers * Use slowapi as the limiter * Use query parameters for the frame latest endpoint * Use query parameters for the media snapshot.jpg endpoint * Use query parameters for the media MJPEG feed endpoint * Revert initial nginx.conf change * Added missing even_id for /events/search endpoint * Removed left over comment * Use FastAPI TestClient * severity query parameter should be a string * Use the same pattern for all tests * Fix endpoint * Revert media routers to old names. Order routes to make sure the dynamic ones from media.py are only used whenever there's no match on auth/etc * Reverted paths for media on tsx files * Deleted file * Fix test_http to use TestClient * Formatting * Bind timeline to DB * Fix http tests * Replace filename with pathvalidate * Fix latest.ext handling and disable uvicorn access logs * Add cosntraints to api provided values * Formatting * Remove unused * Remove unused * Get rate limiter working --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2024-09-24 15:05:30 +02:00
from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.dispatcher import Communicator, Dispatcher
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient
from frigate.comms.webpush import WebPushClient
from frigate.comms.ws import WebSocketClient
from frigate.comms.zmq_proxy import ZmqProxy
from frigate.config.config import FrigateConfig
from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
CONFIG_DIR,
EXPORT_DIR,
MODEL_CACHE_DIR,
RECORD_DIR,
)
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.models import (
Event,
Export,
Previews,
Recordings,
RecordingsToDelete,
Regions,
ReviewSegment,
Timeline,
User,
)
from frigate.object_detection import ObjectDetectProcess
2020-11-04 13:28:07 +01:00
from frigate.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
from frigate.record.export import migrate_exports
from frigate.record.record import manage_recordings
from frigate.review.review import manage_review_segments
Add service manager infrastructure (#14150) * Add service manager infrastructure The changes are (This will be a bit long): - A ServiceManager class that spawns a background thread and deals with service lifecycle management. The idea is that service lifecycle code will run in async functions, so a single thread is enough to manage any (reasonable) amount of services. - A Service class, that offers start(), stop() and restart() methods that simply notify the service manager to... well. Start, stop or restart a service. (!) Warning: Note that this differs from mp.Process.start/stop in that the service commands are sent asynchronously and will complete "eventually". This is good because it means that business logic is fast when booting up and shutting down, but we need to make sure that code does not rely on start() and stop() being instant (Mainly pid assignments). Subclasses of the Service class should use the on_start and on_stop methods to monitor for service events. These will be run by the service manager thread, so we need to be careful not to block execution here. Standard async stuff. (!) Note on service names: Service names should be unique within a ServiceManager. Make sure that you pass the name you want to super().__init__(name="...") if you plan to spawn multiple instances of a service. - A ServiceProcess class: A Service that wraps a multiprocessing.Process into a Service. It offers a run() method subclasses can override and can support in-place restarting using the service manager. And finally, I lied a bit about this whole thing using a single thread. I can't find any way to run python multiprocessing in async, so there is a MultiprocessingWaiter thread that waits for multiprocessing events and notifies any pending futures. This was uhhh... fun? No, not really. But it works. Using this part of the code just involves calling the provided wait method. See the implementation of ServiceProcess for more details. Mirror util.Process hooks onto service process Remove Service.__name attribute Do not serialize process object on ServiceProcess start. asd * Update frigate dictionary * Convert AudioProcessor to service process
2024-10-21 17:00:38 +02:00
from frigate.service_manager import ServiceManager
from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.util.builtin import empty_and_close_queue
from frigate.util.object import get_camera_regions_grid
2021-09-14 05:02:23 +02:00
from frigate.version import VERSION
2020-11-04 13:28:07 +01:00
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog
logger = logging.getLogger(__name__)
2021-02-17 14:23:32 +01:00
class FrigateApp:
def __init__(self, config: FrigateConfig) -> None:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {}
self.ptz_metrics: dict[str, PTZMetrics] = {}
self.processes: dict[str, int] = {}
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
self.embeddings: Optional[EmbeddingsContext] = None
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.config = config
2021-01-16 04:33:53 +01:00
def ensure_dirs(self) -> None:
for d in [
CONFIG_DIR,
RECORD_DIR,
f"{CLIPS_DIR}/cache",
CACHE_DIR,
MODEL_CACHE_DIR,
EXPORT_DIR,
]:
2020-12-01 14:22:23 +01:00
if not os.path.exists(d) and not os.path.islink(d):
2020-12-03 15:01:22 +01:00
logger.info(f"Creating directory: {d}")
2020-12-01 14:22:23 +01:00
os.makedirs(d)
2020-12-03 15:01:22 +01:00
else:
logger.debug(f"Skipping directory: {d}")
2020-12-21 14:37:42 +01:00
def init_camera_metrics(self) -> None:
# create camera_metrics
for camera_name in self.config.cameras.keys():
self.camera_metrics[camera_name] = CameraMetrics()
self.ptz_metrics[camera_name] = PTZMetrics(
autotracker_enabled=self.config.cameras[
camera_name
].onvif.autotracking.enabled
)
2021-02-17 14:23:32 +01:00
def init_queues(self) -> None:
2020-11-04 13:28:07 +01:00
# Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue(
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
2021-02-17 14:23:32 +01:00
)
2020-11-04 13:28:07 +01:00
# Queue for timeline events
self.timeline_queue: Queue = mp.Queue()
def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:
logger.info("Running database vacuum")
db.execute_sql("VACUUM;")
try:
with open(f"{CONFIG_DIR}/.vacuum", "w") as f:
f.write(str(datetime.datetime.now().timestamp()))
except PermissionError:
logger.error("Unable to write to /config to save DB state")
def cleanup_timeline_db(db: SqliteExtDatabase) -> None:
db.execute_sql(
"DELETE FROM timeline WHERE source_id NOT IN (SELECT id FROM event);"
)
try:
with open(f"{CONFIG_DIR}/.timeline", "w") as f:
f.write(str(datetime.datetime.now().timestamp()))
except PermissionError:
logger.error("Unable to write to /config to save DB state")
# Migrate DB schema
2021-01-24 13:53:01 +01:00
migrate_db = SqliteExtDatabase(self.config.database.path)
2020-12-24 14:47:27 +01:00
# Run migrations
2021-02-17 14:23:32 +01:00
del logging.getLogger("peewee_migrate").handlers[:]
2021-01-24 13:53:01 +01:00
router = Router(migrate_db)
if len(router.diff) > 0:
logger.info("Making backup of DB before migrations...")
shutil.copyfile(
self.config.database.path,
self.config.database.path.replace("frigate.db", "backup.db"),
)
2020-12-24 14:47:27 +01:00
router.run()
# this is a temporary check to clean up user DB from beta
# will be removed before final release
if not os.path.exists(f"{CONFIG_DIR}/.timeline"):
cleanup_timeline_db(migrate_db)
# check if vacuum needs to be run
if os.path.exists(f"{CONFIG_DIR}/.vacuum"):
with open(f"{CONFIG_DIR}/.vacuum") as f:
try:
timestamp = round(float(f.readline()))
except Exception:
timestamp = 0
if (
timestamp
< (
datetime.datetime.now() - datetime.timedelta(weeks=2)
).timestamp()
):
vacuum_db(migrate_db)
else:
vacuum_db(migrate_db)
2021-01-24 13:53:01 +01:00
migrate_db.close()
def init_go2rtc(self) -> None:
for proc in psutil.process_iter(["pid", "name"]):
if proc.info["name"] == "go2rtc":
logger.info(f"go2rtc process pid: {proc.info['pid']}")
self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None:
recording_process = util.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config,),
)
recording_process.daemon = True
self.recording_process = recording_process
recording_process.start()
self.processes["recording"] = recording_process.pid or 0
logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None:
review_segment_process = util.Process(
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
)
review_segment_process.daemon = True
self.review_segment_process = review_segment_process
review_segment_process.start()
self.processes["review_segment"] = review_segment_process.pid or 0
logger.info(f"Review process started: {review_segment_process.pid}")
def init_embeddings_manager(self) -> None:
if not self.config.semantic_search.enabled:
return
embedding_process = util.Process(
target=manage_embeddings,
name="embeddings_manager",
args=(self.config,),
)
embedding_process.daemon = True
self.embedding_process = embedding_process
embedding_process.start()
self.processes["embeddings"] = embedding_process.pid or 0
logger.info(f"Embedding process started: {embedding_process.pid}")
def bind_database(self) -> None:
"""Bind db to the main process."""
# NOTE: all db accessing processes need to be created before the db can be bound to the main process
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
self.db = SqliteVecQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache,
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
load_vec_extension=self.config.semantic_search.enabled,
)
models = [
Event,
Export,
Previews,
Recordings,
RecordingsToDelete,
Regions,
ReviewSegment,
Timeline,
User,
]
2020-11-04 13:28:07 +01:00
self.db.bind(models)
def check_db_data_migrations(self) -> None:
# check if vacuum needs to be run
if not os.path.exists(f"{CONFIG_DIR}/.exports"):
try:
with open(f"{CONFIG_DIR}/.exports", "w") as f:
f.write(str(datetime.datetime.now().timestamp()))
except PermissionError:
logger.error("Unable to write to /config to save export state")
migrate_exports(self.config.ffmpeg, list(self.config.cameras.keys()))
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
def init_embeddings_client(self) -> None:
if self.config.semantic_search.enabled:
# Create a client for other processes to use
self.embeddings = EmbeddingsContext(self.db)
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
def init_external_event_processor(self) -> None:
self.external_event_processor = ExternalEventProcessor(self.config)
def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator()
self.inter_config_updater = ConfigPublisher()
self.event_metadata_updater = EventMetadataPublisher(
EventMetadataTypeEnum.regenerate_description
)
self.inter_zmq_proxy = ZmqProxy()
def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
def init_dispatcher(self) -> None:
comms: list[Communicator] = []
2020-11-04 13:28:07 +01:00
if self.config.mqtt.enabled:
comms.append(MqttClient(self.config))
if self.config.notifications.enabled_in_config:
comms.append(WebPushClient(self.config))
comms.append(WebSocketClient(self.config))
comms.append(self.inter_process_communicator)
self.dispatcher = Dispatcher(
self.config,
self.inter_config_updater,
self.onvif_controller,
self.ptz_metrics,
comms,
)
2021-06-14 14:31:13 +02:00
def start_detectors(self) -> None:
2020-11-04 13:28:07 +01:00
for name in self.config.cameras.keys():
self.detection_out_events[name] = mp.Event()
try:
largest_frame = max(
[
det.model.height * det.model.width * 3
if det.model is not None
else 320
for det in self.config.detectors.values()
]
)
shm_in = mp.shared_memory.SharedMemory(
2021-06-24 07:45:27 +02:00
name=name,
create=True,
size=largest_frame,
)
except FileExistsError:
shm_in = mp.shared_memory.SharedMemory(name=name)
try:
shm_out = mp.shared_memory.SharedMemory(
name=f"out-{name}", create=True, size=20 * 6 * 4
)
except FileExistsError:
shm_out = mp.shared_memory.SharedMemory(name=f"out-{name}")
2020-11-04 13:28:07 +01:00
self.detection_shms.append(shm_in)
self.detection_shms.append(shm_out)
for name, detector_config in self.config.detectors.items():
self.detectors[name] = ObjectDetectProcess(
name,
self.detection_queue,
self.detection_out_events,
detector_config,
)
2020-11-04 13:28:07 +01:00
def start_ptz_autotracker(self) -> None:
self.ptz_autotracker_thread = PtzAutoTrackerThread(
self.config,
self.onvif_controller,
self.ptz_metrics,
self.dispatcher,
self.stop_event,
)
self.ptz_autotracker_thread.start()
def start_detected_frames_processor(self) -> None:
2021-02-17 14:23:32 +01:00
self.detected_frames_processor = TrackedObjectProcessor(
self.config,
self.dispatcher,
2021-02-17 14:23:32 +01:00
self.detected_frames_queue,
self.ptz_autotracker_thread,
2021-02-17 14:23:32 +01:00
self.stop_event,
)
2020-11-04 13:28:07 +01:00
self.detected_frames_processor.start()
def start_video_output_processor(self) -> None:
output_processor = util.Process(
target=output_frames,
name="output_processor",
args=(self.config,),
)
output_processor.daemon = True
self.output_processor = output_processor
output_processor.start()
2021-05-30 13:45:37 +02:00
logger.info(f"Output process started: {output_processor.pid}")
def init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras
cameras = list(self.config.cameras.keys())
Regions.delete().where(~(Regions.camera << cameras)).execute()
# create or update region grids for each camera
for camera in self.config.cameras.values():
assert camera.name is not None
self.region_grids[camera.name] = get_camera_regions_grid(
camera.name,
camera.detect,
max(self.config.model.width, self.config.model.height),
)
def start_camera_processors(self) -> None:
2020-11-04 13:28:07 +01:00
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled:
logger.info(f"Camera processor not started for disabled camera {name}")
continue
camera_process = util.Process(
2021-02-17 14:23:32 +01:00
target=track_camera,
name=f"camera_processor:{name}",
args=(
name,
config,
self.config.model,
self.config.model.merged_labelmap,
2021-02-17 14:23:32 +01:00
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
2021-02-17 14:23:32 +01:00
),
daemon=True,
2021-02-17 14:23:32 +01:00
)
self.camera_metrics[name].process = camera_process
2020-11-04 13:28:07 +01:00
camera_process.start()
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
def start_camera_capture_processes(self) -> None:
shm_frame_count = self.shm_frame_count()
2020-11-04 13:28:07 +01:00
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled:
logger.info(f"Capture process not started for disabled camera {name}")
continue
capture_process = util.Process(
2021-02-17 14:23:32 +01:00
target=capture_camera,
name=f"camera_capture:{name}",
args=(name, config, shm_frame_count, self.camera_metrics[name]),
2021-02-17 14:23:32 +01:00
)
2020-11-04 13:28:07 +01:00
capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process
2020-11-04 13:28:07 +01:00
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
2021-02-17 14:23:32 +01:00
def start_audio_processor(self) -> None:
audio_cameras = [
c
for c in self.config.cameras.values()
if c.enabled and c.audio.enabled_in_config
]
if audio_cameras:
Add service manager infrastructure (#14150) * Add service manager infrastructure The changes are (This will be a bit long): - A ServiceManager class that spawns a background thread and deals with service lifecycle management. The idea is that service lifecycle code will run in async functions, so a single thread is enough to manage any (reasonable) amount of services. - A Service class, that offers start(), stop() and restart() methods that simply notify the service manager to... well. Start, stop or restart a service. (!) Warning: Note that this differs from mp.Process.start/stop in that the service commands are sent asynchronously and will complete "eventually". This is good because it means that business logic is fast when booting up and shutting down, but we need to make sure that code does not rely on start() and stop() being instant (Mainly pid assignments). Subclasses of the Service class should use the on_start and on_stop methods to monitor for service events. These will be run by the service manager thread, so we need to be careful not to block execution here. Standard async stuff. (!) Note on service names: Service names should be unique within a ServiceManager. Make sure that you pass the name you want to super().__init__(name="...") if you plan to spawn multiple instances of a service. - A ServiceProcess class: A Service that wraps a multiprocessing.Process into a Service. It offers a run() method subclasses can override and can support in-place restarting using the service manager. And finally, I lied a bit about this whole thing using a single thread. I can't find any way to run python multiprocessing in async, so there is a MultiprocessingWaiter thread that waits for multiprocessing events and notifies any pending futures. This was uhhh... fun? No, not really. But it works. Using this part of the code just involves calling the provided wait method. See the implementation of ServiceProcess for more details. Mirror util.Process hooks onto service process Remove Service.__name attribute Do not serialize process object on ServiceProcess start. asd * Update frigate dictionary * Convert AudioProcessor to service process
2024-10-21 17:00:38 +02:00
proc = AudioProcessor(audio_cameras, self.camera_metrics).start(wait=True)
self.processes["audio_detector"] = proc.pid or 0
def start_timeline_processor(self) -> None:
self.timeline_processor = TimelineProcessor(
self.config, self.timeline_queue, self.stop_event
)
self.timeline_processor.start()
def start_event_processor(self) -> None:
2021-02-17 14:23:32 +01:00
self.event_processor = EventProcessor(
self.config,
self.timeline_queue,
2021-02-17 14:23:32 +01:00
self.stop_event,
)
2020-11-04 13:28:07 +01:00
self.event_processor.start()
2021-02-17 14:23:32 +01:00
def start_event_cleanup(self) -> None:
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
self.event_cleanup = EventCleanup(self.config, self.stop_event, self.db)
2020-11-24 14:27:51 +01:00
self.event_cleanup.start()
2021-02-17 14:23:32 +01:00
def start_record_cleanup(self) -> None:
self.record_cleanup = RecordingCleanup(self.config, self.stop_event)
self.record_cleanup.start()
Limit recording retention to available storage (#3942) * Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
2022-10-09 13:28:26 +02:00
def start_storage_maintainer(self) -> None:
self.storage_maintainer = StorageMaintainer(self.config, self.stop_event)
self.storage_maintainer.start()
def start_stats_emitter(self) -> None:
2021-02-17 14:23:32 +01:00
self.stats_emitter = StatsEmitter(
self.config,
stats_init(
self.config, self.camera_metrics, self.detectors, self.processes
),
2021-02-17 14:23:32 +01:00
self.stop_event,
)
self.stats_emitter.start()
def start_watchdog(self) -> None:
2020-11-04 13:28:07 +01:00
self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
self.frigate_watchdog.start()
def shm_frame_count(self) -> int:
total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
# required for log files + nginx cache
min_req_shm = 40 + 10
if self.config.birdseye.restream:
min_req_shm += 8
available_shm = total_shm - min_req_shm
cam_total_frame_size = 0.0
for camera in self.config.cameras.values():
if camera.enabled and camera.detect.width and camera.detect.height:
cam_total_frame_size += round(
(camera.detect.width * camera.detect.height * 1.5 + 270480)
/ 1048576,
1,
)
if cam_total_frame_size == 0.0:
return 0
shm_frame_count = min(50, int(available_shm / (cam_total_frame_size)))
logger.debug(
f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {shm_frame_count} frames for each camera in SHM"
)
if shm_frame_count < 10:
logger.warning(
f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size * 10)}MB."
)
return shm_frame_count
def init_auth(self) -> None:
if self.config.auth.enabled:
if User.select().count() == 0:
password = secrets.token_hex(16)
password_hash = hash_password(
password, iterations=self.config.auth.hash_iterations
)
User.insert(
{
User.username: "admin",
User.password_hash: password_hash,
User.notification_tokens: [],
}
).execute()
logger.info("********************************************************")
logger.info("********************************************************")
logger.info("*** Auth is enabled, but no users exist. ***")
logger.info("*** Created a default user: ***")
logger.info("*** User: admin ***")
logger.info(f"*** Password: {password} ***")
logger.info("********************************************************")
logger.info("********************************************************")
elif self.config.auth.reset_admin_password:
password = secrets.token_hex(16)
password_hash = hash_password(
password, iterations=self.config.auth.hash_iterations
)
User.replace(
username="admin",
password_hash=password_hash,
notification_tokens=[],
).execute()
logger.info("********************************************************")
logger.info("********************************************************")
logger.info("*** Reset admin password set in the config. ***")
logger.info(f"*** Password: {password} ***")
logger.info("********************************************************")
logger.info("********************************************************")
def start(self) -> None:
2021-09-14 05:02:23 +02:00
logger.info(f"Starting Frigate ({VERSION})")
Streamline live view (#9772) * Break out live page * Improving layouts and add chip component * Improve default camera player sizing * Improve live updating * Cleanup and fit figma * Use fixed height * Masonry layout * Fix stuff * Don't force heights * Adjust scaling * Cleanup * remove sidebar (#9731) * remove sidebar * keep sidebar on mobile for now and add icons * Fix revalidation * Cleanup * Cleanup width * Add chips for activity on cameras * Remove dashboard from header * Use Inter font (#9735) * Show still image when no activity is occurring * remove unused search params * add playing check for webrtc * Don't use grid at all for single column * Fix height on mobile * a few style updates to better match figma (#9745) * Remove active objects when they become stationary * Move to sidebar only and make settings separate component * Fix layout * Animate visibility of chips * Sidebar is full screen * Fix tall aspect ratio cameras * Fix complicated aspect logic * remove * Adjust thumbnail aspect and add text * margin on single column layout * Smaller event thumb text * Simplify basic image view * Only show the red dot when camera is recording * Improve typing for camera toggles * animate chips with react-transition-group (#9763) * don't flash when going to still image * revalidate * tooltips and active tracking outline (#9766) * tooltips * fix tooltip provider and add active tracking outline * remove unused icon * remove figma comment * Get live mode working for jsmpeg * add small gradient below timeago on event thumbnails (#9767) * Create live mode hook and make sure jsmpeg can be used * Enforce env var * Use print * Remove unstable * Add tooltips to thumbnails * Put back vite * Format * Update web/src/components/player/JSMpegPlayer.tsx --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Co-authored-by: Blake Blackshear <blake@frigate.video>
2024-02-10 13:30:53 +01:00
# Ensure global state.
self.ensure_dirs()
# Start frigate services.
self.init_camera_metrics()
self.init_queues()
self.init_database()
self.init_onvif()
self.init_recording_manager()
self.init_review_segment_manager()
self.init_go2rtc()
self.start_detectors()
self.init_embeddings_manager()
self.bind_database()
self.check_db_data_migrations()
self.init_inter_process_communicator()
self.init_dispatcher()
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 22:30:45 +02:00
self.init_embeddings_client()
self.start_video_output_processor()
self.start_ptz_autotracker()
self.init_historical_regions()
2020-11-04 13:28:07 +01:00
self.start_detected_frames_processor()
self.start_camera_processors()
self.start_camera_capture_processes()
self.start_audio_processor()
self.start_storage_maintainer()
self.init_external_event_processor()
self.start_stats_emitter()
self.start_timeline_processor()
2020-11-04 13:28:07 +01:00
self.start_event_processor()
2020-11-24 14:27:51 +01:00
self.start_event_cleanup()
self.start_record_cleanup()
2020-11-04 13:28:07 +01:00
self.start_watchdog()
2021-02-17 14:23:32 +01:00
self.init_auth()
2020-12-05 18:14:18 +01:00
2021-05-18 07:52:08 +02:00
try:
Frigate HTTP API using FastAPI (#13871) * POC: Added FastAPI with one endpoint (get /logs/service) * POC: Revert error_log * POC: Converted preview related endpoints to FastAPI * POC: Converted two more endpoints to FastAPI * POC: lint * Convert all media endpoints to FastAPI. Added /media prefix (/media/camera && media/events && /media/preview) * Convert all notifications API endpoints to FastAPI * Convert first review API endpoints to FastAPI * Convert remaining review API endpoints to FastAPI * Convert export endpoints to FastAPI * Fix path parameters * Convert events endpoints to FastAPI * Use body for multiple events endpoints * Use body for multiple events endpoints (create and end event) * Convert app endpoints to FastAPI * Convert app endpoints to FastAPI * Convert auth endpoints to FastAPI * Removed flask app in favour of FastAPI app. Implemented FastAPI middleware to check CSRF, connect and disconnect from DB. Added middleware x-forwared-for headers * Added starlette plugin to expose custom headers * Use slowapi as the limiter * Use query parameters for the frame latest endpoint * Use query parameters for the media snapshot.jpg endpoint * Use query parameters for the media MJPEG feed endpoint * Revert initial nginx.conf change * Added missing even_id for /events/search endpoint * Removed left over comment * Use FastAPI TestClient * severity query parameter should be a string * Use the same pattern for all tests * Fix endpoint * Revert media routers to old names. Order routes to make sure the dynamic ones from media.py are only used whenever there's no match on auth/etc * Reverted paths for media on tsx files * Deleted file * Fix test_http to use TestClient * Formatting * Bind timeline to DB * Fix http tests * Replace filename with pathvalidate * Fix latest.ext handling and disable uvicorn access logs * Add cosntraints to api provided values * Formatting * Remove unused * Remove unused * Get rate limiter working --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2024-09-24 15:05:30 +02:00
uvicorn.run(
create_fastapi_app(
self.config,
self.db,
self.embeddings,
self.detected_frames_processor,
self.storage_maintainer,
self.onvif_controller,
self.external_event_processor,
self.stats_emitter,
self.event_metadata_updater,
Frigate HTTP API using FastAPI (#13871) * POC: Added FastAPI with one endpoint (get /logs/service) * POC: Revert error_log * POC: Converted preview related endpoints to FastAPI * POC: Converted two more endpoints to FastAPI * POC: lint * Convert all media endpoints to FastAPI. Added /media prefix (/media/camera && media/events && /media/preview) * Convert all notifications API endpoints to FastAPI * Convert first review API endpoints to FastAPI * Convert remaining review API endpoints to FastAPI * Convert export endpoints to FastAPI * Fix path parameters * Convert events endpoints to FastAPI * Use body for multiple events endpoints * Use body for multiple events endpoints (create and end event) * Convert app endpoints to FastAPI * Convert app endpoints to FastAPI * Convert auth endpoints to FastAPI * Removed flask app in favour of FastAPI app. Implemented FastAPI middleware to check CSRF, connect and disconnect from DB. Added middleware x-forwared-for headers * Added starlette plugin to expose custom headers * Use slowapi as the limiter * Use query parameters for the frame latest endpoint * Use query parameters for the media snapshot.jpg endpoint * Use query parameters for the media MJPEG feed endpoint * Revert initial nginx.conf change * Added missing even_id for /events/search endpoint * Removed left over comment * Use FastAPI TestClient * severity query parameter should be a string * Use the same pattern for all tests * Fix endpoint * Revert media routers to old names. Order routes to make sure the dynamic ones from media.py are only used whenever there's no match on auth/etc * Reverted paths for media on tsx files * Deleted file * Fix test_http to use TestClient * Formatting * Bind timeline to DB * Fix http tests * Replace filename with pathvalidate * Fix latest.ext handling and disable uvicorn access logs * Add cosntraints to api provided values * Formatting * Remove unused * Remove unused * Get rate limiter working --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2024-09-24 15:05:30 +02:00
),
host="127.0.0.1",
port=5001,
log_level="error",
)
finally:
self.stop()
2021-02-17 14:23:32 +01:00
def stop(self) -> None:
logger.info("Stopping...")
2020-11-04 13:28:07 +01:00
self.stop_event.set()
# set an end_time on entries without an end_time before exiting
Event.update(
end_time=datetime.datetime.now().timestamp(), has_snapshot=False
).where(Event.end_time == None).execute()
ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where(
ReviewSegment.end_time == None
).execute()
# ensure the capture processes are done
for camera, metrics in self.camera_metrics.items():
capture_process = metrics.capture_process
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join()
# ensure the camera processors are done
for camera, metrics in self.camera_metrics.items():
camera_process = metrics.process
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(metrics.frame_queue)
# ensure the detectors are done
for detector in self.detectors.values():
detector.stop()
empty_and_close_queue(self.detection_queue)
logger.info("Detection queue closed")
self.detected_frames_processor.join()
empty_and_close_queue(self.detected_frames_queue)
logger.info("Detected frames queue closed")
self.timeline_processor.join()
self.event_processor.join()
empty_and_close_queue(self.timeline_queue)
logger.info("Timeline queue closed")
self.output_processor.terminate()
self.output_processor.join()
self.recording_process.terminate()
self.recording_process.join()
self.review_segment_process.terminate()
self.review_segment_process.join()
self.external_event_processor.stop()
self.dispatcher.stop()
self.ptz_autotracker_thread.join()
2020-11-24 14:27:51 +01:00
self.event_cleanup.join()
self.record_cleanup.join()
self.stats_emitter.join()
2020-11-04 13:28:07 +01:00
self.frigate_watchdog.join()
2021-02-07 15:38:35 +01:00
self.db.stop()
2020-11-04 13:28:07 +01:00
# Save embeddings stats to disk
if self.embeddings:
self.embeddings.stop()
# Stop Communicators
self.inter_process_communicator.stop()
self.inter_config_updater.stop()
self.event_metadata_updater.stop()
self.inter_zmq_proxy.stop()
2020-11-04 13:28:07 +01:00
while len(self.detection_shms) > 0:
shm = self.detection_shms.pop()
shm.close()
shm.unlink()
Add service manager infrastructure (#14150) * Add service manager infrastructure The changes are (This will be a bit long): - A ServiceManager class that spawns a background thread and deals with service lifecycle management. The idea is that service lifecycle code will run in async functions, so a single thread is enough to manage any (reasonable) amount of services. - A Service class, that offers start(), stop() and restart() methods that simply notify the service manager to... well. Start, stop or restart a service. (!) Warning: Note that this differs from mp.Process.start/stop in that the service commands are sent asynchronously and will complete "eventually". This is good because it means that business logic is fast when booting up and shutting down, but we need to make sure that code does not rely on start() and stop() being instant (Mainly pid assignments). Subclasses of the Service class should use the on_start and on_stop methods to monitor for service events. These will be run by the service manager thread, so we need to be careful not to block execution here. Standard async stuff. (!) Note on service names: Service names should be unique within a ServiceManager. Make sure that you pass the name you want to super().__init__(name="...") if you plan to spawn multiple instances of a service. - A ServiceProcess class: A Service that wraps a multiprocessing.Process into a Service. It offers a run() method subclasses can override and can support in-place restarting using the service manager. And finally, I lied a bit about this whole thing using a single thread. I can't find any way to run python multiprocessing in async, so there is a MultiprocessingWaiter thread that waits for multiprocessing events and notifies any pending futures. This was uhhh... fun? No, not really. But it works. Using this part of the code just involves calling the provided wait method. See the implementation of ServiceProcess for more details. Mirror util.Process hooks onto service process Remove Service.__name attribute Do not serialize process object on ServiceProcess start. asd * Update frigate dictionary * Convert AudioProcessor to service process
2024-10-21 17:00:38 +02:00
ServiceManager.current().shutdown(wait=True)
os._exit(os.EX_OK)