前言

在很多商业和工业场景中,“目标检测”往往只是第一步。比如在景区、商场或交通路口,仅仅知道“画面里有多少人和车”是不够的,客户通常需要更精确的数据:

  • “今天有多少人进入了商场,多少人离开了?”
  • “这个路口从左往右开过去了多少辆车?”

要实现这种统计,我们需要把目标检测(Object Detection)升级为多目标跟踪(Multi-Object Tracking, MOT),并在帧与帧之间记住每一个目标的身份(ID)。

本文将带你使用 YOLO (v8/v11) 与当前最前沿的 ByteTrack 跟踪算法,配合巧妙的几何数学计算,从零实现一个人流与车流双向越界实时计数系统


一、核心技术原理

1. 为什么选择 ByteTrack?

传统的跟踪算法(如 DEEPSORT)依赖昂贵的外观特征提取网络,不仅运行慢,而且在目标被遮挡时容易丢失 ID。

ByteTrack(发表于 ECCV 2022)的核心思想非常优雅:“不放弃低置信度的检测框”

  • 第一步(Association 1):在高置信度检测框中与已有的跟踪轨迹进行匹配。
  • 第二步(Association 2):对于那些因为遮挡、模糊导致置信度降低的低分框,再次与上一帧的未匹配轨迹进行二次匹配。

这种“变废为宝”的设计,让 ByteTrack 在小目标、遮挡和运动模糊场景下拥有极强的鲁棒性,且无需任何深度特征提取器,推理速度极快

2. 双向越界判定:向量叉乘法

我们要统计“跨过某条虚拟线段”的物体数量,如何通过代码精准判定?

如果使用简单的坐标大小对比,由于目标移动速度不同,很容易在跨线瞬间发生“漏检”或“重复计数”。最严谨、工业界最通用的方法是几何向量叉乘法

假设我们在屏幕上绘制了一条虚拟检测线段,起点为 $A(x_A, y_A)$,终点为 $B(x_B, y_B)$。
对于物体的中心点(或足部中点),我们在前一帧的坐标为 $P_{prev}$,当前帧的坐标为 $P_{curr}$。

要判定运动线段 $P_{prev}P_{curr}$ 是否穿过了检测线段 $AB$,我们需要满足两个基本条件:

  1. 快速排斥实验:两条线段的包围盒必须相交。
  2. 跨立实验:线段 $AB$ 的两个端点在线段 $P_{prev}P_{curr}$ 的两侧,且线段 $P_{prev}P_{curr}$ 的两个端点也在线段 $AB$ 的两侧。

这可以通过向量的**叉乘(Cross Product)**正负号来高效判定。当叉乘结果发生符号翻转时,即说明物体“跨越”了线段。


二、开发环境搭建

只需安装以下三个 Python 核心库即可开始:

1
pip install ultralytics opencv-python numpy

注意ultralytics 库中已经内置封装了 ByteTrack 跟踪器,我们不需要下载额外的 ByteTrack 代码仓库,这极大简化了工程部署难度。


三、完整实现源码

新建文件 object_counting.py,粘贴以下经过工业级优化的完整代码。代码中包含了平滑轨迹绘制、动态双向计数和优雅的 OpenCV 半透明看板展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import cv2
import numpy as np
from collections import defaultdict
from ultralytics import YOLO

# ==================== 1. 配置参数 ====================
MODEL_PATH = "yolo11n.pt" # 可换为 yolo8n.pt 或其他版本
VIDEO_PATH = "test_video.mp4" # 你的输入视频路径,若为摄像头则填 0
OUTPUT_PATH = "counting_result.mp4"

# 虚拟检测线 A(x, y) 到 B(x, y),请根据你的视频分辨率微调
LINE_START = (200, 450)
LINE_END = (1080, 450)

# 只跟踪特定类别(COCO 数据集:0-person, 2-car, 3-motorcycle, 5-bus, 7-truck)
CLASS_IDS = [0, 2, 3, 5, 7]

# ==================== 2. 几何算法辅助函数 ====================
def get_cross_product(p1, p2, p3):
"""计算向量 (p2-p1) 和 (p3-p1) 的叉乘"""
return (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p2[1] - p1[1]) * (p3[0] - p1[0])

