mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	* Fix embeddings reading frames * Fix event update reading * Formatting * Pin AIO http to fix build failure * Pin starlette
		
			
				
	
	
		
			52 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			52 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Facilitates communication between processes."""
 | 
						|
 | 
						|
from frigate.events.types import EventStateEnum, EventTypeEnum
 | 
						|
 | 
						|
from .zmq_proxy import Publisher, Subscriber
 | 
						|
 | 
						|
 | 
						|
class EventUpdatePublisher(Publisher):
 | 
						|
    """Publishes events (objects, audio, manual)."""
 | 
						|
 | 
						|
    topic_base = "event/"
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__("update")
 | 
						|
 | 
						|
    def publish(
 | 
						|
        self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, any]]
 | 
						|
    ) -> None:
 | 
						|
        super().publish(payload)
 | 
						|
 | 
						|
 | 
						|
class EventUpdateSubscriber(Subscriber):
 | 
						|
    """Receives event updates."""
 | 
						|
 | 
						|
    topic_base = "event/"
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__("update")
 | 
						|
 | 
						|
 | 
						|
class EventEndPublisher(Publisher):
 | 
						|
    """Publishes events that have ended."""
 | 
						|
 | 
						|
    topic_base = "event/"
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__("finalized")
 | 
						|
 | 
						|
    def publish(
 | 
						|
        self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]
 | 
						|
    ) -> None:
 | 
						|
        super().publish(payload)
 | 
						|
 | 
						|
 | 
						|
class EventEndSubscriber(Subscriber):
 | 
						|
    """Receives events that have ended."""
 | 
						|
 | 
						|
    topic_base = "event/"
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__("finalized")
 |