blakeblackshear.frigate/frigate/util/classification.py
Nicolas Mowen 0a02c665fc
Improve the tablet layout (#19320)
* Improve the tablet layout

* Update imports sort

* Fix more imports
2025-07-29 09:11:52 -06:00

175 lines
5.7 KiB
Python

"""Util for classification models."""
import logging
import os
import cv2
import numpy as np
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
from frigate.comms.inter_process import InterProcessRequestor
from frigate.const import (
CLIPS_DIR,
MODEL_CACHE_DIR,
PROCESS_PRIORITY_LOW,
UPDATE_MODEL_STATE,
)
from frigate.log import redirect_output_to_logger
from frigate.types import ModelStatusTypesEnum
from frigate.util.process import FrigateProcess
BATCH_SIZE = 16
EPOCHS = 50
LEARNING_RATE = 0.001
logger = logging.getLogger(__name__)
class ClassificationTrainingProcess(FrigateProcess):
def __init__(self, model_name: str) -> None:
super().__init__(
stop_event=None,
priority=PROCESS_PRIORITY_LOW,
name=f"model_training:{model_name}",
)
self.model_name = model_name
def run(self) -> None:
self.pre_run_setup()
self.__train_classification_model()
def __generate_representative_dataset_factory(self, dataset_dir: str):
def generate_representative_dataset():
image_paths = []
for root, dirs, files in os.walk(dataset_dir):
for file in files:
if file.lower().endswith((".jpg", ".jpeg", ".png")):
image_paths.append(os.path.join(root, file))
for path in image_paths[:300]:
img = cv2.imread(path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
img_array = np.array(img, dtype=np.float32) / 255.0
img_array = img_array[None, ...]
yield [img_array]
return generate_representative_dataset
@redirect_output_to_logger(logger, logging.DEBUG)
def __train_classification_model(self) -> bool:
"""Train a classification model."""
# import in the function so that tensorflow is not initialized multiple times
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
logger.info(f"Kicking off classification training for {self.model_name}.")
dataset_dir = os.path.join(CLIPS_DIR, self.model_name, "dataset")
model_dir = os.path.join(MODEL_CACHE_DIR, self.model_name)
num_classes = len(
[
d
for d in os.listdir(dataset_dir)
if os.path.isdir(os.path.join(dataset_dir, d))
]
)
# Start with imagenet base model with 35% of channels in each layer
base_model = MobileNetV2(
input_shape=(224, 224, 3),
include_top=False,
weights="imagenet",
alpha=0.35,
)
base_model.trainable = False # Freeze pre-trained layers
model = models.Sequential(
[
base_model,
layers.GlobalAveragePooling2D(),
layers.Dense(128, activation="relu"),
layers.Dropout(0.3),
layers.Dense(num_classes, activation="softmax"),
]
)
model.compile(
optimizer=optimizers.Adam(learning_rate=LEARNING_RATE),
loss="categorical_crossentropy",
metrics=["accuracy"],
)
# create training set
datagen = ImageDataGenerator(rescale=1.0 / 255, validation_split=0.2)
train_gen = datagen.flow_from_directory(
dataset_dir,
target_size=(224, 224),
batch_size=BATCH_SIZE,
class_mode="categorical",
subset="training",
)
# write labelmap
class_indices = train_gen.class_indices
index_to_class = {v: k for k, v in class_indices.items()}
sorted_classes = [index_to_class[i] for i in range(len(index_to_class))]
with open(os.path.join(model_dir, "labelmap.txt"), "w") as f:
for class_name in sorted_classes:
f.write(f"{class_name}\n")
# train the model
model.fit(train_gen, epochs=EPOCHS, verbose=0)
# convert model to tflite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = (
self.__generate_representative_dataset_factory(dataset_dir)
)
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8
tflite_model = converter.convert()
# write model
with open(os.path.join(model_dir, "model.tflite"), "wb") as f:
f.write(tflite_model)
@staticmethod
def kickoff_model_training(
embeddingRequestor: EmbeddingsRequestor, model_name: str
) -> None:
requestor = InterProcessRequestor()
requestor.send_data(
UPDATE_MODEL_STATE,
{
"model": model_name,
"state": ModelStatusTypesEnum.training,
},
)
# run training in sub process so that
# tensorflow will free CPU / GPU memory
# upon training completion
training_process = ClassificationTrainingProcess(model_name)
training_process.start()
training_process.join()
# reload model and mark training as complete
embeddingRequestor.send_data(
EmbeddingsRequestEnum.reload_classification_model.value,
{"model_name": model_name},
)
requestor.send_data(
UPDATE_MODEL_STATE,
{
"model": model_name,
"state": ModelStatusTypesEnum.complete,
},
)
requestor.stop()