"""
YoloV5 code adapted from https://github.com/ultralytics/yolov5/blob/master/models/yolo.py
"""
import math
from typing import Union, Type, List
import torch
import torch.nn as nn
from super_gradients.training.models.csp_darknet53 import width_multiplier, Conv, BottleneckCSP, CSPDarknet53
from super_gradients.training.models.sg_module import SgModule
from super_gradients.training.utils.detection_utils import non_max_suppression, scale_img, \
check_anchor_order, check_img_size_divisibilty, matrix_non_max_suppression, NMS_Type, \
DetectionPostPredictionCallback, Anchors
from super_gradients.training.utils.export_utils import ExportableHardswish
from super_gradients.training.utils.utils import HpmStruct, get_param, print_once
import numpy as np
COCO_DETECTION_80_CLASSES_BBOX_ANCHORS = Anchors([[10, 13, 16, 30, 33, 23],
[30, 61, 62, 45, 59, 119],
[116, 90, 156, 198, 373, 326]],
strides=[8, 16, 32]) # output strides of all yolo outputs
DEFAULT_YOLOV5_ARCH_PARAMS = {
'anchors': COCO_DETECTION_80_CLASSES_BBOX_ANCHORS, # The sizes of the anchors predicted by the model
'num_classes': 80, # Number of classes to predict
'depth_mult_factor': 1.0, # depth multiplier for the entire model
'width_mult_factor': 1.0, # width multiplier for the entire model
'backbone_struct': [3, 9, 9, 3], # the number of blocks in every stage of the backbone
'channels_in': 3, # # of classes the model predicts
'skip_connections_dict': {12: [6], 16: [4], 19: [14], 22: [10], 24: [17, 20]},
# A dictionary defining skip connections. format is 'target: [source1, source2, ...]'. Each item defines a skip
# connection from all sources to the target according to the layer's index (count starts from the backbone)
'connection_layers_input_channel_size': [1024, 1024, 512],
# default number off channels for the connecting points between the backbone and the head
'fuse_conv_and_bn': False, # Fuse sequential Conv + B.N layers into a single one
'add_nms': False, # Add the NMS module to the computational graph
'nms_conf': 0.25, # When add_nms is True during NMS predictions with confidence lower than this will be discarded
'nms_iou': 0.45, # When add_nms is True IoU threshold for NMS algorithm
# (with smaller value more boxed will be considered "the same" and removed)
}
[docs]class YoloV5PostPredictionCallback(DetectionPostPredictionCallback):
"""Non-Maximum Suppression (NMS) module"""
def __init__(self, conf: float = 0.001, iou: float = 0.6, classes: List[int] = None,
nms_type: NMS_Type = NMS_Type.ITERATIVE, max_predictions: int = 300):
"""
:param conf: confidence threshold
:param iou: IoU threshold (used in NMS_Type.ITERATIVE)
:param classes: (optional list) filter by class (used in NMS_Type.ITERATIVE)
:param nms_type: the type of nms to use (iterative or matrix)
:param max_predictions: maximum number of boxes to output (used in NMS_Type.MATRIX)
"""
super(YoloV5PostPredictionCallback, self).__init__()
self.conf = conf
self.iou = iou
self.classes = classes
self.nms_type = nms_type
self.max_predictions = max_predictions
[docs] def forward(self, x, device: str = None):
if self.nms_type == NMS_Type.ITERATIVE:
return non_max_suppression(x[0], conf_thres=self.conf, iou_thres=self.iou, classes=self.classes)
else:
return matrix_non_max_suppression(x[0], conf_thres=self.conf, max_num_of_detections=self.max_predictions)
[docs]class Concat(nn.Module):
""" CONCATENATE A LIST OF TENSORS ALONG DIMENSION"""
def __init__(self, dimension=1):
super().__init__()
self.dimension = dimension
[docs] def forward(self, x):
return torch.cat(x, self.dimension)
[docs]class Detect(nn.Module):
def __init__(self, num_classes: int, anchors: Anchors, channels: list = None,
width_mult_factor: float = 1.0):
super().__init__()
# CHANGING THE WIDTH OF EACH OF THE DETECTION LAYERS
channels = [width_multiplier(channel, width_mult_factor) for channel in channels]
self.num_classes = num_classes
self.num_outputs = num_classes + 5
self.detection_layers_num = anchors.detection_layers_num
self.num_anchors = anchors.num_anchors
self.grid = [torch.zeros(1)] * self.detection_layers_num # init grid
self.register_buffer('stride', anchors.stride)
self.register_buffer('anchors', anchors.anchors)
self.register_buffer('anchor_grid', anchors.anchor_grid)
self.m = nn.ModuleList(nn.Conv2d(x, self.num_outputs * self.num_anchors, 1) for x in channels) # output conv
[docs] def forward(self, x):
z = [] # inference output
for i in range(self.detection_layers_num):
x[i] = self.m[i](x[i]) # conv
bs, _, ny, nx = x[i].shape # x(bs,255,20,20) to x(bs,3,20,20,85)
x[i] = x[i].view(bs, self.num_anchors, self.num_outputs, ny, nx).permute(0, 1, 3, 4, 2).contiguous()
if not self.training: # inference
if self.grid[i].shape[2:4] != x[i].shape[2:4]:
self.grid[i] = self._make_grid(nx, ny).to(x[i].device)
y = x[i].sigmoid()
xy = (y[..., 0:2] * 2. - 0.5 + self.grid[i].to(x[i].device)) * self.stride[i] # xy
wh = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i].view(1, self.num_anchors, 1, 1, 2) # wh
y = torch.cat([xy, wh, y[..., 4:]], dim=4)
z.append(y.view(bs, -1, self.num_outputs))
return x if self.training else (torch.cat(z, 1), x)
@staticmethod
def _make_grid(nx=20, ny=20):
yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)])
return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float()
[docs]class AbstractYoLoV5Backbone:
def __init__(self, arch_params):
# CREATE A LIST CONTAINING THE LAYERS TO EXTRACT FROM THE BACKBONE AND ADD THE FINAL LAYER
self._layer_idx_to_extract = [idx for sub_l in arch_params.skip_connections_dict.values() for idx in sub_l]
self._layer_idx_to_extract.append(len(self._modules_list) - 1)
[docs] def forward(self, x):
""":return A list, the length of self._modules_list containing the output of the layer if specified in
self._layers_to_extract and None otherwise"""
extracted_intermediate_layers = []
for layer_idx, layer_module in enumerate(self._modules_list):
# PREDICT THE NEXT LAYER'S OUTPUT
x = layer_module(x)
# IF INDICATED APPEND THE OUTPUT TO extracted_intermediate_layers O.W. APPEND None
extracted_intermediate_layers.append(x) if layer_idx in self._layer_idx_to_extract \
else extracted_intermediate_layers.append(None)
return extracted_intermediate_layers
[docs]class YoLoV5DarknetBackbone(AbstractYoLoV5Backbone, CSPDarknet53):
"""Implements the CSP_Darknet53 module and inherit the forward pass to extract layers indicated in arch_params"""
def __init__(self, arch_params):
arch_params.backbone_mode = True
CSPDarknet53.__init__(self, arch_params)
AbstractYoLoV5Backbone.__init__(self, arch_params)
[docs] def forward(self, x):
return AbstractYoLoV5Backbone.forward(self, x)
[docs]class YoLoV5Head(nn.Module):
def __init__(self, arch_params):
super().__init__()
# PARSE arch_params
num_classes = arch_params.num_classes
depth_mult_factor = arch_params.depth_mult_factor
width_mult_factor = arch_params.width_mult_factor
anchors = arch_params.anchors
self._skip_connections_dict = arch_params.skip_connections_dict
# FLATTEN THE SOURCE LIST INTO A LIST OF INDICES
self._layer_idx_to_extract = [idx for sub_l in self._skip_connections_dict.values() for idx in sub_l]
# GET THREE CONNECTING POINTS CHANNEL INPUT SIZE
connector = arch_params.connection_layers_input_channel_size
width_mult = lambda channels: width_multiplier(channels, arch_params.width_mult_factor)
# THE MODULES LIST IS APPROACHABLE FROM "OUTSIDE THE CLASS - SO WE CAN CHANGE IT'S STRUCTURE"
self._modules_list = nn.ModuleList()
self._modules_list.append(Conv(width_mult(connector[0]), width_mult(512), 1, 1)) # 10
self._modules_list.append(nn.Upsample(None, 2, 'nearest')) # 11
self._modules_list.append(Concat(1)) # 12
self._modules_list.append(BottleneckCSP(connector[1], 512, 3, False, width_mult_factor=width_mult_factor,
depth_mult_factor=depth_mult_factor)) # 13
self._modules_list.append(Conv(width_mult(512), width_mult(256), 1)) # 14
self._modules_list.append(nn.Upsample(None, 2, 'nearest')) # 15
self._modules_list.append(Concat(1)) # 16
self._modules_list.append(BottleneckCSP(connector[2], 256, 3, False, width_mult_factor=width_mult_factor,
depth_mult_factor=depth_mult_factor)) # 17
self._modules_list.append(Conv(width_mult(256), width_mult(256), 3, 2)) # 18
self._modules_list.append(Concat(1)) # 19
self._modules_list.append(BottleneckCSP(512, 512, 3, False, width_mult_factor=width_mult_factor,
depth_mult_factor=depth_mult_factor)) # 20
self._modules_list.append(Conv(width_mult(512), width_mult(512), 3, 2)) # 21
self._modules_list.append(Concat(1)) # 22
self._modules_list.append(BottleneckCSP(1024, 1024, 3, False, width_mult_factor=width_mult_factor,
depth_mult_factor=depth_mult_factor)) # 23
self._modules_list.append(Detect(num_classes, anchors, channels=[256, 512, 1024],
width_mult_factor=width_mult_factor)) # 24
[docs] def forward(self, intermediate_output):
"""
:param intermediate_output: A list of the intermediate prediction of layers specified in the
self._inter_layer_idx_to_extract from the Backbone
"""
# COUNT THE NUMBER OF LAYERS IN THE BACKBONE TO CONTINUE THE COUNTER
num_layers_in_backbone = len(intermediate_output)
# INPUT TO HEAD IS THE LAST ELEMENT OF THE BACKBONE'S OUTPUT
out = intermediate_output[-1]
# RUN OVER THE MODULE LIST WITHOUT THE FINAL LAYER & START COUNTER FROM THE END OF THE BACKBONE
for layer_idx, layer_module in enumerate(self._modules_list[:-1], start=num_layers_in_backbone):
# IF THE LAYER APPEARS IN THE KEYS IT INSERT THE PRECIOUS OUTPUT AND THE INDICATED SKIP CONNECTIONS
out = layer_module([out, intermediate_output[self._skip_connections_dict[layer_idx][0]]]) \
if layer_idx in self._skip_connections_dict.keys() else layer_module(out)
# IF INDICATED APPEND THE OUTPUT TO inter_layer_idx_to_extract O.W. APPEND None
intermediate_output.append(out) if layer_idx in self._layer_idx_to_extract \
else intermediate_output.append(None)
# INSERT THE REMAINING LAYERS INTO THE Detect LAYER
last_idx = len(self._modules_list) + num_layers_in_backbone - 1
return self._modules_list[-1]([intermediate_output[self._skip_connections_dict[last_idx][0]],
intermediate_output[self._skip_connections_dict[last_idx][1]],
out])
[docs]class YoLoV5Base(SgModule):
def __init__(self, backbone: Type[nn.Module], arch_params: HpmStruct, initialize_module: bool = True):
super().__init__()
# DEFAULT PARAMETERS TO BE OVERWRITTEN BY DUPLICATES THAT APPEAR IN arch_params
self.arch_params = HpmStruct(**DEFAULT_YOLOV5_ARCH_PARAMS)
self.arch_params.override(**arch_params.to_dict())
self.num_classes = self.arch_params.num_classes
# THE MODEL'S MODULES
self._backbone = backbone(arch_params=self.arch_params)
self._nms = nn.Identity()
# A FLAG TO DEFINE augment_forward IN INFERENCE
self.augmented_inference = False
# RUN SPECIFIC INITIALIZATION OF YOLO-V5
if initialize_module:
self._head = YoLoV5Head(self.arch_params)
self._initialize_module()
[docs] def forward(self, x):
return self._augment_forward(x) if self.augmented_inference else self._forward_once(x)
def _forward_once(self, x):
out = self._backbone(x)
out = self._head(out)
# THIS HAS NO EFFECT IF add_nms() WAS NOT DONE
out = self._nms(out)
return out
def _augment_forward(self, x):
"""Multi-scale forward pass"""
img_size = x.shape[-2:] # height, width
s = [1, 0.83, 0.67] # scales
f = [None, 3, None] # flips (2-ud, 3-lr)
y = [] # outputs
for si, fi in zip(s, f):
xi = scale_img(x.flip(fi) if fi else x, si)
yi = self._forward_once(xi)[0] # forward
yi[..., :4] /= si # de-scale
if fi == 2:
yi[..., 1] = img_size[0] - yi[..., 1] # de-flip ud
elif fi == 3:
yi[..., 0] = img_size[1] - yi[..., 0] # de-flip lr
y.append(yi)
return torch.cat(y, 1), None # augmented inference, train
[docs] def load_state_dict(self, state_dict, strict=True):
try:
super().load_state_dict(state_dict, strict)
except RuntimeError as e:
raise RuntimeError(f"Got exception {e}, if a mismatch between expected and given state_dict keys exist, "
f"checkpoint may have been saved after fusing conv and bn. use fuse_conv_bn before loading.")
def _initialize_module(self):
self._check_strides_and_anchors()
self._initialize_biases()
self._initialize_weights()
if self.arch_params.add_nms:
nms_conf = self.arch_params.nms_conf
nms_iou = self.arch_params.nms_iou
self._nms = YoloV5PostPredictionCallback(nms_conf, nms_iou)
[docs] def update_param_groups(self, param_groups: list, lr: float, epoch: int, iter: int,
training_params: HpmStruct, total_batch: int) -> list:
lr_warmup_epochs = get_param(training_params, 'lr_warmup_epochs', 0)
if epoch < lr_warmup_epochs and iter is not None:
# OVERRIDE THE lr FROM SgModel WITH initial_lr, SINCE SgModel MANIPULATE THE ORIGINAL VALUE
print_once('Using Yolo v5 warm-up lr (overriding ModelBase lr function)')
lr = training_params.initial_lr
momentum = get_param(training_params.optimizer_params, 'momentum')
warmup_momentum = get_param(training_params, 'warmup_momentum', momentum)
warmup_bias_lr = get_param(training_params, 'warmup_bias_lr', lr)
nw = lr_warmup_epochs * total_batch
ni = epoch * total_batch + iter
xi = [0, nw] # x interp
for x in param_groups:
# BIAS LR FALLS FROM 0.1 TO LR0, ALL OTHER LRS RISE FROM 0.0 TO LR0
x['lr'] = np.interp(ni, xi, [warmup_bias_lr if x['name'] == 'bias' else 0.0, lr])
if 'momentum' in x:
x['momentum'] = np.interp(ni, xi, [warmup_momentum, momentum])
return param_groups
else:
return super().update_param_groups(param_groups, lr, epoch, iter, training_params, total_batch)
def _check_strides_and_anchors(self):
m = self._head._modules_list[-1] # Detect()
# Do inference in train mode on a dummy image to get output stride of each head output layer
s = 128 # twice the minimum acceptable image size
dummy_input = torch.zeros(1, self.arch_params.channels_in, s, s)
stride = torch.tensor([s / x.shape[-2] for x in self._forward_once(dummy_input)])
if not torch.equal(m.stride, stride):
raise RuntimeError('Provided anchor strides do not match the model strides')
check_anchor_order(m)
self.register_buffer('stride', m.stride) # USED ONLY FOR CONVERSION
def _initialize_biases(self, cf=None):
"""initialize biases into Detect(), cf is class frequency"""
# TODO: UNDERSTAND WHAT IS THIS cf AND IF WE NEED IT
# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1.
m = self._head._modules_list[-1] # Detect() module
for mi, s in zip(m.m, m.stride): # from
b = mi.bias.view(m.num_anchors, -1) # conv.bias(255) to (3,85)
with torch.no_grad():
b[:, 4] += math.log(8 / (640 / s) ** 2) # obj (8 objects per 640 image)
b[:, 5:] += math.log(0.6 / (m.num_classes - 0.99)) if cf is None else torch.log(cf / cf.sum()) # cls
mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True)
def _initialize_weights(self):
for m in self.modules():
t = type(m)
if t is nn.Conv2d:
pass # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif t is nn.BatchNorm2d:
m.eps = 1e-3
m.momentum = 0.03
elif t in [nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.Hardswish]:
m.inplace = True
[docs] def initialize_param_groups(self, lr: float, training_params: HpmStruct) -> list:
"""
initialize_optimizer_for_model_param_groups - Initializes the weights of the optimizer
adds weight decay *Only* to the Conv2D layers
:param optimizer_cls: The nn.optim (optimizer class) to initialize
:param lr: lr to set for the optimizer
:param training_params:
:return: The optimizer, initialized with the relevant param groups
"""
optimizer_params = get_param(training_params, 'optimizer_params')
# OPTIMIZER PARAMETER GROUPS
default_param_group, weight_decay_param_group, biases_param_group = [], [], []
for name, m in self.named_modules():
if hasattr(m, 'bias') and isinstance(m.bias, nn.Parameter): # bias
biases_param_group.append((name, m.bias))
if isinstance(m, nn.BatchNorm2d): # weight (no decay)
default_param_group.append((name, m.weight))
elif hasattr(m, 'weight') and isinstance(m.weight, nn.Parameter): # weight (with decay)
weight_decay_param_group.append((name, m.weight))
# EXTRACT weight_decay FROM THE optimizer_params IN ORDER TO ASSIGN THEM MANUALLY
weight_decay = optimizer_params.pop('weight_decay') if 'weight_decay' in optimizer_params.keys() else 0
param_groups = [{'named_params': default_param_group, 'lr': lr, **optimizer_params, 'name': 'default'},
{'named_params': weight_decay_param_group, 'weight_decay': weight_decay, 'name': 'wd'},
{'named_params': biases_param_group, 'name': 'bias'}]
# Assert that all parameters were added to optimizer param groups
params_total = sum(p.numel() for p in self.parameters())
optimizer_params_total = sum(p.numel() for g in param_groups for _, p in g['named_params'])
assert params_total == optimizer_params_total, \
f"Parameters {[n for n, _ in self.named_parameters() if 'weight' not in n and 'bias' not in n]} " \
f"weren't added to optimizer param groups"
return param_groups
[docs] def prep_model_for_conversion(self, input_size: Union[tuple, list] = None, **kwargs):
"""
A method for preparing the YoloV5 model for conversion to other frameworks (ONNX, CoreML etc)
:param input_size: expected input size
:return:
"""
assert not self.training, 'model has to be in eval mode to be converted'
# Verify dummy_input from converter is of multiple of the grid size
max_stride = int(max(self.stride))
# Validate the image size
image_dims = input_size[-2:] # assume torch uses channels first layout
for dim in image_dims:
res_flag, suggestion = check_img_size_divisibilty(dim, max_stride)
if not res_flag:
raise ValueError(f'Invalid input size: {input_size}. The input size must be multiple of max stride: '
f'{max_stride}. The closest suggestions are: {suggestion[0]}x{suggestion[0]} or '
f'{suggestion[1]}x{suggestion[1]}')
# Update the model with exportable operators
for k, m in self.named_modules():
if isinstance(m, Conv) and isinstance(m.act, nn.Hardswish):
m._non_persistent_buffers_set = set() # pytorch 1.6.0 compatibility
m.act = ExportableHardswish() # assign activation
[docs] def get_include_attributes(self) -> list:
return ["grid", "anchors", "anchors_grid"]
[docs]class Custom_YoLoV5(YoLoV5Base):
def __init__(self, arch_params: HpmStruct):
backbone = get_param(arch_params, 'backbone', YoLoV5DarknetBackbone)
super().__init__(backbone=backbone, arch_params=arch_params)
[docs]class YoLoV5S(YoLoV5Base):
def __init__(self, arch_params: HpmStruct):
arch_params.depth_mult_factor = 0.33
arch_params.width_mult_factor = 0.50
super().__init__(backbone=YoLoV5DarknetBackbone, arch_params=arch_params)
[docs]class YoLoV5M(YoLoV5Base):
def __init__(self, arch_params: HpmStruct):
arch_params.depth_mult_factor = 0.67
arch_params.width_mult_factor = 0.75
super().__init__(backbone=YoLoV5DarknetBackbone, arch_params=arch_params)
[docs]class YoLoV5L(YoLoV5Base):
def __init__(self, arch_params: HpmStruct):
arch_params.depth_mult_factor = 1.0
arch_params.width_mult_factor = 1.0
super().__init__(backbone=YoLoV5DarknetBackbone, arch_params=arch_params)
[docs]class YoLoV5X(YoLoV5Base):
def __init__(self, arch_params: HpmStruct):
arch_params.depth_mult_factor = 1.33
arch_params.width_mult_factor = 1.25
super().__init__(backbone=YoLoV5DarknetBackbone, arch_params=arch_params)