Spaces:
Running
on
L40S
Running
on
L40S
"""PyTorch layer for estimating optical flow by a residual flow pyramid. | |
This approach of estimating optical flow between two images can be traced back | |
to [1], but is also used by later neural optical flow computation methods such | |
as SpyNet [2] and PWC-Net [3]. | |
The basic idea is that the optical flow is first estimated in a coarse | |
resolution, then the flow is upsampled to warp the higher resolution image and | |
then a residual correction is computed and added to the estimated flow. This | |
process is repeated in a pyramid on coarse to fine order to successively | |
increase the resolution of both optical flow and the warped image. | |
In here, the optical flow predictor is used as an internal component for the | |
film_net frame interpolator, to warp the two input images into the inbetween, | |
target frame. | |
[1] F. Glazer, Hierarchical motion detection. PhD thesis, 1987. | |
[2] A. Ranjan and M. J. Black, Optical Flow Estimation using a Spatial Pyramid | |
Network. 2016 | |
[3] D. Sun X. Yang, M-Y. Liu and J. Kautz, PWC-Net: CNNs for Optical Flow Using | |
Pyramid, Warping, and Cost Volume, 2017 | |
""" | |
from typing import List | |
import torch | |
from torch import nn | |
from torch.nn import functional as F | |
import util | |
class FlowEstimator(nn.Module): | |
"""Small-receptive field predictor for computing the flow between two images. | |
This is used to compute the residual flow fields in PyramidFlowEstimator. | |
Note that while the number of 3x3 convolutions & filters to apply is | |
configurable, two extra 1x1 convolutions are appended to extract the flow in | |
the end. | |
Attributes: | |
name: The name of the layer | |
num_convs: Number of 3x3 convolutions to apply | |
num_filters: Number of filters in each 3x3 convolution | |
""" | |
def __init__(self, in_channels: int, num_convs: int, num_filters: int): | |
super(FlowEstimator, self).__init__() | |
self._convs = nn.ModuleList() | |
for i in range(num_convs): | |
self._convs.append(util.Conv2d(in_channels=in_channels, out_channels=num_filters, size=3)) | |
in_channels = num_filters | |
self._convs.append(util.Conv2d(in_channels, num_filters // 2, size=1)) | |
in_channels = num_filters // 2 | |
# For the final convolution, we want no activation at all to predict the | |
# optical flow vector values. We have done extensive testing on explicitly | |
# bounding these values using sigmoid, but it turned out that having no | |
# activation gives better results. | |
self._convs.append(util.Conv2d(in_channels, 2, size=1, activation=None)) | |
def forward(self, features_a: torch.Tensor, features_b: torch.Tensor) -> torch.Tensor: | |
"""Estimates optical flow between two images. | |
Args: | |
features_a: per pixel feature vectors for image A (B x H x W x C) | |
features_b: per pixel feature vectors for image B (B x H x W x C) | |
Returns: | |
A tensor with optical flow from A to B | |
""" | |
net = torch.cat([features_a, features_b], dim=1) | |
for conv in self._convs: | |
net = conv(net) | |
return net | |
class PyramidFlowEstimator(nn.Module): | |
"""Predicts optical flow by coarse-to-fine refinement. | |
""" | |
def __init__(self, filters: int = 64, | |
flow_convs: tuple = (3, 3, 3, 3), | |
flow_filters: tuple = (32, 64, 128, 256)): | |
super(PyramidFlowEstimator, self).__init__() | |
in_channels = filters << 1 | |
predictors = [] | |
for i in range(len(flow_convs)): | |
predictors.append( | |
FlowEstimator( | |
in_channels=in_channels, | |
num_convs=flow_convs[i], | |
num_filters=flow_filters[i])) | |
in_channels += filters << (i + 2) | |
self._predictor = predictors[-1] | |
self._predictors = nn.ModuleList(predictors[:-1][::-1]) | |
def forward(self, feature_pyramid_a: List[torch.Tensor], | |
feature_pyramid_b: List[torch.Tensor]) -> List[torch.Tensor]: | |
"""Estimates residual flow pyramids between two image pyramids. | |
Each image pyramid is represented as a list of tensors in fine-to-coarse | |
order. Each individual image is represented as a tensor where each pixel is | |
a vector of image features. | |
util.flow_pyramid_synthesis can be used to convert the residual flow | |
pyramid returned by this method into a flow pyramid, where each level | |
encodes the flow instead of a residual correction. | |
Args: | |
feature_pyramid_a: image pyramid as a list in fine-to-coarse order | |
feature_pyramid_b: image pyramid as a list in fine-to-coarse order | |
Returns: | |
List of flow tensors, in fine-to-coarse order, each level encoding the | |
difference against the bilinearly upsampled version from the coarser | |
level. The coarsest flow tensor, e.g. the last element in the array is the | |
'DC-term', e.g. not a residual (alternatively you can think of it being a | |
residual against zero). | |
""" | |
levels = len(feature_pyramid_a) | |
v = self._predictor(feature_pyramid_a[-1], feature_pyramid_b[-1]) | |
residuals = [v] | |
for i in range(levels - 2, len(self._predictors) - 1, -1): | |
# Upsamples the flow to match the current pyramid level. Also, scales the | |
# magnitude by two to reflect the new size. | |
level_size = feature_pyramid_a[i].shape[2:4] | |
v = F.interpolate(2 * v, size=level_size, mode='bilinear') | |
# Warp feature_pyramid_b[i] image based on the current flow estimate. | |
warped = util.warp(feature_pyramid_b[i], v) | |
# Estimate the residual flow between pyramid_a[i] and warped image: | |
v_residual = self._predictor(feature_pyramid_a[i], warped) | |
residuals.insert(0, v_residual) | |
v = v_residual + v | |
for k, predictor in enumerate(self._predictors): | |
i = len(self._predictors) - 1 - k | |
# Upsamples the flow to match the current pyramid level. Also, scales the | |
# magnitude by two to reflect the new size. | |
level_size = feature_pyramid_a[i].shape[2:4] | |
v = F.interpolate(2 * v, size=level_size, mode='bilinear') | |
# Warp feature_pyramid_b[i] image based on the current flow estimate. | |
warped = util.warp(feature_pyramid_b[i], v) | |
# Estimate the residual flow between pyramid_a[i] and warped image: | |
v_residual = predictor(feature_pyramid_a[i], warped) | |
residuals.insert(0, v_residual) | |
v = v_residual + v | |
return residuals | |