mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	* Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated.
		
			
				
	
	
		
			59 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			59 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Facilitates communication between processes."""
 | |
| 
 | |
| import multiprocessing as mp
 | |
| from multiprocessing.synchronize import Event as MpEvent
 | |
| from typing import Any, Optional
 | |
| 
 | |
| import zmq
 | |
| 
 | |
| SOCKET_PUB_SUB = "ipc:///tmp/cache/config"
 | |
| 
 | |
| 
 | |
| class ConfigPublisher:
 | |
|     """Publishes config changes to different processes."""
 | |
| 
 | |
|     def __init__(self) -> None:
 | |
|         self.context = zmq.Context()
 | |
|         self.socket = self.context.socket(zmq.PUB)
 | |
|         self.socket.bind(SOCKET_PUB_SUB)
 | |
|         self.stop_event: MpEvent = mp.Event()
 | |
| 
 | |
|     def publish(self, topic: str, payload: Any) -> None:
 | |
|         """There is no communication back to the processes."""
 | |
|         self.socket.send_string(topic, flags=zmq.SNDMORE)
 | |
|         self.socket.send_pyobj(payload)
 | |
| 
 | |
|     def stop(self) -> None:
 | |
|         self.stop_event.set()
 | |
|         self.socket.close()
 | |
|         self.context.destroy()
 | |
| 
 | |
| 
 | |
| class ConfigSubscriber:
 | |
|     """Simplifies receiving an updated config."""
 | |
| 
 | |
|     def __init__(self, topic: str, exact=False) -> None:
 | |
|         self.topic = topic
 | |
|         self.exact = exact
 | |
|         self.context = zmq.Context()
 | |
|         self.socket = self.context.socket(zmq.SUB)
 | |
|         self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
 | |
|         self.socket.connect(SOCKET_PUB_SUB)
 | |
| 
 | |
|     def check_for_update(self) -> Optional[tuple[str, Any]]:
 | |
|         """Returns updated config or None if no update."""
 | |
|         try:
 | |
|             topic = self.socket.recv_string(flags=zmq.NOBLOCK)
 | |
|             obj = self.socket.recv_pyobj()
 | |
| 
 | |
|             if not self.exact or self.topic == topic:
 | |
|                 return (topic, obj)
 | |
|             else:
 | |
|                 return (None, None)
 | |
|         except zmq.ZMQError:
 | |
|             return (None, None)
 | |
| 
 | |
|     def stop(self) -> None:
 | |
|         self.socket.close()
 | |
|         self.context.destroy()
 |