shithub: libvpx

ref: 09775194ffdb84b4979f3988e7ef301575b661df
dir: /tools/3D-Reconstruction/MotionEST/Exhaust.py/

View raw version
##  Copyright (c) 2020 The WebM project authors. All Rights Reserved.
##
##  Use of this source code is governed by a BSD-style license
##  that can be found in the LICENSE file in the root of the source
##  tree. An additional intellectual property rights grant can be found
##  in the file PATENTS.  All contributing project authors may
##  be found in the AUTHORS file in the root of the source tree.
##

# coding: utf-8
import numpy as np
import numpy.linalg as LA
from Util import MSE
from MotionEST import MotionEST
"""Exhaust Search:"""


class Exhaust(MotionEST):
  """
    Constructor:
        cur_f: current frame
        ref_f: reference frame
        blk_sz: block size
        wnd_size: search window size
        metric: metric to compare the blocks distrotion
    """

  def __init__(self, cur_f, ref_f, blk_size, wnd_size, metric=MSE):
    self.name = 'exhaust'
    self.wnd_sz = wnd_size
    self.metric = metric
    super(Exhaust, self).__init__(cur_f, ref_f, blk_size)

  """
    search method:
        cur_r: start row
        cur_c: start column
    """

  def search(self, cur_r, cur_c):
    min_loss = self.block_dist(cur_r, cur_c, [0, 0], self.metric)
    cur_x = cur_c * self.blk_sz
    cur_y = cur_r * self.blk_sz
    ref_x = cur_x
    ref_y = cur_y
    #search all validate positions and select the one with minimum distortion
    for y in xrange(cur_y - self.wnd_sz, cur_y + self.wnd_sz):
      for x in xrange(cur_x - self.wnd_sz, cur_x + self.wnd_sz):
        if 0 <= x < self.width - self.blk_sz and 0 <= y < self.height - self.blk_sz:
          loss = self.block_dist(cur_r, cur_c, [y - cur_y, x - cur_x],
                                 self.metric)
          if loss < min_loss:
            min_loss = loss
            ref_x = x
            ref_y = y
    return ref_x, ref_y

  def motion_field_estimation(self):
    for i in xrange(self.num_row):
      for j in xrange(self.num_col):
        ref_x, ref_y = self.search(i, j)
        self.mf[i, j] = np.array(
            [ref_y - i * self.blk_sz, ref_x - j * self.blk_sz])


"""Exhaust with Neighbor Constraint"""


class ExhaustNeighbor(MotionEST):
  """
    Constructor:
        cur_f: current frame
        ref_f: reference frame
        blk_sz: block size
        wnd_size: search window size
        beta: neigbor loss weight
        metric: metric to compare the blocks distrotion
    """

  def __init__(self, cur_f, ref_f, blk_size, wnd_size, beta, metric=MSE):
    self.name = 'exhaust + neighbor'
    self.wnd_sz = wnd_size
    self.beta = beta
    self.metric = metric
    super(ExhaustNeighbor, self).__init__(cur_f, ref_f, blk_size)
    self.assign = np.zeros((self.num_row, self.num_col), dtype=np.bool)

  """
    estimate neighbor loss:
        cur_r: current row
        cur_c: current column
        mv: current motion vector
    """

  def neighborLoss(self, cur_r, cur_c, mv):
    loss = 0
    #accumulate difference between current block's motion vector with neighbors'
    for i, j in {(-1, 0), (1, 0), (0, 1), (0, -1)}:
      nb_r = cur_r + i
      nb_c = cur_c + j
      if 0 <= nb_r < self.num_row and 0 <= nb_c < self.num_col and self.assign[
          nb_r, nb_c]:
        loss += LA.norm(mv - self.mf[nb_r, nb_c])
    return loss

  """
    search method:
        cur_r: start row
        cur_c: start column
    """

  def search(self, cur_r, cur_c):
    dist_loss = self.block_dist(cur_r, cur_c, [0, 0], self.metric)
    nb_loss = self.neighborLoss(cur_r, cur_c, np.array([0, 0]))
    min_loss = dist_loss + self.beta * nb_loss
    cur_x = cur_c * self.blk_sz
    cur_y = cur_r * self.blk_sz
    ref_x = cur_x
    ref_y = cur_y
    #search all validate positions and select the one with minimum distortion
    # as well as weighted neighbor loss
    for y in xrange(cur_y - self.wnd_sz, cur_y + self.wnd_sz):
      for x in xrange(cur_x - self.wnd_sz, cur_x + self.wnd_sz):
        if 0 <= x < self.width - self.blk_sz and 0 <= y < self.height - self.blk_sz:
          dist_loss = self.block_dist(cur_r, cur_c, [y - cur_y, x - cur_x],
                                      self.metric)
          nb_loss = self.neighborLoss(cur_r, cur_c, [y - cur_y, x - cur_x])
          loss = dist_loss + self.beta * nb_loss
          if loss < min_loss:
            min_loss = loss
            ref_x = x
            ref_y = y
    return ref_x, ref_y

  def motion_field_estimation(self):
    for i in xrange(self.num_row):
      for j in xrange(self.num_col):
        ref_x, ref_y = self.search(i, j)
        self.mf[i, j] = np.array(
            [ref_y - i * self.blk_sz, ref_x - j * self.blk_sz])
        self.assign[i, j] = True


