import base64 import datetime import json import logging import os import time from functools import reduce import cv2 import gevent import numpy as np from flask import (Blueprint, Flask, Response, current_app, jsonify, make_response, request) from flask_sockets import Sockets from peewee import SqliteDatabase, operator, fn, DoesNotExist from playhouse.shortcuts import model_to_dict from frigate.const import CLIPS_DIR from frigate.models import Event from frigate.stats import stats_snapshot from frigate.util import calculate_region from frigate.version import VERSION logger = logging.getLogger(__name__) bp = Blueprint('frigate', __name__) ws = Blueprint('ws', __name__) class MqttBackend(): """Interface for registering and updating WebSocket clients.""" def __init__(self, mqtt_client, topic_prefix): self.clients = list() self.mqtt_client = mqtt_client self.topic_prefix = topic_prefix def register(self, client): """Register a WebSocket connection for Mqtt updates.""" self.clients.append(client) def publish(self, message): try: json_message = json.loads(message) json_message = { 'topic': f"{self.topic_prefix}/{json_message['topic']}", 'payload': json_message['payload'], 'retain': json_message.get('retain', False) } except: logger.warning("Unable to parse websocket message as valid json.") return logger.debug(f"Publishing mqtt message from websockets at {json_message['topic']}.") self.mqtt_client.publish(json_message['topic'], json_message['payload'], retain=json_message['retain']) def run(self): def send(client, userdata, message): """Sends mqtt messages to clients.""" try: logger.debug(f"Received mqtt message on {message.topic}.") ws_message = json.dumps({ 'topic': message.topic.replace(f"{self.topic_prefix}/",""), 'payload': message.payload.decode() }) except: # if the payload can't be decoded don't relay to clients logger.debug(f"MQTT payload for {message.topic} wasn't text. Skipping...") return for client in self.clients: try: client.send(ws_message) except: logger.debug("Removing websocket client due to a closed connection.") self.clients.remove(client) self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) def start(self): """Maintains mqtt subscription in the background.""" gevent.spawn(self.run) def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor, mqtt_client): app = Flask(__name__) sockets = Sockets(app) @app.before_request def _db_connect(): database.connect() @app.teardown_request def _db_close(exc): if not database.is_closed(): database.close() app.frigate_config = frigate_config app.stats_tracking = stats_tracking app.detected_frames_processor = detected_frames_processor app.register_blueprint(bp) sockets.register_blueprint(ws) app.mqtt_backend = MqttBackend(mqtt_client, frigate_config.mqtt.topic_prefix) app.mqtt_backend.start() return app @bp.route('/') def is_healthy(): return "Frigate is running. Alive and healthy!" @bp.route('/events/summary') def events_summary(): has_clip = request.args.get('has_clip', type=int) has_snapshot = request.args.get('has_snapshot', type=int) clauses = [] if not has_clip is None: clauses.append((Event.has_clip == has_clip)) if not has_snapshot is None: clauses.append((Event.has_snapshot == has_snapshot)) if len(clauses) == 0: clauses.append((1 == 1)) groups = ( Event .select( Event.camera, Event.label, fn.strftime('%Y-%m-%d', fn.datetime(Event.start_time, 'unixepoch', 'localtime')).alias('day'), Event.zones, fn.COUNT(Event.id).alias('count') ) .where(reduce(operator.and_, clauses)) .group_by( Event.camera, Event.label, fn.strftime('%Y-%m-%d', fn.datetime(Event.start_time, 'unixepoch', 'localtime')), Event.zones ) ) return jsonify([e for e in groups.dicts()]) @bp.route('/events/') def event(id): try: return model_to_dict(Event.get(Event.id == id)) except DoesNotExist: return "Event not found", 404 @bp.route('/events//thumbnail.jpg') def event_thumbnail(id): format = request.args.get('format', 'ios') thumbnail_bytes = None try: event = Event.get(Event.id == id) thumbnail_bytes = base64.b64decode(event.thumbnail) except DoesNotExist: # see if the object is currently being tracked try: for camera_state in current_app.detected_frames_processor.camera_states.values(): if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if not tracked_obj is None: thumbnail_bytes = tracked_obj.get_thumbnail() except: return "Event not found", 404 if thumbnail_bytes is None: return "Event not found", 404 # android notifications prefer a 2:1 ratio if format == 'android': jpg_as_np = np.frombuffer(thumbnail_bytes, dtype=np.uint8) img = cv2.imdecode(jpg_as_np, flags=1) thumbnail = cv2.copyMakeBorder(img, 0, 0, int(img.shape[1]*0.5), int(img.shape[1]*0.5), cv2.BORDER_CONSTANT, (0,0,0)) ret, jpg = cv2.imencode('.jpg', thumbnail, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) thumbnail_bytes = jpg.tobytes() response = make_response(thumbnail_bytes) response.headers['Content-Type'] = 'image/jpg' return response @bp.route('/events//snapshot.jpg') def event_snapshot(id): jpg_bytes = None try: event = Event.get(Event.id == id) if not event.has_snapshot: return "Snapshot not available", 404 # read snapshot from disk with open(os.path.join(CLIPS_DIR, f"{event.camera}-{id}.jpg"), 'rb') as image_file: jpg_bytes = image_file.read() except DoesNotExist: # see if the object is currently being tracked try: for camera_state in current_app.detected_frames_processor.camera_states.values(): if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if not tracked_obj is None: jpg_bytes = tracked_obj.get_jpg_bytes( timestamp=request.args.get('timestamp', type=int), bounding_box=request.args.get('bbox', type=int), crop=request.args.get('crop', type=int), height=request.args.get('h', type=int) ) except: return "Event not found", 404 except: return "Event not found", 404 response = make_response(jpg_bytes) response.headers['Content-Type'] = 'image/jpg' return response @bp.route('/events') def events(): limit = request.args.get('limit', 100) camera = request.args.get('camera') label = request.args.get('label') zone = request.args.get('zone') after = request.args.get('after', type=float) before = request.args.get('before', type=float) has_clip = request.args.get('has_clip', type=int) has_snapshot = request.args.get('has_snapshot', type=int) include_thumbnails = request.args.get('include_thumbnails', default=1, type=int) clauses = [] excluded_fields = [] if camera: clauses.append((Event.camera == camera)) if label: clauses.append((Event.label == label)) if zone: clauses.append((Event.zones.cast('text') % f"*\"{zone}\"*")) if after: clauses.append((Event.start_time >= after)) if before: clauses.append((Event.start_time <= before)) if not has_clip is None: clauses.append((Event.has_clip == has_clip)) if not has_snapshot is None: clauses.append((Event.has_snapshot == has_snapshot)) if not include_thumbnails: excluded_fields.append(Event.thumbnail) if len(clauses) == 0: clauses.append((1 == 1)) events = (Event.select() .where(reduce(operator.and_, clauses)) .order_by(Event.start_time.desc()) .limit(limit)) return jsonify([model_to_dict(e, exclude=excluded_fields) for e in events]) @bp.route('/config') def config(): return jsonify(current_app.frigate_config.to_dict()) @bp.route('/version') def version(): return VERSION @bp.route('/stats') def stats(): stats = stats_snapshot(current_app.stats_tracking) return jsonify(stats) @bp.route('//