import base64 import datetime import logging import os import time from functools import reduce import cv2 import numpy as np from flask import (Blueprint, Flask, Response, current_app, jsonify, make_response, request) from peewee import SqliteDatabase, operator, fn, DoesNotExist from playhouse.shortcuts import model_to_dict from frigate.const import CLIPS_DIR from frigate.models import Event from frigate.stats import stats_snapshot from frigate.util import calculate_region from frigate.version import VERSION logger = logging.getLogger(__name__) bp = Blueprint('frigate', __name__) def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor): app = Flask(__name__) @app.before_request def _db_connect(): database.connect() @app.teardown_request def _db_close(exc): if not database.is_closed(): database.close() app.frigate_config = frigate_config app.stats_tracking = stats_tracking app.detected_frames_processor = detected_frames_processor app.register_blueprint(bp) return app @bp.route('/') def is_healthy(): return "Frigate is running. Alive and healthy!" @bp.route('/events/summary') def events_summary(): has_clip = request.args.get('has_clip', type=int) has_snapshot = request.args.get('has_snapshot', type=int) clauses = [] if not has_clip is None: clauses.append((Event.has_clip == has_clip)) if not has_snapshot is None: clauses.append((Event.has_snapshot == has_snapshot)) if len(clauses) == 0: clauses.append((1 == 1)) groups = ( Event .select( Event.camera, Event.label, fn.strftime('%Y-%m-%d', fn.datetime(Event.start_time, 'unixepoch', 'localtime')).alias('day'), Event.zones, fn.COUNT(Event.id).alias('count') ) .where(reduce(operator.and_, clauses)) .group_by( Event.camera, Event.label, fn.strftime('%Y-%m-%d', fn.datetime(Event.start_time, 'unixepoch', 'localtime')), Event.zones ) ) return jsonify([e for e in groups.dicts()]) @bp.route('/events/') def event(id): try: return model_to_dict(Event.get(Event.id == id)) except DoesNotExist: return "Event not found", 404 @bp.route('/events//thumbnail.jpg') def event_thumbnail(id): format = request.args.get('format', 'ios') thumbnail_bytes = None try: event = Event.get(Event.id == id) thumbnail_bytes = base64.b64decode(event.thumbnail) except DoesNotExist: # see if the object is currently being tracked try: for camera_state in current_app.detected_frames_processor.camera_states.values(): if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if not tracked_obj is None: thumbnail_bytes = tracked_obj.get_thumbnail() except: return "Event not found", 404 if thumbnail_bytes is None: return "Event not found", 404 # android notifications prefer a 2:1 ratio if format == 'android': jpg_as_np = np.frombuffer(thumbnail_bytes, dtype=np.uint8) img = cv2.imdecode(jpg_as_np, flags=1) thumbnail = cv2.copyMakeBorder(img, 0, 0, int(img.shape[1]*0.5), int(img.shape[1]*0.5), cv2.BORDER_CONSTANT, (0,0,0)) ret, jpg = cv2.imencode('.jpg', thumbnail, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) thumbnail_bytes = jpg.tobytes() response = make_response(thumbnail_bytes) response.headers['Content-Type'] = 'image/jpg' return response @bp.route('/events//snapshot.jpg') def event_snapshot(id): jpg_bytes = None try: event = Event.get(Event.id == id) if not event.has_snapshot: return "Snapshot not available", 404 # read snapshot from disk with open(os.path.join(CLIPS_DIR, f"{event.camera}-{id}.jpg"), 'rb') as image_file: jpg_bytes = image_file.read() except DoesNotExist: # see if the object is currently being tracked try: for camera_state in current_app.detected_frames_processor.camera_states.values(): if id in camera_state.tracked_objects: tracked_obj = camera_state.tracked_objects.get(id) if not tracked_obj is None: jpg_bytes = tracked_obj.get_jpg_bytes( timestamp=request.args.get('timestamp', type=int), bounding_box=request.args.get('bbox', type=int), crop=request.args.get('crop', type=int), height=request.args.get('h', type=int) ) except: return "Event not found", 404 except: return "Event not found", 404 response = make_response(jpg_bytes) response.headers['Content-Type'] = 'image/jpg' return response @bp.route('/events') def events(): limit = request.args.get('limit', 100) camera = request.args.get('camera') label = request.args.get('label') zone = request.args.get('zone') after = request.args.get('after', type=float) before = request.args.get('before', type=float) has_clip = request.args.get('has_clip', type=int) has_snapshot = request.args.get('has_snapshot', type=int) include_thumbnails = request.args.get('include_thumbnails', default=1, type=int) clauses = [] excluded_fields = [] if camera: clauses.append((Event.camera == camera)) if label: clauses.append((Event.label == label)) if zone: clauses.append((Event.zones.cast('text') % f"*\"{zone}\"*")) if after: clauses.append((Event.start_time >= after)) if before: clauses.append((Event.start_time <= before)) if not has_clip is None: clauses.append((Event.has_clip == has_clip)) if not has_snapshot is None: clauses.append((Event.has_snapshot == has_snapshot)) if not include_thumbnails: excluded_fields.append(Event.thumbnail) if len(clauses) == 0: clauses.append((1 == 1)) events = (Event.select() .where(reduce(operator.and_, clauses)) .order_by(Event.start_time.desc()) .limit(limit)) return jsonify([model_to_dict(e, exclude=excluded_fields) for e in events]) @bp.route('/config') def config(): return jsonify(current_app.frigate_config.to_dict()) @bp.route('/version') def version(): return VERSION @bp.route('/stats') def stats(): stats = stats_snapshot(current_app.stats_tracking) return jsonify(stats) @bp.route('//