| ## Copyright (c) 2020 The WebM project authors. All Rights Reserved. |
| ## |
| ## Use of this source code is governed by a BSD-style license |
| ## that can be found in the LICENSE file in the root of the source |
| ## tree. An additional intellectual property rights grant can be found |
| ## in the file PATENTS. All contributing project authors may |
| ## be found in the AUTHORS file in the root of the source tree. |
| ## |
| |
| # coding: utf-8 |
| import numpy as np |
| import numpy.linalg as LA |
| from scipy.ndimage.filters import gaussian_filter |
| from scipy.sparse import csc_matrix |
| from scipy.sparse.linalg import inv |
| from MotionEST import MotionEST |
| """Anandan Model""" |
| |
| |
| class Anandan(MotionEST): |
| """ |
| constructor: |
| cur_f: current frame |
| ref_f: reference frame |
| blk_sz: block size |
| beta: smooth constrain weight |
| k1,k2,k3: confidence coefficients |
| max_iter: maximum number of iterations |
| """ |
| |
| def __init__(self, cur_f, ref_f, blk_sz, beta, k1, k2, k3, max_iter=100): |
| super(Anandan, self).__init__(cur_f, ref_f, blk_sz) |
| self.levels = int(np.log2(blk_sz)) |
| self.intensity_hierarchy() |
| self.c_maxs = [] |
| self.c_mins = [] |
| self.e_maxs = [] |
| self.e_mins = [] |
| for l in xrange(self.levels + 1): |
| c_max, c_min, e_max, e_min = self.get_curvature(self.cur_Is[l]) |
| self.c_maxs.append(c_max) |
| self.c_mins.append(c_min) |
| self.e_maxs.append(e_max) |
| self.e_mins.append(e_min) |
| self.beta = beta |
| self.k1, self.k2, self.k3 = k1, k2, k3 |
| self.max_iter = max_iter |
| |
| """ |
| build intensity hierarchy |
| """ |
| |
| def intensity_hierarchy(self): |
| level = 0 |
| self.cur_Is = [] |
| self.ref_Is = [] |
| #build each level itensity by using gaussian filters |
| while level <= self.levels: |
| cur_I = gaussian_filter(self.cur_yuv[:, :, 0], sigma=(2**level) * 0.56) |
| ref_I = gaussian_filter(self.ref_yuv[:, :, 0], sigma=(2**level) * 0.56) |
| self.ref_Is.append(ref_I) |
| self.cur_Is.append(cur_I) |
| level += 1 |
| |
| """ |
| get curvature of each block |
| """ |
| |
| def get_curvature(self, I): |
| c_max = np.zeros((self.num_row, self.num_col)) |
| c_min = np.zeros((self.num_row, self.num_col)) |
| e_max = np.zeros((self.num_row, self.num_col, 2)) |
| e_min = np.zeros((self.num_row, self.num_col, 2)) |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| h11, h12, h21, h22 = 0, 0, 0, 0 |
| for i in xrange(r * self.blk_sz, r * self.blk_sz + self.blk_sz): |
| for j in xrange(c * self.blk_sz, c * self.blk_sz + self.blk_sz): |
| if 0 <= i < self.height - 1 and 0 <= j < self.width - 1: |
| Ix = I[i][j + 1] - I[i][j] |
| Iy = I[i + 1][j] - I[i][j] |
| h11 += Iy * Iy |
| h12 += Ix * Iy |
| h21 += Ix * Iy |
| h22 += Ix * Ix |
| U, S, _ = LA.svd(np.array([[h11, h12], [h21, h22]])) |
| c_max[r, c], c_min[r, c] = S[0], S[1] |
| e_max[r, c] = U[:, 0] |
| e_min[r, c] = U[:, 1] |
| return c_max, c_min, e_max, e_min |
| |
| """ |
| get ssd of motion vector: |
| cur_I: current intensity |
| ref_I: reference intensity |
| center: current position |
| mv: motion vector |
| """ |
| |
| def get_ssd(self, cur_I, ref_I, center, mv): |
| ssd = 0 |
| for r in xrange(int(center[0]), int(center[0]) + self.blk_sz): |
| for c in xrange(int(center[1]), int(center[1]) + self.blk_sz): |
| if 0 <= r < self.height and 0 <= c < self.width: |
| tr, tc = r + int(mv[0]), c + int(mv[1]) |
| if 0 <= tr < self.height and 0 <= tc < self.width: |
| ssd += (ref_I[tr, tc] - cur_I[r, c])**2 |
| else: |
| ssd += cur_I[r, c]**2 |
| return ssd |
| |
| """ |
| get region match of level l |
| l: current level |
| last_mvs: matchine results of last level |
| radius: movenment radius |
| """ |
| |
| def region_match(self, l, last_mvs, radius): |
| mvs = np.zeros((self.num_row, self.num_col, 2)) |
| min_ssds = np.zeros((self.num_row, self.num_col)) |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| center = np.array([r * self.blk_sz, c * self.blk_sz]) |
| #use overlap hierarchy policy |
| init_mvs = [] |
| if last_mvs is None: |
| init_mvs = [np.array([0, 0])] |
| else: |
| for i, j in {(r, c), (r, c + 1), (r + 1, c), (r + 1, c + 1)}: |
| if 0 <= i < last_mvs.shape[0] and 0 <= j < last_mvs.shape[1]: |
| init_mvs.append(last_mvs[i, j]) |
| #use last matching results as the start position as current level |
| min_ssd = None |
| min_mv = None |
| for init_mv in init_mvs: |
| for i in xrange(-2, 3): |
| for j in xrange(-2, 3): |
| mv = init_mv + np.array([i, j]) * radius |
| ssd = self.get_ssd(self.cur_Is[l], self.ref_Is[l], center, mv) |
| if min_ssd is None or ssd < min_ssd: |
| min_ssd = ssd |
| min_mv = mv |
| min_ssds[r, c] = min_ssd |
| mvs[r, c] = min_mv |
| return mvs, min_ssds |
| |
| """ |
| smooth motion field based on neighbor constraint |
| uvs: current estimation |
| mvs: matching results |
| min_ssds: minimum ssd of matching results |
| l: current level |
| """ |
| |
| def smooth(self, uvs, mvs, min_ssds, l): |
| sm_uvs = np.zeros((self.num_row, self.num_col, 2)) |
| c_max = self.c_maxs[l] |
| c_min = self.c_mins[l] |
| e_max = self.e_maxs[l] |
| e_min = self.e_mins[l] |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| w_max = c_max[r, c] / ( |
| self.k1 + self.k2 * min_ssds[r, c] + self.k3 * c_max[r, c]) |
| w_min = c_min[r, c] / ( |
| self.k1 + self.k2 * min_ssds[r, c] + self.k3 * c_min[r, c]) |
| w = w_max * w_min / (w_max + w_min + 1e-6) |
| if w < 0: |
| w = 0 |
| avg_uv = np.array([0.0, 0.0]) |
| for i, j in {(r - 1, c), (r + 1, c), (r, c - 1), (r, c + 1)}: |
| if 0 <= i < self.num_row and 0 <= j < self.num_col: |
| avg_uv += 0.25 * uvs[i, j] |
| sm_uvs[r, c] = (w * w * mvs[r, c] + self.beta * avg_uv) / ( |
| self.beta + w * w) |
| return sm_uvs |
| |
| """ |
| motion field estimation |
| """ |
| |
| def motion_field_estimation(self): |
| last_mvs = None |
| for l in xrange(self.levels, -1, -1): |
| mvs, min_ssds = self.region_match(l, last_mvs, 2**l) |
| uvs = np.zeros(mvs.shape) |
| for _ in xrange(self.max_iter): |
| uvs = self.smooth(uvs, mvs, min_ssds, l) |
| last_mvs = uvs |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| self.mf[r, c] = uvs[r, c] |