def is_intersect(p1, p2, q1, q2):
"""
判断线段 P1P2 (物体的运动轨迹) 是否与线段 Q1Q2 (检测线) 相交
"""
# 快速排斥实验(Bounding Box Check)
if (max(p1[0], p2[0]) < min(q1[0], q2[0]) or min(p1[0], p2[0]) > max(q1[0], q2[0]) or
max(p1[1], p2[1]) < min(q1[1], q2[1]) or min(p1[1], p2[1]) > max(q1[1], q2[1])):
return False

# 跨立实验(Cross Product Check)
cp1 = get_cross_product(q1, q2, p1)
cp2 = get_cross_product(q1, q2, p2)
cp3 = get_cross_product(p1, p2, q1)
cp4 = get_cross_product(p1, p2, q2)

# 符号不同说明跨立在两侧,即相交
return (cp1 * cp2 < 0) and (cp3 * cp4 < 0)

# ==================== 3. 主计数引擎 ====================
def run_counting():
# 初始化 YOLO 模型
model = YOLO(MODEL_PATH)

# 打开视频源
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
print(f"Error: 无法打开视频 {VIDEO_PATH}")
return

# 获取视频属性
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# 创建视频编写器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(OUTPUT_PATH, fourcc, fps, (width, height))

# 状态数据结构
track_history = defaultdict(list) # 存储轨迹点 {track_id: [(x1, y1), ...]}
counted_ids = set() # 已经计过数的目标 ID 集合

count_in = 0 # 从下往上(或从左往右)跨线的计数
count_out = 0 # 从上往下(或从右往左)跨线的计数

print("开始处理视频,按 'q' 键可提前退出...")

while cap.isOpened():
ret, frame = cap.read()
if not ret:
break

# 运行 YOLO 检测并激活内置 ByteTrack 跟踪器
# persist=True 表示帧间保持 ID 关联,conf 控制置信度过滤
results = model.track(
source=frame,
persist=True,
tracker="bytetrack.yaml",
classes=CLASS_IDS,
conf=0.3,
verbose=False
)

# 提取跟踪结果
boxes = results[0].boxes.xyxy.cpu().numpy() if results[0].boxes.xyxy is not None else []
ids = results[0].boxes.id.int().cpu().numpy() if results[0].boxes.id is not None else []
clss = results[0].boxes.cls.int().cpu().numpy() if results[0].boxes.cls is not None else []

# 绘制半透明计数看板背景
overlay = frame.copy()
cv2.rectangle(overlay, (20, 20), (320, 160), (40, 40, 40), -1)
cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)

# 绘制检测线
cv2.line(frame, LINE_START, LINE_END, (0, 255, 255), 3) # 黄色检测线
cv2.circle(frame, LINE_START, 6, (0, 0, 255), -1)
cv2.circle(frame, LINE_END, 6, (0, 0, 255), -1)

# 遍历当前帧中检测到的所有目标
for box, track_id, cls_id in zip(boxes, ids, clss):
x1, y1, x2, y2 = box

# 使用目标的底边中心点(足部/车轮位置)作为越界判定点
cx = int((x1 + x2) / 2)
cy = int(y2) # 或者选择中心点: int((y1 + y2) / 2)
current_point = (cx, cy)

# 更新当前 ID 的轨迹历史
track_history[track_id].append(current_point)
if len(track_history[track_id]) > 30: # 限制轨迹最大保留 30 帧
track_history[track_id].pop(0)

# 开始判定是否跨线
if len(track_history[track_id]) >= 2 and track_id not in counted_ids:
prev_point = track_history[track_id][-2] # 获取上一帧的坐标位置

# 判定当前物体的运动线段 prev_point -> current_point 是否穿过检测线
if is_intersect(prev_point, current_point, LINE_START, LINE_END):
# 判断方向:根据检测线是横向还是纵向,通过 y 坐标或 x 坐标大小关系判定方向
# 这里以横向线为例: cy 变小说明从下往上运动 (IN);cy 变大说明从上往下运动 (OUT)
if current_point[1] < prev_point[1]:
count_in += 1
else:
count_out += 1

counted_ids.add(track_id) # 防止同一个目标被反复计数

# 绘制目标边界框及发光效果
color = (0, 255, 0) if track_id in counted_ids else (255, 100, 0)
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)