"""Exhaust with Neighbor Constraint and Feature Score"""


class ExhaustNeighborFeatureScore(MotionEST):
  """
    Constructor:
        cur_f: current frame
        ref_f: reference frame
        blk_sz: block size
        wnd_size: search window size
        beta: neigbor loss weight
        max_iter: maximum number of iterations
        metric: metric to compare the blocks distrotion
    """

  def __init__(self,
               cur_f,
               ref_f,
               blk_size,
               wnd_size,
               beta=1,
               max_iter=100,
               metric=MSE):
    self.name = 'exhaust + neighbor+feature score'
    self.wnd_sz = wnd_size
    self.beta = beta
    self.metric = metric
    self.max_iter = max_iter
    super(ExhaustNeighborFeatureScore, self).__init__(cur_f, ref_f, blk_size)
    self.fs = self.getFeatureScore()

  """
    get feature score of each block
    """

  def getFeatureScore(self):
    fs = np.zeros((self.num_row, self.num_col))
    for r in xrange(self.num_row):
      for c in xrange(self.num_col):
        IxIx = 0
        IyIy = 0
        IxIy = 0
        #get ssd surface
        for x in xrange(self.blk_sz - 1):
          for y in xrange(self.blk_sz - 1):
            ox = c * self.blk_sz + x
            oy = r * self.blk_sz + y
            Ix = self.cur_yuv[oy, ox + 1, 0] - self.cur_yuv[oy, ox, 0]
            Iy = self.cur_yuv[oy + 1, ox, 0] - self.cur_yuv[oy, ox, 0]
            IxIx += Ix * Ix
            IyIy += Iy * Iy
            IxIy += Ix * Iy
        #get maximum and minimum eigenvalues
        lambda_max = 0.5 * ((IxIx + IyIy) + np.sqrt(4 * IxIy * IxIy +
                                                    (IxIx - IyIy)**2))
        lambda_min = 0.5 * ((IxIx + IyIy) - np.sqrt(4 * IxIy * IxIy +
                                                    (IxIx - IyIy)**2))
        fs[r, c] = lambda_max * lambda_min / (1e-6 + lambda_max + lambda_min)
        if fs[r, c] < 0:
          fs[r, c] = 0
    return fs

  """
    do exhaust search
    """

  def search(self, cur_r, cur_c):
    min_loss = self.block_dist(cur_r, cur_c, [0, 0], self.metric)
    cur_x = cur_c * self.blk_sz
    cur_y = cur_r * self.blk_sz
    ref_x = cur_x
    ref_y = cur_y
    #search all validate positions and select the one with minimum distortion
    for y in xrange(cur_y - self.wnd_sz, cur_y + self.wnd_sz):
      for x in xrange(cur_x - self.wnd_sz, cur_x + self.wnd_sz):
        if 0 <= x < self.width - self.blk_sz and 0 <= y < self.height - self.blk_sz:
          loss = self.block_dist(cur_r, cur_c, [y - cur_y, x - cur_x],
                                 self.metric)
          if loss < min_loss:
            min_loss = loss
            ref_x = x
            ref_y = y
    return ref_x, ref_y

  """
    add smooth constraint
    """

  def smooth(self, uvs, mvs):
    sm_uvs = np.zeros(uvs.shape)
    for r in xrange(self.num_row):
      for c in xrange(self.num_col):
        avg_uv = np.array([0.0, 0.0])
        for i, j in {(r - 1, c), (r + 1, c), (r, c - 1), (r, c + 1)}:
          if 0 <= i < self.num_row and 0 <= j < self.num_col:
            avg_uv += uvs[i, j] / 6.0
        for i, j in {(r - 1, c - 1), (r - 1, c + 1), (r + 1, c - 1),
                     (r + 1, c + 1)}:
          if 0 <= i < self.num_row and 0 <= j < self.num_col:
            avg_uv += uvs[i, j] / 12.0
        sm_uvs[r, c] = (self.fs[r, c] * mvs[r, c] + self.beta * avg_uv) / (
            self.beta + self.fs[r, c])
    return sm_uvs

  def motion_field_estimation(self):
    #get matching results
    mvs = np.zeros(self.mf.shape)
    for r in xrange(self.num_row):
      for c in xrange(self.num_col):
        ref_x, ref_y = self.search(r, c)
        mvs[r, c] = np.array([ref_y - r * self.blk_sz, ref_x - c * self.blk_sz])
    #add smoothness constraint
    uvs = np.zeros(self.mf.shape)
    for _ in xrange(self.max_iter):
      uvs = self.smooth(uvs, mvs)
    self.mf = uvs