import base64 import datetime import json import logging import os import time from functools import reduce import cv2 import gevent import numpy as np from flask import ( Blueprint, Flask, Response, current_app, jsonify, make_response, request, ) from flask_sockets import Sockets from peewee import SqliteDatabase, operator, fn, DoesNotExist from playhouse.shortcuts import model_to_dict from frigate.const import CLIPS_DIR from frigate.models import Event from frigate.stats import stats_snapshot from frigate.util import calculate_region from frigate.version import VERSION logger = logging.getLogger(__name__) bp = Blueprint("frigate", __name__) ws = Blueprint("ws", __name__) class MqttBackend: """Interface for registering and updating WebSocket clients.""" def __init__(self, mqtt_client, topic_prefix): self.clients = list() self.mqtt_client = mqtt_client self.topic_prefix = topic_prefix def register(self, client): """Register a WebSocket connection for Mqtt updates.""" self.clients.append(client) def publish(self, message): try: json_message = json.loads(message) json_message = { "topic": f"{self.topic_prefix}/{json_message['topic']}", "payload": json_message.get["payload"], "retain": json_message.get("retain", False), } except: logger.warning("Unable to parse websocket message as valid json.") return logger.debug( f"Publishing mqtt message from websockets at {json_message['topic']}." ) self.mqtt_client.publish( json_message["topic"], json_message["payload"], retain=json_message["retain"], ) def run(self): def send(client, userdata, message): """Sends mqtt messages to clients.""" try: logger.debug(f"Received mqtt message on {message.topic}.") ws_message = json.dumps( { "topic": message.topic.replace(f"{self.topic_prefix}/", ""), "payload": message.payload.decode(), } ) except: # if the payload can't be decoded don't relay to clients logger.debug( f"MQTT payload for {message.topic} wasn't text. Skipping..." ) return for client in self.clients: try: client.send(ws_message) except: logger.debug( "Removing websocket client due to a closed connection." ) self.clients.remove(client) self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) def start(self): """Maintains mqtt subscription in the background.""" gevent.spawn(self.run) def create_app( frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor, mqtt_client, ): app = Flask(__name__) sockets = Sockets(app) @app.before_request def _db_connect(): database.connect() @app.teardown_request def _db_close(exc): if not database.is_closed(): database.close() app.frigate_config = frigate_config app.stats_tracking = stats_tracking app.detected_frames_processor = detected_frames_processor app.register_blueprint(bp) sockets.register_blueprint(ws) app.mqtt_backend = MqttBackend(mqtt_client, frigate_config.mqtt.topic_prefix) app.mqtt_backend.start() return app @bp.route("/") def is_healthy(): return "Frigate is running. Alive and healthy!" @bp.route("/events/summary") def events_summary(): has_clip = request.args.get("has_clip", type=int) has_snapshot = request.args.get("has_snapshot", type=int) clauses = [] if not has_clip is None: clauses.append((Event.has_clip == has_clip)) if not has_snapshot is None: clauses.append((Event.has_snapshot == has_snapshot)) if len(clauses) == 0: clauses.append((1 == 1)) groups = ( Event.select( Event.camera, Event.label, fn.strftime( "%Y-%m-%d", fn.datetime(Event.start_time, "unixepoch", "localtime") ).alias("day"), Event.zones, fn.COUNT(Event.id).alias("count"), ) .where(reduce(operator.and_, clauses)) .group_by( Event.camera, Event.label, fn.strftime( "%Y-%m-%d", fn.datetime(Event.start_time, "unixepoch", "localtime") ), Event.zones, ) ) return jsonify([e for e in groups.dicts()]) @bp.route("/events/") def event(id): try: return model_to_dict(Event.get(Event.id == id)) except DoesNotExist: return "Event not found", 404 @bp.route("/events//thumbnail.jpg") def event_thumbnail(id): format = request.args.get("format", "ios") thumbnail_bytes = None try: event = Event.get(Event.id == id) thumbnail_bytes = base64.b64decode(event.thumbnail) except DoesNotExist: # see if the object is currently being tracked try: camera_states = current_app.detected_frames_processor.camera_states.values() for camera_state in camera_states: if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if not tracked_obj is None: thumbnail_bytes = tracked_obj.get_thumbnail() except: return "Event not found", 404 if thumbnail_bytes is None: return "Event not found", 404 # android notifications prefer a 2:1 ratio if format == "android": jpg_as_np = np.frombuffer(thumbnail_bytes, dtype=np.uint8) img = cv2.imdecode(jpg_as_np, flags=1) thumbnail = cv2.copyMakeBorder( img, 0, 0, int(img.shape[1] * 0.5), int(img.shape[1] * 0.5), cv2.BORDER_CONSTANT, (0, 0, 0), ) ret, jpg = cv2.imencode(".jpg", thumbnail, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) thumbnail_bytes = jpg.tobytes() response = make_response(thumbnail_bytes) response.headers["Content-Type"] = "image/jpg" return response @bp.route("/events//snapshot.jpg") def event_snapshot(id): jpg_bytes = None try: event = Event.get(Event.id == id) if not event.has_snapshot: return "Snapshot not available", 404 # read snapshot from disk with open( os.path.join(CLIPS_DIR, f"{event.camera}-{id}.jpg"), "rb" ) as image_file: jpg_bytes = image_file.read() except DoesNotExist: # see if the object is currently being tracked try: camera_states = current_app.detected_frames_processor.camera_states.values() for camera_state in camera_states: if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if not tracked_obj is None: jpg_bytes = tracked_obj.get_jpg_bytes( timestamp=request.args.get("timestamp", type=int), bounding_box=request.args.get("bbox", type=int), crop=request.args.get("crop", type=int), height=request.args.get("h", type=int), ) except: return "Event not found", 404 except: return "Event not found", 404 response = make_response(jpg_bytes) response.headers["Content-Type"] = "image/jpg" return response @bp.route("/events") def events(): limit = request.args.get("limit", 100) camera = request.args.get("camera") label = request.args.get("label") zone = request.args.get("zone") after = request.args.get("after", type=float) before = request.args.get("before", type=float) has_clip = request.args.get("has_clip", type=int) has_snapshot = request.args.get("has_snapshot", type=int) include_thumbnails = request.args.get("include_thumbnails", default=1, type=int) clauses = [] excluded_fields = [] if camera: clauses.append((Event.camera == camera)) if label: clauses.append((Event.label == label)) if zone: clauses.append((Event.zones.cast("text") % f'*"{zone}"*')) if after: clauses.append((Event.start_time >= after)) if before: clauses.append((Event.start_time <= before)) if not has_clip is None: clauses.append((Event.has_clip == has_clip)) if not has_snapshot is None: clauses.append((Event.has_snapshot == has_snapshot)) if not include_thumbnails: excluded_fields.append(Event.thumbnail) if len(clauses) == 0: clauses.append((1 == 1)) events = ( Event.select() .where(reduce(operator.and_, clauses)) .order_by(Event.start_time.desc()) .limit(limit) ) return jsonify([model_to_dict(e, exclude=excluded_fields) for e in events]) @bp.route("/config") def config(): return jsonify(current_app.frigate_config.to_dict()) @bp.route("/version") def version(): return VERSION @bp.route("/stats") def stats(): stats = stats_snapshot(current_app.stats_tracking) return jsonify(stats) @bp.route("//