import datetime import logging import os import tempfile import unittest from unittest.mock import MagicMock from peewee import DoesNotExist from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.config import FrigateConfig from frigate.models import Event, Recordings from frigate.storage import StorageMaintainer from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS class TestHttp(unittest.TestCase): def setUp(self): # setup clean database for each test run migrate_db = SqliteExtDatabase("test.db") del logging.getLogger("peewee_migrate").handlers[:] router = Router(migrate_db) router.run() migrate_db.close() self.db = SqliteQueueDatabase(TEST_DB) models = [Event, Recordings] self.db.bind(models) self.test_dir = tempfile.mkdtemp() self.minimal_config = { "mqtt": {"host": "mqtt"}, "cameras": { "front_door": { "ffmpeg": { "inputs": [ {"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]} ] }, "detect": { "height": 1080, "width": 1920, "fps": 5, }, } }, } self.double_cam_config = { "mqtt": {"host": "mqtt"}, "cameras": { "front_door": { "ffmpeg": { "inputs": [ {"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]} ] }, "detect": { "height": 1080, "width": 1920, "fps": 5, }, }, "back_door": { "ffmpeg": { "inputs": [ {"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]} ] }, "detect": { "height": 1080, "width": 1920, "fps": 5, }, }, }, } def tearDown(self): if not self.db.is_closed(): self.db.close() try: for file in TEST_DB_CLEANUPS: os.remove(file) except OSError: pass def test_segment_calculations(self): """Test that the segment calculations are correct.""" config = FrigateConfig(**self.double_cam_config) storage = StorageMaintainer(config, MagicMock()) time_keep = datetime.datetime.now().timestamp() rec_fd_id = "1234567.frontdoor" rec_bd_id = "1234568.backdoor" _insert_mock_recording( rec_fd_id, os.path.join(self.test_dir, f"{rec_fd_id}.tmp"), time_keep, time_keep + 10, camera="front_door", seg_size=4, seg_dur=10, ) _insert_mock_recording( rec_bd_id, os.path.join(self.test_dir, f"{rec_bd_id}.tmp"), time_keep + 10, time_keep + 20, camera="back_door", seg_size=8, seg_dur=20, ) storage.calculate_camera_bandwidth() assert storage.camera_storage_stats == { "front_door": {"bandwidth": 1440, "needs_refresh": True}, "back_door": {"bandwidth": 2880, "needs_refresh": True}, } def test_segment_calculations_with_zero_segments(self): """Ensure segment calculation does not fail when migrating from previous version.""" config = FrigateConfig(**self.minimal_config) storage = StorageMaintainer(config, MagicMock()) time_keep = datetime.datetime.now().timestamp() rec_fd_id = "1234567.frontdoor" _insert_mock_recording( rec_fd_id, os.path.join(self.test_dir, f"{rec_fd_id}.tmp"), time_keep, time_keep + 10, camera="front_door", seg_size=0, seg_dur=10, ) storage.calculate_camera_bandwidth() assert storage.camera_storage_stats == { "front_door": {"bandwidth": 0, "needs_refresh": True}, } def test_storage_cleanup(self): """Ensure that all recordings are cleaned up when necessary.""" config = FrigateConfig(**self.minimal_config) storage = StorageMaintainer(config, MagicMock()) id = "123456.keep" time_keep = datetime.datetime.now().timestamp() _insert_mock_event( id, time_keep, time_keep + 30, True, ) rec_k_id = "1234567.keep" rec_k2_id = "1234568.keep" rec_k3_id = "1234569.keep" _insert_mock_recording( rec_k_id, os.path.join(self.test_dir, f"{rec_k_id}.tmp"), time_keep, time_keep + 10, ) _insert_mock_recording( rec_k2_id, os.path.join(self.test_dir, f"{rec_k2_id}.tmp"), time_keep + 10, time_keep + 20, ) _insert_mock_recording( rec_k3_id, os.path.join(self.test_dir, f"{rec_k3_id}.tmp"), time_keep + 20, time_keep + 30, ) id2 = "7890.delete" time_delete = datetime.datetime.now().timestamp() - 360 _insert_mock_event(id2, time_delete, time_delete + 30, False) rec_d_id = "78901.delete" rec_d2_id = "78902.delete" rec_d3_id = "78903.delete" _insert_mock_recording( rec_d_id, os.path.join(self.test_dir, f"{rec_d_id}.tmp"), time_delete, time_delete + 10, ) _insert_mock_recording( rec_d2_id, os.path.join(self.test_dir, f"{rec_d2_id}.tmp"), time_delete + 10, time_delete + 20, ) _insert_mock_recording( rec_d3_id, os.path.join(self.test_dir, f"{rec_d3_id}.tmp"), time_delete + 20, time_delete + 30, ) storage.calculate_camera_bandwidth() storage.reduce_storage_consumption() with self.assertRaises(DoesNotExist): assert Recordings.get(Recordings.id == rec_k_id) assert Recordings.get(Recordings.id == rec_k2_id) assert Recordings.get(Recordings.id == rec_k3_id) Recordings.get(Recordings.id == rec_d_id) Recordings.get(Recordings.id == rec_d2_id) Recordings.get(Recordings.id == rec_d3_id) def test_storage_cleanup_keeps_retained(self): """Ensure that all recordings are cleaned up when necessary.""" config = FrigateConfig(**self.minimal_config) storage = StorageMaintainer(config, MagicMock()) id = "123456.keep" time_keep = datetime.datetime.now().timestamp() _insert_mock_event( id, time_keep, time_keep + 30, True, ) rec_k_id = "1234567.keep" rec_k2_id = "1234568.keep" rec_k3_id = "1234569.keep" _insert_mock_recording( rec_k_id, os.path.join(self.test_dir, f"{rec_k_id}.tmp"), time_keep, time_keep + 10, ) _insert_mock_recording( rec_k2_id, os.path.join(self.test_dir, f"{rec_k2_id}.tmp"), time_keep + 10, time_keep + 20, ) _insert_mock_recording( rec_k3_id, os.path.join(self.test_dir, f"{rec_k3_id}.tmp"), time_keep + 20, time_keep + 30, ) time_delete = datetime.datetime.now().timestamp() - 7200 for i in range(0, 59): id = f"{123456 + i}.delete" _insert_mock_recording( id, os.path.join(self.test_dir, f"{id}.tmp"), time_delete, time_delete + 600, ) storage.calculate_camera_bandwidth() storage.reduce_storage_consumption() assert Recordings.get(Recordings.id == rec_k_id) assert Recordings.get(Recordings.id == rec_k2_id) assert Recordings.get(Recordings.id == rec_k3_id) def _insert_mock_event(id: str, start: int, end: int, retain: bool) -> Event: """Inserts a basic event model with a given id.""" return Event.insert( id=id, label="Mock", camera="front_door", start_time=start, end_time=end, top_score=100, false_positive=False, zones=list(), thumbnail="", region=[], box=[], area=0, has_clip=True, has_snapshot=True, retain_indefinitely=retain, ).execute() def _insert_mock_recording( id: str, file: str, start: int, end: int, camera="front_door", seg_size=8, seg_dur=10, ) -> Event: """Inserts a basic recording model with a given id.""" # we must open the file so storage maintainer will delete it with open(file, "w"): pass return Recordings.insert( id=id, camera=camera, path=file, start_time=start, end_time=end, duration=seg_dur, motion=True, objects=True, segment_size=seg_size, ).execute()