Chat improvements (#22823)

* Add score fusion helpers for find_similar_objects chat tool

* Add candidate query builder for find_similar_objects chat tool

* register find_similar_objects chat tool definition

* implement _execute_find_similar_objects chat tool dispatcher

* Dispatch find_similar_objects in chat tool executor

* Teach chat system prompt when to use find_similar_objects

* Add i18n strings for find_similar_objects chat tool

* Add frontend extractor for find_similar_objects tool response

* Render anchor badge and similarity scores in chat results

* formatting

* filter similarity results in python, not sqlite-vec

* extract pure chat helpers to chat_util module

* Teach chat system prompt about attached_event marker

* Add parseAttachedEvent and prependAttachment helpers

* Add i18n strings for chat event attachments

* Add ChatAttachmentChip component

* Make chat thumbnails attach to composer on click

* Render attachment chip in user chat bubbles

* Add ChatQuickReplies pill row component

* Add ChatPaperclipButton with event picker popover

* Wire event attachments into chat composer and messages

* add ability to stop streaming

* tweak cursor to appear at the end of the same line of the streaming response

* use abort signal

* add tooltip

* display label and camera on attachment chip
This commit is contained in:
Josh Hawkins
2026-04-09 15:31:37 -05:00
committed by GitHub
parent 556d5d8c9d
commit 98c2fe00c1
12 changed files with 1318 additions and 109 deletions

View File

@@ -3,9 +3,11 @@
import base64
import json
import logging
import operator
import time
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional
from functools import reduce
from typing import Any, Dict, List, Optional
import cv2
from fastapi import APIRouter, Body, Depends, Request
@@ -17,6 +19,14 @@ from frigate.api.auth import (
get_allowed_cameras_for_filter,
require_camera_access,
)
from frigate.api.chat_util import (
chunk_content,
distance_to_score,
format_events_with_local_time,
fuse_scores,
hydrate_event,
parse_iso_to_timestamp,
)
from frigate.api.defs.query.events_query_parameters import EventsQueryParams
from frigate.api.defs.request.chat_body import ChatCompletionRequest
from frigate.api.defs.response.chat_response import (
@@ -32,55 +42,13 @@ from frigate.jobs.vlm_watch import (
start_vlm_watch_job,
stop_vlm_watch_job,
)
from frigate.models import Event
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.chat])
def _chunk_content(content: str, chunk_size: int = 80) -> Generator[str, None, None]:
"""Yield content in word-aware chunks for streaming."""
if not content:
return
words = content.split(" ")
current: List[str] = []
current_len = 0
for w in words:
current.append(w)
current_len += len(w) + 1
if current_len >= chunk_size:
yield " ".join(current) + " "
current = []
current_len = 0
if current:
yield " ".join(current)
def _format_events_with_local_time(
events_list: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Add human-readable local start/end times to each event for the LLM."""
result = []
for evt in events_list:
if not isinstance(evt, dict):
result.append(evt)
continue
copy_evt = dict(evt)
try:
start_ts = evt.get("start_time")
end_ts = evt.get("end_time")
if start_ts is not None:
dt_start = datetime.fromtimestamp(start_ts)
copy_evt["start_time_local"] = dt_start.strftime("%Y-%m-%d %I:%M:%S %p")
if end_ts is not None:
dt_end = datetime.fromtimestamp(end_ts)
copy_evt["end_time_local"] = dt_end.strftime("%Y-%m-%d %I:%M:%S %p")
except (TypeError, ValueError, OSError):
pass
result.append(copy_evt)
return result
class ToolExecuteRequest(BaseModel):
"""Request model for tool execution."""
@@ -158,6 +126,76 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
"required": [],
},
},
{
"type": "function",
"function": {
"name": "find_similar_objects",
"description": (
"Find tracked objects that are visually and semantically similar "
"to a specific past event. Use this when the user references a "
"particular object they have seen and wants to find other "
"sightings of the same or similar one ('that green car', 'the "
"person in the red jacket', 'the package that was delivered'). "
"Prefer this over search_objects whenever the user's intent is "
"'find more like this specific one.' Use search_objects first "
"only if you need to locate the anchor event. Requires semantic "
"search to be enabled."
),
"parameters": {
"type": "object",
"properties": {
"event_id": {
"type": "string",
"description": "The id of the anchor event to find similar objects to.",
},
"after": {
"type": "string",
"description": "Start time in ISO 8601 format (e.g., '2024-01-01T00:00:00Z').",
},
"before": {
"type": "string",
"description": "End time in ISO 8601 format (e.g., '2024-01-01T23:59:59Z').",
},
"cameras": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of cameras to restrict to. Defaults to all.",
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of labels to restrict to. Defaults to the anchor event's label.",
},
"sub_labels": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of sub_labels (names) to restrict to.",
},
"zones": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of zones. An event matches if any of its zones overlap.",
},
"similarity_mode": {
"type": "string",
"enum": ["visual", "semantic", "fused"],
"description": "Which similarity signal(s) to use. 'fused' (default) combines visual and semantic.",
"default": "fused",
},
"min_score": {
"type": "number",
"description": "Drop matches with a similarity score below this threshold (0.0-1.0).",
},
"limit": {
"type": "integer",
"description": "Maximum number of matches to return (default: 10).",
"default": 10,
},
},
"required": ["event_id"],
},
},
},
{
"type": "function",
"function": {
@@ -434,6 +472,166 @@ async def _execute_search_objects(
)
async def _execute_find_similar_objects(
request: Request,
arguments: Dict[str, Any],
allowed_cameras: List[str],
) -> Dict[str, Any]:
"""Execute the find_similar_objects tool.
Returns a plain dict (not JSONResponse) so the chat loop can embed it
directly in tool-result messages.
"""
# 1. Semantic search enabled?
config = request.app.frigate_config
if not getattr(config.semantic_search, "enabled", False):
return {
"error": "semantic_search_disabled",
"message": (
"Semantic search must be enabled to find similar objects. "
"Enable it in the Frigate config under semantic_search."
),
}
context = request.app.embeddings
if context is None:
return {
"error": "semantic_search_disabled",
"message": "Embeddings context is not available.",
}
# 2. Anchor lookup.
event_id = arguments.get("event_id")
if not event_id:
return {"error": "missing_event_id", "message": "event_id is required."}
try:
anchor = Event.get(Event.id == event_id)
except Event.DoesNotExist:
return {
"error": "anchor_not_found",
"message": f"Could not find event {event_id}.",
}
# 3. Parse params.
after = parse_iso_to_timestamp(arguments.get("after"))
before = parse_iso_to_timestamp(arguments.get("before"))
cameras = arguments.get("cameras")
if cameras:
# Respect RBAC: intersect with the user's allowed cameras.
cameras = [c for c in cameras if c in allowed_cameras]
else:
cameras = list(allowed_cameras) if allowed_cameras else None
labels = arguments.get("labels") or [anchor.label]
sub_labels = arguments.get("sub_labels")
zones = arguments.get("zones")
similarity_mode = arguments.get("similarity_mode", "fused")
if similarity_mode not in ("visual", "semantic", "fused"):
similarity_mode = "fused"
min_score = arguments.get("min_score")
limit = int(arguments.get("limit", 10))
limit = max(1, min(limit, 50))
# 4. Run similarity searches. We deliberately do NOT pass event_ids into
# the vec queries — the IN filter on sqlite-vec is broken in the installed
# version (see frigate/embeddings/__init__.py). Mirror the pattern used by
# frigate/api/event.py events_search: fetch top-k globally, then intersect
# with the structured filters via Peewee.
visual_distances: Dict[str, float] = {}
description_distances: Dict[str, float] = {}
try:
if similarity_mode in ("visual", "fused"):
rows = context.search_thumbnail(anchor)
visual_distances = {row[0]: row[1] for row in rows}
if similarity_mode in ("semantic", "fused"):
query_text = (
(anchor.data or {}).get("description")
or anchor.sub_label
or anchor.label
)
rows = context.search_description(query_text)
description_distances = {row[0]: row[1] for row in rows}
except Exception:
logger.exception("Similarity search failed")
return {
"error": "similarity_search_failed",
"message": "Failed to run similarity search.",
}
vec_ids = set(visual_distances) | set(description_distances)
vec_ids.discard(anchor.id)
# vec layer returns up to k=100 per modality; flag when we hit that ceiling
# so the LLM can mention there may be more matches beyond what we saw.
candidate_truncated = (
len(visual_distances) >= 100 or len(description_distances) >= 100
)
if not vec_ids:
return {
"anchor": hydrate_event(anchor),
"results": [],
"similarity_mode": similarity_mode,
"candidate_truncated": candidate_truncated,
}
# 5. Apply structured filters, intersected with vec hits.
clauses = [Event.id.in_(list(vec_ids))]
if after is not None:
clauses.append(Event.start_time >= after)
if before is not None:
clauses.append(Event.start_time <= before)
if cameras:
clauses.append(Event.camera.in_(cameras))
if labels:
clauses.append(Event.label.in_(labels))
if sub_labels:
clauses.append(Event.sub_label.in_(sub_labels))
if zones:
# Mirror the pattern used by frigate/api/event.py for JSON-array zone match.
zone_clauses = [Event.zones.cast("text") % f'*"{zone}"*' for zone in zones]
clauses.append(reduce(operator.or_, zone_clauses))
eligible = {e.id: e for e in Event.select().where(reduce(operator.and_, clauses))}
# 6. Fuse and rank.
scored: List[tuple[str, float]] = []
for eid in eligible:
v_score = (
distance_to_score(visual_distances[eid], context.thumb_stats)
if eid in visual_distances
else None
)
d_score = (
distance_to_score(description_distances[eid], context.desc_stats)
if eid in description_distances
else None
)
fused = fuse_scores(v_score, d_score)
if fused is None:
continue
if min_score is not None and fused < min_score:
continue
scored.append((eid, fused))
scored.sort(key=lambda pair: pair[1], reverse=True)
scored = scored[:limit]
results = [hydrate_event(eligible[eid], score=score) for eid, score in scored]
return {
"anchor": hydrate_event(anchor),
"results": results,
"similarity_mode": similarity_mode,
"candidate_truncated": candidate_truncated,
}
@router.post(
"/chat/execute",
dependencies=[Depends(allow_any_authenticated())],
@@ -459,6 +657,13 @@ async def execute_tool(
if tool_name == "search_objects":
return await _execute_search_objects(arguments, allowed_cameras)
if tool_name == "find_similar_objects":
result = await _execute_find_similar_objects(
request, arguments, allowed_cameras
)
status_code = 200 if "error" not in result else 400
return JSONResponse(content=result, status_code=status_code)
if tool_name == "set_camera_state":
result = await _execute_set_camera_state(request, arguments)
return JSONResponse(
@@ -642,6 +847,8 @@ async def _execute_tool_internal(
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(f"Failed to extract tool result: {e}")
return {"error": "Failed to parse tool result"}
elif tool_name == "find_similar_objects":
return await _execute_find_similar_objects(request, arguments, allowed_cameras)
elif tool_name == "set_camera_state":
return await _execute_set_camera_state(request, arguments)
elif tool_name == "get_live_context":
@@ -664,8 +871,9 @@ async def _execute_tool_internal(
return _execute_get_recap(arguments, allowed_cameras)
else:
logger.error(
"Tool call failed: unknown tool %r. Expected one of: search_objects, get_live_context, "
"start_camera_watch, stop_camera_watch, get_profile_status, get_recap. Arguments received: %s",
"Tool call failed: unknown tool %r. Expected one of: search_objects, find_similar_objects, "
"get_live_context, start_camera_watch, stop_camera_watch, get_profile_status, get_recap. "
"Arguments received: %s",
tool_name,
json.dumps(arguments),
)
@@ -927,7 +1135,7 @@ async def _execute_pending_tools(
json.dumps(tool_args),
)
if tool_name == "search_objects" and isinstance(tool_result, list):
tool_result = _format_events_with_local_time(tool_result)
tool_result = format_events_with_local_time(tool_result)
_keys = {
"id",
"camera",
@@ -1080,7 +1288,9 @@ Do not start your response with phrases like "I will check...", "Let me see...",
Always present times to the user in the server's local timezone. When tool results include start_time_local and end_time_local, use those exact strings when listing or describing detection times—do not convert or invent timestamps. Do not use UTC or ISO format with Z for the user-facing answer unless the tool result only provides Unix timestamps without local time fields.
When users ask about "today", "yesterday", "this week", etc., use the current date above as reference.
When searching for objects or events, use ISO 8601 format for dates (e.g., {current_date_str}T00:00:00Z for the start of today).
Always be accurate with time calculations based on the current date provided.{cameras_section}"""
Always be accurate with time calculations based on the current date provided.
When a user refers to a specific object they have seen or describe with identifying details ("that green car", "the person in the red jacket", "a package left today"), prefer the find_similar_objects tool over search_objects. Use search_objects first only to locate the anchor event, then pass its id to find_similar_objects. For generic queries like "show me all cars today", keep using search_objects. If a user message begins with [attached_event:<id>], treat that event id as the anchor for any similarity or "tell me more" request in the same message and call find_similar_objects with that id.{cameras_section}"""
conversation.append(
{
@@ -1118,6 +1328,9 @@ Always be accurate with time calculations based on the current date provided.{ca
async def stream_body_llm():
nonlocal conversation, stream_tool_calls, stream_iterations
while stream_iterations < max_iterations:
if await request.is_disconnected():
logger.debug("Client disconnected, stopping chat stream")
return
logger.debug(
f"Streaming LLM (iteration {stream_iterations + 1}/{max_iterations}) "
f"with {len(conversation)} message(s)"
@@ -1127,6 +1340,9 @@ Always be accurate with time calculations based on the current date provided.{ca
tools=tools if tools else None,
tool_choice="auto",
):
if await request.is_disconnected():
logger.debug("Client disconnected, stopping chat stream")
return
kind, value = event
if kind == "content_delta":
yield (
@@ -1156,6 +1372,11 @@ Always be accurate with time calculations based on the current date provided.{ca
msg.get("content"), pending
)
)
if await request.is_disconnected():
logger.debug(
"Client disconnected before tool execution"
)
return
(
executed_calls,
tool_results,
@@ -1240,7 +1461,7 @@ Always be accurate with time calculations based on the current date provided.{ca
+ b"\n"
)
# Stream content in word-sized chunks for smooth UX
for part in _chunk_content(final_content):
for part in chunk_content(final_content):
yield (
json.dumps({"type": "content", "delta": part}).encode(
"utf-8"

135
frigate/api/chat_util.py Normal file
View File

@@ -0,0 +1,135 @@
"""Pure, stateless helpers used by the chat tool dispatchers.
These were extracted from frigate/api/chat.py to keep that module focused on
route handlers, tool dispatchers, and streaming loop internals. Nothing in
this file touches the FastAPI request, the embeddings context, or the chat
loop state — all inputs and outputs are plain data.
"""
import logging
import math
import time
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional
from frigate.embeddings.util import ZScoreNormalization
from frigate.models import Event
logger = logging.getLogger(__name__)
# Similarity fusion weights for find_similar_objects.
# Visual dominates because the feature's primary use case is "same specific object."
# If these change, update the test in test_chat_find_similar_objects.py.
VISUAL_WEIGHT = 0.65
DESCRIPTION_WEIGHT = 0.35
def chunk_content(content: str, chunk_size: int = 80) -> Generator[str, None, None]:
"""Yield content in word-aware chunks for streaming."""
if not content:
return
words = content.split(" ")
current: List[str] = []
current_len = 0
for w in words:
current.append(w)
current_len += len(w) + 1
if current_len >= chunk_size:
yield " ".join(current) + " "
current = []
current_len = 0
if current:
yield " ".join(current)
def format_events_with_local_time(
events_list: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Add human-readable local start/end times to each event for the LLM."""
result = []
for evt in events_list:
if not isinstance(evt, dict):
result.append(evt)
continue
copy_evt = dict(evt)
try:
start_ts = evt.get("start_time")
end_ts = evt.get("end_time")
if start_ts is not None:
dt_start = datetime.fromtimestamp(start_ts)
copy_evt["start_time_local"] = dt_start.strftime("%Y-%m-%d %I:%M:%S %p")
if end_ts is not None:
dt_end = datetime.fromtimestamp(end_ts)
copy_evt["end_time_local"] = dt_end.strftime("%Y-%m-%d %I:%M:%S %p")
except (TypeError, ValueError, OSError):
pass
result.append(copy_evt)
return result
def distance_to_score(distance: float, stats: ZScoreNormalization) -> float:
"""Convert a cosine distance to a [0, 1] similarity score.
Uses the existing ZScoreNormalization stats maintained by EmbeddingsContext
to normalize across deployments, then a bounded sigmoid. Lower distance ->
higher score. If stats are uninitialized (stddev == 0), returns a neutral
0.5 so the fallback ordering by raw distance still dominates.
"""
if stats.stddev == 0:
return 0.5
z = (distance - stats.mean) / stats.stddev
# Sigmoid on -z so that small distance (good) -> high score.
return 1.0 / (1.0 + math.exp(z))
def fuse_scores(
visual_score: Optional[float],
description_score: Optional[float],
) -> Optional[float]:
"""Weighted fusion of visual and description similarity scores.
If one side is missing (e.g., no description embedding for this event),
the other side's score is returned alone with no penalty. If both are
missing, returns None and the caller should drop the event.
"""
if visual_score is None and description_score is None:
return None
if visual_score is None:
return description_score
if description_score is None:
return visual_score
return VISUAL_WEIGHT * visual_score + DESCRIPTION_WEIGHT * description_score
def parse_iso_to_timestamp(value: Optional[str]) -> Optional[float]:
"""Parse an ISO-8601 string as server-local time -> unix timestamp.
Mirrors the parsing _execute_search_objects uses so both tools accept the
same format from the LLM.
"""
if value is None:
return None
try:
s = value.replace("Z", "").strip()[:19]
dt = datetime.strptime(s, "%Y-%m-%dT%H:%M:%S")
return time.mktime(dt.timetuple())
except (ValueError, AttributeError, TypeError):
logger.warning("Invalid timestamp format: %s", value)
return None
def hydrate_event(event: Event, score: Optional[float] = None) -> Dict[str, Any]:
"""Convert an Event row into the dict shape returned by find_similar_objects."""
data: Dict[str, Any] = {
"id": event.id,
"camera": event.camera,
"label": event.label,
"sub_label": event.sub_label,
"start_time": event.start_time,
"end_time": event.end_time,
"zones": event.zones,
}
if score is not None:
data["score"] = score
return data