mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-30 13:48:07 +02:00
* Implement model training via ZMQ and add model states to represent training * Get model updates working * Improve toasts and model state * Clean up logging * Add back in
621 lines
19 KiB
Python
621 lines
19 KiB
Python
"""Object classification APIs."""
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from typing import Any
|
|
|
|
import cv2
|
|
from fastapi import APIRouter, Depends, Request, UploadFile
|
|
from fastapi.responses import JSONResponse
|
|
from pathvalidate import sanitize_filename
|
|
from peewee import DoesNotExist
|
|
from playhouse.shortcuts import model_to_dict
|
|
|
|
from frigate.api.auth import require_role
|
|
from frigate.api.defs.request.classification_body import (
|
|
AudioTranscriptionBody,
|
|
RenameFaceBody,
|
|
)
|
|
from frigate.api.defs.tags import Tags
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.camera import DetectConfig
|
|
from frigate.const import CLIPS_DIR, FACE_DIR
|
|
from frigate.embeddings import EmbeddingsContext
|
|
from frigate.models import Event
|
|
from frigate.util.path import get_event_snapshot
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(tags=[Tags.events])
|
|
|
|
|
|
@router.get("/faces")
|
|
def get_faces():
|
|
face_dict: dict[str, list[str]] = {}
|
|
|
|
if not os.path.exists(FACE_DIR):
|
|
return JSONResponse(status_code=200, content={})
|
|
|
|
for name in os.listdir(FACE_DIR):
|
|
face_dir = os.path.join(FACE_DIR, name)
|
|
|
|
if not os.path.isdir(face_dir):
|
|
continue
|
|
|
|
face_dict[name] = []
|
|
|
|
for file in filter(
|
|
lambda f: (f.lower().endswith((".webp", ".png", ".jpg", ".jpeg"))),
|
|
os.listdir(face_dir),
|
|
):
|
|
face_dict[name].append(file)
|
|
|
|
return JSONResponse(status_code=200, content=face_dict)
|
|
|
|
|
|
@router.post("/faces/reprocess", dependencies=[Depends(require_role(["admin"]))])
|
|
def reclassify_face(request: Request, body: dict = None):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
json: dict[str, Any] = body or {}
|
|
training_file = os.path.join(
|
|
FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}"
|
|
)
|
|
|
|
if not training_file or not os.path.isfile(training_file):
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"Invalid filename or no file exists: {training_file}",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
response = context.reprocess_face(training_file)
|
|
|
|
return JSONResponse(
|
|
content=response,
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.post("/faces/train/{name}/classify")
|
|
def train_face(request: Request, name: str, body: dict = None):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
json: dict[str, Any] = body or {}
|
|
training_file_name = sanitize_filename(json.get("training_file", ""))
|
|
training_file = os.path.join(FACE_DIR, f"train/{training_file_name}")
|
|
event_id = json.get("event_id")
|
|
|
|
if not training_file_name and not event_id:
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": "A training file or event_id must be passed.",
|
|
}
|
|
),
|
|
status_code=400,
|
|
)
|
|
|
|
if training_file_name and not os.path.isfile(training_file):
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"Invalid filename or no file exists: {training_file_name}",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
sanitized_name = sanitize_filename(name)
|
|
new_name = f"{sanitized_name}-{datetime.datetime.now().timestamp()}.webp"
|
|
new_file_folder = os.path.join(FACE_DIR, f"{sanitized_name}")
|
|
|
|
if not os.path.exists(new_file_folder):
|
|
os.mkdir(new_file_folder)
|
|
|
|
if training_file_name:
|
|
shutil.move(training_file, os.path.join(new_file_folder, new_name))
|
|
else:
|
|
try:
|
|
event: Event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"Invalid event_id or no event exists: {event_id}",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
snapshot = get_event_snapshot(event)
|
|
face_box = event.data["attributes"][0]["box"]
|
|
detect_config: DetectConfig = request.app.frigate_config.cameras[
|
|
event.camera
|
|
].detect
|
|
|
|
# crop onto the face box minus the bounding box itself
|
|
x1 = int(face_box[0] * detect_config.width) + 2
|
|
y1 = int(face_box[1] * detect_config.height) + 2
|
|
x2 = x1 + int(face_box[2] * detect_config.width) - 4
|
|
y2 = y1 + int(face_box[3] * detect_config.height) - 4
|
|
face = snapshot[y1:y2, x1:x2]
|
|
cv2.imwrite(os.path.join(new_file_folder, new_name), face)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
context.clear_face_classifier()
|
|
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": True,
|
|
"message": f"Successfully saved {training_file_name} as {new_name}.",
|
|
}
|
|
),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.post("/faces/{name}/create", dependencies=[Depends(require_role(["admin"]))])
|
|
async def create_face(request: Request, name: str):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
os.makedirs(
|
|
os.path.join(FACE_DIR, sanitize_filename(name.replace(" ", "_"))), exist_ok=True
|
|
)
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={"success": False, "message": "Successfully created face folder."},
|
|
)
|
|
|
|
|
|
@router.post("/faces/{name}/register", dependencies=[Depends(require_role(["admin"]))])
|
|
async def register_face(request: Request, name: str, file: UploadFile):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
result = context.register_face(name, await file.read())
|
|
|
|
if not isinstance(result, dict):
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"success": False,
|
|
"message": "Could not process request. Try restarting Frigate.",
|
|
},
|
|
)
|
|
|
|
return JSONResponse(
|
|
status_code=200 if result.get("success", True) else 400,
|
|
content=result,
|
|
)
|
|
|
|
|
|
@router.post("/faces/recognize")
|
|
async def recognize_face(request: Request, file: UploadFile):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
result = context.recognize_face(await file.read())
|
|
|
|
if not isinstance(result, dict):
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"success": False,
|
|
"message": "Could not process request. Try restarting Frigate.",
|
|
},
|
|
)
|
|
|
|
return JSONResponse(
|
|
status_code=200 if result.get("success", True) else 400,
|
|
content=result,
|
|
)
|
|
|
|
|
|
@router.post("/faces/{name}/delete", dependencies=[Depends(require_role(["admin"]))])
|
|
def deregister_faces(request: Request, name: str, body: dict = None):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
json: dict[str, Any] = body or {}
|
|
list_of_ids = json.get("ids", "")
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
context.delete_face_ids(
|
|
name, map(lambda file: sanitize_filename(file), list_of_ids)
|
|
)
|
|
return JSONResponse(
|
|
content=({"success": True, "message": "Successfully deleted faces."}),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.put("/faces/{old_name}/rename", dependencies=[Depends(require_role(["admin"]))])
|
|
def rename_face(request: Request, old_name: str, body: RenameFaceBody):
|
|
if not request.app.frigate_config.face_recognition.enabled:
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"message": "Face recognition is not enabled.", "success": False},
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
try:
|
|
context.rename_face(old_name, body.new_name)
|
|
return JSONResponse(
|
|
content={
|
|
"success": True,
|
|
"message": f"Successfully renamed face to {body.new_name}.",
|
|
},
|
|
status_code=200,
|
|
)
|
|
except ValueError as e:
|
|
logger.error(e)
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"message": "Error renaming face. Check Frigate logs.",
|
|
"success": False,
|
|
},
|
|
)
|
|
|
|
|
|
@router.put("/lpr/reprocess")
|
|
def reprocess_license_plate(request: Request, event_id: str):
|
|
if not request.app.frigate_config.lpr.enabled:
|
|
message = "License plate recognition is not enabled."
|
|
logger.error(message)
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": message,
|
|
}
|
|
),
|
|
status_code=400,
|
|
)
|
|
|
|
try:
|
|
event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
message = f"Event {event_id} not found"
|
|
logger.error(message)
|
|
return JSONResponse(
|
|
content=({"success": False, "message": message}), status_code=404
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
response = context.reprocess_plate(model_to_dict(event))
|
|
|
|
return JSONResponse(
|
|
content=response,
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.put("/reindex", dependencies=[Depends(require_role(["admin"]))])
|
|
def reindex_embeddings(request: Request):
|
|
if not request.app.frigate_config.semantic_search.enabled:
|
|
message = (
|
|
"Cannot reindex tracked object embeddings, Semantic Search is not enabled."
|
|
)
|
|
logger.error(message)
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": message,
|
|
}
|
|
),
|
|
status_code=400,
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
response = context.reindex_embeddings()
|
|
|
|
if response == "started":
|
|
return JSONResponse(
|
|
content={
|
|
"success": True,
|
|
"message": "Embeddings reindexing has started.",
|
|
},
|
|
status_code=202, # 202 Accepted
|
|
)
|
|
elif response == "in_progress":
|
|
return JSONResponse(
|
|
content={
|
|
"success": False,
|
|
"message": "Embeddings reindexing is already in progress.",
|
|
},
|
|
status_code=409, # 409 Conflict
|
|
)
|
|
else:
|
|
return JSONResponse(
|
|
content={
|
|
"success": False,
|
|
"message": "Failed to start reindexing.",
|
|
},
|
|
status_code=500,
|
|
)
|
|
|
|
|
|
@router.put("/audio/transcribe")
|
|
def transcribe_audio(request: Request, body: AudioTranscriptionBody):
|
|
event_id = body.event_id
|
|
|
|
try:
|
|
event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
message = f"Event {event_id} not found"
|
|
logger.error(message)
|
|
return JSONResponse(
|
|
content=({"success": False, "message": message}), status_code=404
|
|
)
|
|
|
|
if not request.app.frigate_config.cameras[event.camera].audio_transcription.enabled:
|
|
message = f"Audio transcription is not enabled for {event.camera}."
|
|
logger.error(message)
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": message,
|
|
}
|
|
),
|
|
status_code=400,
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
response = context.transcribe_audio(model_to_dict(event))
|
|
|
|
if response == "started":
|
|
return JSONResponse(
|
|
content={
|
|
"success": True,
|
|
"message": "Audio transcription has started.",
|
|
},
|
|
status_code=202, # 202 Accepted
|
|
)
|
|
elif response == "in_progress":
|
|
return JSONResponse(
|
|
content={
|
|
"success": False,
|
|
"message": "Audio transcription for a speech event is currently in progress. Try again later.",
|
|
},
|
|
status_code=409, # 409 Conflict
|
|
)
|
|
else:
|
|
return JSONResponse(
|
|
content={
|
|
"success": False,
|
|
"message": "Failed to transcribe audio.",
|
|
},
|
|
status_code=500,
|
|
)
|
|
|
|
|
|
# custom classification training
|
|
|
|
|
|
@router.get("/classification/{name}/dataset")
|
|
def get_classification_dataset(name: str):
|
|
dataset_dict: dict[str, list[str]] = {}
|
|
|
|
dataset_dir = os.path.join(CLIPS_DIR, sanitize_filename(name), "dataset")
|
|
|
|
if not os.path.exists(dataset_dir):
|
|
return JSONResponse(status_code=200, content={})
|
|
|
|
for name in os.listdir(dataset_dir):
|
|
category_dir = os.path.join(dataset_dir, name)
|
|
|
|
if not os.path.isdir(category_dir):
|
|
continue
|
|
|
|
dataset_dict[name] = []
|
|
|
|
for file in filter(
|
|
lambda f: (f.lower().endswith((".webp", ".png", ".jpg", ".jpeg"))),
|
|
os.listdir(category_dir),
|
|
):
|
|
dataset_dict[name].append(file)
|
|
|
|
return JSONResponse(status_code=200, content=dataset_dict)
|
|
|
|
|
|
@router.get("/classification/{name}/train")
|
|
def get_classification_images(name: str):
|
|
train_dir = os.path.join(CLIPS_DIR, sanitize_filename(name), "train")
|
|
|
|
if not os.path.exists(train_dir):
|
|
return JSONResponse(status_code=200, content=[])
|
|
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content=list(
|
|
filter(
|
|
lambda f: (f.lower().endswith((".webp", ".png", ".jpg", ".jpeg"))),
|
|
os.listdir(train_dir),
|
|
)
|
|
),
|
|
)
|
|
|
|
|
|
@router.post("/classification/{name}/train")
|
|
async def train_configured_model(request: Request, name: str):
|
|
config: FrigateConfig = request.app.frigate_config
|
|
|
|
if name not in config.classification.custom:
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"{name} is not a known classification model.",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
context: EmbeddingsContext = request.app.embeddings
|
|
context.start_classification_training(name)
|
|
return JSONResponse(
|
|
content={"success": True, "message": "Started classification model training."},
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.post(
|
|
"/classification/{name}/dataset/{category}/delete",
|
|
dependencies=[Depends(require_role(["admin"]))],
|
|
)
|
|
def delete_classification_dataset_images(
|
|
request: Request, name: str, category: str, body: dict = None
|
|
):
|
|
config: FrigateConfig = request.app.frigate_config
|
|
|
|
if name not in config.classification.custom:
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"{name} is not a known classification model.",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
json: dict[str, Any] = body or {}
|
|
list_of_ids = json.get("ids", "")
|
|
folder = os.path.join(
|
|
CLIPS_DIR, sanitize_filename(name), "dataset", sanitize_filename(category)
|
|
)
|
|
|
|
for id in list_of_ids:
|
|
file_path = os.path.join(folder, id)
|
|
|
|
if os.path.isfile(file_path):
|
|
os.unlink(file_path)
|
|
|
|
return JSONResponse(
|
|
content=({"success": True, "message": "Successfully deleted faces."}),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.post(
|
|
"/classification/{name}/dataset/categorize",
|
|
dependencies=[Depends(require_role(["admin"]))],
|
|
)
|
|
def categorize_classification_image(request: Request, name: str, body: dict = None):
|
|
config: FrigateConfig = request.app.frigate_config
|
|
|
|
if name not in config.classification.custom:
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"{name} is not a known classification model.",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
json: dict[str, Any] = body or {}
|
|
category = sanitize_filename(json.get("category", ""))
|
|
training_file_name = sanitize_filename(json.get("training_file", ""))
|
|
training_file = os.path.join(CLIPS_DIR, name, "train", training_file_name)
|
|
|
|
if training_file_name and not os.path.isfile(training_file):
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"Invalid filename or no file exists: {training_file_name}",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
new_name = f"{category}-{datetime.datetime.now().timestamp()}.png"
|
|
new_file_folder = os.path.join(CLIPS_DIR, name, "dataset", category)
|
|
|
|
if not os.path.exists(new_file_folder):
|
|
os.mkdir(new_file_folder)
|
|
|
|
# use opencv because webp images can not be used to train
|
|
img = cv2.imread(training_file)
|
|
cv2.imwrite(os.path.join(new_file_folder, new_name), img)
|
|
os.unlink(training_file)
|
|
|
|
return JSONResponse(
|
|
content=({"success": True, "message": "Successfully deleted faces."}),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.post(
|
|
"/classification/{name}/train/delete",
|
|
dependencies=[Depends(require_role(["admin"]))],
|
|
)
|
|
def delete_classification_train_images(request: Request, name: str, body: dict = None):
|
|
config: FrigateConfig = request.app.frigate_config
|
|
|
|
if name not in config.classification.custom:
|
|
return JSONResponse(
|
|
content=(
|
|
{
|
|
"success": False,
|
|
"message": f"{name} is not a known classification model.",
|
|
}
|
|
),
|
|
status_code=404,
|
|
)
|
|
|
|
json: dict[str, Any] = body or {}
|
|
list_of_ids = json.get("ids", "")
|
|
folder = os.path.join(CLIPS_DIR, sanitize_filename(name), "train")
|
|
|
|
for id in list_of_ids:
|
|
file_path = os.path.join(folder, id)
|
|
|
|
if os.path.isfile(file_path):
|
|
os.unlink(file_path)
|
|
|
|
return JSONResponse(
|
|
content=({"success": True, "message": "Successfully deleted faces."}),
|
|
status_code=200,
|
|
)
|