mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-21 00:06:44 +01:00
240 lines
8.1 KiB
Python
240 lines
8.1 KiB
Python
|
import datetime
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import unittest
|
||
|
from unittest.mock import MagicMock, patch
|
||
|
|
||
|
from peewee import DoesNotExist
|
||
|
from peewee_migrate import Router
|
||
|
from playhouse.sqlite_ext import SqliteExtDatabase
|
||
|
from playhouse.sqliteq import SqliteQueueDatabase
|
||
|
from playhouse.shortcuts import model_to_dict
|
||
|
|
||
|
from frigate.config import FrigateConfig
|
||
|
from frigate.http import create_app
|
||
|
from frigate.models import Event, Recordings
|
||
|
from frigate.storage import StorageMaintainer
|
||
|
|
||
|
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
|
||
|
|
||
|
|
||
|
class TestHttp(unittest.TestCase):
|
||
|
def setUp(self):
|
||
|
# setup clean database for each test run
|
||
|
migrate_db = SqliteExtDatabase("test.db")
|
||
|
del logging.getLogger("peewee_migrate").handlers[:]
|
||
|
router = Router(migrate_db)
|
||
|
router.run()
|
||
|
migrate_db.close()
|
||
|
self.db = SqliteQueueDatabase(TEST_DB)
|
||
|
models = [Event, Recordings]
|
||
|
self.db.bind(models)
|
||
|
|
||
|
self.minimal_config = {
|
||
|
"mqtt": {"host": "mqtt"},
|
||
|
"cameras": {
|
||
|
"front_door": {
|
||
|
"ffmpeg": {
|
||
|
"inputs": [
|
||
|
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||
|
]
|
||
|
},
|
||
|
"detect": {
|
||
|
"height": 1080,
|
||
|
"width": 1920,
|
||
|
"fps": 5,
|
||
|
},
|
||
|
}
|
||
|
},
|
||
|
}
|
||
|
self.double_cam_config = {
|
||
|
"mqtt": {"host": "mqtt"},
|
||
|
"cameras": {
|
||
|
"front_door": {
|
||
|
"ffmpeg": {
|
||
|
"inputs": [
|
||
|
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||
|
]
|
||
|
},
|
||
|
"detect": {
|
||
|
"height": 1080,
|
||
|
"width": 1920,
|
||
|
"fps": 5,
|
||
|
},
|
||
|
},
|
||
|
"back_door": {
|
||
|
"ffmpeg": {
|
||
|
"inputs": [
|
||
|
{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}
|
||
|
]
|
||
|
},
|
||
|
"detect": {
|
||
|
"height": 1080,
|
||
|
"width": 1920,
|
||
|
"fps": 5,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
def tearDown(self):
|
||
|
if not self.db.is_closed():
|
||
|
self.db.close()
|
||
|
|
||
|
try:
|
||
|
for file in TEST_DB_CLEANUPS:
|
||
|
os.remove(file)
|
||
|
except OSError:
|
||
|
pass
|
||
|
|
||
|
def test_segment_calculations(self):
|
||
|
"""Test that the segment calculations are correct."""
|
||
|
config = FrigateConfig(**self.double_cam_config)
|
||
|
storage = StorageMaintainer(config, MagicMock())
|
||
|
|
||
|
time_keep = datetime.datetime.now().timestamp()
|
||
|
rec_fd_id = "1234567.frontdoor"
|
||
|
rec_bd_id = "1234568.backdoor"
|
||
|
_insert_mock_recording(
|
||
|
rec_fd_id,
|
||
|
time_keep,
|
||
|
time_keep + 10,
|
||
|
camera="front_door",
|
||
|
seg_size=4,
|
||
|
seg_dur=10,
|
||
|
)
|
||
|
_insert_mock_recording(
|
||
|
rec_bd_id,
|
||
|
time_keep + 10,
|
||
|
time_keep + 20,
|
||
|
camera="back_door",
|
||
|
seg_size=8,
|
||
|
seg_dur=20,
|
||
|
)
|
||
|
storage.calculate_camera_bandwidth()
|
||
|
assert storage.camera_storage_stats == {
|
||
|
"front_door": {"bandwidth": 1440, "needs_refresh": True},
|
||
|
"back_door": {"bandwidth": 2880, "needs_refresh": True},
|
||
|
}
|
||
|
|
||
|
def test_segment_calculations_with_zero_segments(self):
|
||
|
"""Ensure segment calculation does not fail when migrating from previous version."""
|
||
|
config = FrigateConfig(**self.minimal_config)
|
||
|
storage = StorageMaintainer(config, MagicMock())
|
||
|
|
||
|
time_keep = datetime.datetime.now().timestamp()
|
||
|
rec_fd_id = "1234567.frontdoor"
|
||
|
_insert_mock_recording(
|
||
|
rec_fd_id,
|
||
|
time_keep,
|
||
|
time_keep + 10,
|
||
|
camera="front_door",
|
||
|
seg_size=0,
|
||
|
seg_dur=10,
|
||
|
)
|
||
|
storage.calculate_camera_bandwidth()
|
||
|
assert storage.camera_storage_stats == {
|
||
|
"front_door": {"bandwidth": 0, "needs_refresh": True},
|
||
|
}
|
||
|
|
||
|
def test_storage_cleanup(self):
|
||
|
"""Ensure that all recordings are cleaned up when necessary."""
|
||
|
config = FrigateConfig(**self.minimal_config)
|
||
|
storage = StorageMaintainer(config, MagicMock())
|
||
|
|
||
|
id = "123456.keep"
|
||
|
time_keep = datetime.datetime.now().timestamp()
|
||
|
_insert_mock_event(id, time_keep, time_keep + 30, True)
|
||
|
rec_k_id = "1234567.keep"
|
||
|
rec_k2_id = "1234568.keep"
|
||
|
rec_k3_id = "1234569.keep"
|
||
|
_insert_mock_recording(rec_k_id, time_keep, time_keep + 10)
|
||
|
_insert_mock_recording(rec_k2_id, time_keep + 10, time_keep + 20)
|
||
|
_insert_mock_recording(rec_k3_id, time_keep + 20, time_keep + 30)
|
||
|
|
||
|
id2 = "7890.delete"
|
||
|
time_delete = datetime.datetime.now().timestamp() - 360
|
||
|
_insert_mock_event(id2, time_delete, time_delete + 30, False)
|
||
|
rec_d_id = "78901.delete"
|
||
|
rec_d2_id = "78902.delete"
|
||
|
rec_d3_id = "78903.delete"
|
||
|
_insert_mock_recording(rec_d_id, time_delete, time_delete + 10)
|
||
|
_insert_mock_recording(rec_d2_id, time_delete + 10, time_delete + 20)
|
||
|
_insert_mock_recording(rec_d3_id, time_delete + 20, time_delete + 30)
|
||
|
|
||
|
storage.calculate_camera_bandwidth()
|
||
|
storage.reduce_storage_consumption()
|
||
|
with self.assertRaises(DoesNotExist):
|
||
|
assert Recordings.get(Recordings.id == rec_k_id)
|
||
|
assert Recordings.get(Recordings.id == rec_k2_id)
|
||
|
assert Recordings.get(Recordings.id == rec_k3_id)
|
||
|
Recordings.get(Recordings.id == rec_d_id)
|
||
|
Recordings.get(Recordings.id == rec_d2_id)
|
||
|
Recordings.get(Recordings.id == rec_d3_id)
|
||
|
|
||
|
def test_storage_cleanup_keeps_retained(self):
|
||
|
"""Ensure that all recordings are cleaned up when necessary."""
|
||
|
config = FrigateConfig(**self.minimal_config)
|
||
|
storage = StorageMaintainer(config, MagicMock())
|
||
|
|
||
|
id = "123456.keep"
|
||
|
time_keep = datetime.datetime.now().timestamp()
|
||
|
_insert_mock_event(id, time_keep, time_keep + 30, True)
|
||
|
rec_k_id = "1234567.keep"
|
||
|
rec_k2_id = "1234568.keep"
|
||
|
rec_k3_id = "1234569.keep"
|
||
|
_insert_mock_recording(rec_k_id, time_keep, time_keep + 10)
|
||
|
_insert_mock_recording(rec_k2_id, time_keep + 10, time_keep + 20)
|
||
|
_insert_mock_recording(rec_k3_id, time_keep + 20, time_keep + 30)
|
||
|
|
||
|
time_delete = datetime.datetime.now().timestamp() - 7200
|
||
|
for i in range(0, 59):
|
||
|
_insert_mock_recording(
|
||
|
f"{123456 + i}.delete", time_delete, time_delete + 600
|
||
|
)
|
||
|
|
||
|
storage.calculate_camera_bandwidth()
|
||
|
storage.reduce_storage_consumption()
|
||
|
assert Recordings.get(Recordings.id == rec_k_id)
|
||
|
assert Recordings.get(Recordings.id == rec_k2_id)
|
||
|
assert Recordings.get(Recordings.id == rec_k3_id)
|
||
|
|
||
|
|
||
|
def _insert_mock_event(id: str, start: int, end: int, retain: bool) -> Event:
|
||
|
"""Inserts a basic event model with a given id."""
|
||
|
return Event.insert(
|
||
|
id=id,
|
||
|
label="Mock",
|
||
|
camera="front_door",
|
||
|
start_time=start,
|
||
|
end_time=end,
|
||
|
top_score=100,
|
||
|
false_positive=False,
|
||
|
zones=list(),
|
||
|
thumbnail="",
|
||
|
region=[],
|
||
|
box=[],
|
||
|
area=0,
|
||
|
has_clip=True,
|
||
|
has_snapshot=True,
|
||
|
retain_indefinitely=retain,
|
||
|
).execute()
|
||
|
|
||
|
|
||
|
def _insert_mock_recording(
|
||
|
id: str, start: int, end: int, camera="front_door", seg_size=8, seg_dur=10
|
||
|
) -> Event:
|
||
|
"""Inserts a basic recording model with a given id."""
|
||
|
return Recordings.insert(
|
||
|
id=id,
|
||
|
camera=camera,
|
||
|
path=f"/recordings/{id}",
|
||
|
start_time=start,
|
||
|
end_time=end,
|
||
|
duration=seg_dur,
|
||
|
motion=True,
|
||
|
objects=True,
|
||
|
segment_size=seg_size,
|
||
|
).execute()
|