mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-12-19 19:06:16 +01:00
36cbffcc5e
* Initial re-implementation of semantic search * put docker-compose back and make reindex match docs * remove debug code and fix import * fix docs * manually build pysqlite3 as binaries are only available for x86-64 * update comment in build_pysqlite3.sh * only embed objects * better error handling when genai fails * ask ollama to pull requested model at startup * update ollama docs * address some PR review comments * fix lint * use IPC to write description, update docs for reindex * remove gemini-pro-vision from docs as it will be unavailable soon * fix OpenAI doc available models * fix api error in gemini and metadata for embeddings
44 lines
1.1 KiB
Python
44 lines
1.1 KiB
Python
"""Facilitates communication between processes."""
|
|
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
from .zmq_proxy import Publisher, Subscriber
|
|
|
|
|
|
class DetectionTypeEnum(str, Enum):
|
|
all = ""
|
|
api = "api"
|
|
video = "video"
|
|
audio = "audio"
|
|
|
|
|
|
class DetectionPublisher(Publisher):
|
|
"""Simplifies receiving video and audio detections."""
|
|
|
|
topic_base = "detection/"
|
|
|
|
def __init__(self, topic: DetectionTypeEnum) -> None:
|
|
topic = topic.value
|
|
super().__init__(topic)
|
|
|
|
|
|
class DetectionSubscriber(Subscriber):
|
|
"""Simplifies receiving video and audio detections."""
|
|
|
|
topic_base = "detection/"
|
|
|
|
def __init__(self, topic: DetectionTypeEnum) -> None:
|
|
topic = topic.value
|
|
super().__init__(topic)
|
|
|
|
def check_for_update(
|
|
self, timeout: float = None
|
|
) -> Optional[tuple[DetectionTypeEnum, any]]:
|
|
return super().check_for_update(timeout)
|
|
|
|
def _return_object(self, topic: str, payload: any) -> any:
|
|
if payload is None:
|
|
return (None, None)
|
|
return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload)
|