"""Event apis.""" import base64 import io import logging import os from datetime import datetime from functools import reduce from pathlib import Path from urllib.parse import unquote import cv2 import numpy as np from flask import ( Blueprint, current_app, jsonify, make_response, request, ) from peewee import JOIN, DoesNotExist, fn, operator from PIL import Image from playhouse.shortcuts import model_to_dict from frigate.const import ( CLIPS_DIR, ) from frigate.embeddings import EmbeddingsContext from frigate.embeddings.embeddings import get_metadata from frigate.models import Event, ReviewSegment, Timeline from frigate.object_processing import TrackedObject from frigate.util.builtin import get_tz_modifiers logger = logging.getLogger(__name__) EventBp = Blueprint("events", __name__) DEFAULT_TIME_RANGE = "00:00,24:00" @EventBp.route("/events") def events(): camera = request.args.get("camera", "all") cameras = request.args.get("cameras", "all") # handle old camera arg if cameras == "all" and camera != "all": cameras = camera label = unquote(request.args.get("label", "all")) labels = request.args.get("labels", "all") # handle old label arg if labels == "all" and label != "all": labels = label sub_label = request.args.get("sub_label", "all") sub_labels = request.args.get("sub_labels", "all") # handle old sub_label arg if sub_labels == "all" and sub_label != "all": sub_labels = sub_label zone = request.args.get("zone", "all") zones = request.args.get("zones", "all") # handle old label arg if zones == "all" and zone != "all": zones = zone limit = request.args.get("limit", 100) after = request.args.get("after", type=float) before = request.args.get("before", type=float) time_range = request.args.get("time_range", DEFAULT_TIME_RANGE) has_clip = request.args.get("has_clip", type=int) has_snapshot = request.args.get("has_snapshot", type=int) in_progress = request.args.get("in_progress", type=int) include_thumbnails = request.args.get("include_thumbnails", default=1, type=int) favorites = request.args.get("favorites", type=int) min_score = request.args.get("min_score", type=float) max_score = request.args.get("max_score", type=float) is_submitted = request.args.get("is_submitted", type=int) min_length = request.args.get("min_length", type=float) max_length = request.args.get("max_length", type=float) sort = request.args.get("sort", type=str) clauses = [] selected_columns = [ Event.id, Event.camera, Event.label, Event.zones, Event.start_time, Event.end_time, Event.has_clip, Event.has_snapshot, Event.plus_id, Event.retain_indefinitely, Event.sub_label, Event.top_score, Event.false_positive, Event.box, Event.data, ] if camera != "all": clauses.append((Event.camera == camera)) if cameras != "all": camera_list = cameras.split(",") clauses.append((Event.camera << camera_list)) if labels != "all": label_list = labels.split(",") clauses.append((Event.label << label_list)) if sub_labels != "all": # use matching so joined sub labels are included # for example a sub label 'bob' would get events # with sub labels 'bob' and 'bob, john' sub_label_clauses = [] filtered_sub_labels = sub_labels.split(",") if "None" in filtered_sub_labels: filtered_sub_labels.remove("None") sub_label_clauses.append((Event.sub_label.is_null())) for label in filtered_sub_labels: sub_label_clauses.append( (Event.sub_label.cast("text") == label) ) # include exact matches # include this label when part of a list sub_label_clauses.append((Event.sub_label.cast("text") % f"*{label},*")) sub_label_clauses.append((Event.sub_label.cast("text") % f"*, {label}*")) sub_label_clause = reduce(operator.or_, sub_label_clauses) clauses.append((sub_label_clause)) if zones != "all": # use matching so events with multiple zones # still match on a search where any zone matches zone_clauses = [] filtered_zones = zones.split(",") if "None" in filtered_zones: filtered_zones.remove("None") zone_clauses.append((Event.zones.length() == 0)) for zone in filtered_zones: zone_clauses.append((Event.zones.cast("text") % f'*"{zone}"*')) zone_clause = reduce(operator.or_, zone_clauses) clauses.append((zone_clause)) if after: clauses.append((Event.start_time > after)) if before: clauses.append((Event.start_time < before)) if time_range != DEFAULT_TIME_RANGE: # get timezone arg to ensure browser times are used tz_name = request.args.get("timezone", default="utc", type=str) hour_modifier, minute_modifier, _ = get_tz_modifiers(tz_name) times = time_range.split(",") time_after = times[0] time_before = times[1] start_hour_fun = fn.strftime( "%H:%M", fn.datetime(Event.start_time, "unixepoch", hour_modifier, minute_modifier), ) # cases where user wants events overnight, ex: from 20:00 to 06:00 # should use or operator if time_after > time_before: clauses.append( ( reduce( operator.or_, [(start_hour_fun > time_after), (start_hour_fun < time_before)], ) ) ) # all other cases should be and operator else: clauses.append((start_hour_fun > time_after)) clauses.append((start_hour_fun < time_before)) if has_clip is not None: clauses.append((Event.has_clip == has_clip)) if has_snapshot is not None: clauses.append((Event.has_snapshot == has_snapshot)) if in_progress is not None: clauses.append((Event.end_time.is_null(in_progress))) if include_thumbnails: selected_columns.append(Event.thumbnail) if favorites: clauses.append((Event.retain_indefinitely == favorites)) if max_score is not None: clauses.append((Event.data["score"] <= max_score)) if min_score is not None: clauses.append((Event.data["score"] >= min_score)) if min_length is not None: clauses.append(((Event.end_time - Event.start_time) >= min_length)) if max_length is not None: clauses.append(((Event.end_time - Event.start_time) <= max_length)) if is_submitted is not None: if is_submitted == 0: clauses.append((Event.plus_id.is_null())) elif is_submitted > 0: clauses.append((Event.plus_id != "")) if len(clauses) == 0: clauses.append((True)) if sort: if sort == "score_asc": order_by = Event.data["score"].asc() elif sort == "score_desc": order_by = Event.data["score"].desc() elif sort == "date_asc": order_by = Event.start_time.asc() elif sort == "date_desc": order_by = Event.start_time.desc() else: order_by = Event.start_time.desc() events = ( Event.select(*selected_columns) .where(reduce(operator.and_, clauses)) .order_by(order_by) .limit(limit) .dicts() .iterator() ) return jsonify(list(events)) @EventBp.route("/events/explore") def events_explore(): limit = request.args.get("limit", 10, type=int) subquery = Event.select( Event.id, Event.camera, Event.label, Event.zones, Event.start_time, Event.end_time, Event.has_clip, Event.has_snapshot, Event.plus_id, Event.retain_indefinitely, Event.sub_label, Event.top_score, Event.false_positive, Event.box, Event.data, fn.rank() .over(partition_by=[Event.label], order_by=[Event.start_time.desc()]) .alias("rank"), fn.COUNT(Event.id).over(partition_by=[Event.label]).alias("event_count"), ).alias("subquery") query = ( Event.select( subquery.c.id, subquery.c.camera, subquery.c.label, subquery.c.zones, subquery.c.start_time, subquery.c.end_time, subquery.c.has_clip, subquery.c.has_snapshot, subquery.c.plus_id, subquery.c.retain_indefinitely, subquery.c.sub_label, subquery.c.top_score, subquery.c.false_positive, subquery.c.box, subquery.c.data, subquery.c.event_count, ) .from_(subquery) .where(subquery.c.rank <= limit) .order_by(subquery.c.event_count.desc(), subquery.c.start_time.desc()) .dicts() ) events = query.iterator() return jsonify(list(events)) @EventBp.route("/event_ids") def event_ids(): idString = request.args.get("ids") ids = idString.split(",") if not ids: return make_response( jsonify({"success": False, "message": "Valid list of ids must be sent"}), 400, ) try: events = Event.select().where(Event.id << ids).dicts().iterator() return jsonify(list(events)) except Exception: return make_response( jsonify({"success": False, "message": "Events not found"}), 400 ) @EventBp.route("/events/search") def events_search(): query = request.args.get("query", type=str) search_type = request.args.get("search_type", "thumbnail,description", type=str) include_thumbnails = request.args.get("include_thumbnails", default=1, type=int) limit = request.args.get("limit", 50, type=int) # Filters cameras = request.args.get("cameras", "all", type=str) labels = request.args.get("labels", "all", type=str) zones = request.args.get("zones", "all", type=str) after = request.args.get("after", type=float) before = request.args.get("before", type=float) if not query: return make_response( jsonify( { "success": False, "message": "A search query must be supplied", } ), 400, ) if not current_app.frigate_config.semantic_search.enabled: return make_response( jsonify( { "success": False, "message": "Semantic search is not enabled", } ), 400, ) context: EmbeddingsContext = current_app.embeddings selected_columns = [ Event.id, Event.camera, Event.label, Event.sub_label, Event.zones, Event.start_time, Event.end_time, Event.has_clip, Event.has_snapshot, Event.data, Event.plus_id, ReviewSegment.thumb_path, ] if include_thumbnails: selected_columns.append(Event.thumbnail) # Build the where clause for the embeddings query embeddings_filters = [] if cameras != "all": camera_list = cameras.split(",") embeddings_filters.append({"camera": {"$in": camera_list}}) if labels != "all": label_list = labels.split(",") embeddings_filters.append({"label": {"$in": label_list}}) if zones != "all": filtered_zones = zones.split(",") zone_filters = [{f"zones_{zone}": {"$eq": True}} for zone in filtered_zones] if len(zone_filters) > 1: embeddings_filters.append({"$or": zone_filters}) else: embeddings_filters.append(zone_filters[0]) if after: embeddings_filters.append({"start_time": {"$gt": after}}) if before: embeddings_filters.append({"start_time": {"$lt": before}}) where = None if len(embeddings_filters) > 1: where = {"$and": embeddings_filters} elif len(embeddings_filters) == 1: where = embeddings_filters[0] thumb_ids = {} desc_ids = {} if search_type == "similarity": # Grab the ids of events that match the thumbnail image embeddings try: search_event: Event = Event.get(Event.id == query) except DoesNotExist: return make_response( jsonify( { "success": False, "message": "Event not found", } ), 404, ) thumbnail = base64.b64decode(search_event.thumbnail) img = np.array(Image.open(io.BytesIO(thumbnail)).convert("RGB")) thumb_result = context.embeddings.thumbnail.query( query_images=[img], n_results=limit, where=where, ) thumb_ids = dict( zip( thumb_result["ids"][0], context.thumb_stats.normalize(thumb_result["distances"][0]), ) ) else: search_types = search_type.split(",") if "thumbnail" in search_types: thumb_result = context.embeddings.thumbnail.query( query_texts=[query], n_results=limit, where=where, ) # Do a rudimentary normalization of the difference in distances returned by CLIP and MiniLM. thumb_ids = dict( zip( thumb_result["ids"][0], context.thumb_stats.normalize(thumb_result["distances"][0]), ) ) if "description" in search_types: desc_result = context.embeddings.description.query( query_texts=[query], n_results=limit, where=where, ) desc_ids = dict( zip( desc_result["ids"][0], context.desc_stats.normalize(desc_result["distances"][0]), ) ) results = {} for event_id in thumb_ids.keys() | desc_ids: min_distance = min( i for i in (thumb_ids.get(event_id), desc_ids.get(event_id)) if i is not None ) results[event_id] = { "distance": min_distance, "source": "thumbnail" if min_distance == thumb_ids.get(event_id) else "description", } if not results: return jsonify([]) # Get the event data events = ( Event.select(*selected_columns) .join( ReviewSegment, JOIN.LEFT_OUTER, on=(fn.json_extract(ReviewSegment.data, "$.detections").contains(Event.id)), ) .where(Event.id << list(results.keys())) .dicts() .iterator() ) events = list(events) events = [ {k: v for k, v in event.items() if k != "data"} | { k: v for k, v in event["data"].items() if k in ["type", "score", "top_score", "description"] } | { "search_distance": results[event["id"]]["distance"], "search_source": results[event["id"]]["source"], } for event in events ] events = sorted(events, key=lambda x: x["search_distance"])[:limit] return jsonify(events) @EventBp.route("/events/summary") def events_summary(): tz_name = request.args.get("timezone", default="utc", type=str) hour_modifier, minute_modifier, seconds_offset = get_tz_modifiers(tz_name) has_clip = request.args.get("has_clip", type=int) has_snapshot = request.args.get("has_snapshot", type=int) clauses = [] if has_clip is not None: clauses.append((Event.has_clip == has_clip)) if has_snapshot is not None: clauses.append((Event.has_snapshot == has_snapshot)) if len(clauses) == 0: clauses.append((True)) groups = ( Event.select( Event.camera, Event.label, Event.sub_label, fn.strftime( "%Y-%m-%d", fn.datetime( Event.start_time, "unixepoch", hour_modifier, minute_modifier ), ).alias("day"), Event.zones, fn.COUNT(Event.id).alias("count"), ) .where(reduce(operator.and_, clauses)) .group_by( Event.camera, Event.label, Event.sub_label, (Event.start_time + seconds_offset).cast("int") / (3600 * 24), Event.zones, ) ) return jsonify([e for e in groups.dicts()]) @EventBp.route("/events/", methods=("GET",)) def event(id): try: return model_to_dict(Event.get(Event.id == id)) except DoesNotExist: return "Event not found", 404 @EventBp.route("/events//retain", methods=("POST",)) def set_retain(id): try: event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) event.retain_indefinitely = True event.save() return make_response( jsonify({"success": True, "message": "Event " + id + " retained"}), 200 ) @EventBp.route("/events//plus", methods=("POST",)) def send_to_plus(id): if not current_app.plus_api.is_active(): message = "PLUS_API_KEY environment variable is not set" logger.error(message) return make_response( jsonify( { "success": False, "message": message, } ), 400, ) include_annotation = ( request.json.get("include_annotation") if request.is_json else None ) try: event = Event.get(Event.id == id) except DoesNotExist: message = f"Event {id} not found" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 404) # events from before the conversion to relative dimensions cant include annotations if event.data.get("box") is None: include_annotation = None if event.end_time is None: logger.error(f"Unable to load clean png for in-progress event: {event.id}") return make_response( jsonify( { "success": False, "message": "Unable to load clean png for in-progress event", } ), 400, ) if event.plus_id: message = "Already submitted to plus" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) # load clean.png try: filename = f"{event.camera}-{event.id}-clean.png" image = cv2.imread(os.path.join(CLIPS_DIR, filename)) except Exception: logger.error(f"Unable to load clean png for event: {event.id}") return make_response( jsonify( {"success": False, "message": "Unable to load clean png for event"} ), 400, ) if image is None or image.size == 0: logger.error(f"Unable to load clean png for event: {event.id}") return make_response( jsonify( {"success": False, "message": "Unable to load clean png for event"} ), 400, ) try: plus_id = current_app.plus_api.upload_image(image, event.camera) except Exception as ex: logger.exception(ex) return make_response( jsonify({"success": False, "message": "Error uploading image"}), 400, ) # store image id in the database event.plus_id = plus_id event.save() if include_annotation is not None: box = event.data["box"] try: current_app.plus_api.add_annotation( event.plus_id, box, event.label, ) except ValueError: message = "Error uploading annotation, unsupported label provided." logger.error(message) return make_response( jsonify({"success": False, "message": message}), 400, ) except Exception as ex: logger.exception(ex) return make_response( jsonify({"success": False, "message": "Error uploading annotation"}), 400, ) return make_response(jsonify({"success": True, "plus_id": plus_id}), 200) @EventBp.route("/events//false_positive", methods=("PUT",)) def false_positive(id): if not current_app.plus_api.is_active(): message = "PLUS_API_KEY environment variable is not set" logger.error(message) return make_response( jsonify( { "success": False, "message": message, } ), 400, ) try: event = Event.get(Event.id == id) except DoesNotExist: message = f"Event {id} not found" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 404) # events from before the conversion to relative dimensions cant include annotations if event.data.get("box") is None: message = "Events prior to 0.13 cannot be submitted as false positives" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) if event.false_positive: message = "False positive already submitted to Frigate+" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) if not event.plus_id: plus_response = send_to_plus(id) if plus_response.status_code != 200: return plus_response # need to refetch the event now that it has a plus_id event = Event.get(Event.id == id) region = event.data["region"] box = event.data["box"] # provide top score if score is unavailable score = ( (event.data["top_score"] if event.data["top_score"] else event.top_score) if event.data["score"] is None else event.data["score"] ) try: current_app.plus_api.add_false_positive( event.plus_id, region, box, score, event.label, event.model_hash, event.model_type, event.detector_type, ) except ValueError: message = "Error uploading false positive, unsupported label provided." logger.error(message) return make_response( jsonify({"success": False, "message": message}), 400, ) except Exception as ex: logger.exception(ex) return make_response( jsonify({"success": False, "message": "Error uploading false positive"}), 400, ) event.false_positive = True event.save() return make_response(jsonify({"success": True, "plus_id": event.plus_id}), 200) @EventBp.route("/events//retain", methods=("DELETE",)) def delete_retain(id): try: event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) event.retain_indefinitely = False event.save() return make_response( jsonify({"success": True, "message": "Event " + id + " un-retained"}), 200 ) @EventBp.route("/events//sub_label", methods=("POST",)) def set_sub_label(id): try: event: Event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) json: dict[str, any] = request.get_json(silent=True) or {} new_sub_label = json.get("subLabel") new_score = json.get("subLabelScore") if new_sub_label is None: return make_response( jsonify( { "success": False, "message": "A sub label must be supplied", } ), 400, ) if new_sub_label and len(new_sub_label) > 100: return make_response( jsonify( { "success": False, "message": new_sub_label + " exceeds the 100 character limit for sub_label", } ), 400, ) if new_score is not None and (new_score > 1.0 or new_score < 0): return make_response( jsonify( { "success": False, "message": new_score + " does not fit within the expected bounds 0 <= score <= 1.0", } ), 400, ) if not event.end_time: # update tracked object tracked_obj: TrackedObject = ( current_app.detected_frames_processor.camera_states[ event.camera ].tracked_objects.get(event.id) ) if tracked_obj: tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score) # update timeline items Timeline.update( data=Timeline.data.update({"sub_label": (new_sub_label, new_score)}) ).where(Timeline.source_id == id).execute() event.sub_label = new_sub_label if new_score: data = event.data data["sub_label_score"] = new_score event.data = data event.save() return make_response( jsonify( { "success": True, "message": "Event " + id + " sub label set to " + new_sub_label, } ), 200, ) @EventBp.route("/events//description", methods=("POST",)) def set_description(id): try: event: Event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) json: dict[str, any] = request.get_json(silent=True) or {} new_description = json.get("description") if new_description is None or len(new_description) == 0: return make_response( jsonify( { "success": False, "message": "description cannot be empty", } ), 400, ) event.data["description"] = new_description event.save() # If semantic search is enabled, update the index if current_app.frigate_config.semantic_search.enabled: context: EmbeddingsContext = current_app.embeddings context.embeddings.description.upsert( documents=[new_description], metadatas=[get_metadata(event)], ids=[id], ) return make_response( jsonify( { "success": True, "message": "Event " + id + " description set to " + new_description, } ), 200, ) @EventBp.route("/events/", methods=("DELETE",)) def delete_event(id): try: event = Event.get(Event.id == id) except DoesNotExist: return make_response( jsonify({"success": False, "message": "Event " + id + " not found"}), 404 ) media_name = f"{event.camera}-{event.id}" if event.has_snapshot: media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg") media.unlink(missing_ok=True) media = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png") media.unlink(missing_ok=True) if event.has_clip: media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4") media.unlink(missing_ok=True) event.delete_instance() Timeline.delete().where(Timeline.source_id == id).execute() # If semantic search is enabled, update the index if current_app.frigate_config.semantic_search.enabled: context: EmbeddingsContext = current_app.embeddings context.embeddings.thumbnail.delete(ids=[id]) context.embeddings.description.delete(ids=[id]) return make_response( jsonify({"success": True, "message": "Event " + id + " deleted"}), 200 ) @EventBp.route("/events//