ResNet152Layers
from __future__ import print_function
from collections import OrderedDict
import os
import numpy
try:
from PIL import Image
available = True
except ImportError as e:
available = False
_import_error = e
from chainer.dataset.convert import concat_examples
from chainer.dataset import download
from chainer import flag
from chainer.functions.activation.relu import relu
from chainer.functions.activation.softmax import softmax
from chainer.functions.array.reshape import reshape
from chainer.functions.math.sum import sum
from chainer.functions.pooling.average_pooling_2d import average_pooling_2d
from chainer.functions.pooling.max_pooling_2d import max_pooling_2d
from chainer.initializers import constant
from chainer.initializers import normal
from chainer import link
from chainer.links.connection.convolution_2d import Convolution2D
from chainer.links.connection.linear import Linear
from chainer.links.normalization.batch_normalization import BatchNormalization
from chainer.serializers import npz
from chainer.utils import imgproc
from chainer.variable import Variable
class ResNet152Layers(link.Chain):
def __init__(self, pretrained_model='auto'):
if pretrained_model:
# As a sampling process is time-consuming,
# we employ a zero initializer for faster computation.
kwargs = {'initialW': constant.Zero()}
else:
# employ default initializers used in the original paper
kwargs = {'initialW': normal.HeNormal(scale=1.0)}
super(ResNet152Layers, self).__init__(
conv1=Convolution2D(3, 64, 7, 2, 3, nobias=True, **kwargs),
bn1=BatchNormalization(64),
res2=BuildingBlock(3, 64, 64, 256, 1, **kwargs),
res3=BuildingBlock(8, 256, 128, 512, 2, **kwargs),
res4=BuildingBlock(36, 512, 256, 1024, 2, **kwargs),
res5=BuildingBlock(3, 1024, 512, 2048, 2, **kwargs),
fc6=Linear(2048, 1000),
)
if pretrained_model == 'auto':
_retrieve(
'ResNet-152-model.npz', 'ResNet-152-model.caffemodel', self)
elif pretrained_model:
npz.load_npz(pretrained_model, self)
self.functions = OrderedDict([
('conv1', [self.conv1, self.bn1, relu]),
('pool1', [lambda x: max_pooling_2d(x, ksize=3, stride=2)]),
('res2', [self.res2]),
('res3', [self.res3]),
('res4', [self.res4]),
('res5', [self.res5]),
('pool5', [_global_average_pooling_2d]),
('fc6', [self.fc6]),
('prob', [softmax]),
])
@property
def available_layers(self):
return list(self.functions.keys())
@classmethod
def convert_caffemodel_to_npz(cls, path_caffemodel, path_npz):
"""Converts a pre-trained caffemodel to a chainer model.
Args:
path_caffemodel (str): Path of the pre-trained caffemodel.
path_npz (str): Path of the converted chainer model.
"""
# As CaffeFunction uses shortcut symbols,
# we import CaffeFunction here.
from chainer.links.caffe.caffe_function import CaffeFunction
caffemodel = CaffeFunction(path_caffemodel)
chainermodel = cls(pretrained_model=None)
_transfer_resnet152(caffemodel, chainermodel)
npz.save_npz(path_npz, chainermodel, compression=False)
def __call__(self, x, layers=['prob'], test=True):
"""Computes all the feature maps specified by ``layers``.
Args:
x (~chainer.Variable): Input variable.
layers (list of str): The list of layer names you want to extract.
test (bool): If ``True``, BarchNormalization runs in test mode.
Returns:
Dictionary of ~chainer.Variable: A directory in which
the key contains the layer name and the value contains
the corresponding feature map variable.
"""
h = x
activations = {}
target_layers = set(layers)
for key, funcs in self.functions.items():
if len(target_layers) == 0:
break
for func in funcs:
if isinstance(func, BatchNormalization) or \
isinstance(func, BuildingBlock):
h = func(h, test=test)
else:
h = func(h)
if key in target_layers:
activations[key] = h
target_layers.remove(key)
return activations
def extract(self, images, layers=['pool5'], size=(224, 224),
test=True, volatile=flag.OFF):
"""Extracts all the feature maps of given images.
The difference of directly executing ``__call__`` is that
it directly accepts images as an input and automatically
transforms them to a proper variable. That is,
it is also interpreted as a shortcut method that implicitly calls
``prepare`` and ``__call__`` functions.
Args:
images (iterable of PIL.Image or numpy.ndarray): Input images.
layers (list of str): The list of layer names you want to extract.
size (pair of ints): The resolution of resized images used as
an input of CNN. All the given images are not resized
if this argument is ``None``, but the resolutions of
all the images should be the same.
test (bool): If ``True``, BatchNormalization runs in test mode.
volatile (~chainer.Flag): Volatility flag used for input variables.
Returns:
Dictionary of ~chainer.Variable: A directory in which
the key contains the layer name and the value contains
the corresponding feature map variable.
"""
x = concat_examples([prepare(img, size=size) for img in images])
x = Variable(self.xp.asarray(x), volatile=volatile)
return self(x, layers=layers, test=test)
def predict(self, images, oversample=True):
"""Computes all the probabilities of given images.
Args:
images (iterable of PIL.Image or numpy.ndarray): Input images.
oversample (bool): If ``True``, it averages results across
center, corners, and mirrors. Otherwise, it uses only the
center.
Returns:
~chainer.Variable: Output that contains the class probabilities
of given images.
"""
x = concat_examples([prepare(img, size=(256, 256)) for img in images])
if oversample:
x = imgproc.oversample(x, crop_dims=(224, 224))
else:
x = x[:, :, 16:240, 16:240]
# Set volatile option to ON to reduce memory consumption
x = Variable(self.xp.asarray(x), volatile=flag.ON)
y = self(x, layers=['prob'])['prob']
if oversample:
n = y.data.shape[0] // 10
y_shape = y.data.shape[1:]
y = reshape(y, (n, 10) + y_shape)
y = sum(y, axis=1) / 10
return y
def prepare(image, size=(224, 224)):
"""Converts the given image to the numpy array for ResNets.
Note that you have to call this method before ``__call__``
because the pre-trained resnet model requires to resize the given
image, covert the RGB to the BGR, subtract the mean,
and permute the dimensions before calling.
Args:
image (PIL.Image or numpy.ndarray): Input image.
If an input is ``numpy.ndarray``, its shape must be
``(height, width)``, ``(height, width, channels)``,
or ``(channels, height, width)``, and
the order of the channels must be RGB.
size (pair of ints): Size of converted images.
If ``None``, the given image is not resized.
Returns:
numpy.ndarray: The converted output array.
"""
if not available:
raise ImportError('PIL cannot be loaded. Install Pillow!\n'
'The actual import error is as follows:\n' +
str(_import_error))
if isinstance(image, numpy.ndarray):
if image.ndim == 3:
if image.shape[0] == 1:
image = image[0, :, :]
elif image.shape[0] == 3:
image = image.transpose((1, 2, 0))
image = Image.fromarray(image.astype(numpy.uint8))
image = image.convert('RGB')
if size:
image = image.resize(size)
image = numpy.asarray(image, dtype=numpy.float32)
image = image[:, :, ::-1]
# NOTE: in the original paper they subtract a fixed mean image,
# however, in order to support arbitrary size we instead use the
# mean pixel (rather than mean image) as with VGG team. The mean
# value used in ResNet is slightly different from that of VGG16.
image -= numpy.array(
[103.063, 115.903, 123.152], dtype=numpy.float32)
image = image.transpose((2, 0, 1))
return image
class BuildingBlock(link.Chain):
"""A building block that consists of several Bottleneck layers.
Args:
n_layer (int): Number of layers used in the building block.
in_channels (int): Number of channels of input arrays.
mid_channels (int): Number of channels of intermediate arrays.
out_channels (int): Number of channels of output arrays.
stride (int or tuple of ints): Stride of filter application.
initialW (4-D array): Initial weight value used in
the convolutional layers.
"""
def __init__(self, n_layer, in_channels, mid_channels,
out_channels, stride, initialW=None):
links = [
('a', BottleneckA(
in_channels, mid_channels, out_channels, stride, initialW))
]
for i in range(n_layer - 1):
name = 'b{}'.format(i + 1)
bottleneck = BottleneckB(out_channels, mid_channels, initialW)
links.append((name, bottleneck))
super(BuildingBlock, self).__init__(**dict(links))
self.forward = links
def __call__(self, x, test=True):
for name, func in self.forward:
x = func(x, test=test)
return x
class BottleneckA(link.Chain):
"""A bottleneck layer that reduces the resolution of the feature map.
Args:
in_channels (int): Number of channels of input arrays.
mid_channels (int): Number of channels of intermediate arrays.
out_channels (int): Number of channels of output arrays.
stride (int or tuple of ints): Stride of filter application.
initialW (4-D array): Initial weight value used in
the convolutional layers.
"""
def __init__(self, in_channels, mid_channels, out_channels,
stride=2, initialW=None):
super(BottleneckA, self).__init__(
conv1=Convolution2D(
in_channels, mid_channels, 1, stride, 0,
initialW=initialW, nobias=True),
bn1=BatchNormalization(mid_channels),
conv2=Convolution2D(
mid_channels, mid_channels, 3, 1, 1,
initialW=initialW, nobias=True),
bn2=BatchNormalization(mid_channels),
conv3=Convolution2D(
mid_channels, out_channels, 1, 1, 0,
initialW=initialW, nobias=True),
bn3=BatchNormalization(out_channels),
conv4=Convolution2D(
in_channels, out_channels, 1, stride, 0,
initialW=initialW, nobias=True),
bn4=BatchNormalization(out_channels),
)
def __call__(self, x, test=True):
h1 = relu(self.bn1(self.conv1(x), test=test))
h1 = relu(self.bn2(self.conv2(h1), test=test))
h1 = self.bn3(self.conv3(h1), test=test)
h2 = self.bn4(self.conv4(x), test=test)
return relu(h1 + h2)
class BottleneckB(link.Chain):
"""A bottleneck layer that maintains the resolution of the feature map.
Args:
in_channels (int): Number of channels of input and output arrays.
mid_channels (int): Number of channels of intermediate arrays.
initialW (4-D array): Initial weight value used in
the convolutional layers.
"""
def __init__(self, in_channels, mid_channels, initialW=None):
super(BottleneckB, self).__init__(
conv1=Convolution2D(
in_channels, mid_channels, 1, 1, 0,
initialW=initialW, nobias=True),
bn1=BatchNormalization(mid_channels),
conv2=Convolution2D(
mid_channels, mid_channels, 3, 1, 1,
initialW=initialW, nobias=True),
bn2=BatchNormalization(mid_channels),
conv3=Convolution2D(
mid_channels, in_channels, 1, 1, 0,
initialW=initialW, nobias=True),
bn3=BatchNormalization(in_channels),
)
def __call__(self, x, test=True):
h = relu(self.bn1(self.conv1(x), test=test))
h = relu(self.bn2(self.conv2(h), test=test))
h = self.bn3(self.conv3(h), test=test)
return relu(h + x)
def _global_average_pooling_2d(x):
n, channel, rows, cols = x.data.shape
h = average_pooling_2d(x, (rows, cols), stride=1)
h = reshape(h, (n, channel))
return h
def _transfer_components(src, dst_conv, dst_bn, bname, cname):
src_conv = getattr(src, 'res{}_branch{}'.format(bname, cname))
src_bn = getattr(src, 'bn{}_branch{}'.format(bname, cname))
src_scale = getattr(src, 'scale{}_branch{}'.format(bname, cname))
dst_conv.W.data[:] = src_conv.W.data
dst_bn.avg_mean[:] = src_bn.avg_mean
dst_bn.avg_var[:] = src_bn.avg_var
dst_bn.gamma.data[:] = src_scale.W.data
dst_bn.beta.data[:] = src_scale.bias.b.data
def _transfer_bottleneckA(src, dst, name):
_transfer_components(src, dst.conv1, dst.bn1, name, '2a')
_transfer_components(src, dst.conv2, dst.bn2, name, '2b')
_transfer_components(src, dst.conv3, dst.bn3, name, '2c')
_transfer_components(src, dst.conv4, dst.bn4, name, '1')
def _transfer_bottleneckB(src, dst, name):
_transfer_components(src, dst.conv1, dst.bn1, name, '2a')
_transfer_components(src, dst.conv2, dst.bn2, name, '2b')
_transfer_components(src, dst.conv3, dst.bn3, name, '2c')
def _transfer_block(src, dst, names):
_transfer_bottleneckA(src, dst.a, names[0])
for i, name in enumerate(names[1:]):
dst_bottleneckB = getattr(dst, 'b{}'.format(i + 1))
_transfer_bottleneckB(src, dst_bottleneckB, name)
def _transfer_resnet152(src, dst):
dst.conv1.W.data[:] = src.conv1.W.data
# dst.conv1.b.data[:] = src.conv1.b.data
dst.bn1.avg_mean[:] = src.bn_conv1.avg_mean
dst.bn1.avg_var[:] = src.bn_conv1.avg_var
dst.bn1.gamma.data[:] = src.scale_conv1.W.data
dst.bn1.beta.data[:] = src.scale_conv1.bias.b.data
_transfer_block(src, dst.res2, ['2a', '2b', '2c'])
_transfer_block(src, dst.res3, ['3a', '3b1', '3b2', '3b3', '3b4', '3b5', '3b6', '3b7'])
_transfer_block(src, dst.res4, ['4a'] + ['4b{}'.format(x) for x in range(1, 36)])
_transfer_block(src, dst.res5, ['5a', '5b', '5c'])
dst.fc6.W.data[:] = src.fc1000.W.data
dst.fc6.b.data[:] = src.fc1000.b.data
def _make_npz(path_npz, path_caffemodel, model):
print('Now loading caffemodel (usually it may take few minutes)')
if not os.path.exists(path_caffemodel):
raise IOError(
'The pre-trained caffemodel does not exist. Please download it '
'from \'https://github.com/KaimingHe/deep-residual-networks\', '
'and place it on {}'.format(path_caffemodel))
ResNet152Layers.convert_caffemodel_to_npz(path_caffemodel, path_npz)
npz.load_npz(path_npz, model)
return model
def _retrieve(name_npz, name_caffemodel, model):
root = download.get_dataset_directory('pfnet/chainer/models/')
path = os.path.join(root, name_npz)
path_caffemodel = os.path.join(root, name_caffemodel)
return download.cache_or_load_file(
path, lambda path: _make_npz(path, path_caffemodel, model),
lambda path: npz.load_npz(path, model))