import math
import os
from abc import abstractmethod
from collections import namedtuple

import numpy as np
import paddle
from paddle import ParamAttr
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn import Layer
from paddle.utils import try_import

def read_config(fp=None):
    if fp is None:
        dir_name = os.path.dirname(os.path.abspath(__file__))
        fp = os.path.join(dir_name, "visual_backbone.yaml")
    with open(fp, "r") as fin:
        yacs_config = try_import("yacs.config")
        cfg = yacs_config.CfgNode().load_cfg(fin)
    return cfg

[docs]class Conv2d(nn.Conv2D): def __init__(self, *args, **kwargs): norm = kwargs.pop("norm", None) activation = kwargs.pop("activation", None) super(Conv2d, self).__init__(*args, **kwargs) self.norm = norm self.activation = activation
[docs] def forward(self, x): x = super(Conv2d, self).forward(x) if self.norm is not None: x = self.norm(x) if self.activation is not None: x = self.activation(x) return x
[docs]class CNNBlockBase(Layer): def __init__(self, in_channels, out_channels, stride): """ The `__init__` method of any subclass should also contain these arguments. Args: in_channels (int): out_channels (int): stride (int): """ super(CNNBlockBase, self).__init__() self.in_channels = in_channels self.out_channels = out_channels self.stride = stride def freeze(self): for p in self.parameters(): p.stop_gradient = True
ResNetBlockBase = CNNBlockBase
[docs]class ShapeSpec(namedtuple("_ShapeSpec", ["channels", "height", "width", "stride"])): def __new__(cls, channels=None, height=None, width=None, stride=None): return super().__new__(cls, channels, height, width, stride)
[docs]def get_norm(norm, out_channels): """ Args: norm (str or callable): either one of BN, SyncBN, FrozenBN, GN; or a callable that takes a channel number and returns the normalization layer as a nn.Layer. out_channels (int): out_channels Returns: nn.Layer or None: the normalization layer """ if norm is None: return None if isinstance(norm, str): if len(norm) == 0: return None norm = { "BN": nn.BatchNorm, "SyncBN": nn.SyncBatchNorm, "FrozenBN": FrozenBatchNorm, }[norm] return norm(out_channels)
[docs]class FrozenBatchNorm(nn.BatchNorm): def __init__(self, num_channels): param_attr = ParamAttr(learning_rate=0.0, trainable=False) bias_attr = ParamAttr(learning_rate=0.0, trainable=False) super(FrozenBatchNorm, self).__init__( num_channels, param_attr=param_attr, bias_attr=bias_attr, use_global_stats=True )
[docs]class Backbone(nn.Layer): def __init__(self): super(Backbone, self).__init__()
[docs] @abstractmethod def forward(self, *args): pass
@property def size_divisibility(self) -> int: return 0 def output_shape(self): # this is a backward-compatible default return { name: ShapeSpec(channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]) for name in self._out_features }
[docs]class BasicBlock(CNNBlockBase): """ The basic residual block for ResNet-18 and ResNet-34 defined in :paper:`ResNet`, with two 3x3 conv layers and a projection shortcut if needed. """ def __init__(self, in_channels, out_channels, *, stride=1, norm="BN"): raise NotImplementedError
[docs]class BottleneckBlock(CNNBlockBase): """ The standard bottleneck residual block used by ResNet-50, 101 and 152 defined in :paper:`ResNet`. It contains 3 conv layers with kernels 1x1, 3x3, 1x1, and a projection shortcut if needed. """ def __init__( self, in_channels, out_channels, *, bottleneck_channels, stride=1, num_groups=1, norm="BN", stride_in_1x1=False, dilation=1, ): super(BottleneckBlock, self).__init__(in_channels, out_channels, stride) if in_channels != out_channels: self.shortcut = Conv2d( in_channels, out_channels, kernel_size=1, stride=stride, bias_attr=False, norm=get_norm(norm, out_channels), ) else: self.shortcut = None stride_1x1, stride_3x3 = (stride, 1) if stride_in_1x1 else (1, stride) self.conv1 = Conv2d( in_channels, bottleneck_channels, kernel_size=1, stride=stride_1x1, bias_attr=False, norm=get_norm(norm, bottleneck_channels), ) self.conv2 = Conv2d( bottleneck_channels, bottleneck_channels, kernel_size=3, stride=stride_3x3, padding=1 * dilation, bias_attr=False, groups=num_groups, dilation=dilation, norm=get_norm(norm, bottleneck_channels), ) self.conv3 = Conv2d( bottleneck_channels, out_channels, kernel_size=1, bias_attr=False, norm=get_norm(norm, out_channels), ) # init code is removed cause pretrained model will be loaded
[docs] def forward(self, x): out = self.conv1(x) out = F.relu(out) out = self.conv2(out) out = F.relu(out) out = self.conv3(out) if self.shortcut is not None: shortcut = self.shortcut(x) else: shortcut = x out += shortcut out = F.relu(out) return out
[docs]class DeformBottleneckBlock(CNNBlockBase): """ Similar to :class:`BottleneckBlock`, but with :paper:`deformable conv <deformconv>` in the 3x3 convolution. """ def __init__( self, in_channels, out_channels, *, bottleneck_channels, stride=1, num_groups=1, norm="BN", stride_in_1x1=False, dilation=1, deform_modulated=False, deform_num_groups=1, ): raise NotImplementedError
[docs]class BasicStem(CNNBlockBase): """ The standard ResNet stem (layers before the first residual block), with a conv, relu and max_pool. """ def __init__(self, in_channels=3, out_channels=64, norm="BN"): """ Args: norm (str or callable): norm after the first conv layer. See :func:`layers.get_norm` for supported format. """ super(BasicStem, self).__init__(in_channels, out_channels, 4) self.in_channels = in_channels self.conv1 = Conv2d( in_channels, out_channels, kernel_size=7, stride=2, padding=3, bias_attr=False, norm=get_norm(norm, out_channels), )
[docs] def forward(self, x): x = self.conv1(x) x = F.relu(x) x = F.max_pool2d(x, kernel_size=3, stride=2, padding=1) return x
[docs]class ResNet(Backbone): def __init__(self, stem, stages, num_classes=None, out_features=None, freeze_at=0): super(ResNet, self).__init__() self.stem = stem self.num_classes = num_classes current_stride = self.stem.stride self._out_feature_strides = {"stem": current_stride} self._out_feature_channels = {"stem": self.stem.out_channels} self.stage_names, self.stages = [], [] if out_features is not None: num_stages = max([{"res2": 1, "res3": 2, "res4": 3, "res5": 4}.get(f, 0) for f in out_features]) stages = stages[:num_stages] for i, blocks in enumerate(stages): assert len(blocks) > 0, len(blocks) for block in blocks: assert isinstance(block, CNNBlockBase), block name = "res" + str(i + 2) stage = nn.Sequential(*blocks) self.add_sublayer(name, stage) self.stage_names.append(name) self.stages.append(stage) self._out_feature_strides[name] = current_stride = int( current_stride *[k.stride for k in blocks]) ) self._out_feature_channels[name] = curr_channels = blocks[-1].out_channels self.stage_names = tuple(self.stage_names) if num_classes is not None: self.avgpool = nn.AdaptiveAvgPool2D(1) self.linear = nn.Linear(curr_channels, num_classes) name = "linear" if out_features is None: out_features = [name] self._out_features = out_features assert len(self._out_features) children = [x[0] for x in self.named_children()] for out_feature in self._out_features: assert out_feature in children, "Available children: {}".format(", ".join(children)) self.freeze(freeze_at)
[docs] def forward(self, x): """ Args: x: Tensor of shape (N,C,H,W). H, W must be a multiple of ``self.size_divisibility``. Returns: dict[str->Tensor]: names and the corresponding features """ assert x.dim() == 4, f"ResNet takes an input of shape (N, C, H, W). Got {x.shape} instead!" outputs = {} x = self.stem(x) if "stem" in self._out_features: outputs["stem"] = x for name, stage in zip(self.stage_names, self.stages): x = stage(x) if name in self._out_features: outputs[name] = x if self.num_classes is not None: x = self.avgpool(x) x = paddle.flatten(x, 1) x = self.linear(x) if "linear" in self._out_features: outputs["linear"] = x return outputs
def output_shape(self): return { name: ShapeSpec(channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]) for name in self._out_features }
[docs] @staticmethod def make_stage(block_class, num_blocks, *, in_channels, out_channels, **kwargs): """ Create a list of blocks of the same type that forms one ResNet stage. Args: block_class (type): a subclass of CNNBlockBase that's used to create all blocks in this stage. A module of this type must not change spatial resolution of inputs unless its stride != 1. num_blocks (int): number of blocks in this stage in_channels (int): input channels of the entire stage. out_channels (int): output channels of **every block** in the stage. kwargs: other arguments passed to the constructor of `block_class`. If the argument name is "xx_per_block", the argument is a list of values to be passed to each block in the stage. Otherwise, the same argument is passed to every block in the stage. Returns: list[CNNBlockBase]: a list of block module. Examples: :: stage = ResNet.make_stage( BottleneckBlock, 3, in_channels=16, out_channels=64, bottleneck_channels=16, num_groups=1, stride_per_block=[2, 1, 1], dilations_per_block=[1, 1, 2] ) Usually, layers that produce the same feature map spatial size are defined as one "stage" (in :paper:`FPN`). Under such definition, ``stride_per_block[1:]`` should all be 1. """ blocks = [] for i in range(num_blocks): curr_kwargs = {} for k, v in kwargs.items(): if k.endswith("_per_block"): assert len(v) == num_blocks, ( f"Argument '{k}' of make_stage should have the " f"same length as num_blocks={num_blocks}." ) newk = k[: -len("_per_block")] assert newk not in kwargs, f"Cannot call make_stage with both {k} and {newk}!" curr_kwargs[newk] = v[i] else: curr_kwargs[k] = v blocks.append(block_class(in_channels=in_channels, out_channels=out_channels, **curr_kwargs)) in_channels = out_channels return blocks
[docs] @staticmethod def make_default_stages(depth, block_class=None, **kwargs): """ Created list of ResNet stages from pre-defined depth (one of 18, 34, 50, 101, 152). If it doesn't create the ResNet variant you need, please use :meth:`make_stage` instead for fine-grained customization. Args: depth (int): depth of ResNet block_class (type): the CNN block class. Has to accept `bottleneck_channels` argument for depth > 50. By default it is BasicBlock or BottleneckBlock, based on the depth. kwargs: other arguments to pass to `make_stage`. Should not contain stride and channels, as they are predefined for each depth. Returns: list[list[CNNBlockBase]]: modules in all stages; see arguments of :class:`ResNet.__init__`. """ num_blocks_per_stage = { 18: [2, 2, 2, 2], 34: [3, 4, 6, 3], 50: [3, 4, 6, 3], 101: [3, 4, 23, 3], 152: [3, 8, 36, 3], }[depth] if block_class is None: block_class = BasicBlock if depth < 50 else BottleneckBlock if depth < 50: in_channels = [64, 64, 128, 256] out_channels = [64, 128, 256, 512] else: in_channels = [64, 256, 512, 1024] out_channels = [256, 512, 1024, 2048] ret = [] for (n, s, i, o) in zip(num_blocks_per_stage, [1, 2, 2, 2], in_channels, out_channels): if depth >= 50: kwargs["bottleneck_channels"] = o // 4 ret.append( ResNet.make_stage( block_class=block_class, num_blocks=n, stride_per_block=[s] + [1] * (n - 1), in_channels=i, out_channels=o, **kwargs, ) ) return ret
def freeze(self, freeze_at=0): if freeze_at >= 1: self.stem.freeze() for idx, stage in enumerate(self.stages, start=2): if freeze_at >= idx: for block in stage.children(): block.freeze() return self
[docs]class LastLevelMaxPool(nn.Layer): """ This module is used in the original FPN to generate a downsampled P6 feature from P5. """ def __init__(self): super(LastLevelMaxPool, self).__init__() self.num_levels = 1 self.in_feature = "p5"
[docs] def forward(self, x): return [F.max_pool2d(x, kernel_size=1, stride=2, padding=0)]
def _assert_strides_are_log2_contiguous(strides): """ Assert that each stride is 2x times its preceding stride, i.e. "contiguous in log2". """ for i, stride in enumerate(strides[1:], 1): assert stride == 2 * strides[i - 1], "Strides {} {} are not log2 contiguous".format(stride, strides[i - 1])
[docs]class FPN(Backbone): def __init__(self, bottom_up, in_features, out_channels, norm="", top_block=None, fuse_type="sum"): super(FPN, self).__init__() assert isinstance(bottom_up, Backbone) assert in_features, in_features # Feature map strides and channels from the bottom up network (e.g. ResNet) input_shapes = bottom_up.output_shape() strides = [input_shapes[f].stride for f in in_features] in_channels_per_feature = [input_shapes[f].channels for f in in_features] _assert_strides_are_log2_contiguous(strides) lateral_convs = [] output_convs = [] use_bias = norm == "" for idx, in_channels in enumerate(in_channels_per_feature): lateral_norm = get_norm(norm, out_channels) output_norm = get_norm(norm, out_channels) lateral_conv = Conv2d(in_channels, out_channels, kernel_size=1, bias_attr=use_bias, norm=lateral_norm) output_conv = Conv2d( out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias_attr=use_bias, norm=output_norm, ) stage = int(math.log2(strides[idx])) self.add_sublayer("fpn_lateral{}".format(stage), lateral_conv) self.add_sublayer("fpn_output{}".format(stage), output_conv) lateral_convs.append(lateral_conv) output_convs.append(output_conv) # Place convs into top-down order (from low to high resolution) # to make the top-down computation in forward clearer. self.lateral_convs = lateral_convs[::-1] self.output_convs = output_convs[::-1] self.top_block = top_block self.in_features = tuple(in_features) self.bottom_up = bottom_up # Return feature names are "p<stage>", like ["p2", "p3", ..., "p6"] self._out_feature_strides = {"p{}".format(int(math.log2(s))): s for s in strides} # top block output feature maps. if self.top_block is not None: for s in range(stage, stage + self.top_block.num_levels): self._out_feature_strides["p{}".format(s + 1)] = 2 ** (s + 1) self._out_features = list(self._out_feature_strides.keys()) self._out_feature_channels = {k: out_channels for k in self._out_features} self._size_divisibility = strides[-1] assert fuse_type in {"avg", "sum"} self._fuse_type = fuse_type @property def size_divisibility(self): return self._size_divisibility
[docs] def forward(self, x): """ Args: x (dict[str->Tensor]): mapping feature map name (e.g., "res5") to feature map tensor for each feature level in high to low resolution order. Returns: dict[str->Tensor]: mapping from feature map name to FPN feature map tensor in high to low resolution order. Returned feature names follow the FPN paper convention: "p<stage>", where stage has stride = 2 ** stage e.g., ["p2", "p3", ..., "p6"]. """ bottom_up_features = self.bottom_up(x) results = [] prev_features = self.lateral_convs[0](bottom_up_features[self.in_features[-1]]) results.append(self.output_convs[0](prev_features)) # Reverse feature maps into top-down order (from low to high resolution) for idx, (lateral_conv, output_conv) in enumerate(zip(self.lateral_convs, self.output_convs)): if idx > 0: features = self.in_features[-idx - 1] features = bottom_up_features[features] top_down_features = F.interpolate(prev_features, scale_factor=2.0, mode="nearest") lateral_features = lateral_conv(features) prev_features = lateral_features + top_down_features if self._fuse_type == "avg": prev_features /= 2 results.insert(0, output_conv(prev_features)) if self.top_block is not None: if self.top_block.in_feature in bottom_up_features: top_block_in_feature = bottom_up_features[self.top_block.in_feature] else: top_block_in_feature = results[self._out_features.index(self.top_block.in_feature)] results.extend(self.top_block(top_block_in_feature)) assert len(self._out_features) == len(results) return {f: res for f, res in zip(self._out_features, results)}
def output_shape(self): return { name: ShapeSpec(channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]) for name in self._out_features }
[docs]def make_stage(*args, **kwargs): """ Deprecated alias for backward compatibiltiy. """ return ResNet.make_stage(*args, **kwargs)
[docs]def build_resnet_backbone(cfg, input_shape=None): """ Create a ResNet instance from config. Returns: ResNet: a :class:`ResNet` instance. """ # need registration of new blocks/stems? if input_shape is None: ch = 3 else: ch = input_shape.channels norm = cfg.MODEL.RESNETS.NORM stem = BasicStem( in_channels=ch, out_channels=cfg.MODEL.RESNETS.STEM_OUT_CHANNELS, norm=norm, ) # fmt: off freeze_at = cfg.MODEL.BACKBONE.FREEZE_AT # default as 2 out_features = cfg.MODEL.RESNETS.OUT_FEATURES depth = cfg.MODEL.RESNETS.DEPTH num_groups = cfg.MODEL.RESNETS.NUM_GROUPS width_per_group = cfg.MODEL.RESNETS.WIDTH_PER_GROUP bottleneck_channels = num_groups * width_per_group in_channels = cfg.MODEL.RESNETS.STEM_OUT_CHANNELS out_channels = cfg.MODEL.RESNETS.RES2_OUT_CHANNELS stride_in_1x1 = cfg.MODEL.RESNETS.STRIDE_IN_1X1 res5_dilation = cfg.MODEL.RESNETS.RES5_DILATION deform_on_per_stage = cfg.MODEL.RESNETS.DEFORM_ON_PER_STAGE deform_modulated = cfg.MODEL.RESNETS.DEFORM_MODULATED deform_num_groups = cfg.MODEL.RESNETS.DEFORM_NUM_GROUPS # fmt: on assert res5_dilation in {1, 2}, "res5_dilation cannot be {}.".format(res5_dilation) num_blocks_per_stage = { 18: [2, 2, 2, 2], 34: [3, 4, 6, 3], 50: [3, 4, 6, 3], 101: [3, 4, 23, 3], 152: [3, 8, 36, 3], }[depth] if depth in [18, 34]: assert out_channels == 64, "Must set MODEL.RESNETS.RES2_OUT_CHANNELS = 64 for R18/R34" assert not any(deform_on_per_stage), "MODEL.RESNETS.DEFORM_ON_PER_STAGE unsupported for R18/R34" assert res5_dilation == 1, "Must set MODEL.RESNETS.RES5_DILATION = 1 for R18/R34" assert num_groups == 1, "Must set MODEL.RESNETS.NUM_GROUPS = 1 for R18/R34" stages = [] for idx, stage_idx in enumerate(range(2, 6)): # res5_dilation is used this way as a convention in R-FCN & Deformable Conv paper dilation = res5_dilation if stage_idx == 5 else 1 first_stride = 1 if idx == 0 or (stage_idx == 5 and dilation == 2) else 2 stage_kargs = { "num_blocks": num_blocks_per_stage[idx], "stride_per_block": [first_stride] + [1] * (num_blocks_per_stage[idx] - 1), "in_channels": in_channels, "out_channels": out_channels, "norm": norm, } # Use BasicBlock for R18 and R34. if depth in [18, 34]: stage_kargs["block_class"] = BasicBlock else: stage_kargs["bottleneck_channels"] = bottleneck_channels stage_kargs["stride_in_1x1"] = stride_in_1x1 stage_kargs["dilation"] = dilation stage_kargs["num_groups"] = num_groups if deform_on_per_stage[idx]: stage_kargs["block_class"] = DeformBottleneckBlock stage_kargs["deform_modulated"] = deform_modulated stage_kargs["deform_num_groups"] = deform_num_groups else: stage_kargs["block_class"] = BottleneckBlock blocks = ResNet.make_stage(**stage_kargs) in_channels = out_channels out_channels *= 2 bottleneck_channels *= 2 stages.append(blocks) return ResNet(stem, stages, out_features=out_features, freeze_at=freeze_at)
def build_resnet_fpn_backbone(cfg, input_shape=None): bottom_up = build_resnet_backbone(cfg, input_shape) in_features = cfg.MODEL.FPN.IN_FEATURES out_channels = cfg.MODEL.FPN.OUT_CHANNELS backbone = FPN( bottom_up=bottom_up, in_features=in_features, out_channels=out_channels, norm=cfg.MODEL.FPN.NORM, top_block=LastLevelMaxPool(), fuse_type=cfg.MODEL.FPN.FUSE_TYPE, ) return backbone
[docs]class VisualBackbone(Layer): def __init__(self, config): super(VisualBackbone, self).__init__() self.cfg = read_config() self.backbone = build_resnet_fpn_backbone(self.cfg) # syncbn is removed cause that will cause import of torch assert len(self.cfg.MODEL.PIXEL_MEAN) == len(self.cfg.MODEL.PIXEL_STD) num_channels = len(self.cfg.MODEL.PIXEL_MEAN) self.register_buffer("pixel_mean", paddle.to_tensor(self.cfg.MODEL.PIXEL_MEAN).reshape([num_channels, 1, 1])) self.register_buffer("pixel_std", paddle.to_tensor(self.cfg.MODEL.PIXEL_STD).reshape([num_channels, 1, 1])) self.out_feature_key = "p2" # is_deterministic is disabled here. self.pool = nn.AdaptiveAvgPool2D(config["image_feature_pool_shape"][:2]) if len(config["image_feature_pool_shape"]) == 2: config["image_feature_pool_shape"].append(self.backbone.output_shape()[self.out_feature_key].channels) assert self.backbone.output_shape()[self.out_feature_key].channels == config["image_feature_pool_shape"][2]
[docs] def forward(self, images): images_input = (paddle.to_tensor(images) - self.pixel_mean) / self.pixel_std features = self.backbone(images_input) features = features[self.out_feature_key] features = self.pool(features).flatten(start_axis=2).transpose([0, 2, 1]) return features