Add GenAI Backend Streaming and Chat (#22152)

* Add basic chat page with entry

* Add chat history

* processing

* Add markdown

* Improvements

* Adjust timing format

* Reduce fields in response

* More time parsing improvements

* Show tool calls separately from message

* Add title

* Improve UI handling

* Support streaming

* Full streaming support

* Fix tool calling

* Add copy button

* Improvements to UI

* Improve default behavior

* Implement message editing

* Add sub label to event tool filtering

* Cleanup

* Cleanup UI and prompt

* Cleanup UI bubbles

* Fix loading

* Add support for markdown tables

* Add thumbnail images to object results

* Add a starting state for chat

* Clenaup
This commit is contained in:
Nicolas Mowen
2026-02-27 09:07:30 -07:00
committed by GitHub
parent e7250f24cb
commit fa1f9a1fa4
20 changed files with 3097 additions and 291 deletions

View File

@@ -3,12 +3,13 @@
import base64
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import time
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional
import cv2
from fastapi import APIRouter, Body, Depends, Request
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from frigate.api.auth import (
@@ -20,15 +21,60 @@ from frigate.api.defs.request.chat_body import ChatCompletionRequest
from frigate.api.defs.response.chat_response import (
ChatCompletionResponse,
ChatMessageResponse,
ToolCall,
)
from frigate.api.defs.tags import Tags
from frigate.api.event import events
from frigate.genai.utils import build_assistant_message_for_conversation
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.chat])
def _chunk_content(content: str, chunk_size: int = 80) -> Generator[str, None, None]:
"""Yield content in word-aware chunks for streaming."""
if not content:
return
words = content.split(" ")
current: List[str] = []
current_len = 0
for w in words:
current.append(w)
current_len += len(w) + 1
if current_len >= chunk_size:
yield " ".join(current) + " "
current = []
current_len = 0
if current:
yield " ".join(current)
def _format_events_with_local_time(
events_list: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Add human-readable local start/end times to each event for the LLM."""
result = []
for evt in events_list:
if not isinstance(evt, dict):
result.append(evt)
continue
copy_evt = dict(evt)
try:
start_ts = evt.get("start_time")
end_ts = evt.get("end_time")
if start_ts is not None:
dt_start = datetime.fromtimestamp(start_ts)
copy_evt["start_time_local"] = dt_start.strftime("%Y-%m-%d %I:%M:%S %p")
if end_ts is not None:
dt_end = datetime.fromtimestamp(end_ts)
copy_evt["end_time_local"] = dt_end.strftime("%Y-%m-%d %I:%M:%S %p")
except (TypeError, ValueError, OSError):
pass
result.append(copy_evt)
return result
class ToolExecuteRequest(BaseModel):
"""Request model for tool execution."""
@@ -52,19 +98,25 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
"Search for detected objects in Frigate by camera, object label, time range, "
"zones, and other filters. Use this to answer questions about when "
"objects were detected, what objects appeared, or to find specific object detections. "
"An 'object' in Frigate represents a tracked detection (e.g., a person, package, car)."
"An 'object' in Frigate represents a tracked detection (e.g., a person, package, car). "
"When the user asks about a specific name (person, delivery company, animal, etc.), "
"filter by sub_label only and do not set label."
),
"parameters": {
"type": "object",
"properties": {
"camera": {
"type": "string",
"description": "Camera name to filter by (optional). Use 'all' for all cameras.",
"description": "Camera name to filter by (optional).",
},
"label": {
"type": "string",
"description": "Object label to filter by (e.g., 'person', 'package', 'car').",
},
"sub_label": {
"type": "string",
"description": "Name of a person, delivery company, animal, etc. When filtering by a specific name, use only sub_label; do not set label.",
},
"after": {
"type": "string",
"description": "Start time in ISO 8601 format (e.g., '2024-01-01T00:00:00Z').",
@@ -80,8 +132,8 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
},
"limit": {
"type": "integer",
"description": "Maximum number of objects to return (default: 10).",
"default": 10,
"description": "Maximum number of objects to return (default: 25).",
"default": 25,
},
},
},
@@ -119,14 +171,13 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
summary="Get available tools",
description="Returns OpenAI-compatible tool definitions for function calling.",
)
def get_tools(request: Request) -> JSONResponse:
def get_tools() -> JSONResponse:
"""Get list of available tools for LLM function calling."""
tools = get_tool_definitions()
return JSONResponse(content={"tools": tools})
async def _execute_search_objects(
request: Request,
arguments: Dict[str, Any],
allowed_cameras: List[str],
) -> JSONResponse:
@@ -136,23 +187,26 @@ async def _execute_search_objects(
This searches for detected objects (events) in Frigate using the same
logic as the events API endpoint.
"""
# Parse ISO 8601 timestamps to Unix timestamps if provided
# Parse after/before as server local time; convert to Unix timestamp
after = arguments.get("after")
before = arguments.get("before")
def _parse_as_local_timestamp(s: str):
s = s.replace("Z", "").strip()[:19]
dt = datetime.strptime(s, "%Y-%m-%dT%H:%M:%S")
return time.mktime(dt.timetuple())
if after:
try:
after_dt = datetime.fromisoformat(after.replace("Z", "+00:00"))
after = after_dt.timestamp()
except (ValueError, AttributeError):
after = _parse_as_local_timestamp(after)
except (ValueError, AttributeError, TypeError):
logger.warning(f"Invalid 'after' timestamp format: {after}")
after = None
if before:
try:
before_dt = datetime.fromisoformat(before.replace("Z", "+00:00"))
before = before_dt.timestamp()
except (ValueError, AttributeError):
before = _parse_as_local_timestamp(before)
except (ValueError, AttributeError, TypeError):
logger.warning(f"Invalid 'before' timestamp format: {before}")
before = None
@@ -165,15 +219,14 @@ async def _execute_search_objects(
# Build query parameters compatible with EventsQueryParams
query_params = EventsQueryParams(
camera=arguments.get("camera", "all"),
cameras=arguments.get("camera", "all"),
label=arguments.get("label", "all"),
labels=arguments.get("label", "all"),
sub_labels=arguments.get("sub_label", "all").lower(),
zones=zones,
zone=zones,
after=after,
before=before,
limit=arguments.get("limit", 10),
limit=arguments.get("limit", 25),
)
try:
@@ -202,7 +255,6 @@ async def _execute_search_objects(
description="Execute a tool function call from an LLM.",
)
async def execute_tool(
request: Request,
body: ToolExecuteRequest = Body(...),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
) -> JSONResponse:
@@ -218,7 +270,7 @@ async def execute_tool(
logger.debug(f"Executing tool: {tool_name} with arguments: {arguments}")
if tool_name == "search_objects":
return await _execute_search_objects(request, arguments, allowed_cameras)
return await _execute_search_objects(arguments, allowed_cameras)
return JSONResponse(
content={
@@ -334,7 +386,7 @@ async def _execute_tool_internal(
This is used by the chat completion endpoint to execute tools.
"""
if tool_name == "search_objects":
response = await _execute_search_objects(request, arguments, allowed_cameras)
response = await _execute_search_objects(arguments, allowed_cameras)
try:
if hasattr(response, "body"):
body_str = response.body.decode("utf-8")
@@ -349,15 +401,109 @@ async def _execute_tool_internal(
elif tool_name == "get_live_context":
camera = arguments.get("camera")
if not camera:
logger.error(
"Tool get_live_context failed: camera parameter is required. "
"Arguments: %s",
json.dumps(arguments),
)
return {"error": "Camera parameter is required"}
return await _execute_get_live_context(request, camera, allowed_cameras)
else:
logger.error(
"Tool call failed: unknown tool %r. Expected one of: search_objects, get_live_context. "
"Arguments received: %s",
tool_name,
json.dumps(arguments),
)
return {"error": f"Unknown tool: {tool_name}"}
async def _execute_pending_tools(
pending_tool_calls: List[Dict[str, Any]],
request: Request,
allowed_cameras: List[str],
) -> tuple[List[ToolCall], List[Dict[str, Any]]]:
"""
Execute a list of tool calls; return (ToolCall list for API response, tool result dicts for conversation).
"""
tool_calls_out: List[ToolCall] = []
tool_results: List[Dict[str, Any]] = []
for tool_call in pending_tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call.get("arguments") or {}
tool_call_id = tool_call["id"]
logger.debug(
f"Executing tool: {tool_name} (id: {tool_call_id}) with arguments: {json.dumps(tool_args, indent=2)}"
)
try:
tool_result = await _execute_tool_internal(
tool_name, tool_args, request, allowed_cameras
)
if isinstance(tool_result, dict) and tool_result.get("error"):
logger.error(
"Tool call %s (id: %s) returned error: %s. Arguments: %s",
tool_name,
tool_call_id,
tool_result.get("error"),
json.dumps(tool_args),
)
if tool_name == "search_objects" and isinstance(tool_result, list):
tool_result = _format_events_with_local_time(tool_result)
_keys = {
"id",
"camera",
"label",
"zones",
"start_time_local",
"end_time_local",
"sub_label",
"event_count",
}
tool_result = [
{k: evt[k] for k in _keys if k in evt}
for evt in tool_result
if isinstance(evt, dict)
]
result_content = (
json.dumps(tool_result)
if isinstance(tool_result, (dict, list))
else (tool_result if isinstance(tool_result, str) else str(tool_result))
)
tool_calls_out.append(
ToolCall(name=tool_name, arguments=tool_args, response=result_content)
)
tool_results.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": result_content,
}
)
except Exception as e:
logger.error(
"Error executing tool %s (id: %s): %s. Arguments: %s",
tool_name,
tool_call_id,
e,
json.dumps(tool_args),
exc_info=True,
)
error_content = json.dumps({"error": f"Tool execution failed: {str(e)}"})
tool_calls_out.append(
ToolCall(name=tool_name, arguments=tool_args, response=error_content)
)
tool_results.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": error_content,
}
)
return (tool_calls_out, tool_results)
@router.post(
"/chat/completion",
response_model=ChatCompletionResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Chat completion with tool calling",
description=(
@@ -369,7 +515,7 @@ async def chat_completion(
request: Request,
body: ChatCompletionRequest = Body(...),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
) -> JSONResponse:
):
"""
Chat completion endpoint with tool calling support.
@@ -394,9 +540,9 @@ async def chat_completion(
tools = get_tool_definitions()
conversation = []
current_datetime = datetime.now(timezone.utc)
current_datetime = datetime.now()
current_date_str = current_datetime.strftime("%Y-%m-%d")
current_time_str = current_datetime.strftime("%H:%M:%S %Z")
current_time_str = current_datetime.strftime("%I:%M:%S %p")
cameras_info = []
config = request.app.frigate_config
@@ -429,9 +575,12 @@ async def chat_completion(
system_prompt = f"""You are a helpful assistant for Frigate, a security camera NVR system. You help users answer questions about their cameras, detected objects, and events.
Current date and time: {current_date_str} at {current_time_str} (UTC)
Current server local date and time: {current_date_str} at {current_time_str}
When users ask questions about "today", "yesterday", "this week", etc., use the current date above as reference.
Do not start your response with phrases like "I will check...", "Let me see...", or "Let me look...". Answer directly.
Always present times to the user in the server's local timezone. When tool results include start_time_local and end_time_local, use those exact strings when listing or describing detection times—do not convert or invent timestamps. Do not use UTC or ISO format with Z for the user-facing answer unless the tool result only provides Unix timestamps without local time fields.
When users ask about "today", "yesterday", "this week", etc., use the current date above as reference.
When searching for objects or events, use ISO 8601 format for dates (e.g., {current_date_str}T00:00:00Z for the start of today).
Always be accurate with time calculations based on the current date provided.{cameras_section}{live_image_note}"""
@@ -471,6 +620,7 @@ Always be accurate with time calculations based on the current date provided.{ca
conversation.append(msg_dict)
tool_iterations = 0
tool_calls: List[ToolCall] = []
max_iterations = body.max_tool_iterations
logger.debug(
@@ -478,6 +628,81 @@ Always be accurate with time calculations based on the current date provided.{ca
f"{len(tools)} tool(s) available, max_iterations={max_iterations}"
)
# True LLM streaming when client supports it and stream requested
if body.stream and hasattr(genai_client, "chat_with_tools_stream"):
stream_tool_calls: List[ToolCall] = []
stream_iterations = 0
async def stream_body_llm():
nonlocal conversation, stream_tool_calls, stream_iterations
while stream_iterations < max_iterations:
logger.debug(
f"Streaming LLM (iteration {stream_iterations + 1}/{max_iterations}) "
f"with {len(conversation)} message(s)"
)
async for event in genai_client.chat_with_tools_stream(
messages=conversation,
tools=tools if tools else None,
tool_choice="auto",
):
kind, value = event
if kind == "content_delta":
yield (
json.dumps({"type": "content", "delta": value}).encode(
"utf-8"
)
+ b"\n"
)
elif kind == "message":
msg = value
if msg.get("finish_reason") == "error":
yield (
json.dumps(
{
"type": "error",
"error": "An error occurred while processing your request.",
}
).encode("utf-8")
+ b"\n"
)
return
pending = msg.get("tool_calls")
if pending:
stream_iterations += 1
conversation.append(
build_assistant_message_for_conversation(
msg.get("content"), pending
)
)
executed_calls, tool_results = await _execute_pending_tools(
pending, request, allowed_cameras
)
stream_tool_calls.extend(executed_calls)
conversation.extend(tool_results)
yield (
json.dumps(
{
"type": "tool_calls",
"tool_calls": [
tc.model_dump() for tc in stream_tool_calls
],
}
).encode("utf-8")
+ b"\n"
)
break
else:
yield (json.dumps({"type": "done"}).encode("utf-8") + b"\n")
return
else:
yield json.dumps({"type": "done"}).encode("utf-8") + b"\n"
return StreamingResponse(
stream_body_llm(),
media_type="application/x-ndjson",
headers={"X-Accel-Buffering": "no"},
)
try:
while tool_iterations < max_iterations:
logger.debug(
@@ -499,117 +724,71 @@ Always be accurate with time calculations based on the current date provided.{ca
status_code=500,
)
assistant_message = {
"role": "assistant",
"content": response.get("content"),
}
if response.get("tool_calls"):
assistant_message["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["arguments"]),
},
}
for tc in response["tool_calls"]
]
conversation.append(assistant_message)
conversation.append(
build_assistant_message_for_conversation(
response.get("content"), response.get("tool_calls")
)
)
tool_calls = response.get("tool_calls")
if not tool_calls:
pending_tool_calls = response.get("tool_calls")
if not pending_tool_calls:
logger.debug(
f"Chat completion finished with final answer (iterations: {tool_iterations})"
)
final_content = response.get("content") or ""
if body.stream:
async def stream_body() -> Any:
if tool_calls:
yield (
json.dumps(
{
"type": "tool_calls",
"tool_calls": [
tc.model_dump() for tc in tool_calls
],
}
).encode("utf-8")
+ b"\n"
)
# Stream content in word-sized chunks for smooth UX
for part in _chunk_content(final_content):
yield (
json.dumps({"type": "content", "delta": part}).encode(
"utf-8"
)
+ b"\n"
)
yield json.dumps({"type": "done"}).encode("utf-8") + b"\n"
return StreamingResponse(
stream_body(),
media_type="application/x-ndjson",
)
return JSONResponse(
content=ChatCompletionResponse(
message=ChatMessageResponse(
role="assistant",
content=response.get("content"),
content=final_content,
tool_calls=None,
),
finish_reason=response.get("finish_reason", "stop"),
tool_iterations=tool_iterations,
tool_calls=tool_calls,
).model_dump(),
)
# Execute tools
tool_iterations += 1
logger.debug(
f"Tool calls detected (iteration {tool_iterations}/{max_iterations}): "
f"{len(tool_calls)} tool(s) to execute"
f"{len(pending_tool_calls)} tool(s) to execute"
)
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["arguments"]
tool_call_id = tool_call["id"]
logger.debug(
f"Executing tool: {tool_name} (id: {tool_call_id}) with arguments: {json.dumps(tool_args, indent=2)}"
)
try:
tool_result = await _execute_tool_internal(
tool_name, tool_args, request, allowed_cameras
)
if isinstance(tool_result, dict):
result_content = json.dumps(tool_result)
result_summary = tool_result
if isinstance(tool_result, dict) and isinstance(
tool_result.get("content"), list
):
result_count = len(tool_result.get("content", []))
result_summary = {
"count": result_count,
"sample": tool_result.get("content", [])[:2]
if result_count > 0
else [],
}
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) completed successfully. "
f"Result: {json.dumps(result_summary, indent=2)}"
)
elif isinstance(tool_result, str):
result_content = tool_result
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) completed successfully. "
f"Result length: {len(result_content)} characters"
)
else:
result_content = str(tool_result)
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) completed successfully. "
f"Result type: {type(tool_result).__name__}"
)
tool_results.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": result_content,
}
)
except Exception as e:
logger.error(
f"Error executing tool {tool_name} (id: {tool_call_id}): {e}",
exc_info=True,
)
error_content = json.dumps({"error": "Tool execution failed"})
tool_results.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": error_content,
}
)
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) failed. Error result added to conversation."
)
executed_calls, tool_results = await _execute_pending_tools(
pending_tool_calls, request, allowed_cameras
)
tool_calls.extend(executed_calls)
conversation.extend(tool_results)
logger.debug(
f"Added {len(tool_results)} tool result(s) to conversation. "
@@ -628,6 +807,7 @@ Always be accurate with time calculations based on the current date provided.{ca
),
finish_reason="length",
tool_iterations=tool_iterations,
tool_calls=tool_calls,
).model_dump(),
)

