mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			310 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			310 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import datetime
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import tempfile
 | 
						|
import unittest
 | 
						|
from unittest.mock import MagicMock
 | 
						|
 | 
						|
from peewee import DoesNotExist
 | 
						|
from peewee_migrate import Router
 | 
						|
from playhouse.sqlite_ext import SqliteExtDatabase
 | 
						|
from playhouse.sqliteq import SqliteQueueDatabase
 | 
						|
 | 
						|
from frigate.config import FrigateConfig
 | 
						|
from frigate.models import Event, Recordings
 | 
						|
from frigate.storage import StorageMaintainer
 | 
						|
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
 | 
						|
 | 
						|
 | 
						|
class TestHttp(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        # setup clean database for each test run
 | 
						|
        migrate_db = SqliteExtDatabase("test.db")
 | 
						|
        del logging.getLogger("peewee_migrate").handlers[:]
 | 
						|
        router = Router(migrate_db)
 | 
						|
        router.run()
 | 
						|
        migrate_db.close()
 | 
						|
        self.db = SqliteQueueDatabase(TEST_DB)
 | 
						|
        models = [Event, Recordings]
 | 
						|
        self.db.bind(models)
 | 
						|
        self.test_dir = tempfile.mkdtemp()
 | 
						|
 | 
						|
        self.minimal_config = {
 | 
						|
            "mqtt": {"host": "mqtt"},
 | 
						|
            "cameras": {
 | 
						|
                "front_door": {
 | 
						|
                    "ffmpeg": {
 | 
						|
                        "inputs": [
 | 
						|
                            {"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
 | 
						|
                        ]
 | 
						|
                    },
 | 
						|
                    "detect": {
 | 
						|
                        "height": 1080,
 | 
						|
                        "width": 1920,
 | 
						|
                        "fps": 5,
 | 
						|
                    },
 | 
						|
                }
 | 
						|
            },
 | 
						|
        }
 | 
						|
        self.double_cam_config = {
 | 
						|
            "mqtt": {"host": "mqtt"},
 | 
						|
            "cameras": {
 | 
						|
                "front_door": {
 | 
						|
                    "ffmpeg": {
 | 
						|
                        "inputs": [
 | 
						|
                            {"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
 | 
						|
                        ]
 | 
						|
                    },
 | 
						|
                    "detect": {
 | 
						|
                        "height": 1080,
 | 
						|
                        "width": 1920,
 | 
						|
                        "fps": 5,
 | 
						|
                    },
 | 
						|
                },
 | 
						|
                "back_door": {
 | 
						|
                    "ffmpeg": {
 | 
						|
                        "inputs": [
 | 
						|
                            {"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}
 | 
						|
                        ]
 | 
						|
                    },
 | 
						|
                    "detect": {
 | 
						|
                        "height": 1080,
 | 
						|
                        "width": 1920,
 | 
						|
                        "fps": 5,
 | 
						|
                    },
 | 
						|
                },
 | 
						|
            },
 | 
						|
        }
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        if not self.db.is_closed():
 | 
						|
            self.db.close()
 | 
						|
 | 
						|
        try:
 | 
						|
            for file in TEST_DB_CLEANUPS:
 | 
						|
                os.remove(file)
 | 
						|
        except OSError:
 | 
						|
            pass
 | 
						|
 | 
						|
    def test_segment_calculations(self):
 | 
						|
        """Test that the segment calculations are correct."""
 | 
						|
        config = FrigateConfig(**self.double_cam_config)
 | 
						|
        storage = StorageMaintainer(config, MagicMock())
 | 
						|
 | 
						|
        time_keep = datetime.datetime.now().timestamp()
 | 
						|
        rec_fd_id = "1234567.frontdoor"
 | 
						|
        rec_bd_id = "1234568.backdoor"
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_fd_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_fd_id}.tmp"),
 | 
						|
            time_keep,
 | 
						|
            time_keep + 10,
 | 
						|
            camera="front_door",
 | 
						|
            seg_size=4,
 | 
						|
            seg_dur=10,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_bd_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_bd_id}.tmp"),
 | 
						|
            time_keep + 10,
 | 
						|
            time_keep + 20,
 | 
						|
            camera="back_door",
 | 
						|
            seg_size=8,
 | 
						|
            seg_dur=20,
 | 
						|
        )
 | 
						|
        storage.calculate_camera_bandwidth()
 | 
						|
        assert storage.camera_storage_stats == {
 | 
						|
            "front_door": {"bandwidth": 1440, "needs_refresh": True},
 | 
						|
            "back_door": {"bandwidth": 2880, "needs_refresh": True},
 | 
						|
        }
 | 
						|
 | 
						|
    def test_segment_calculations_with_zero_segments(self):
 | 
						|
        """Ensure segment calculation does not fail when migrating from previous version."""
 | 
						|
        config = FrigateConfig(**self.minimal_config)
 | 
						|
        storage = StorageMaintainer(config, MagicMock())
 | 
						|
 | 
						|
        time_keep = datetime.datetime.now().timestamp()
 | 
						|
        rec_fd_id = "1234567.frontdoor"
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_fd_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_fd_id}.tmp"),
 | 
						|
            time_keep,
 | 
						|
            time_keep + 10,
 | 
						|
            camera="front_door",
 | 
						|
            seg_size=0,
 | 
						|
            seg_dur=10,
 | 
						|
        )
 | 
						|
        storage.calculate_camera_bandwidth()
 | 
						|
        assert storage.camera_storage_stats == {
 | 
						|
            "front_door": {"bandwidth": 0, "needs_refresh": True},
 | 
						|
        }
 | 
						|
 | 
						|
    def test_storage_cleanup(self):
 | 
						|
        """Ensure that all recordings are cleaned up when necessary."""
 | 
						|
        config = FrigateConfig(**self.minimal_config)
 | 
						|
        storage = StorageMaintainer(config, MagicMock())
 | 
						|
 | 
						|
        id = "123456.keep"
 | 
						|
        time_keep = datetime.datetime.now().timestamp()
 | 
						|
        _insert_mock_event(
 | 
						|
            id,
 | 
						|
            time_keep,
 | 
						|
            time_keep + 30,
 | 
						|
            True,
 | 
						|
        )
 | 
						|
        rec_k_id = "1234567.keep"
 | 
						|
        rec_k2_id = "1234568.keep"
 | 
						|
        rec_k3_id = "1234569.keep"
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_k_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_k_id}.tmp"),
 | 
						|
            time_keep,
 | 
						|
            time_keep + 10,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_k2_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_k2_id}.tmp"),
 | 
						|
            time_keep + 10,
 | 
						|
            time_keep + 20,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_k3_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_k3_id}.tmp"),
 | 
						|
            time_keep + 20,
 | 
						|
            time_keep + 30,
 | 
						|
        )
 | 
						|
 | 
						|
        id2 = "7890.delete"
 | 
						|
        time_delete = datetime.datetime.now().timestamp() - 360
 | 
						|
        _insert_mock_event(id2, time_delete, time_delete + 30, False)
 | 
						|
        rec_d_id = "78901.delete"
 | 
						|
        rec_d2_id = "78902.delete"
 | 
						|
        rec_d3_id = "78903.delete"
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_d_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_d_id}.tmp"),
 | 
						|
            time_delete,
 | 
						|
            time_delete + 10,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_d2_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_d2_id}.tmp"),
 | 
						|
            time_delete + 10,
 | 
						|
            time_delete + 20,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_d3_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_d3_id}.tmp"),
 | 
						|
            time_delete + 20,
 | 
						|
            time_delete + 30,
 | 
						|
        )
 | 
						|
 | 
						|
        storage.calculate_camera_bandwidth()
 | 
						|
        storage.reduce_storage_consumption()
 | 
						|
        with self.assertRaises(DoesNotExist):
 | 
						|
            assert Recordings.get(Recordings.id == rec_k_id)
 | 
						|
            assert Recordings.get(Recordings.id == rec_k2_id)
 | 
						|
            assert Recordings.get(Recordings.id == rec_k3_id)
 | 
						|
            Recordings.get(Recordings.id == rec_d_id)
 | 
						|
            Recordings.get(Recordings.id == rec_d2_id)
 | 
						|
            Recordings.get(Recordings.id == rec_d3_id)
 | 
						|
 | 
						|
    def test_storage_cleanup_keeps_retained(self):
 | 
						|
        """Ensure that all recordings are cleaned up when necessary."""
 | 
						|
        config = FrigateConfig(**self.minimal_config)
 | 
						|
        storage = StorageMaintainer(config, MagicMock())
 | 
						|
 | 
						|
        id = "123456.keep"
 | 
						|
        time_keep = datetime.datetime.now().timestamp()
 | 
						|
        _insert_mock_event(
 | 
						|
            id,
 | 
						|
            time_keep,
 | 
						|
            time_keep + 30,
 | 
						|
            True,
 | 
						|
        )
 | 
						|
        rec_k_id = "1234567.keep"
 | 
						|
        rec_k2_id = "1234568.keep"
 | 
						|
        rec_k3_id = "1234569.keep"
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_k_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_k_id}.tmp"),
 | 
						|
            time_keep,
 | 
						|
            time_keep + 10,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_k2_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_k2_id}.tmp"),
 | 
						|
            time_keep + 10,
 | 
						|
            time_keep + 20,
 | 
						|
        )
 | 
						|
        _insert_mock_recording(
 | 
						|
            rec_k3_id,
 | 
						|
            os.path.join(self.test_dir, f"{rec_k3_id}.tmp"),
 | 
						|
            time_keep + 20,
 | 
						|
            time_keep + 30,
 | 
						|
        )
 | 
						|
 | 
						|
        time_delete = datetime.datetime.now().timestamp() - 7200
 | 
						|
        for i in range(0, 59):
 | 
						|
            id = f"{123456 + i}.delete"
 | 
						|
            _insert_mock_recording(
 | 
						|
                id,
 | 
						|
                os.path.join(self.test_dir, f"{id}.tmp"),
 | 
						|
                time_delete,
 | 
						|
                time_delete + 600,
 | 
						|
            )
 | 
						|
 | 
						|
        storage.calculate_camera_bandwidth()
 | 
						|
        storage.reduce_storage_consumption()
 | 
						|
        assert Recordings.get(Recordings.id == rec_k_id)
 | 
						|
        assert Recordings.get(Recordings.id == rec_k2_id)
 | 
						|
        assert Recordings.get(Recordings.id == rec_k3_id)
 | 
						|
 | 
						|
 | 
						|
