| ## Copyright (c) 2020 The WebM project authors. All Rights Reserved. |
| ## |
| ## Use of this source code is governed by a BSD-style license |
| ## that can be found in the LICENSE file in the root of the source |
| ## tree. An additional intellectual property rights grant can be found |
| ## in the file PATENTS. All contributing project authors may |
| ## be found in the AUTHORS file in the root of the source tree. |
| ## |
| |
| # coding: utf-8 |
| import numpy as np |
| import numpy.linalg as LA |
| from Util import MSE |
| from MotionEST import MotionEST |
| """Exhaust Search:""" |
| |
| |
| class Exhaust(MotionEST): |
| """ |
| Constructor: |
| cur_f: current frame |
| ref_f: reference frame |
| blk_sz: block size |
| wnd_size: search window size |
| metric: metric to compare the blocks distrotion |
| """ |
| |
| def __init__(self, cur_f, ref_f, blk_size, wnd_size, metric=MSE): |
| self.name = 'exhaust' |
| self.wnd_sz = wnd_size |
| self.metric = metric |
| super(Exhaust, self).__init__(cur_f, ref_f, blk_size) |
| |
| """ |
| search method: |
| cur_r: start row |
| cur_c: start column |
| """ |
| |
| def search(self, cur_r, cur_c): |
| min_loss = self.block_dist(cur_r, cur_c, [0, 0], self.metric) |
| cur_x = cur_c * self.blk_sz |
| cur_y = cur_r * self.blk_sz |
| ref_x = cur_x |
| ref_y = cur_y |
| #search all validate positions and select the one with minimum distortion |
| for y in xrange(cur_y - self.wnd_sz, cur_y + self.wnd_sz): |
| for x in xrange(cur_x - self.wnd_sz, cur_x + self.wnd_sz): |
| if 0 <= x < self.width - self.blk_sz and 0 <= y < self.height - self.blk_sz: |
| loss = self.block_dist(cur_r, cur_c, [y - cur_y, x - cur_x], |
| self.metric) |
| if loss < min_loss: |
| min_loss = loss |
| ref_x = x |
| ref_y = y |
| return ref_x, ref_y |
| |
| def motion_field_estimation(self): |
| for i in xrange(self.num_row): |
| for j in xrange(self.num_col): |
| ref_x, ref_y = self.search(i, j) |
| self.mf[i, j] = np.array( |
| [ref_y - i * self.blk_sz, ref_x - j * self.blk_sz]) |
| |
| |
| """Exhaust with Neighbor Constraint""" |
| |
| |
| class ExhaustNeighbor(MotionEST): |
| """ |
| Constructor: |
| cur_f: current frame |
| ref_f: reference frame |
| blk_sz: block size |
| wnd_size: search window size |
| beta: neigbor loss weight |
| metric: metric to compare the blocks distrotion |
| """ |
| |
| def __init__(self, cur_f, ref_f, blk_size, wnd_size, beta, metric=MSE): |
| self.name = 'exhaust + neighbor' |
| self.wnd_sz = wnd_size |
| self.beta = beta |
| self.metric = metric |
| super(ExhaustNeighbor, self).__init__(cur_f, ref_f, blk_size) |
| self.assign = np.zeros((self.num_row, self.num_col), dtype=np.bool) |
| |
| """ |
| estimate neighbor loss: |
| cur_r: current row |
| cur_c: current column |
| mv: current motion vector |
| """ |
| |
| def neighborLoss(self, cur_r, cur_c, mv): |
| loss = 0 |
| #accumulate difference between current block's motion vector with neighbors' |
| for i, j in {(-1, 0), (1, 0), (0, 1), (0, -1)}: |
| nb_r = cur_r + i |
| nb_c = cur_c + j |
| if 0 <= nb_r < self.num_row and 0 <= nb_c < self.num_col and self.assign[ |
| nb_r, nb_c]: |
| loss += LA.norm(mv - self.mf[nb_r, nb_c]) |
| return loss |
| |
| """ |
| search method: |
| cur_r: start row |
| cur_c: start column |
| """ |
| |
| def search(self, cur_r, cur_c): |
| dist_loss = self.block_dist(cur_r, cur_c, [0, 0], self.metric) |
| nb_loss = self.neighborLoss(cur_r, cur_c, np.array([0, 0])) |
| min_loss = dist_loss + self.beta * nb_loss |
| cur_x = cur_c * self.blk_sz |
| cur_y = cur_r * self.blk_sz |
| ref_x = cur_x |
| ref_y = cur_y |
| #search all validate positions and select the one with minimum distortion |
| # as well as weighted neighbor loss |
| for y in xrange(cur_y - self.wnd_sz, cur_y + self.wnd_sz): |
| for x in xrange(cur_x - self.wnd_sz, cur_x + self.wnd_sz): |
| if 0 <= x < self.width - self.blk_sz and 0 <= y < self.height - self.blk_sz: |
| dist_loss = self.block_dist(cur_r, cur_c, [y - cur_y, x - cur_x], |
| self.metric) |
| nb_loss = self.neighborLoss(cur_r, cur_c, [y - cur_y, x - cur_x]) |
| loss = dist_loss + self.beta * nb_loss |
| if loss < min_loss: |
| min_loss = loss |
| ref_x = x |
| ref_y = y |
| return ref_x, ref_y |
| |
| def motion_field_estimation(self): |
| for i in xrange(self.num_row): |
| for j in xrange(self.num_col): |
| ref_x, ref_y = self.search(i, j) |
| self.mf[i, j] = np.array( |
| [ref_y - i * self.blk_sz, ref_x - j * self.blk_sz]) |
| self.assign[i, j] = True |
| |
| |
| """Exhaust with Neighbor Constraint and Feature Score""" |
| |
| |
| class ExhaustNeighborFeatureScore(MotionEST): |
| """ |
| Constructor: |
| cur_f: current frame |
| ref_f: reference frame |
| blk_sz: block size |
| wnd_size: search window size |
| beta: neigbor loss weight |
| max_iter: maximum number of iterations |
| metric: metric to compare the blocks distrotion |
| """ |
| |
| def __init__(self, |
| cur_f, |
| ref_f, |
| blk_size, |
| wnd_size, |
| beta=1, |
| max_iter=100, |
| metric=MSE): |
| self.name = 'exhaust + neighbor+feature score' |
| self.wnd_sz = wnd_size |
| self.beta = beta |
| self.metric = metric |
| self.max_iter = max_iter |
| super(ExhaustNeighborFeatureScore, self).__init__(cur_f, ref_f, blk_size) |
| self.fs = self.getFeatureScore() |
| |
| """ |
| get feature score of each block |
| """ |
| |
| def getFeatureScore(self): |
| fs = np.zeros((self.num_row, self.num_col)) |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| IxIx = 0 |
| IyIy = 0 |
| IxIy = 0 |
| #get ssd surface |
| for x in xrange(self.blk_sz - 1): |
| for y in xrange(self.blk_sz - 1): |
| ox = c * self.blk_sz + x |
| oy = r * self.blk_sz + y |
| Ix = self.cur_yuv[oy, ox + 1, 0] - self.cur_yuv[oy, ox, 0] |
| Iy = self.cur_yuv[oy + 1, ox, 0] - self.cur_yuv[oy, ox, 0] |
| IxIx += Ix * Ix |
| IyIy += Iy * Iy |
| IxIy += Ix * Iy |
| #get maximum and minimum eigenvalues |
| lambda_max = 0.5 * ((IxIx + IyIy) + np.sqrt(4 * IxIy * IxIy + |
| (IxIx - IyIy)**2)) |
| lambda_min = 0.5 * ((IxIx + IyIy) - np.sqrt(4 * IxIy * IxIy + |
| (IxIx - IyIy)**2)) |
| fs[r, c] = lambda_max * lambda_min / (1e-6 + lambda_max + lambda_min) |
| if fs[r, c] < 0: |
| fs[r, c] = 0 |
| return fs |
| |
| """ |
| do exhaust search |
| """ |
| |
| def search(self, cur_r, cur_c): |
| min_loss = self.block_dist(cur_r, cur_c, [0, 0], self.metric) |
| cur_x = cur_c * self.blk_sz |
| cur_y = cur_r * self.blk_sz |
| ref_x = cur_x |
| ref_y = cur_y |
| #search all validate positions and select the one with minimum distortion |
| for y in xrange(cur_y - self.wnd_sz, cur_y + self.wnd_sz): |
| for x in xrange(cur_x - self.wnd_sz, cur_x + self.wnd_sz): |
| if 0 <= x < self.width - self.blk_sz and 0 <= y < self.height - self.blk_sz: |
| loss = self.block_dist(cur_r, cur_c, [y - cur_y, x - cur_x], |
| self.metric) |
| if loss < min_loss: |
| min_loss = loss |
| ref_x = x |
| ref_y = y |
| return ref_x, ref_y |
| |
| """ |
| add smooth constraint |
| """ |
| |
| def smooth(self, uvs, mvs): |
| sm_uvs = np.zeros(uvs.shape) |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| avg_uv = np.array([0.0, 0.0]) |
| for i, j in {(r - 1, c), (r + 1, c), (r, c - 1), (r, c + 1)}: |
| if 0 <= i < self.num_row and 0 <= j < self.num_col: |
| avg_uv += uvs[i, j] / 6.0 |
| for i, j in {(r - 1, c - 1), (r - 1, c + 1), (r + 1, c - 1), |
| (r + 1, c + 1)}: |
| if 0 <= i < self.num_row and 0 <= j < self.num_col: |
| avg_uv += uvs[i, j] / 12.0 |
| sm_uvs[r, c] = (self.fs[r, c] * mvs[r, c] + self.beta * avg_uv) / ( |
| self.beta + self.fs[r, c]) |
| return sm_uvs |
| |
| def motion_field_estimation(self): |
| #get matching results |
| mvs = np.zeros(self.mf.shape) |
| for r in xrange(self.num_row): |
| for c in xrange(self.num_col): |
| ref_x, ref_y = self.search(r, c) |
| mvs[r, c] = np.array([ref_y - r * self.blk_sz, ref_x - c * self.blk_sz]) |
| #add smoothness constraint |
| uvs = np.zeros(self.mf.shape) |
| for _ in xrange(self.max_iter): |
| uvs = self.smooth(uvs, mvs) |
| self.mf = uvs |