mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			579 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			579 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Utils for reading and writing object detection data."""
 | 
						|
 | 
						|
import datetime
 | 
						|
import logging
 | 
						|
import math
 | 
						|
from collections import defaultdict
 | 
						|
 | 
						|
import cv2
 | 
						|
import numpy as np
 | 
						|
from peewee import DoesNotExist
 | 
						|
 | 
						|
from frigate.config import DetectConfig, ModelConfig
 | 
						|
from frigate.const import (
 | 
						|
    LABEL_CONSOLIDATION_DEFAULT,
 | 
						|
    LABEL_CONSOLIDATION_MAP,
 | 
						|
    LABEL_NMS_DEFAULT,
 | 
						|
    LABEL_NMS_MAP,
 | 
						|
)
 | 
						|
from frigate.detectors.detector_config import PixelFormatEnum
 | 
						|
from frigate.models import Event, Regions, Timeline
 | 
						|
from frigate.util.image import (
 | 
						|
    area,
 | 
						|
    calculate_region,
 | 
						|
    clipped,
 | 
						|
    intersection,
 | 
						|
    intersection_over_union,
 | 
						|
    yuv_region_2_bgr,
 | 
						|
    yuv_region_2_rgb,
 | 
						|
    yuv_region_2_yuv,
 | 
						|
)
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
GRID_SIZE = 8
 | 
						|
 | 
						|
 | 
						|
def get_camera_regions_grid(
 | 
						|
    name: str,
 | 
						|
    detect: DetectConfig,
 | 
						|
    min_region_size: int,
 | 
						|
) -> list[list[dict[str, any]]]:
 | 
						|
    """Build a grid of expected region sizes for a camera."""
 | 
						|
    # get grid from db if available
 | 
						|
    try:
 | 
						|
        regions: Regions = Regions.select().where(Regions.camera == name).get()
 | 
						|
        grid = regions.grid
 | 
						|
        last_update = regions.last_update
 | 
						|
    except DoesNotExist:
 | 
						|
        grid = []
 | 
						|
        for x in range(GRID_SIZE):
 | 
						|
            row = []
 | 
						|
            for y in range(GRID_SIZE):
 | 
						|
                row.append({"sizes": []})
 | 
						|
            grid.append(row)
 | 
						|
        last_update = 0
 | 
						|
 | 
						|
    # get events for timeline entries
 | 
						|
    events = (
 | 
						|
        Event.select(Event.id)
 | 
						|
        .where(Event.camera == name)
 | 
						|
        .where((Event.false_positive == None) | (Event.false_positive == False))
 | 
						|
        .where(Event.start_time > last_update)
 | 
						|
    )
 | 
						|
    valid_event_ids = [e["id"] for e in events.dicts()]
 | 
						|
    logger.debug(f"Found {len(valid_event_ids)} new events for {name}")
 | 
						|
 | 
						|
    # no new events, return as is
 | 
						|
    if not valid_event_ids:
 | 
						|
        return grid
 | 
						|
 | 
						|
    new_update = datetime.datetime.now().timestamp()
 | 
						|
    timeline = (
 | 
						|
        Timeline.select(
 | 
						|
            *[
 | 
						|
                Timeline.camera,
 | 
						|
                Timeline.source,
 | 
						|
                Timeline.data,
 | 
						|
            ]
 | 
						|
        )
 | 
						|
        .where(Timeline.source_id << valid_event_ids)
 | 
						|
        .limit(10000)
 | 
						|
        .dicts()
 | 
						|
    )
 | 
						|
 | 
						|
    logger.debug(f"Found {len(timeline)} new entries for {name}")
 | 
						|
 | 
						|
    width = detect.width
 | 
						|
    height = detect.height
 | 
						|
 | 
						|
    for t in timeline:
 | 
						|
        if t.get("source") != "tracked_object":
 | 
						|
            continue
 | 
						|
 | 
						|
        box = t["data"]["box"]
 | 
						|
 | 
						|
        # calculate centroid position
 | 
						|
        x = box[0] + (box[2] / 2)
 | 
						|
        y = box[1] + (box[3] / 2)
 | 
						|
 | 
						|
        x_pos = int(x * GRID_SIZE)
 | 
						|
        y_pos = int(y * GRID_SIZE)
 | 
						|
 | 
						|
        calculated_region = calculate_region(
 | 
						|
            (height, width),
 | 
						|
            box[0] * width,
 | 
						|
            box[1] * height,
 | 
						|
            (box[0] + box[2]) * width,
 | 
						|
            (box[1] + box[3]) * height,
 | 
						|
            min_region_size,
 | 
						|
            1.35,
 | 
						|
        )
 | 
						|
        # save width of region to grid as relative
 | 
						|
        grid[x_pos][y_pos]["sizes"].append(
 | 
						|
            (calculated_region[2] - calculated_region[0]) / width
 | 
						|
        )
 | 
						|
 | 
						|
    for x in range(GRID_SIZE):
 | 
						|
        for y in range(GRID_SIZE):
 | 
						|
            cell = grid[x][y]
 | 
						|
 | 
						|
            if len(cell["sizes"]) == 0:
 | 
						|
                continue
 | 
						|
 | 
						|
            std_dev = np.std(cell["sizes"])
 | 
						|
            mean = np.mean(cell["sizes"])
 | 
						|
            logger.debug(f"std dev: {std_dev} mean: {mean}")
 | 
						|
            cell["x"] = x
 | 
						|
            cell["y"] = y
 | 
						|
            cell["std_dev"] = std_dev
 | 
						|
            cell["mean"] = mean
 | 
						|
 | 
						|
    # update db with new grid
 | 
						|
    region = {
 | 
						|
        Regions.camera: name,
 | 
						|
        Regions.grid: grid,
 | 
						|
        Regions.last_update: new_update,
 | 
						|
    }
 | 
						|
    (
 | 
						|
        Regions.insert(region)
 | 
						|
        .on_conflict(
 | 
						|
            conflict_target=[Regions.camera],
 | 
						|
            update=region,
 | 
						|
        )
 | 
						|
        .execute()
 | 
						|
    )
 | 
						|
 | 
						|
    return grid
 | 
						|
 | 
						|
 | 
						|
def get_cluster_region_from_grid(frame_shape, min_region, cluster, boxes, region_grid):
 | 
						|
    min_x = frame_shape[1]
 | 
						|
    min_y = frame_shape[0]
 | 
						|
    max_x = 0
 | 
						|
    max_y = 0
 | 
						|
    for b in cluster:
 | 
						|
        min_x = min(boxes[b][0], min_x)
 | 
						|
        min_y = min(boxes[b][1], min_y)
 | 
						|
        max_x = max(boxes[b][2], max_x)
 | 
						|
        max_y = max(boxes[b][3], max_y)
 | 
						|
    return get_region_from_grid(
 | 
						|
        frame_shape, [min_x, min_y, max_x, max_y], min_region, region_grid
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def get_region_from_grid(
 | 
						|
    frame_shape: tuple[int],
 | 
						|
    cluster: list[int],
 | 
						|
    min_region: int,
 | 
						|
    region_grid: list[list[dict[str, any]]],
 | 
						|
) -> list[int]:
 | 
						|
    """Get a region for a box based on the region grid."""
 | 
						|
    box = calculate_region(
 | 
						|
        frame_shape, cluster[0], cluster[1], cluster[2], cluster[3], min_region
 | 
						|
    )
 | 
						|
    centroid = (
 | 
						|
        box[0] + (min(frame_shape[1], box[2]) - box[0]) / 2,
 | 
						|
        box[1] + (min(frame_shape[0], box[3]) - box[1]) / 2,
 | 
						|
    )
 | 
						|
    grid_x = int(centroid[0] / frame_shape[1] * GRID_SIZE)
 | 
						|
    grid_y = int(centroid[1] / frame_shape[0] * GRID_SIZE)
 | 
						|
 | 
						|
    cell = region_grid[grid_x][grid_y]
 | 
						|
 | 
						|
    # if there is no known data, use original region calculation
 | 
						|
    if not cell or not cell["sizes"]:
 | 
						|
        return box
 | 
						|
 | 
						|
    # convert the calculated region size to relative
 | 
						|
    calc_size = (box[2] - box[0]) / frame_shape[1]
 | 
						|
 | 
						|
    # if region is within expected size, don't resize
 | 
						|
    if (
 | 
						|
        (cell["mean"] - cell["std_dev"])
 | 
						|
        <= calc_size
 | 
						|
        <= (cell["mean"] + cell["std_dev"])
 | 
						|
    ):
 | 
						|
        return box
 | 
						|
    # TODO not sure how to handle case where cluster is larger than expected region
 | 
						|
    elif calc_size > (cell["mean"] + cell["std_dev"]):
 | 
						|
        return box
 | 
						|
 | 
						|
    size = cell["mean"] * frame_shape[1]
 | 
						|
 | 
						|
    # get region based on grid size
 | 
						|
    return calculate_region(
 | 
						|
        frame_shape,
 | 
						|
        max(0, centroid[0] - size / 2),
 | 
						|
        max(0, centroid[1] - size / 2),
 | 
						|
        min(frame_shape[1], centroid[0] + size / 2),
 | 
						|
        min(frame_shape[0], centroid[1] + size / 2),
 | 
						|
        min_region,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def is_object_filtered(obj, objects_to_track, object_filters):
 | 
						|
    object_name = obj[0]
 | 
						|
    object_score = obj[1]
 | 
						|
    object_box = obj[2]
 | 
						|
    object_area = obj[3]
 | 
						|
    object_ratio = obj[4]
 | 
						|
 | 
						|
    if object_name not in objects_to_track:
 | 
						|
        return True
 | 
						|
 | 
						|
    if object_name in object_filters:
 | 
						|
        obj_settings = object_filters[object_name]
 | 
						|
 | 
						|
        # if the min area is larger than the
 | 
						|
        # detected object, don't add it to detected objects
 | 
						|
        if obj_settings.min_area > object_area:
 | 
						|
            return True
 | 
						|
 | 
						|
        # if the detected object is larger than the
 | 
						|
        # max area, don't add it to detected objects
 | 
						|
        if obj_settings.max_area < object_area:
 | 
						|
            return True
 | 
						|
 | 
						|
        # if the score is lower than the min_score, skip
 | 
						|
        if obj_settings.min_score > object_score:
 | 
						|
            return True
 | 
						|
 | 
						|
        # if the object is not proportionally wide enough
 | 
						|
        if obj_settings.min_ratio > object_ratio:
 | 
						|
            return True
 | 
						|
 | 
						|
        # if the object is proportionally too wide
 | 
						|
        if obj_settings.max_ratio < object_ratio:
 | 
						|
            return True
 | 
						|
 | 
						|
        if obj_settings.mask is not None:
 | 
						|
            # compute the coordinates of the object and make sure
 | 
						|
            # the location isn't outside the bounds of the image (can happen from rounding)
 | 
						|
            object_xmin = object_box[0]
 | 
						|
            object_xmax = object_box[2]
 | 
						|
            object_ymax = object_box[3]
 | 
						|
            y_location = min(int(object_ymax), len(obj_settings.mask) - 1)
 | 
						|
            x_location = min(
 | 
						|
                int((object_xmax + object_xmin) / 2.0),
 | 
						|
                len(obj_settings.mask[0]) - 1,
 | 
						|
            )
 | 
						|
 | 
						|
            # if the object is in a masked location, don't add it to detected objects
 | 
						|
            if obj_settings.mask[y_location][x_location] == 0:
 | 
						|
                return True
 | 
						|
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def get_min_region_size(model_config: ModelConfig) -> int:
 | 
						|
    """Get the min region size."""
 | 
						|
    return max(model_config.height, model_config.width)
 | 
						|
 | 
						|
 | 
						|
def create_tensor_input(frame, model_config: ModelConfig, region):
 | 
						|
    if model_config.input_pixel_format == PixelFormatEnum.rgb:
 | 
						|
        cropped_frame = yuv_region_2_rgb(frame, region)
 | 
						|
    elif model_config.input_pixel_format == PixelFormatEnum.bgr:
 | 
						|
        cropped_frame = yuv_region_2_bgr(frame, region)
 | 
						|
    else:
 | 
						|
        cropped_frame = yuv_region_2_yuv(frame, region)
 | 
						|
 | 
						|
    # Resize if needed
 | 
						|
    if cropped_frame.shape != (model_config.height, model_config.width, 3):
 | 
						|
        cropped_frame = cv2.resize(
 | 
						|
            cropped_frame,
 | 
						|
            dsize=(model_config.width, model_config.height),
 | 
						|
            interpolation=cv2.INTER_LINEAR,
 | 
						|
        )
 | 
						|
 | 
						|
    # Expand dimensions since the model expects images to have shape: [1, height, width, 3]
 | 
						|
    return np.expand_dims(cropped_frame, axis=0)
 | 
						|
 | 
						|
 | 
						|
def box_overlaps(b1, b2):
 | 
						|
    if b1[2] < b2[0] or b1[0] > b2[2] or b1[1] > b2[3] or b1[3] < b2[1]:
 | 
						|
        return False
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def box_inside(b1, b2):
 | 
						|
    # check if b2 is inside b1
 | 
						|
    if b2[0] >= b1[0] and b2[1] >= b1[1] and b2[2] <= b1[2] and b2[3] <= b1[3]:
 | 
						|
        return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def reduce_boxes(boxes, iou_threshold=0.0):
 | 
						|
    clusters = []
 | 
						|
 | 
						|
    for box in boxes:
 | 
						|
        matched = 0
 | 
						|
        for cluster in clusters:
 | 
						|
            if intersection_over_union(box, cluster) > iou_threshold:
 | 
						|
                matched = 1
 | 
						|
                cluster[0] = min(cluster[0], box[0])
 | 
						|
                cluster[1] = min(cluster[1], box[1])
 | 
						|
                cluster[2] = max(cluster[2], box[2])
 | 
						|
                cluster[3] = max(cluster[3], box[3])
 | 
						|
 | 
						|
        if not matched:
 | 
						|
            clusters.append(list(box))
 | 
						|
 | 
						|
    return [tuple(c) for c in clusters]
 | 
						|
 | 
						|
 | 
						|
def average_boxes(boxes: list[list[int, int, int, int]]) -> list[int, int, int, int]:
 | 
						|
    """Return a box that is the average of a list of boxes."""
 | 
						|
    x_mins = []
 | 
						|
    y_mins = []
 | 
						|
    x_max = []
 | 
						|
    y_max = []
 | 
						|
 | 
						|
    for box in boxes:
 | 
						|
        x_mins.append(box[0])
 | 
						|
        y_mins.append(box[1])
 | 
						|
        x_max.append(box[2])
 | 
						|
        y_max.append(box[3])
 | 
						|
 | 
						|
    return [np.mean(x_mins), np.mean(y_mins), np.mean(x_max), np.mean(y_max)]
 | 
						|
 | 
						|
 | 
						|
def median_of_boxes(boxes: list[list[int, int, int, int]]) -> list[int, int, int, int]:
 | 
						|
    """Return a box that is the median of a list of boxes."""
 | 
						|
    sorted_boxes = sorted(boxes, key=lambda x: area(x))
 | 
						|
    return sorted_boxes[int(len(sorted_boxes) / 2.0)]
 | 
						|
 | 
						|
 | 
						|
def intersects_any(box_a, boxes):
 | 
						|
    for box in boxes:
 | 
						|
        if box_overlaps(box_a, box):
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def inside_any(box_a, boxes):
 | 
						|
    for box in boxes:
 | 
						|
        # check if box_a is inside of box
 | 
						|
        if box_inside(box, box_a):
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def get_cluster_boundary(box, min_region):
 | 
						|
    # compute the max region size for the current box (box is 10% of region)
 | 
						|
    box_width = box[2] - box[0]
 | 
						|
    box_height = box[3] - box[1]
 | 
						|
    max_region_area = abs(box_width * box_height) / 0.1
 | 
						|
    max_region_size = max(min_region, int(math.sqrt(max_region_area)))
 | 
						|
 | 
						|
    centroid = (box_width / 2 + box[0], box_height / 2 + box[1])
 | 
						|
 | 
						|
    max_x_dist = int(max_region_size - box_width / 2 * 1.1)
 | 
						|
    max_y_dist = int(max_region_size - box_height / 2 * 1.1)
 | 
						|
 | 
						|
    return [
 | 
						|
        int(centroid[0] - max_x_dist),
 | 
						|
        int(centroid[1] - max_y_dist),
 | 
						|
        int(centroid[0] + max_x_dist),
 | 
						|
        int(centroid[1] + max_y_dist),
 | 
						|
    ]
 | 
						|
 | 
						|
 | 
						|
def get_cluster_candidates(frame_shape, min_region, boxes):
 | 
						|
    # and create a cluster of other boxes using it's max region size
 | 
						|
    # only include boxes where the region is an appropriate(except the region could possibly be smaller?)
 | 
						|
    # size in the cluster. in order to be in the cluster, the furthest corner needs to be within x,y offset
 | 
						|
    # determined by the max_region size minus half the box + 20%
 | 
						|
    # TODO: see if we can do this with numpy
 | 
						|
    cluster_candidates = []
 | 
						|
    used_boxes = []
 | 
						|
    # loop over each box
 | 
						|
    for current_index, b in enumerate(boxes):
 | 
						|
        if current_index in used_boxes:
 | 
						|
            continue
 | 
						|
        cluster = [current_index]
 | 
						|
        used_boxes.append(current_index)
 | 
						|
        cluster_boundary = get_cluster_boundary(b, min_region)
 | 
						|
        # find all other boxes that fit inside the boundary
 | 
						|
        for compare_index, compare_box in enumerate(boxes):
 | 
						|
            if compare_index in used_boxes:
 | 
						|
                continue
 | 
						|
 | 
						|
            # if the box is not inside the potential cluster area, cluster them
 | 
						|
            if not box_inside(cluster_boundary, compare_box):
 | 
						|
                continue
 | 
						|
 | 
						|
            # get the region if you were to add this box to the cluster
 | 
						|
            potential_cluster = cluster + [compare_index]
 | 
						|
            cluster_region = get_cluster_region(
 | 
						|
                frame_shape, min_region, potential_cluster, boxes
 | 
						|
            )
 | 
						|
            # if region could be smaller and either box would be too small
 | 
						|
            # for the resulting region, dont cluster
 | 
						|
            should_cluster = True
 | 
						|
            if (cluster_region[2] - cluster_region[0]) > min_region:
 | 
						|
                for b in potential_cluster:
 | 
						|
                    box = boxes[b]
 | 
						|
                    # boxes should be more than 5% of the area of the region
 | 
						|
                    if area(box) / area(cluster_region) < 0.05:
 | 
						|
                        should_cluster = False
 | 
						|
                        break
 | 
						|
 | 
						|
            if should_cluster:
 | 
						|
                cluster.append(compare_index)
 | 
						|
                used_boxes.append(compare_index)
 | 
						|
        cluster_candidates.append(cluster)
 | 
						|
 | 
						|
    # return the unique clusters only
 | 
						|
    unique = {tuple(sorted(c)) for c in cluster_candidates}
 | 
						|
    return [list(tup) for tup in unique]
 | 
						|
 | 
						|
 | 
						|
def get_cluster_region(frame_shape, min_region, cluster, boxes):
 | 
						|
    min_x = frame_shape[1]
 | 
						|
    min_y = frame_shape[0]
 | 
						|
    max_x = 0
 | 
						|
    max_y = 0
 | 
						|
    for b in cluster:
 | 
						|
        min_x = min(boxes[b][0], min_x)
 | 
						|
        min_y = min(boxes[b][1], min_y)
 | 
						|
        max_x = max(boxes[b][2], max_x)
 | 
						|
        max_y = max(boxes[b][3], max_y)
 | 
						|
    return calculate_region(
 | 
						|
        frame_shape, min_x, min_y, max_x, max_y, min_region, multiplier=1.35
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def get_startup_regions(
 | 
						|
    frame_shape: tuple[int],
 | 
						|
    region_min_size: int,
 | 
						|
    region_grid: list[list[dict[str, any]]],
 | 
						|
) -> list[list[int]]:
 | 
						|
    """Get a list of regions to run on startup."""
 | 
						|
    # return 8 most popular regions for the camera
 | 
						|
    all_cells = np.concatenate(region_grid).flat
 | 
						|
    startup_cells = sorted(all_cells, key=lambda c: len(c["sizes"]), reverse=True)[0:8]
 | 
						|
    regions = []
 | 
						|
 | 
						|
    for cell in startup_cells:
 | 
						|
        # rest of the cells are empty
 | 
						|
        if not cell["sizes"]:
 | 
						|
            break
 | 
						|
 | 
						|
        x = frame_shape[1] / GRID_SIZE * (0.5 + cell["x"])
 | 
						|
        y = frame_shape[0] / GRID_SIZE * (0.5 + cell["y"])
 | 
						|
        size = cell["mean"] * frame_shape[1]
 | 
						|
        regions.append(
 | 
						|
            calculate_region(
 | 
						|
                frame_shape,
 | 
						|
                x - size / 2,
 | 
						|
                y - size / 2,
 | 
						|
                x + size / 2,
 | 
						|
                y + size / 2,
 | 
						|
                region_min_size,
 | 
						|
                multiplier=1,
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    return regions
 | 
						|
 | 
						|
 | 
						|
def reduce_detections(
 | 
						|
    frame_shape: tuple[int],
 | 
						|
    all_detections: list[tuple[any]],
 | 
						|
) -> list[tuple[any]]:
 | 
						|
    """Take a list of detections and reduce overlaps to create a list of confident detections."""
 | 
						|
 | 
						|
    def reduce_overlapping_detections(detections: list[tuple[any]]) -> list[tuple[any]]:
 | 
						|
        """apply non-maxima suppression to suppress weak, overlapping bounding boxes."""
 | 
						|
        detected_object_groups = defaultdict(lambda: [])
 | 
						|
        for detection in detections:
 | 
						|
            detected_object_groups[detection[0]].append(detection)
 | 
						|
 | 
						|
        selected_objects = []
 | 
						|
        for group in detected_object_groups.values():
 | 
						|
            label = group[0][0]
 | 
						|
            # o[2] is the box of the object: xmin, ymin, xmax, ymax
 | 
						|
            # apply max/min to ensure values do not exceed the known frame size
 | 
						|
            boxes = [
 | 
						|
                (
 | 
						|
                    o[2][0],
 | 
						|
                    o[2][1],
 | 
						|
                    o[2][2] - o[2][0],
 | 
						|
                    o[2][3] - o[2][1],
 | 
						|
                )
 | 
						|
                for o in group
 | 
						|
            ]
 | 
						|
 | 
						|
            # reduce confidences for objects that are on edge of region
 | 
						|
            # 0.6 should be used to ensure that the object is still considered and not dropped
 | 
						|
            # due to min score requirement of NMSBoxes
 | 
						|
            confidences = [0.6 if clipped(o, frame_shape) else o[1] for o in group]
 | 
						|
 | 
						|
            indices = cv2.dnn.NMSBoxes(
 | 
						|
                boxes, confidences, 0.5, LABEL_NMS_MAP.get(label, LABEL_NMS_DEFAULT)
 | 
						|
            )
 | 
						|
 | 
						|
            # add objects
 | 
						|
            for index in indices:
 | 
						|
                index = index if isinstance(index, np.int32) else index[0]
 | 
						|
                obj = group[index]
 | 
						|
                selected_objects.append(obj)
 | 
						|
 | 
						|
        # set the detections list to only include top objects
 | 
						|
        return selected_objects
 | 
						|
 | 
						|
    def get_consolidated_object_detections(detections: list[tuple[any]]):
 | 
						|
        """Drop detections that overlap too much."""
 | 
						|
        detected_object_groups = defaultdict(lambda: [])
 | 
						|
        for detection in detections:
 | 
						|
            detected_object_groups[detection[0]].append(detection)
 | 
						|
 | 
						|
        consolidated_detections = []
 | 
						|
        for group in detected_object_groups.values():
 | 
						|
            # if the group only has 1 item, skip
 | 
						|
            if len(group) == 1:
 | 
						|
                consolidated_detections.append(group[0])
 | 
						|
                continue
 | 
						|
 | 
						|
            # sort smallest to largest by area
 | 
						|
            sorted_by_area = sorted(group, key=lambda g: g[3])
 | 
						|
 | 
						|
            for current_detection_idx in range(0, len(sorted_by_area)):
 | 
						|
                current_detection = sorted_by_area[current_detection_idx]
 | 
						|
                current_label = current_detection[0]
 | 
						|
                current_box = current_detection[2]
 | 
						|
                overlap = 0
 | 
						|
                for to_check_idx in range(
 | 
						|
                    min(current_detection_idx + 1, len(sorted_by_area)),
 | 
						|
                    len(sorted_by_area),
 | 
						|
                ):
 | 
						|
                    to_check = sorted_by_area[to_check_idx][2]
 | 
						|
 | 
						|
                    # if area of current detection / area of check < 5% they should not be compared
 | 
						|
                    # this covers cases where a large car parked in a driveway doesn't block detections
 | 
						|
                    # of cars in the street behind it
 | 
						|
                    if area(current_box) / area(to_check) < 0.05:
 | 
						|
                        continue
 | 
						|
 | 
						|
                    intersect_box = intersection(current_box, to_check)
 | 
						|
                    # if % of smaller detection is inside of another detection, consolidate
 | 
						|
                    if intersect_box is not None and area(intersect_box) / area(
 | 
						|
                        current_box
 | 
						|
                    ) > LABEL_CONSOLIDATION_MAP.get(
 | 
						|
                        current_label, LABEL_CONSOLIDATION_DEFAULT
 | 
						|
                    ):
 | 
						|
                        overlap = 1
 | 
						|
                        break
 | 
						|
                if overlap == 0:
 | 
						|
                    consolidated_detections.append(
 | 
						|
                        sorted_by_area[current_detection_idx]
 | 
						|
                    )
 | 
						|
 | 
						|
        return consolidated_detections
 | 
						|
 | 
						|
    return get_consolidated_object_detections(
 | 
						|
        reduce_overlapping_detections(all_detections)
 | 
						|
    )
 |