from torch.nn import functional as F from torch.utils.data import DataLoader from torchvision import transforms as T from torchvision.transforms.functional import to_pil_image from threading import Thread from tqdm import tqdm from torch.utils.data import Dataset from PIL import Image from typing import Callable, Optional, List, Tuple import glob from torch import nn from torchvision.models.resnet import ResNet, Bottleneck from torch import Tensor import torchvision import numpy as np import cv2 import uuid
# --------------- hy --------------- class HomographicAlignment: """ Apply homographic alignment on background to match with the source image. """
def crop_patch(self, x: torch.Tensor, idx: Tuple[torch.Tensor, torch.Tensor, torch.Tensor], size: int, padding: int): """ Crops selected patches from image given indices. Inputs: x: image (B, C, H, W). idx: selection indices Tuple[(P,), (P,), (P,),], where the 3 values are (B, H, W) index. size: center size of the patch, also stride of the crop. padding: expansion size of the patch. Output: patch: (P, C, h, w), where h = w = size + 2 * padding. """ if padding != 0: x = F.pad(x, (padding,) * 4)
if self.patch_crop_method == 'unfold': # Use unfold. Best performance for PyTorch and TorchScript. return x.permute(0, 2, 3, 1) \ .unfold(1, size + 2 * padding, size) \ .unfold(2, size + 2 * padding, size)[idx[0], idx[1], idx[2]] elif self.patch_crop_method == 'roi_align': # Use roi_align. Best compatibility for ONNX. idx = idx[0].type_as(x), idx[1].type_as(x), idx[2].type_as(x) b = idx[0] x1 = idx[2] * size - 0.5 y1 = idx[1] * size - 0.5 x2 = idx[2] * size + size + 2 * padding - 0.5 y2 = idx[1] * size + size + 2 * padding - 0.5 boxes = torch.stack([b, x1, y1, x2, y2], dim=1) return torchvision.ops.roi_align(x, boxes, size + 2 * padding, sampling_ratio=1) else: # Use gather. Crops out patches pixel by pixel. idx_pix = self.compute_pixel_indices(x, idx, size, padding) pat = torch.gather(x.view(-1), 0, idx_pix.view(-1)) pat = pat.view(-1, x.size(1), size + 2 * padding, size + 2 * padding) return pat
def replace_patch(self, x: torch.Tensor, y: torch.Tensor, idx: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]): """ Replaces patches back into image given index. Inputs: x: image (B, C, H, W) y: patches (P, C, h, w) idx: selection indices Tuple[(P,), (P,), (P,)] where the 3 values are (B, H, W) index. Output: image: (B, C, H, W), where patches at idx locations are replaced with y. """ xB, xC, xH, xW = x.shape yB, yC, yH, yW = y.shape if self.patch_replace_method == 'scatter_nd': # Use scatter_nd. Best performance for PyTorch and TorchScript. Replacing patch by patch. x = x.view(xB, xC, xH // yH, yH, xW // yW, yW).permute(0, 2, 4, 1, 3, 5) x[idx[0], idx[1], idx[2]] = y x = x.permute(0, 3, 1, 4, 2, 5).view(xB, xC, xH, xW) return x else: # Use scatter_element. Best compatibility for ONNX. Replacing pixel by pixel. idx_pix = self.compute_pixel_indices(x, idx, size=4, padding=0) return x.view(-1).scatter_(0, idx_pix.view(-1), y.view(-1)).view(x.shape)
def compute_pixel_indices(self, x: torch.Tensor, idx: Tuple[torch.Tensor, torch.Tensor, torch.Tensor], size: int, padding: int): """ Compute selected pixel indices in the tensor. Used for crop_method == 'gather' and replace_method == 'scatter_element', which crop and replace pixel by pixel. Input: x: image: (B, C, H, W) idx: selection indices Tuple[(P,), (P,), (P,),], where the 3 values are (B, H, W) index. size: center size of the patch, also stride of the crop. padding: expansion size of the patch. Output: idx: (P, C, O, O) long tensor where O is the output size: size + 2 * padding, P is number of patches. the element are indices pointing to the input x.view(-1). """
B, C, H, W = x.shape S, P = size, padding O = S + 2 * P b, y, x = idx n = b.size(0) c = torch.arange(C) o = torch.arange(O) idx_pat = (c * H * W).view(C, 1, 1).expand([C, O, O]) + (o * W).view(1, O, 1).expand([C, O, O]) + o.view(1, 1, O).expand( [C, O, O]) idx_loc = b * W * H + y * W * S + x * S idx_pix = idx_loc.view(-1, 1, 1, 1).expand([n, C, O, O]) + idx_pat.view(1, C, O, O).expand([n, C, O, O]) return idx_pix
def load_matched_state_dict(model, state_dict, print_stats=True): """ Only loads weights that matched in key and shape. Ignore other weights. """ num_matched, num_total = 0, 0 curr_state_dict = model.state_dict() for key in curr_state_dict.keys(): num_total += 1 if key in state_dict and curr_state_dict[key].shape == state_dict[key].shape: curr_state_dict[key] = state_dict[key] num_matched += 1 model.load_state_dict(curr_state_dict) if print_stats: print(f'Loaded state_dict: {num_matched}/{num_total} matched')
def _make_divisible(v: float, divisor: int, min_value: Optional[int] = None) -> int: """ This function is taken from the original tf repo. It ensures that all layers have a channel number that is divisible by 8 It can be seen here: https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py """ if min_value is None: min_value = divisor new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) # Make sure that round down does not go down by more than 10%. if new_v new_v += divisor return new_v
class ConvNormActivation(torch.nn.Sequential): def __init__( self, in_channels: int, out_channels: int, kernel_size: int = 3, stride: int = 1, padding: Optional[int] = None, groups: int = 1, norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d, activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, dilation: int = 1, inplace: bool = True, ) -> None: if padding is None: padding = (kernel_size - 1) // 2 * dilation layers = [torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, dilation=dilation, groups=groups, bias=norm_layer is None)] if norm_layer is not None: layers.append(norm_layer(out_channels)) if activation_layer is not None: layers.append(activation_layer(inplace=inplace)) super().__init__(*layers) self.out_channels = out_channels
def forward(self, x: Tensor) -> Tensor: if self.use_res_connect: return x + self.conv(x) else: return self.conv(x)
class MobileNetV2(nn.Module): def __init__( self, num_classes: int = 1000, width_mult: float = 1.0, inverted_residual_setting: Optional[List[List[int]]] = None, round_nearest: int = 8, block: Optional[Callable[..., nn.Module]] = None, norm_layer: Optional[Callable[..., nn.Module]] = None ) -> None: """ MobileNet V2 main class Args: num_classes (int): Number of classes width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount inverted_residual_setting: Network structure round_nearest (int): Round the number of channels in each layer to be a multiple of this number Set to 1 to turn off rounding block: Module specifying inverted residual building block for mobilenet norm_layer: Module specifying the normalization layer to use """ super(MobileNetV2, self).__init__()
if block is None: block = InvertedResidual
if norm_layer is None: norm_layer = nn.BatchNorm2d
# only check the first element, assuming user knows t,c,n,s are required if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4: raise ValueError("inverted_residual_setting should be non-empty " "or a 4-element list, got {}".format(inverted_residual_setting))
# building first layer input_channel = _make_divisible(input_channel * width_mult, round_nearest) self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest) features: List[nn.Module] = [ConvNormActivation(3, input_channel, stride=2, norm_layer=norm_layer, activation_layer=nn.ReLU6)] # building inverted residual blocks for t, c, n, s in inverted_residual_setting: output_channel = _make_divisible(c * width_mult, round_nearest) for i in range(n): stride = s if i == 0 else 1 features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer)) input_channel = output_channel # building last several layers features.append(ConvNormActivation(input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6)) # make it nn.Sequential self.features = nn.Sequential(*features)
# building classifier self.classifier = nn.Sequential( nn.Dropout(0.2), nn.Linear(self.last_channel, num_classes), )
# weight initialization for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out') if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.ones_(m.weight) nn.init.zeros_(m.bias) elif isinstance(m, nn.Linear): nn.init.normal_(m.weight, 0, 0.01) nn.init.zeros_(m.bias)
def _forward_impl(self, x: Tensor) -> Tensor: # This exists since TorchScript doesn't support inheritance, so the superclass method # (this one) needs to have a name other than `forward` that can be accessed in a subclass x = self.features(x) # Cannot use "squeeze" as batch-size can be 1 x = nn.functional.adaptive_avg_pool2d(x, (1, 1)) x = torch.flatten(x, 1) x = self.classifier(x) return x
class MobileNetV2Encoder(MobileNetV2): """ MobileNetV2Encoder inherits from torchvision's official MobileNetV2. It is modified to use dilation on the last block to maintain output stride 16, and deleted the classifier block that was originally used for classification. The forward method additionally returns the feature maps at all resolutions for decoder's use. """
# Replace first conv layer if in_channels doesn't match. if in_channels != 3: self.features[0][0] = nn.Conv2d(in_channels, 32, 3, 2, 1, bias=False)
# Remove last block self.features = self.features[:-1]
# Change to use dilation to maintain output stride = 16 self.features[14].conv[1][0].stride = (1, 1) for feature in self.features[15:]: feature.conv[1][0].dilation = (2, 2) feature.conv[1][0].padding = (2, 2)
# Delete classifier del self.classifier
def forward(self, x): x0 = x # 1/1 x = self.features[0](x) x = self.features[1](x) x1 = x # 1/2 x = self.features[2](x) x = self.features[3](x) x2 = x # 1/4 x = self.features[4](x) x = self.features[5](x) x = self.features[6](x) x3 = x # 1/8 x = self.features[7](x) x = self.features[8](x) x = self.features[9](x) x = self.features[10](x) x = self.features[11](x) x = self.features[12](x) x = self.features[13](x) x = self.features[14](x) x = self.features[15](x) x = self.features[16](x) x = self.features[17](x) x4 = x # 1/16 return x4, x3, x2, x1, x0
def forward(self, x4, x3, x2, x1, x0):
x = F.interpolate(x4, size=x3.shape[2:], mode='bilinear', align_corners=False) x = torch.cat([x, x3], dim=1) x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = F.interpolate(x, size=x2.shape[2:], mode='bilinear', align_corners=False) x = torch.cat([x, x2], dim=1) x = self.conv2(x) x = self.bn2(x) x = self.relu(x) x = F.interpolate(x, size=x1.shape[2:], mode='bilinear', align_corners=False) x = torch.cat([x, x1], dim=1) x = self.conv3(x) x = self.bn3(x) x = self.relu(x) x = F.interpolate(x, size=x0.shape[2:], mode='bilinear', align_corners=False) x = torch.cat([x, x0], dim=1) x = self.conv4(x) return x
# Replace first conv layer if in_channels doesn't match. if in_channels != 3: self.conv1 = nn.Conv2d(in_channels, 64, 7, 2, 3, bias=False)
# Delete fully-connected layer del self.avgpool del self.fc
def forward(self, x): x0 = x # 1/1 x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x1 = x # 1/2 x = self.maxpool(x) x = self.layer1(x) x2 = x # 1/4 x = self.layer2(x) x3 = x # 1/8 x = self.layer3(x) x = self.layer4(x) x4 = x # 1/16 return x4, x3, x2, x1, x0
class Base(nn.Module): """ A generic implementation of the base encoder-decoder network inspired by DeepLab. Accepts arbitrary channels for input and output. """
def forward(self, x): x, *shortcuts = self.backbone(x) x = self.aspp(x) x = self.decoder(x, *shortcuts) return x
def load_pretrained_deeplabv3_state_dict(self, state_dict, print_stats=True): # Pretrained DeepLabV3 models are provided by . # This method converts and loads their pretrained state_dict to match with our model structure. # This method is not needed if you are not planning to train from deeplab weights. # Use load_state_dict() for normal weight loading.
# Convert state_dict naming for aspp module state_dict = {k.replace('classifier.classifier.0', 'aspp'): v for k, v in state_dict.items()}
if
isinstance(self.backbone, ResNetEncoder): # ResNet backbone does not need change. load_matched_state_dict(self, state_dict, print_stats) else: # Change MobileNetV2 backbone to state_dict format, then change back after loading. backbone_features = self.backbone.features self.backbone.low_level_features = backbone_features[:4] self.backbone.high_level_features = backbone_features[4:] del self.backbone.features load_matched_state_dict(self, state_dict, print_stats) self.backbone.features = backbone_features del self.backbone.low_level_features del self.backbone.high_level_features
def forward(self, src, bgr): assert src.size() == bgr.size(), 'src and bgr must have the same shape' assert src.size(2) // 4 * 4 == src.size(2) and src.size(3) // 4 * 4 == src.size(3), \ 'src and bgr must have width and height that are divisible by 4'
parser.add_argument('--device', type=str, choices=['cpu', 'cuda'], default='cuda') parser.add_argument('--num-workers', type=int, default=0, help='number of worker threads used in DataLoader. Note that Windows need to use single thread (0).') parser.add_argument('--preprocess-alignment', action='store_true')
assert 'err' not in args.output_types or args.model_type in ['mattingbase', 'mattingrefine'], \ 'Only mattingbase and mattingrefine support err output' assert 'ref' not in args.output_types or args.model_type in ['mattingrefine'], \ 'Only mattingrefine support ref output'
# --------------- Main ---------------
device = torch.device(args.device)
# Load model if args.model_type == 'mattingbase': model = MattingBase(args.model_backbone) if args.model_type == 'mattingrefine': model = MattingRefine( args.model_backbone, args.model_backbone_scale, args.model_refine_mode, args.model_refine_sample_pixels, args.model_refine_threshold, args.model_refine_kernel_size)
model = model.to(device).eval() model.load_state_dict(torch.load(args.model_checkpoint, map_location=device), strict=False)
# # Create output directory # if os.path.exists(args.output_dir): # if args.y or input(f'Directory {args.output_dir} already exists. Override? [Y/N]: ').lower() == 'y': # shutil.rmtree(args.output_dir) # else: # exit()
for output_type in args.output_types: if os.path.exists(os.path.join(args.output_dir, output_type)) is False: os.makedirs(os.path.join(args.output_dir, output_type))
# Worker function def writer(img, path): img = to_pil_image(img[0].cpu()) img.save(path)
# Conversion loop with torch.no_grad(): for i, (src, bgr) in enumerate(tqdm(dataloader)): src = src.to(device, non_blocking=True) bgr = bgr.to(device, non_blocking=True)