# 绘制 ID 和类别名
label = f"ID: {track_id} {model.names[cls_id]}"
cv2.putText(frame, label, (int(x1), int(y1) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

# 绘制物体的足部当前点和运动轨迹线
cv2.circle(frame, current_point, 4, color, -1)
points = np.hstack(track_history[track_id]).astype(np.int32).reshape((-1, 1, 2))
cv2.polylines(frame, [points], isClosed=False, color=(255, 255, 255), thickness=1)

# 绘制高大上的仪表看板文本
cv2.putText(frame, "AI Real-time Counting", (40, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(frame, f"IN (Upstream): {count_in}", (40, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"OUT (Downstream): {count_out}", (40, 130),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)

# 写入帧到输出视频
writer.write(frame)

# 屏幕实时显示
cv2.imshow("YOLO + ByteTrack Real-time Object Counting", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break

# 释放资源
cap.release()
writer.release()
cv2.destroyAllWindows()
print(f"处理完成!输出视频已保存至: {OUTPUT_PATH}")

if __name__ == "__main__":
run_counting()

四、核心代码精讲

1. model.track 究竟做了什么?

在上一篇文章中,我们使用的是普通的检测方法 model(frame)。在这段代码里,我们将其换成了支持持续跟踪的 model.track() 方法。

1
results = model.track(source=frame, persist=True, tracker="bytetrack.yaml")
  • persist=True:这是激活跟踪器的关键。它告诉 YOLO 在处理当前帧时,要参考上一帧的跟踪状态,并确保同一个物体的 box.id 保持不变。
  • tracker="bytetrack.yaml":Ultralytics 内置了两种跟踪器,即 "bytetrack.yaml""botsort.yaml"。相比之下,ByteTrack 速度更快,适合实时性要求高、算力相对有限的场景。

2. 为什么选择足部中心点 cy = int(y2) 而不是几何中心?

对于人流和车流,它们都是在地面上移动的。由于相机的透视投影关系,物体的上半身容易在跨线前发生倾斜,从而导致越界判定不准。
因此,选用**检测框底边中点(足部/轮胎贴地位置)**作为基准点,其跨线判定的物理位置最为准确。

3. 如何解决“同个 ID 往返跨线”的情况?

代码中使用了一个 counted_ids 集合进行去重:

1
2
3
if ... and track_id not in counted_ids:
...
counted_ids.add(track_id)

这是一种单次越界去重机制。在一些更为复杂的场景中,若需要允许同一个目标在往返跨越时多次计数,可以改为记录目标的“前侧状态”(例如用一个字典记录 {track_id: "IN_SIDE"})。一旦检测到物体的状态从 IN_SIDE 变为 OUT_SIDE,则触发一次计数并更新状态,从而支持往返多次累计计数。


五、工业部署调优与提速策略

  1. 多线程读取与跳帧机制
    对于高分辨率摄像头,可以使用多线程方式读取视频流,避免主推理线程因为 OpenCV 的 read() 发生 I/O 阻塞。同时,在保证精度的前提下采用隔帧检测(例如每 2 帧调用一次 YOLO 推理,其余帧采用快速光流或在上一帧的跟踪位置上预测),可大幅将 FPS 提高 50% 以上。

  2. 合理控制置信度与早停阈值
    ByteTrack 的强大在于匹配。但如果视频中背景极为嘈杂,过低的第一阶段阈值会引入过多负样本噪声。在 model.track() 中,建议将第一层过滤置信度设为 0.3~0.4 之间。

  3. 多摄像头并行与服务器端集成
    在企业落地时,可通过 multiprocessing 为每个摄像头启动一个独立的 Python 推理进程,最终通过 Redis 消息队列或 RabbitMQ 将计数数据实时写入后端的 PostgreSQL 数据库或报警后台。


结语

通过 YOLO + ByteTrack + 几何叉乘判定,我们只用了 100 多行代码就构建起了一套逻辑严密、抗遮挡、不惧漏检的高性能计数系统。

下一篇文章中,我们将带大家把 YOLO 塞入更宏大的工业级安防场景——结合 GB28181 和 JT1078 国标监控协议的视频流,实现多路高并发实时 AI 安全告警系统,敬请期待!