View File

@@ -39,3 +39,7 @@ class ChatCompletionRequest(BaseModel):
"user message as multimodal content. Use with get_live_context for detection info."
),
)
stream: bool = Field(
default=False,
description="If true, stream the final assistant response in the body as newline-delimited JSON.",
)

View File

@@ -5,8 +5,8 @@ from typing import Any, Optional
from pydantic import BaseModel, Field
class ToolCall(BaseModel):
"""A tool call from the LLM."""
class ToolCallInvocation(BaseModel):
"""A tool call requested by the LLM (before execution)."""
id: str = Field(description="Unique identifier for this tool call")
name: str = Field(description="Tool name to call")
@@ -20,11 +20,24 @@ class ChatMessageResponse(BaseModel):
content: Optional[str] = Field(
default=None, description="Message content (None if tool calls present)"
)
tool_calls: Optional[list[ToolCall]] = Field(
tool_calls: Optional[list[ToolCallInvocation]] = Field(
default=None, description="Tool calls if LLM wants to call tools"
)
class ToolCall(BaseModel):
"""A tool that was executed during the completion, with its response."""
name: str = Field(description="Tool name that was called")
arguments: dict[str, Any] = Field(
default_factory=dict, description="Arguments passed to the tool"
)
response: str = Field(
default="",
description="The response or result returned from the tool execution",
)
class ChatCompletionResponse(BaseModel):
"""Response from chat completion."""
@@ -35,3 +48,7 @@ class ChatCompletionResponse(BaseModel):
tool_iterations: int = Field(
default=0, description="Number of tool call iterations performed"
)
tool_calls: list[ToolCall] = Field(
default_factory=list,
description="List of tool calls that were executed during this completion",
)