def _insert_mock_event(id: str, start: int, end: int, retain: bool) -> Event:
 | 
						|
    """Inserts a basic event model with a given id."""
 | 
						|
    return Event.insert(
 | 
						|
        id=id,
 | 
						|
        label="Mock",
 | 
						|
        camera="front_door",
 | 
						|
        start_time=start,
 | 
						|
        end_time=end,
 | 
						|
        top_score=100,
 | 
						|
        false_positive=False,
 | 
						|
        zones=list(),
 | 
						|
        thumbnail="",
 | 
						|
        region=[],
 | 
						|
        box=[],
 | 
						|
        area=0,
 | 
						|
        has_clip=True,
 | 
						|
        has_snapshot=True,
 | 
						|
        retain_indefinitely=retain,
 | 
						|
    ).execute()
 | 
						|
 | 
						|
 | 
						|
def _insert_mock_recording(
 | 
						|
    id: str,
 | 
						|
    file: str,
 | 
						|
    start: int,
 | 
						|
    end: int,
 | 
						|
    camera="front_door",
 | 
						|
    seg_size=8,
 | 
						|
    seg_dur=10,
 | 
						|
) -> Event:
 | 
						|
    """Inserts a basic recording model with a given id."""
 | 
						|
    # we must open the file so storage maintainer will delete it
 | 
						|
    with open(file, "w"):
 | 
						|
        pass
 | 
						|
 | 
						|
    return Recordings.insert(
 | 
						|
        id=id,
 | 
						|
        camera=camera,
 | 
						|
        path=file,
 | 
						|
        start_time=start,
 | 
						|
        end_time=end,
 | 
						|
        duration=seg_dur,
 | 
						|
        motion=True,
 | 
						|
        objects=True,
 | 
						|
        segment_size=seg_size,
 | 
						|
    ).execute()
 |