一、Lucas-Kanade 算法
基本概念
Lucas-Kanade 算法是一种经典的光流估计算法,用于追踪视频序列中的运动对象或检测图像之间的运动。光流是指一幅图像中的许多点在第二幅图像中的位置,用于描述物体在两帧图像之间的位移。
Lucas-Kanade 算法基于以下假设:
- 亮度不变假设:物体表面的亮度在运动过程中保持不变。
- 小位移假设:物体在相邻两帧之间的运动位移很小。
- 空间一致性假设:基于物理世界的刚体,相邻像素具有相似的运动。
数学推导
亮度不变假设
假设 t 时刻,位于 (x,y) 像素位置的物体,在 t+Δt 时刻位于 (x+u,y+v) 位置,基于亮度不变假设,有:
$$ I(x, y, t)=I\left(x+u, y+v, t+\Delta_{t}\right) $$
对该方程进行一阶泰勒展开,并忽略高阶项,得到:
$$ I\left(x+u, y+v, t+\Delta_{t}\right)\approx I(x, y, t)+uI_{x}^{\prime}+vI_{y}^{\prime}+\Delta_{t}I_{t}^{\prime} $$
其中,$I_{x}^{\prime}, I_{y}^{\prime}, I_{t}^{\prime}$分别是图像在 x 方向、y 方向和时间上的偏导数。可以简化为:
$$ u I_{x}^{\prime}+v I_{y}^{\prime}+\Delta_{t} I_{t}^{\prime} =0 $$
由$\Delta_{t} I_{t}^{\prime} = \Delta I_{t}$并化为矩阵形式得到
$$ \left[I_{x}^{\prime}, I_{y}^{\prime}\right]\left[\begin{array}{l}u \\ v\end{array}\right]=-\Delta I_{t} $$
给定两张图像$I_{x}^{\prime}, I_{y}^{\prime},\Delta I_{t}$均为已知量,$u,v$即为待求的光流,但只有一个方程,因此要求得解需要借助下一个假设。
邻域光流一致
LK 算法认为,待求点附近的小窗口内,所有点的光流都是一致的,在窗口$m \times m$内可以得到
$$ Ax=b $$
其中
$$ \mathbf{A} = \begin{pmatrix} I'_x(1) & I'_y(1) \\ I'_x(2) & I'_y(2) \\ \vdots & \vdots \\ I'_x(n) & I'_y(n) \end{pmatrix}, \quad \mathbf{x} = \begin{pmatrix} u \\ v \end{pmatrix}, \quad \mathbf{b} = \begin{pmatrix} -\Delta I_t(1) \\ -\Delta I_t(2) \\ \vdots \\ -\Delta I_t(n) \end{pmatrix} $$
需要最小化误差函数
$$ \sum\nolimits_i w^2(i)(I'_x(i)u + I'_y(i)v + \Delta I_t(i))^2 $$
通过最小二乘法求解,得到:
$$ \mathbf{x} = (A^T A)^{-1} A^T \mathbf{b} $$
存在问题
这一算法结合待求点附近窗口的光流信息,只能处理小范围的移动,在物体运动幅度大的场景下表现不好。因此引入 LK 金字塔。
二、实现 LK 金字塔
基本概念
当物体运动速度较快,可以缩小图像尺寸,这样同样的窗口可以检测更大范围的运动。因此 LK 金字塔使用图像金字塔,在各个层上使用相同的窗口大小,这样在高层可以检测更大幅度的运动,在底层可以检测小幅度的运动,这样能够由粗到细的检测光流。
算法实现
建立金字塔
算法需要构建前后两张图片的图像金字塔和 x,y 方向上的梯度金字塔。
def build_pyramid(self, prevImg, nextImg):
def _pyr(img):
pyr = [img]
gradX = [cv2.Sobel(img, cv2.CV_32F, 1, 0, ksize=3)]
gradY = [cv2.Sobel(img, cv2.CV_32F, 0, 1, ksize=3)]
for i in range(1, self.maxLevel):
img = cv2.pyrDown(img)
pyr.append(img)
gradX.append(cv2.Scharr(img, cv2.CV_32F, 1, 0))
gradY.append(cv2.Scharr(img, cv2.CV_32F, 0, 1))
return pyr, gradX, gradY
self.prevPyramid, self.prevGradXPyr, self.prevGradYPyr = _pyr(prevImg)
self.nextPyramid, self.nextGradXPyr, self.nextGradYPyr = _pyr(nextImg)
迭代计算光流
对于任何一个点若要求得它在下一帧中的位置,首先在顶层进行估计。在顶层,首先假设预测点和原始点在同一位置,取两个点所在窗口计算误差。
def calc_patch_diff_with_bilinear(self, prevImg, nextImg, prevPt, nextPt):
if (not self.in_border((prevPt[0] - self.winSize // 2, prevPt[1] - self.winSize // 2), prevImg.shape) or
not self.in_border((prevPt[0] + self.winSize // 2, prevPt[1] + self.winSize // 2), prevImg.shape) or
not self.in_border((nextPt[0] - self.winSize // 2, nextPt[1] - self.winSize // 2), nextImg.shape) or
not self.in_border((nextPt[0] + self.winSize // 2, nextPt[1] + self.winSize // 2), nextImg.shape)):
return 1e10
patch = np.zeros((self.winSize, self.winSize), dtype=np.float32)
patchNext = np.zeros((self.winSize, self.winSize), dtype=np.float32)
for u in range(self.winSize):
for v in range(self.winSize):
patch[u][v] = self.cvMatAt(prevImg, prevPt[0] - self.winSize // 2 + v, prevPt[1] - self.winSize//2 + u)
patchNext[u][v] = self.cvMatAt(nextImg, nextPt[0] - self.winSize//2 + v, nextPt[1] - self.winSize//2 + u)
patchDiff = patch - patchNext
err = np.sqrt(np.mean(patchDiff * patchDiff))
return err
邻域点内的所有误差可记为
$$ \varepsilon(\bar{\nu})=\varepsilon\left(\nu_{x}, \nu_{y}\right)=\sum_{x=p_{x}-\omega_{x}}^{p_{x}+\omega_{x}} \sum_{y=p_{y}-\omega_{y}}^{p_{y}+\omega_{y}}\left(A(x, y)-B\left(x+\nu_{x}, y+\nu_{y}\right)\right)^{2} $$
即两个窗口内对应位置像素灰度亮度差的平方和。
要得到使误差最小的向量 v,可以使用最小二乘法。
$$ \bar{\nu}_{\mathrm{opt}}=G^{-1} \bar{b} $$
其中
$G \doteq \sum_{x=p_{x}-\omega_{x}}^{p_{x}+\omega_{x}} \sum_{y=p_{y}-\omega_{y}}^{p_{y}+\omega_{y}}\left[\begin{array}{cc}I_{x}^{2} & I_{x} I_{y} \\ I_{x} I_{y} & I_{y}^{2}\end{array}\right]$$\bar{b} \doteq \sum_{x=p_{x}-\omega_{x}}^{p_{x}+\omega_{x}} \sum_{y=p_{y}-\omega_{y}}^{p_{y}+\omega_{y}}\left[\begin{array}{l}\delta I I_{x} \ \delta I I_{y}\end{array}\right]$
G = np.zeros((2, 2))
b = np.zeros(2)
weight_sum = 0.0
for y in range(self.winSize):
for x in range(self.winSize):
JNext = np.array([self.cvMatAt(patchGradXNext, x, y), self.cvMatAt(patchGradYNext, x, y)])
weight = 1.0
'''
# Weighted adjustment
if WEIGHTED:
J = np.array([patchGradX[y, x], patchGradY[y, x]])
weight = 1.0 / (np.linalg.norm(J - JNext) + 1.0)
'''
G += np.outer(JNext, JNext) * weight
di = self.cvMatAt(patch, x, y) - self.cvMatAt(patchNext, x, y)
b += di * JNext * weight
weight_sum += weight
G /= weight_sum
b /= weight_sum
dp = np.linalg.inv(G).dot(b)
最后在每层内需要使用多次迭代的方法计算最优解。
# 多次迭代
for _ in range(10):
flowOnLevel = self.calc_optical_flow_single_level(prevLevel, nextLevel, pointOnLevel, pointOnLevelPred, level)
pointOnLevelPred = pointOnLevelPred + flowOnLevel
updated_error = self.calc_patch_diff_with_bilinear(prevLevel, nextLevel, pointOnLevel, pointOnLevelPred)
# 误差增大退出
if updated_error > error_org + 1e-3:
pointOnLevelPred = pointOnLevelPred - flowOnLevel
break
error_org = updated_error
反馈
现在能够计算同一层前一张图的每个点在后一张图的预测值了,也就是说能够计算每一层的预测结果了。
记每层的初始光流和剩余光流分别为$\mathrm{g}^{L}, \mathrm{d}^{L}$下一层的初始光流为
$$ \mathrm{g}^{L-1}=2\left(\mathrm{~g}^{\mathrm{L}}+\mathrm{d}^{L}\right) $$
其中的系数由金字塔缩放产生的倍率。
0 层的最终光流为$\mathrm{d}=\mathrm{g}^{0}+\mathrm{d}^{0}$由顶层的初始光流为 0,因此得到,$\mathbf{d}=\sum_{L=0}^{L_{m}} 2^{L} \mathbf{d}^{L}$即各层的剩余光流之和。
displacement = np.zeros(2)
for level in range(self.maxLevel - 1, -1, -1):
prevLevel = self.prevPyramid[level]
nextLevel = self.nextPyramid[level]
pointOnLevel = point * (1.0 / (1 << level))
pointOnLevelPred = (point + displacement) * (1.0 / (1 << level))
error_org = self.calc_patch_diff_with_bilinear(prevLevel, nextLevel, pointOnLevel, pointOnLevelPred)
# 多次迭代
for _ in range(10):
flowOnLevel = self.calc_optical_flow_single_level(prevLevel, nextLevel, pointOnLevel, pointOnLevelPred, level)
pointOnLevelPred = pointOnLevelPred + flowOnLevel
updated_error = self.calc_patch_diff_with_bilinear(prevLevel, nextLevel, pointOnLevel, pointOnLevelPred)
# 误差增大退出
if updated_error > error_org + 1e-3:
pointOnLevelPred = pointOnLevelPred - flowOnLevel
break
error_org = updated_error
displacement = (pointOnLevelPred - pointOnLevel) * (1 << level)
在实际代码中直接将每层的结果折算到 0 层然后再折算回下一层参与计算即可通过迭代的方式,从粗到细的得到 0 层的最终光流。
误差检验
patch = prevImg[int(prevPt[1] - self.winSize // 2):int(prevPt[1] + self.winSize // 2),
int(prevPt[0] - self.winSize // 2):int(prevPt[0] + self.winSize // 2)]
patchNext = nextImg[int(keypoints2i[1] - self.winSize // 2):int(keypoints2i[1] + self.winSize // 2),
int(keypoints2i[0] - self.winSize // 2):int(keypoints2i[0] + self.winSize // 2)]
patchDiff = patch - patchNext
err.append(np.sqrt(np.mean(patchDiff * patchDiff)))
if err[i] < 10.0:
status.append(1)
else:
status.append(0)
对于那些经过 N 次迭代后误差仍较大的点,算法认为追踪失败了,他们可能由于在这些特征点已经离开下一张图像或者由于噪声等因素,无法再下一张图像中追踪了,我们需要将其标记。这也是 LK 算法作为稀疏光流算法的体现。
效果
三、使用 LK 金字塔实现视频增稳
基本原理
视频增稳主要分析视频序列中两帧之间的差异,通过跟踪特征点在视频中的位置,记录特征点在每一帧中的位移。这些轨迹能够反映出特征点的运动趋势。通过平滑算法(如累积平均、高斯滤波、卡尔曼滤波等)对特征点的轨迹进行处理,消除短暂的抖动和噪声。最后根据平滑后的特征点轨迹,计算帧与帧之间的全局变换矩阵。应用这些变换矩阵到原始视频中,实现防抖效果。
主要用到角点检测、LK 光流法。
算法实现
提取抖动
for i in range(n_frames - 1):
# 提取前一帧的特征点
prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=200, qualityLevel=0.01, minDistance=10, blockSize=3)
# 读下一帧
success, curr = cap.read()
if not success:
break
curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
# 计算光流
curr_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_pts, None)
# 过滤有效点
prev_pts = prev_pts[status == 1]
curr_pts = curr_pts[status == 1]
if distance_threshold is not None:
distances = np.sqrt(np.sum((prev_pts - curr_pts) ** 2, axis=1))
prev_pts = prev_pts[distances < distance_threshold]
curr_pts = curr_pts[distances < distance_threshold]
# 找到变换矩阵
m, inlier = cv2.estimateAffine2D(prev_pts, curr_pts)
# 只提取位移和旋转
dx = m[0, 2]
dy = m[1, 2]
da = np.arctan2(m[1, 0], m[0, 0])
transforms[i+1] = [dx, dy, da]
# 移到下一帧
prev_gray = curr_gray
print("Frame: " + str(i+2) + "/" + str(n_frames) +
" - Tracked points : " + str(len(prev_pts)))
算法首先通过 goodFeaturesToTrack 提取前一帧的特征点,然后使用 calcOpticalFlowPyrLK 得到光流,过滤后通过 estimateAffine2D 得到点变换的仿射变换矩阵。其中 estimateAffine2D 是通过最小二乘法最小化平方误差得到两组点之间的仿射变换矩阵的。
平滑轨迹
# 对变换曲线平滑
def convolve(curve, radius):
window_size = 2 * radius + 1
f = np.ones(window_size) / window_size
# 添加填充
curve_pad = np.pad(curve, (radius, radius), 'reflect')
# 应用卷积
curve_smoothed = np.convolve(curve_pad, f, mode='same')
# 返回平滑曲线
return curve_smoothed[radius:-radius]
def smooth(trace):
smoothed_trace = trace.copy()
# 过滤x, y和角度曲线
for i in range(3):
smoothed_trace[:, i] = convolve(trace[:, i], radius=SMOOTHING_RADIUS)
return smoothed_trace
# 计算轨迹
trajectory = np.cumsum(transforms, axis=0)
smoothed_trajectory = smooth(trajectory)
# 计算位移
difference = smoothed_trajectory - trajectory
smoothed_transforms = difference
得到每一帧的特征点运动情况,通过求累积和就能得到特征点的运动轨迹,应用均值滤波卷积就能得到平滑的运动轨迹,再与原轨迹做差就能得到每一帧的位移。
应用变换
success, frame = cap.read()
if not success:
print('read fail')
break
# 从新的转换数组中提取转换
dx = smoothed_transforms[i, 0]
dy = smoothed_transforms[i, 1]
da = smoothed_transforms[i, 2]
# 重构变换矩阵
m = np.array([[np.cos(da), -np.sin(da), dx], [np.sin(da), np.cos(da), dy]])
# 应用仿射
frame_stabilized = cv2.warpAffine(frame, m, (width, height))
# 1.1x裁切去黑边
T = cv2.getRotationMatrix2D((width / 2, height / 2), 0, 1.1)
frame_cut = cv2.warpAffine(frame_stabilized, T, (width, height))
最后通过 warpAffine 将仿射变换矩阵应用到原视频并进行 1.1x 裁切来去除黑边,就得到了稳定后的视频。
https://nankai.feishu.cn/drive/folder/RFdVfwP1Hl1QB5dU9WkcIwYMngh?from=from_copylink