From 858367c98a84fdec273010e1c2be9de033e23bf3 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 16 Feb 2026 17:40:11 -0700 Subject: [PATCH] Full streaming support --- frigate/api/chat.py | 292 ++++++++++++++++++++----------------- frigate/genai/llama_cpp.py | 253 ++++++++++++++++++++++---------- frigate/genai/ollama.py | 245 ++++++++++++++++++++----------- frigate/genai/utils.py | 70 +++++++++ web/src/pages/Chat.tsx | 30 +++- web/vite.config.ts | 2 +- 6 files changed, 591 insertions(+), 301 deletions(-) create mode 100644 frigate/genai/utils.py diff --git a/frigate/api/chat.py b/frigate/api/chat.py index 939e399df..5ff8f4a99 100644 --- a/frigate/api/chat.py +++ b/frigate/api/chat.py @@ -25,6 +25,7 @@ from frigate.api.defs.response.chat_response import ( ) from frigate.api.defs.tags import Tags from frigate.api.event import events +from frigate.genai.utils import build_assistant_message_for_conversation logger = logging.getLogger(__name__) @@ -403,6 +404,78 @@ async def _execute_tool_internal( return {"error": f"Unknown tool: {tool_name}"} +async def _execute_pending_tools( + pending_tool_calls: List[Dict[str, Any]], + request: Request, + allowed_cameras: List[str], +) -> tuple[List[ToolCall], List[Dict[str, Any]]]: + """ + Execute a list of tool calls; return (ToolCall list for API response, tool result dicts for conversation). + """ + tool_calls_out: List[ToolCall] = [] + tool_results: List[Dict[str, Any]] = [] + for tool_call in pending_tool_calls: + tool_name = tool_call["name"] + tool_args = tool_call.get("arguments") or {} + tool_call_id = tool_call["id"] + logger.debug( + f"Executing tool: {tool_name} (id: {tool_call_id}) with arguments: {json.dumps(tool_args, indent=2)}" + ) + try: + tool_result = await _execute_tool_internal( + tool_name, tool_args, request, allowed_cameras + ) + if tool_name == "search_objects" and isinstance(tool_result, list): + tool_result = _format_events_with_local_time(tool_result) + _keys = { + "id", + "camera", + "label", + "zones", + "start_time_local", + "end_time_local", + "sub_label", + "event_count", + } + tool_result = [ + {k: evt[k] for k in _keys if k in evt} + for evt in tool_result + if isinstance(evt, dict) + ] + result_content = ( + json.dumps(tool_result) + if isinstance(tool_result, (dict, list)) + else (tool_result if isinstance(tool_result, str) else str(tool_result)) + ) + tool_calls_out.append( + ToolCall(name=tool_name, arguments=tool_args, response=result_content) + ) + tool_results.append( + { + "role": "tool", + "tool_call_id": tool_call_id, + "content": result_content, + } + ) + except Exception as e: + logger.error( + f"Error executing tool {tool_name} (id: {tool_call_id}): {e}", + exc_info=True, + ) + error_content = json.dumps({"error": f"Tool execution failed: {str(e)}"}) + tool_calls_out.append( + ToolCall(name=tool_name, arguments=tool_args, response=error_content) + ) + tool_results.append( + { + "role": "tool", + "tool_call_id": tool_call_id, + "content": error_content, + } + ) + return (tool_calls_out, tool_results) + + @router.post( "/chat/completion", dependencies=[Depends(allow_any_authenticated())], @@ -527,6 +600,81 @@ Always be accurate with time calculations based on the current date provided.{ca f"{len(tools)} tool(s) available, max_iterations={max_iterations}" ) + # True LLM streaming when client supports it and stream requested + if body.stream and hasattr(genai_client, "chat_with_tools_stream"): + stream_tool_calls: List[ToolCall] = [] + stream_iterations = 0 + + async def stream_body_llm(): + nonlocal conversation, stream_tool_calls, stream_iterations + while stream_iterations < max_iterations: + logger.debug( + f"Streaming LLM (iteration {stream_iterations + 1}/{max_iterations}) " + f"with {len(conversation)} message(s)" + ) + async for event in genai_client.chat_with_tools_stream( + messages=conversation, + tools=tools if tools else None, + tool_choice="auto", + ): + kind, value = event + if kind == "content_delta": + yield ( + json.dumps({"type": "content", "delta": value}).encode( + "utf-8" + ) + + b"\n" + ) + elif kind == "message": + msg = value + if msg.get("finish_reason") == "error": + yield ( + json.dumps( + { + "type": "error", + "error": "An error occurred while processing your request.", + } + ).encode("utf-8") + + b"\n" + ) + return + pending = msg.get("tool_calls") + if pending: + stream_iterations += 1 + conversation.append( + build_assistant_message_for_conversation( + msg.get("content"), pending + ) + ) + executed_calls, tool_results = await _execute_pending_tools( + pending, request, allowed_cameras + ) + stream_tool_calls.extend(executed_calls) + conversation.extend(tool_results) + yield ( + json.dumps( + { + "type": "tool_calls", + "tool_calls": [ + tc.model_dump() for tc in stream_tool_calls + ], + } + ).encode("utf-8") + + b"\n" + ) + break + else: + yield (json.dumps({"type": "done"}).encode("utf-8") + b"\n") + return + else: + yield json.dumps({"type": "done"}).encode("utf-8") + b"\n" + + return StreamingResponse( + stream_body_llm(), + media_type="application/x-ndjson", + headers={"X-Accel-Buffering": "no"}, + ) + try: while tool_iterations < max_iterations: logger.debug( @@ -548,23 +696,11 @@ Always be accurate with time calculations based on the current date provided.{ca status_code=500, ) - assistant_message = { - "role": "assistant", - "content": response.get("content"), - } - if response.get("tool_calls"): - assistant_message["tool_calls"] = [ - { - "id": tc["id"], - "type": "function", - "function": { - "name": tc["name"], - "arguments": json.dumps(tc["arguments"]), - }, - } - for tc in response["tool_calls"] - ] - conversation.append(assistant_message) + conversation.append( + build_assistant_message_for_conversation( + response.get("content"), response.get("tool_calls") + ) + ) pending_tool_calls = response.get("tool_calls") if not pending_tool_calls: @@ -574,6 +710,7 @@ Always be accurate with time calculations based on the current date provided.{ca final_content = response.get("content") or "" if body.stream: + async def stream_body() -> Any: if tool_calls: yield ( @@ -590,8 +727,9 @@ Always be accurate with time calculations based on the current date provided.{ca # Stream content in word-sized chunks for smooth UX for part in _chunk_content(final_content): yield ( - json.dumps({"type": "content", "delta": part}) - .encode("utf-8") + json.dumps({"type": "content", "delta": part}).encode( + "utf-8" + ) + b"\n" ) yield json.dumps({"type": "done"}).encode("utf-8") + b"\n" @@ -614,123 +752,15 @@ Always be accurate with time calculations based on the current date provided.{ca ).model_dump(), ) - # Execute tools tool_iterations += 1 logger.debug( f"Tool calls detected (iteration {tool_iterations}/{max_iterations}): " f"{len(pending_tool_calls)} tool(s) to execute" ) - tool_results = [] - - for tool_call in pending_tool_calls: - tool_name = tool_call["name"] - tool_args = tool_call["arguments"] - tool_call_id = tool_call["id"] - - logger.debug( - f"Executing tool: {tool_name} (id: {tool_call_id}) with arguments: {json.dumps(tool_args, indent=2)}" - ) - - try: - tool_result = await _execute_tool_internal( - tool_name, tool_args, request, allowed_cameras - ) - - # Add local time fields to search_objects results so the LLM doesn't hallucinate timestamps - if tool_name == "search_objects" and isinstance(tool_result, list): - tool_result = _format_events_with_local_time(tool_result) - _keys = { - "id", - "camera", - "label", - "zones", - "start_time_local", - "end_time_local", - "sub_label", - "event_count", - } - tool_result = [ - {k: evt[k] for k in _keys if k in evt} - for evt in tool_result - if isinstance(evt, dict) - ] - - if isinstance(tool_result, dict): - result_content = json.dumps(tool_result) - result_summary = tool_result - if isinstance(tool_result, dict) and isinstance( - tool_result.get("content"), list - ): - result_count = len(tool_result.get("content", [])) - result_summary = { - "count": result_count, - "sample": tool_result.get("content", [])[:2] - if result_count > 0 - else [], - } - logger.debug( - f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " - f"Result: {json.dumps(result_summary, indent=2)}" - ) - elif isinstance(tool_result, list): - result_content = json.dumps(tool_result) - logger.debug( - f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " - f"Result: {len(tool_result)} item(s)" - ) - elif isinstance(tool_result, str): - result_content = tool_result - logger.debug( - f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " - f"Result length: {len(result_content)} characters" - ) - else: - result_content = str(tool_result) - logger.debug( - f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " - f"Result type: {type(tool_result).__name__}" - ) - - tool_calls.append( - ToolCall( - name=tool_name, - arguments=tool_args or {}, - response=result_content, - ) - ) - tool_results.append( - { - "role": "tool", - "tool_call_id": tool_call_id, - "content": result_content, - } - ) - except Exception as e: - logger.error( - f"Error executing tool {tool_name} (id: {tool_call_id}): {e}", - exc_info=True, - ) - error_content = json.dumps( - {"error": f"Tool execution failed: {str(e)}"} - ) - tool_calls.append( - ToolCall( - name=tool_name, - arguments=tool_args or {}, - response=error_content, - ) - ) - tool_results.append( - { - "role": "tool", - "tool_call_id": tool_call_id, - "content": error_content, - } - ) - logger.debug( - f"Tool {tool_name} (id: {tool_call_id}) failed. Error result added to conversation." - ) - + executed_calls, tool_results = await _execute_pending_tools( + pending_tool_calls, request, allowed_cameras + ) + tool_calls.extend(executed_calls) conversation.extend(tool_results) logger.debug( f"Added {len(tool_results)} tool result(s) to conversation. " diff --git a/frigate/genai/llama_cpp.py b/frigate/genai/llama_cpp.py index fafef74ae..f938f8b40 100644 --- a/frigate/genai/llama_cpp.py +++ b/frigate/genai/llama_cpp.py @@ -5,10 +5,12 @@ import json import logging from typing import Any, Optional +import httpx import requests from frigate.config import GenAIProviderEnum from frigate.genai import GenAIClient, register_genai_provider +from frigate.genai.utils import parse_tool_calls_from_message logger = logging.getLogger(__name__) @@ -99,7 +101,76 @@ class LlamaCppClient(GenAIClient): def get_context_size(self) -> int: """Get the context window size for llama.cpp.""" - return self.genai_config.provider_options.get("context_size", 4096) + return self.provider_options.get("context_size", 4096) + + def _build_payload( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]], + tool_choice: Optional[str], + stream: bool = False, + ) -> dict[str, Any]: + """Build request payload for chat completions (sync or stream).""" + openai_tool_choice = None + if tool_choice: + if tool_choice == "none": + openai_tool_choice = "none" + elif tool_choice == "auto": + openai_tool_choice = "auto" + elif tool_choice == "required": + openai_tool_choice = "required" + + payload: dict[str, Any] = {"messages": messages} + if stream: + payload["stream"] = True + if tools: + payload["tools"] = tools + if openai_tool_choice is not None: + payload["tool_choice"] = openai_tool_choice + provider_opts = { + k: v for k, v in self.provider_options.items() if k != "context_size" + } + payload.update(provider_opts) + return payload + + def _message_from_choice(self, choice: dict[str, Any]) -> dict[str, Any]: + """Parse OpenAI-style choice into {content, tool_calls, finish_reason}.""" + message = choice.get("message", {}) + content = message.get("content") + content = content.strip() if content else None + tool_calls = parse_tool_calls_from_message(message) + finish_reason = choice.get("finish_reason") or ( + "tool_calls" if tool_calls else "stop" if content else "error" + ) + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + + @staticmethod + def _streamed_tool_calls_to_list( + tool_calls_by_index: dict[int, dict[str, Any]], + ) -> Optional[list[dict[str, Any]]]: + """Convert streamed tool_calls index map to list of {id, name, arguments}.""" + if not tool_calls_by_index: + return None + result = [] + for idx in sorted(tool_calls_by_index.keys()): + t = tool_calls_by_index[idx] + args_str = t.get("arguments") or "{}" + try: + arguments = json.loads(args_str) + except json.JSONDecodeError: + arguments = {} + result.append( + { + "id": t.get("id", ""), + "name": t.get("name", ""), + "arguments": arguments, + } + ) + return result if result else None def chat_with_tools( self, @@ -122,31 +193,8 @@ class LlamaCppClient(GenAIClient): "tool_calls": None, "finish_reason": "error", } - try: - openai_tool_choice = None - if tool_choice: - if tool_choice == "none": - openai_tool_choice = "none" - elif tool_choice == "auto": - openai_tool_choice = "auto" - elif tool_choice == "required": - openai_tool_choice = "required" - - payload = { - "messages": messages, - } - - if tools: - payload["tools"] = tools - if openai_tool_choice is not None: - payload["tool_choice"] = openai_tool_choice - - provider_opts = { - k: v for k, v in self.provider_options.items() if k != "context_size" - } - payload.update(provider_opts) - + payload = self._build_payload(messages, tools, tool_choice, stream=False) response = requests.post( f"{self.provider}/v1/chat/completions", json=payload, @@ -154,60 +202,13 @@ class LlamaCppClient(GenAIClient): ) response.raise_for_status() result = response.json() - if result is None or "choices" not in result or len(result["choices"]) == 0: return { "content": None, "tool_calls": None, "finish_reason": "error", } - - choice = result["choices"][0] - message = choice.get("message", {}) - - content = message.get("content") - if content: - content = content.strip() - else: - content = None - - tool_calls = None - if "tool_calls" in message and message["tool_calls"]: - tool_calls = [] - for tool_call in message["tool_calls"]: - try: - function_data = tool_call.get("function", {}) - arguments_str = function_data.get("arguments", "{}") - arguments = json.loads(arguments_str) - except (json.JSONDecodeError, KeyError, TypeError) as e: - logger.warning( - f"Failed to parse tool call arguments: {e}, " - f"tool: {function_data.get('name', 'unknown')}" - ) - arguments = {} - - tool_calls.append( - { - "id": tool_call.get("id", ""), - "name": function_data.get("name", ""), - "arguments": arguments, - } - ) - - finish_reason = "error" - if "finish_reason" in choice and choice["finish_reason"]: - finish_reason = choice["finish_reason"] - elif tool_calls: - finish_reason = "tool_calls" - elif content: - finish_reason = "stop" - - return { - "content": content, - "tool_calls": tool_calls, - "finish_reason": finish_reason, - } - + return self._message_from_choice(result["choices"][0]) except requests.exceptions.Timeout as e: logger.warning("llama.cpp request timed out: %s", str(e)) return { @@ -219,8 +220,7 @@ class LlamaCppClient(GenAIClient): error_detail = str(e) if hasattr(e, "response") and e.response is not None: try: - error_body = e.response.text - error_detail = f"{str(e)} - Response: {error_body[:500]}" + error_detail = f"{str(e)} - Response: {e.response.text[:500]}" except Exception: pass logger.warning("llama.cpp returned an error: %s", error_detail) @@ -236,3 +236,106 @@ class LlamaCppClient(GenAIClient): "tool_calls": None, "finish_reason": "error", } + + async def chat_with_tools_stream( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ): + """Stream chat with tools via OpenAI-compatible streaming API.""" + if self.provider is None: + logger.warning( + "llama.cpp provider has not been initialized. Check your llama.cpp configuration." + ) + yield ( + "message", + { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }, + ) + return + try: + payload = self._build_payload(messages, tools, tool_choice, stream=True) + content_parts: list[str] = [] + tool_calls_by_index: dict[int, dict[str, Any]] = {} + finish_reason = "stop" + + async with httpx.AsyncClient(timeout=float(self.timeout)) as client: + async with client.stream( + "POST", + f"{self.provider}/v1/chat/completions", + json=payload, + ) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + if not line.startswith("data: "): + continue + data_str = line[6:].strip() + if data_str == "[DONE]": + break + try: + data = json.loads(data_str) + except json.JSONDecodeError: + continue + choices = data.get("choices") or [] + if not choices: + continue + delta = choices[0].get("delta", {}) + if choices[0].get("finish_reason"): + finish_reason = choices[0]["finish_reason"] + if delta.get("content"): + content_parts.append(delta["content"]) + yield ("content_delta", delta["content"]) + for tc in delta.get("tool_calls") or []: + idx = tc.get("index", 0) + if idx not in tool_calls_by_index: + tool_calls_by_index[idx] = { + "id": tc.get("id", ""), + "name": tc.get("name", ""), + "arguments": "", + } + t = tool_calls_by_index[idx] + if tc.get("id"): + t["id"] = tc["id"] + if tc.get("name"): + t["name"] = tc["name"] + if tc.get("arguments"): + t["arguments"] += tc["arguments"] + + full_content = "".join(content_parts).strip() or None + tool_calls_list = self._streamed_tool_calls_to_list(tool_calls_by_index) + if tool_calls_list: + finish_reason = "tool_calls" + yield ( + "message", + { + "content": full_content, + "tool_calls": tool_calls_list, + "finish_reason": finish_reason, + }, + ) + except httpx.HTTPStatusError as e: + logger.warning("llama.cpp streaming HTTP error: %s", e) + yield ( + "message", + { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }, + ) + except Exception as e: + logger.warning( + "Unexpected error in llama.cpp chat_with_tools_stream: %s", str(e) + ) + yield ( + "message", + { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }, + ) diff --git a/frigate/genai/ollama.py b/frigate/genai/ollama.py index 6e9a4f5d5..4efedf64a 100644 --- a/frigate/genai/ollama.py +++ b/frigate/genai/ollama.py @@ -1,15 +1,16 @@ """Ollama Provider for Frigate AI.""" -import json import logging from typing import Any, Optional from httpx import RemoteProtocolError, TimeoutException +from ollama import AsyncClient as OllamaAsyncClient from ollama import Client as ApiClient from ollama import ResponseError from frigate.config import GenAIProviderEnum from frigate.genai import GenAIClient, register_genai_provider +from frigate.genai.utils import parse_tool_calls_from_message logger = logging.getLogger(__name__) @@ -88,6 +89,73 @@ class OllamaClient(GenAIClient): "num_ctx", 4096 ) + def _build_request_params( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]], + tool_choice: Optional[str], + stream: bool = False, + ) -> dict[str, Any]: + """Build request_messages and params for chat (sync or stream).""" + request_messages = [] + for msg in messages: + msg_dict = { + "role": msg.get("role"), + "content": msg.get("content", ""), + } + if msg.get("tool_call_id"): + msg_dict["tool_call_id"] = msg["tool_call_id"] + if msg.get("name"): + msg_dict["name"] = msg["name"] + if msg.get("tool_calls"): + msg_dict["tool_calls"] = msg["tool_calls"] + request_messages.append(msg_dict) + + request_params: dict[str, Any] = { + "model": self.genai_config.model, + "messages": request_messages, + **self.provider_options, + } + if stream: + request_params["stream"] = True + if tools: + request_params["tools"] = tools + if tool_choice: + request_params["tool_choice"] = ( + "none" + if tool_choice == "none" + else "required" + if tool_choice == "required" + else "auto" + ) + return request_params + + def _message_from_response(self, response: dict[str, Any]) -> dict[str, Any]: + """Parse Ollama chat response into {content, tool_calls, finish_reason}.""" + if not response or "message" not in response: + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + message = response["message"] + content = message.get("content", "").strip() if message.get("content") else None + tool_calls = parse_tool_calls_from_message(message) + finish_reason = "error" + if response.get("done"): + finish_reason = ( + "tool_calls" if tool_calls else "stop" if content else "error" + ) + elif tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + def chat_with_tools( self, messages: list[dict[str, Any]], @@ -103,93 +171,12 @@ class OllamaClient(GenAIClient): "tool_calls": None, "finish_reason": "error", } - try: - request_messages = [] - for msg in messages: - msg_dict = { - "role": msg.get("role"), - "content": msg.get("content", ""), - } - if msg.get("tool_call_id"): - msg_dict["tool_call_id"] = msg["tool_call_id"] - if msg.get("name"): - msg_dict["name"] = msg["name"] - if msg.get("tool_calls"): - msg_dict["tool_calls"] = msg["tool_calls"] - request_messages.append(msg_dict) - - request_params = { - "model": self.genai_config.model, - "messages": request_messages, - } - - if tools: - request_params["tools"] = tools - if tool_choice: - if tool_choice == "none": - request_params["tool_choice"] = "none" - elif tool_choice == "required": - request_params["tool_choice"] = "required" - elif tool_choice == "auto": - request_params["tool_choice"] = "auto" - - request_params.update(self.provider_options) - - response = self.provider.chat(**request_params) - - if not response or "message" not in response: - return { - "content": None, - "tool_calls": None, - "finish_reason": "error", - } - - message = response["message"] - content = ( - message.get("content", "").strip() if message.get("content") else None + request_params = self._build_request_params( + messages, tools, tool_choice, stream=False ) - - tool_calls = None - if "tool_calls" in message and message["tool_calls"]: - tool_calls = [] - for tool_call in message["tool_calls"]: - try: - function_data = tool_call.get("function", {}) - arguments_str = function_data.get("arguments", "{}") - arguments = json.loads(arguments_str) - except (json.JSONDecodeError, KeyError, TypeError) as e: - logger.warning( - f"Failed to parse tool call arguments: {e}, " - f"tool: {function_data.get('name', 'unknown')}" - ) - arguments = {} - - tool_calls.append( - { - "id": tool_call.get("id", ""), - "name": function_data.get("name", ""), - "arguments": arguments, - } - ) - - finish_reason = "error" - if "done" in response and response["done"]: - if tool_calls: - finish_reason = "tool_calls" - elif content: - finish_reason = "stop" - elif tool_calls: - finish_reason = "tool_calls" - elif content: - finish_reason = "stop" - - return { - "content": content, - "tool_calls": tool_calls, - "finish_reason": finish_reason, - } - + response = self.provider.chat(**request_params) + return self._message_from_response(response) except (TimeoutException, ResponseError, ConnectionError) as e: logger.warning("Ollama returned an error: %s", str(e)) return { @@ -204,3 +191,89 @@ class OllamaClient(GenAIClient): "tool_calls": None, "finish_reason": "error", } + + async def chat_with_tools_stream( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ): + """Stream chat with tools; yields content deltas then final message.""" + if self.provider is None: + logger.warning( + "Ollama provider has not been initialized. Check your Ollama configuration." + ) + yield ( + "message", + { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }, + ) + return + try: + request_params = self._build_request_params( + messages, tools, tool_choice, stream=True + ) + async_client = OllamaAsyncClient( + host=self.genai_config.base_url, + timeout=self.timeout, + ) + content_parts: list[str] = [] + final_message: dict[str, Any] | None = None + try: + stream = await async_client.chat(**request_params) + async for chunk in stream: + if not chunk or "message" not in chunk: + continue + msg = chunk.get("message", {}) + delta = msg.get("content") or "" + if delta: + content_parts.append(delta) + yield ("content_delta", delta) + if chunk.get("done"): + full_content = "".join(content_parts).strip() or None + tool_calls = parse_tool_calls_from_message(msg) + final_message = { + "content": full_content, + "tool_calls": tool_calls, + "finish_reason": "tool_calls" if tool_calls else "stop", + } + break + finally: + await async_client.close() + + if final_message is not None: + yield ("message", final_message) + else: + yield ( + "message", + { + "content": "".join(content_parts).strip() or None, + "tool_calls": None, + "finish_reason": "stop", + }, + ) + except (TimeoutException, ResponseError, ConnectionError) as e: + logger.warning("Ollama streaming error: %s", str(e)) + yield ( + "message", + { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }, + ) + except Exception as e: + logger.warning( + "Unexpected error in Ollama chat_with_tools_stream: %s", str(e) + ) + yield ( + "message", + { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }, + ) diff --git a/frigate/genai/utils.py b/frigate/genai/utils.py new file mode 100644 index 000000000..93d4552b9 --- /dev/null +++ b/frigate/genai/utils.py @@ -0,0 +1,70 @@ +"""Shared helpers for GenAI providers and chat (OpenAI-style messages, tool call parsing).""" + +import json +import logging +from typing import Any, List, Optional + +logger = logging.getLogger(__name__) + + +def parse_tool_calls_from_message( + message: dict[str, Any], +) -> Optional[list[dict[str, Any]]]: + """ + Parse tool_calls from an OpenAI-style message dict. + + Message may have "tool_calls" as a list of: + {"id": str, "function": {"name": str, "arguments": str}, ...} + + Returns a list of {"id", "name", "arguments"} with arguments parsed as dict, + or None if no tool_calls. Used by Ollama and LlamaCpp (non-stream) responses. + """ + raw = message.get("tool_calls") + if not raw or not isinstance(raw, list): + return None + result = [] + for tool_call in raw: + function_data = tool_call.get("function") or {} + try: + arguments_str = function_data.get("arguments") or "{}" + arguments = json.loads(arguments_str) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning( + "Failed to parse tool call arguments: %s, tool: %s", + e, + function_data.get("name", "unknown"), + ) + arguments = {} + result.append( + { + "id": tool_call.get("id", ""), + "name": function_data.get("name", ""), + "arguments": arguments, + } + ) + return result if result else None + + +def build_assistant_message_for_conversation( + content: Any, + tool_calls_raw: Optional[List[dict[str, Any]]], +) -> dict[str, Any]: + """ + Build the assistant message dict in OpenAI format for appending to a conversation. + + tool_calls_raw: list of {"id", "name", "arguments"} (arguments as dict), or None. + """ + msg: dict[str, Any] = {"role": "assistant", "content": content} + if tool_calls_raw: + msg["tool_calls"] = [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc.get("arguments") or {}), + }, + } + for tc in tool_calls_raw + ] + return msg diff --git a/web/src/pages/Chat.tsx b/web/src/pages/Chat.tsx index c562a982d..37fdd6450 100644 --- a/web/src/pages/Chat.tsx +++ b/web/src/pages/Chat.tsx @@ -62,6 +62,7 @@ export default function ChatPage() { setMessages((prev) => [...prev, assistantMessage]); let buffer = ""; + let hadStreamError = false; for (;;) { const { done, value } = await reader.read(); if (done) break; @@ -81,6 +82,14 @@ export default function ChatPage() { } catch { continue; } + if (data.type === "error" && "error" in data) { + setError((data as { error?: string }).error ?? t("error")); + setMessages((prev) => + prev.filter((m) => !(m.role === "assistant" && m.content === "")), + ); + hadStreamError = true; + break; + } if (data.type === "tool_calls" && data.tool_calls?.length) { setMessages((prev) => { const next = [...prev]; @@ -105,8 +114,11 @@ export default function ChatPage() { }); } } + if (hadStreamError) break; } - if (buffer.trim()) { + if (hadStreamError) { + // already set error and cleaned up + } else if (buffer.trim()) { try { const data = JSON.parse(buffer.trim()) as { type: string; @@ -130,13 +142,15 @@ export default function ChatPage() { } } - setMessages((prev) => { - const next = [...prev]; - const last = next[next.length - 1]; - if (last?.role === "assistant" && last.content === "") - next[next.length - 1] = { ...last, content: " " }; - return next; - }); + if (!hadStreamError) { + setMessages((prev) => { + const next = [...prev]; + const last = next[next.length - 1]; + if (last?.role === "assistant" && last.content === "") + next[next.length - 1] = { ...last, content: " " }; + return next; + }); + } } catch { setError(t("error")); setMessages((prev) => diff --git a/web/vite.config.ts b/web/vite.config.ts index 148048995..98a9afde1 100644 --- a/web/vite.config.ts +++ b/web/vite.config.ts @@ -4,7 +4,7 @@ import { defineConfig } from "vite"; import react from "@vitejs/plugin-react-swc"; import monacoEditorPlugin from "vite-plugin-monaco-editor"; -const proxyHost = process.env.PROXY_HOST || "1ocalhost:5000"; +const proxyHost = process.env.PROXY_HOST || "192.168.50.106:5002"; // https://vitejs.dev/config/ export default defineConfig({