mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-26 19:06:11 +01:00
e8763b3697
* Ignore entire __pycache__ folder instead of individual *.pyc files * Ignore .mypy_cache in git * Rework config YAML parsing to use only ruamel.yaml PyYAML silently overrides keys when encountering duplicates, but ruamel raises and exception by default. Since we're already using it elsewhere, dropping PyYAML is an easy choice to make. * Added EnvString in config to slim down runtime_config() * Added gitlens to devcontainer * Automatically call FrigateConfig.runtime_config() runtime_config needed to be called manually before. Now, it's been removed, but the same code is run by a pydantic validator. * Fix handling of missing -segment_time * Removed type annotation on FrigateConfig's parse I'd like to keep them, but then mypy complains about some fundamental errors with how the pydantic model is structured. I'd like to fix it, but I'd rather work towards moving some of this config to the database.
742 lines
21 KiB
Python
742 lines
21 KiB
Python
"""Main api runner."""
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
import os
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from functools import reduce
|
|
from typing import Optional
|
|
|
|
import requests
|
|
from flask import Blueprint, Flask, current_app, jsonify, make_response, request
|
|
from markupsafe import escape
|
|
from peewee import operator
|
|
from playhouse.sqliteq import SqliteQueueDatabase
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
|
|
from frigate.api.auth import AuthBp, get_jwt_secret, limiter
|
|
from frigate.api.event import EventBp
|
|
from frigate.api.export import ExportBp
|
|
from frigate.api.media import MediaBp
|
|
from frigate.api.notification import NotificationBp
|
|
from frigate.api.preview import PreviewBp
|
|
from frigate.api.review import ReviewBp
|
|
from frigate.config import FrigateConfig
|
|
from frigate.const import CONFIG_DIR
|
|
from frigate.embeddings import EmbeddingsContext
|
|
from frigate.events.external import ExternalEventProcessor
|
|
from frigate.models import Event, Timeline
|
|
from frigate.plus import PlusApi
|
|
from frigate.ptz.onvif import OnvifController
|
|
from frigate.stats.emitter import StatsEmitter
|
|
from frigate.storage import StorageMaintainer
|
|
from frigate.util.builtin import (
|
|
clean_camera_user_pass,
|
|
get_tz_modifiers,
|
|
update_yaml_from_url,
|
|
)
|
|
from frigate.util.services import ffprobe_stream, restart_frigate, vainfo_hwaccel
|
|
from frigate.version import VERSION
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
bp = Blueprint("frigate", __name__)
|
|
bp.register_blueprint(EventBp)
|
|
bp.register_blueprint(ExportBp)
|
|
bp.register_blueprint(MediaBp)
|
|
bp.register_blueprint(PreviewBp)
|
|
bp.register_blueprint(ReviewBp)
|
|
bp.register_blueprint(AuthBp)
|
|
bp.register_blueprint(NotificationBp)
|
|
|
|
|
|
def create_app(
|
|
frigate_config,
|
|
database: SqliteQueueDatabase,
|
|
embeddings: Optional[EmbeddingsContext],
|
|
detected_frames_processor,
|
|
storage_maintainer: StorageMaintainer,
|
|
onvif: OnvifController,
|
|
external_processor: ExternalEventProcessor,
|
|
plus_api: PlusApi,
|
|
stats_emitter: StatsEmitter,
|
|
):
|
|
app = Flask(__name__)
|
|
|
|
@app.before_request
|
|
def check_csrf():
|
|
if request.method in ["GET", "HEAD", "OPTIONS", "TRACE"]:
|
|
pass
|
|
if "origin" in request.headers and "x-csrf-token" not in request.headers:
|
|
return jsonify({"success": False, "message": "Missing CSRF header"}), 401
|
|
|
|
@app.before_request
|
|
def _db_connect():
|
|
if database.is_closed():
|
|
database.connect()
|
|
|
|
@app.teardown_request
|
|
def _db_close(exc):
|
|
if not database.is_closed():
|
|
database.close()
|
|
|
|
app.frigate_config = frigate_config
|
|
app.embeddings = embeddings
|
|
app.detected_frames_processor = detected_frames_processor
|
|
app.storage_maintainer = storage_maintainer
|
|
app.onvif = onvif
|
|
app.external_processor = external_processor
|
|
app.plus_api = plus_api
|
|
app.camera_error_image = None
|
|
app.stats_emitter = stats_emitter
|
|
app.jwt_token = get_jwt_secret() if frigate_config.auth.enabled else None
|
|
# update the request_address with the x-forwarded-for header from nginx
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
|
|
# initialize the rate limiter for the login endpoint
|
|
limiter.init_app(app)
|
|
if frigate_config.auth.failed_login_rate_limit is None:
|
|
limiter.enabled = False
|
|
|
|
app.register_blueprint(bp)
|
|
|
|
return app
|
|
|
|
|
|
@bp.route("/")
|
|
def is_healthy():
|
|
return "Frigate is running. Alive and healthy!"
|
|
|
|
|
|
@bp.route("/config/schema.json")
|
|
def config_schema():
|
|
return current_app.response_class(
|
|
current_app.frigate_config.schema_json(), mimetype="application/json"
|
|
)
|
|
|
|
|
|
@bp.route("/go2rtc/streams")
|
|
def go2rtc_streams():
|
|
r = requests.get("http://127.0.0.1:1984/api/streams")
|
|
if not r.ok:
|
|
logger.error("Failed to fetch streams from go2rtc")
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Error fetching stream data"}),
|
|
500,
|
|
)
|
|
stream_data = r.json()
|
|
for data in stream_data.values():
|
|
for producer in data.get("producers", []):
|
|
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
|
|
return jsonify(stream_data)
|
|
|
|
|
|
@bp.route("/go2rtc/streams/<camera_name>")
|
|
def go2rtc_camera_stream(camera_name: str):
|
|
r = requests.get(
|
|
f"http://127.0.0.1:1984/api/streams?src={camera_name}&video=all&audio=allµphone"
|
|
)
|
|
if not r.ok:
|
|
logger.error("Failed to fetch streams from go2rtc")
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Error fetching stream data"}),
|
|
500,
|
|
)
|
|
stream_data = r.json()
|
|
for producer in stream_data.get("producers", []):
|
|
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
|
|
return jsonify(stream_data)
|
|
|
|
|
|
@bp.route("/version")
|
|
def version():
|
|
return VERSION
|
|
|
|
|
|
@bp.route("/stats")
|
|
def stats():
|
|
return jsonify(current_app.stats_emitter.get_latest_stats())
|
|
|
|
|
|
@bp.route("/stats/history")
|
|
def stats_history():
|
|
keys = request.args.get("keys", default=None)
|
|
|
|
if keys:
|
|
keys = keys.split(",")
|
|
|
|
return jsonify(current_app.stats_emitter.get_stats_history(keys))
|
|
|
|
|
|
@bp.route("/config")
|
|
def config():
|
|
config_obj: FrigateConfig = current_app.frigate_config
|
|
config: dict[str, dict[str, any]] = config_obj.model_dump(
|
|
mode="json", warnings="none", exclude_none=True
|
|
)
|
|
|
|
# remove the mqtt password
|
|
config["mqtt"].pop("password", None)
|
|
|
|
# remove the proxy secret
|
|
config["proxy"].pop("auth_secret", None)
|
|
|
|
for camera_name, camera in current_app.frigate_config.cameras.items():
|
|
camera_dict = config["cameras"][camera_name]
|
|
|
|
# clean paths
|
|
for input in camera_dict.get("ffmpeg", {}).get("inputs", []):
|
|
input["path"] = clean_camera_user_pass(input["path"])
|
|
|
|
# add clean ffmpeg_cmds
|
|
camera_dict["ffmpeg_cmds"] = copy.deepcopy(camera.ffmpeg_cmds)
|
|
for cmd in camera_dict["ffmpeg_cmds"]:
|
|
cmd["cmd"] = clean_camera_user_pass(" ".join(cmd["cmd"]))
|
|
|
|
# ensure that zones are relative
|
|
for zone_name, zone in config_obj.cameras[camera_name].zones.items():
|
|
camera_dict["zones"][zone_name]["color"] = zone.color
|
|
|
|
config["plus"] = {"enabled": current_app.plus_api.is_active()}
|
|
config["model"]["colormap"] = config_obj.model.colormap
|
|
|
|
for detector_config in config["detectors"].values():
|
|
detector_config["model"]["labelmap"] = (
|
|
current_app.frigate_config.model.merged_labelmap
|
|
)
|
|
|
|
return jsonify(config)
|
|
|
|
|
|
@bp.route("/config/raw")
|
|
def config_raw():
|
|
config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
|
|
|
|
# Check if we can use .yaml instead of .yml
|
|
config_file_yaml = config_file.replace(".yml", ".yaml")
|
|
|
|
if os.path.isfile(config_file_yaml):
|
|
config_file = config_file_yaml
|
|
|
|
if not os.path.isfile(config_file):
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Could not find file"}), 404
|
|
)
|
|
|
|
with open(config_file, "r") as f:
|
|
raw_config = f.read()
|
|
f.close()
|
|
|
|
return raw_config, 200
|
|
|
|
|
|
@bp.route("/config/save", methods=["POST"])
|
|
def config_save():
|
|
save_option = request.args.get("save_option")
|
|
|
|
new_config = request.get_data().decode()
|
|
|
|
if not new_config:
|
|
return make_response(
|
|
jsonify(
|
|
{"success": False, "message": "Config with body param is required"}
|
|
),
|
|
400,
|
|
)
|
|
|
|
# Validate the config schema
|
|
try:
|
|
FrigateConfig.parse_yaml(new_config)
|
|
except Exception:
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": f"\nConfig Error:\n\n{escape(str(traceback.format_exc()))}",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
# Save the config to file
|
|
try:
|
|
config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
|
|
|
|
# Check if we can use .yaml instead of .yml
|
|
config_file_yaml = config_file.replace(".yml", ".yaml")
|
|
|
|
if os.path.isfile(config_file_yaml):
|
|
config_file = config_file_yaml
|
|
|
|
with open(config_file, "w") as f:
|
|
f.write(new_config)
|
|
f.close()
|
|
except Exception:
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Could not write config file, be sure that Frigate has write permission on the config file.",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
if save_option == "restart":
|
|
try:
|
|
restart_frigate()
|
|
except Exception as e:
|
|
logging.error(f"Error restarting Frigate: {e}")
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": True,
|
|
"message": "Config successfully saved, unable to restart Frigate",
|
|
}
|
|
),
|
|
200,
|
|
)
|
|
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": True,
|
|
"message": "Config successfully saved, restarting (this can take up to one minute)...",
|
|
}
|
|
),
|
|
200,
|
|
)
|
|
else:
|
|
return make_response(
|
|
jsonify({"success": True, "message": "Config successfully saved."}),
|
|
200,
|
|
)
|
|
|
|
|
|
@bp.route("/config/set", methods=["PUT"])
|
|
def config_set():
|
|
config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml")
|
|
|
|
# Check if we can use .yaml instead of .yml
|
|
config_file_yaml = config_file.replace(".yml", ".yaml")
|
|
|
|
if os.path.isfile(config_file_yaml):
|
|
config_file = config_file_yaml
|
|
|
|
with open(config_file, "r") as f:
|
|
old_raw_config = f.read()
|
|
f.close()
|
|
|
|
try:
|
|
update_yaml_from_url(config_file, request.url)
|
|
with open(config_file, "r") as f:
|
|
new_raw_config = f.read()
|
|
f.close()
|
|
# Validate the config schema
|
|
try:
|
|
config_obj = FrigateConfig.parse_yaml(new_raw_config)
|
|
except Exception:
|
|
with open(config_file, "w") as f:
|
|
f.write(old_raw_config)
|
|
f.close()
|
|
logger.error(f"\nConfig Error:\n\n{str(traceback.format_exc())}")
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Error parsing config. Check logs for error message.",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Error updating config: {e}")
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Error updating config"}),
|
|
500,
|
|
)
|
|
|
|
json = request.get_json(silent=True) or {}
|
|
|
|
if json.get("requires_restart", 1) == 0:
|
|
current_app.frigate_config = FrigateConfig.parse_object(
|
|
config_obj, plus_api=current_app.plus_api
|
|
)
|
|
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": True,
|
|
"message": "Config successfully updated, restart to apply",
|
|
}
|
|
),
|
|
200,
|
|
)
|
|
|
|
|
|
@bp.route("/ffprobe", methods=["GET"])
|
|
def ffprobe():
|
|
path_param = request.args.get("paths", "")
|
|
|
|
if not path_param:
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Path needs to be provided."}), 404
|
|
)
|
|
|
|
if path_param.startswith("camera"):
|
|
camera = path_param[7:]
|
|
|
|
if camera not in current_app.frigate_config.cameras.keys():
|
|
return make_response(
|
|
jsonify(
|
|
{"success": False, "message": f"{camera} is not a valid camera."}
|
|
),
|
|
404,
|
|
)
|
|
|
|
if not current_app.frigate_config.cameras[camera].enabled:
|
|
return make_response(
|
|
jsonify({"success": False, "message": f"{camera} is not enabled."}), 404
|
|
)
|
|
|
|
paths = map(
|
|
lambda input: input.path,
|
|
current_app.frigate_config.cameras[camera].ffmpeg.inputs,
|
|
)
|
|
elif "," in clean_camera_user_pass(path_param):
|
|
paths = path_param.split(",")
|
|
else:
|
|
paths = [path_param]
|
|
|
|
# user has multiple streams
|
|
output = []
|
|
|
|
for path in paths:
|
|
ffprobe = ffprobe_stream(current_app.frigate_config.ffmpeg, path.strip())
|
|
output.append(
|
|
{
|
|
"return_code": ffprobe.returncode,
|
|
"stderr": (
|
|
ffprobe.stderr.decode("unicode_escape").strip()
|
|
if ffprobe.returncode != 0
|
|
else ""
|
|
),
|
|
"stdout": (
|
|
json.loads(ffprobe.stdout.decode("unicode_escape").strip())
|
|
if ffprobe.returncode == 0
|
|
else ""
|
|
),
|
|
}
|
|
)
|
|
|
|
return jsonify(output)
|
|
|
|
|
|
@bp.route("/vainfo", methods=["GET"])
|
|
def vainfo():
|
|
vainfo = vainfo_hwaccel()
|
|
return jsonify(
|
|
{
|
|
"return_code": vainfo.returncode,
|
|
"stderr": (
|
|
vainfo.stderr.decode("unicode_escape").strip()
|
|
if vainfo.returncode != 0
|
|
else ""
|
|
),
|
|
"stdout": (
|
|
vainfo.stdout.decode("unicode_escape").strip()
|
|
if vainfo.returncode == 0
|
|
else ""
|
|
),
|
|
}
|
|
)
|
|
|
|
|
|
@bp.route("/logs/<service>", methods=["GET"])
|
|
def logs(service: str):
|
|
def download_logs(service_location: str):
|
|
try:
|
|
file = open(service_location, "r")
|
|
contents = file.read()
|
|
file.close()
|
|
return jsonify(contents)
|
|
except FileNotFoundError as e:
|
|
logger.error(e)
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Could not find log file"}),
|
|
500,
|
|
)
|
|
|
|
log_locations = {
|
|
"frigate": "/dev/shm/logs/frigate/current",
|
|
"go2rtc": "/dev/shm/logs/go2rtc/current",
|
|
"nginx": "/dev/shm/logs/nginx/current",
|
|
"chroma": "/dev/shm/logs/chroma/current",
|
|
}
|
|
service_location = log_locations.get(service)
|
|
|
|
if not service_location:
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Not a valid service"}),
|
|
404,
|
|
)
|
|
|
|
if request.args.get("download", type=bool, default=False):
|
|
return download_logs(service_location)
|
|
|
|
start = request.args.get("start", type=int, default=0)
|
|
end = request.args.get("end", type=int)
|
|
|
|
try:
|
|
file = open(service_location, "r")
|
|
contents = file.read()
|
|
file.close()
|
|
|
|
# use the start timestamp to group logs together``
|
|
logLines = []
|
|
keyLength = 0
|
|
dateEnd = 0
|
|
currentKey = ""
|
|
currentLine = ""
|
|
|
|
for rawLine in contents.splitlines():
|
|
cleanLine = rawLine.strip()
|
|
|
|
if len(cleanLine) < 10:
|
|
continue
|
|
|
|
# handle cases where S6 does not include date in log line
|
|
if " " not in cleanLine:
|
|
cleanLine = f"{datetime.now()} {cleanLine}"
|
|
|
|
if dateEnd == 0:
|
|
dateEnd = cleanLine.index(" ")
|
|
keyLength = dateEnd - (6 if service_location == "frigate" else 0)
|
|
|
|
newKey = cleanLine[0:keyLength]
|
|
|
|
if newKey == currentKey:
|
|
currentLine += f"\n{cleanLine[dateEnd:].strip()}"
|
|
continue
|
|
else:
|
|
if len(currentLine) > 0:
|
|
logLines.append(currentLine)
|
|
|
|
currentKey = newKey
|
|
currentLine = cleanLine
|
|
|
|
logLines.append(currentLine)
|
|
|
|
return make_response(
|
|
jsonify({"totalLines": len(logLines), "lines": logLines[start:end]}),
|
|
200,
|
|
)
|
|
except FileNotFoundError as e:
|
|
logger.error(e)
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Could not find log file"}),
|
|
500,
|
|
)
|
|
|
|
|
|
@bp.route("/restart", methods=["POST"])
|
|
def restart():
|
|
try:
|
|
restart_frigate()
|
|
except Exception as e:
|
|
logging.error(f"Error restarting Frigate: {e}")
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Unable to restart Frigate.",
|
|
}
|
|
),
|
|
500,
|
|
)
|
|
|
|
return make_response(
|
|
jsonify(
|
|
{
|
|
"success": True,
|
|
"message": "Restarting (this can take up to one minute)...",
|
|
}
|
|
),
|
|
200,
|
|
)
|
|
|
|
|
|
@bp.route("/labels")
|
|
def get_labels():
|
|
camera = request.args.get("camera", type=str, default="")
|
|
|
|
try:
|
|
if camera:
|
|
events = Event.select(Event.label).where(Event.camera == camera).distinct()
|
|
else:
|
|
events = Event.select(Event.label).distinct()
|
|
except Exception as e:
|
|
logger.error(e)
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Failed to get labels"}), 404
|
|
)
|
|
|
|
labels = sorted([e.label for e in events])
|
|
return jsonify(labels)
|
|
|
|
|
|
@bp.route("/sub_labels")
|
|
def get_sub_labels():
|
|
split_joined = request.args.get("split_joined", type=int)
|
|
|
|
try:
|
|
events = Event.select(Event.sub_label).distinct()
|
|
except Exception:
|
|
return make_response(
|
|
jsonify({"success": False, "message": "Failed to get sub_labels"}),
|
|
404,
|
|
)
|
|
|
|
sub_labels = [e.sub_label for e in events]
|
|
|
|
if None in sub_labels:
|
|
sub_labels.remove(None)
|
|
|
|
if split_joined:
|
|
original_labels = sub_labels.copy()
|
|
|
|
for label in original_labels:
|
|
if "," in label:
|
|
sub_labels.remove(label)
|
|
parts = label.split(",")
|
|
|
|
for part in parts:
|
|
if part.strip() not in sub_labels:
|
|
sub_labels.append(part.strip())
|
|
|
|
sub_labels.sort()
|
|
return jsonify(sub_labels)
|
|
|
|
|
|
@bp.route("/timeline")
|
|
def timeline():
|
|
camera = request.args.get("camera", "all")
|
|
source_id = request.args.get("source_id", type=str)
|
|
limit = request.args.get("limit", 100)
|
|
|
|
clauses = []
|
|
|
|
selected_columns = [
|
|
Timeline.timestamp,
|
|
Timeline.camera,
|
|
Timeline.source,
|
|
Timeline.source_id,
|
|
Timeline.class_type,
|
|
Timeline.data,
|
|
]
|
|
|
|
if camera != "all":
|
|
clauses.append((Timeline.camera == camera))
|
|
|
|
if source_id:
|
|
clauses.append((Timeline.source_id == source_id))
|
|
|
|
if len(clauses) == 0:
|
|
clauses.append((True))
|
|
|
|
timeline = (
|
|
Timeline.select(*selected_columns)
|
|
.where(reduce(operator.and_, clauses))
|
|
.order_by(Timeline.timestamp.asc())
|
|
.limit(limit)
|
|
.dicts()
|
|
)
|
|
|
|
return jsonify([t for t in timeline])
|
|
|
|
|
|
@bp.route("/timeline/hourly")
|
|
def hourly_timeline():
|
|
"""Get hourly summary for timeline."""
|
|
cameras = request.args.get("cameras", "all")
|
|
labels = request.args.get("labels", "all")
|
|
before = request.args.get("before", type=float)
|
|
after = request.args.get("after", type=float)
|
|
limit = request.args.get("limit", 200)
|
|
tz_name = request.args.get("timezone", default="utc", type=str)
|
|
|
|
_, minute_modifier, _ = get_tz_modifiers(tz_name)
|
|
minute_offset = int(minute_modifier.split(" ")[0])
|
|
|
|
clauses = []
|
|
|
|
if cameras != "all":
|
|
camera_list = cameras.split(",")
|
|
clauses.append((Timeline.camera << camera_list))
|
|
|
|
if labels != "all":
|
|
label_list = labels.split(",")
|
|
clauses.append((Timeline.data["label"] << label_list))
|
|
|
|
if before:
|
|
clauses.append((Timeline.timestamp < before))
|
|
|
|
if after:
|
|
clauses.append((Timeline.timestamp > after))
|
|
|
|
if len(clauses) == 0:
|
|
clauses.append((True))
|
|
|
|
timeline = (
|
|
Timeline.select(
|
|
Timeline.camera,
|
|
Timeline.timestamp,
|
|
Timeline.data,
|
|
Timeline.class_type,
|
|
Timeline.source_id,
|
|
Timeline.source,
|
|
)
|
|
.where(reduce(operator.and_, clauses))
|
|
.order_by(Timeline.timestamp.desc())
|
|
.limit(limit)
|
|
.dicts()
|
|
.iterator()
|
|
)
|
|
|
|
count = 0
|
|
start = 0
|
|
end = 0
|
|
hours: dict[str, list[dict[str, any]]] = {}
|
|
|
|
for t in timeline:
|
|
if count == 0:
|
|
start = t["timestamp"]
|
|
else:
|
|
end = t["timestamp"]
|
|
|
|
count += 1
|
|
|
|
hour = (
|
|
datetime.fromtimestamp(t["timestamp"]).replace(
|
|
minute=0, second=0, microsecond=0
|
|
)
|
|
+ timedelta(
|
|
minutes=minute_offset,
|
|
)
|
|
).timestamp()
|
|
if hour not in hours:
|
|
hours[hour] = [t]
|
|
else:
|
|
hours[hour].insert(0, t)
|
|
|
|
return jsonify(
|
|
{
|
|
"start": start,
|
|
"end": end,
|
|
"count": count,
|
|
"hours": hours,
|
|
}
|
|
)
|