thinkwee
2/27/2018 - 6:57 AM

disan.py

``````"""
@ author: xx
@ Email: xx@xxx
@ Date: August 26, 2017

Directional Self-Attention Network
Requirements: Python 3.5.2, Tensorflow 1.2
Usage: from xx.disan import disan
"""

import tensorflow as tf
from functools import reduce
from operator import mul

VERY_BIG_NUMBER = 1e30
VERY_SMALL_NUMBER = 1e-30
VERY_POSITIVE_NUMBER = VERY_BIG_NUMBER
VERY_NEGATIVE_NUMBER = -VERY_BIG_NUMBER

# ---------------   DiSAN Interface  ----------------
keep_prob=1., is_train=None, wd=0., activation='elu',
tensor_dict=None, name=''):
with tf.variable_scope(scope or 'DiSAN'):
with tf.variable_scope('ct_attn'):
fw_res = directional_attention_with_dense(
keep_prob, is_train, wd, activation,
tensor_dict=tensor_dict, name=name + '_fw_attn')
bw_res = directional_attention_with_dense(
keep_prob, is_train, wd, activation,
tensor_dict=tensor_dict, name=name + '_bw_attn')

seq_rep = tf.concat([fw_res, bw_res], -1)

with tf.variable_scope('sent_enc_attn'):
sent_rep = multi_dimensional_attention(
keep_prob, is_train, wd, activation,
tensor_dict=tensor_dict, name=name + '_attn')
return sent_rep

# --------------- supporting networks ----------------
keep_prob=1., is_train=None, wd=0., activation='elu',
tensor_dict=None, name=None):
def scaled_tanh(x, scale=5.):
return scale * tf.nn.tanh(1. / scale * x)

bs, sl, vec = tf.shape(rep_tensor)[0], tf.shape(rep_tensor)[1], tf.shape(rep_tensor)[2]
ivec = rep_tensor.get_shape()[2]
with tf.variable_scope(scope or 'directional_attention_%s' % direction or 'diag'):
sl_indices = tf.range(sl, dtype=tf.int32)
sl_col, sl_row = tf.meshgrid(sl_indices, sl_indices)
if direction is None:
direct_mask = tf.cast(tf.diag(- tf.ones([sl], tf.int32)) + 1, tf.bool)
else:
if direction == 'forward':
else:

# non-linear
rep_map = bn_dense_layer(rep_tensor, ivec, True, 0., 'bn_dense_map', activation,
False, wd, keep_prob, is_train)
rep_map_tile = tf.tile(tf.expand_dims(rep_map, 1), [1, sl, 1, 1])  # bs,sl,sl,vec
rep_map_dp = dropout(rep_map, keep_prob, is_train)

# attention
with tf.variable_scope('attention'):  # bs,sl,sl,vec
f_bias = tf.get_variable('f_bias', [ivec], tf.float32, tf.constant_initializer(0.))
dependent = linear(rep_map_dp, ivec, False, scope='linear_dependent')  # bs,sl,vec
dependent_etd = tf.expand_dims(dependent, 1)  # bs,1,sl,vec

logits = scaled_tanh(dependent_etd + head_etd + f_bias, 5.0)  # bs,sl,sl,vec

attn_score = tf.nn.softmax(logits_masked, 2)  # bs,sl,sl,vec

attn_result = tf.reduce_sum(attn_score * rep_map_tile, 2)  # bs,sl,vec

with tf.variable_scope('output'):
o_bias = tf.get_variable('o_bias', [ivec], tf.float32, tf.constant_initializer(0.))
# input gate
fusion_gate = tf.nn.sigmoid(
linear(rep_map, ivec, True, 0., 'linear_fusion_i', False, wd, keep_prob, is_train) +
linear(attn_result, ivec, True, 0., 'linear_fusion_a', False, wd, keep_prob, is_train) +
o_bias)
output = fusion_gate * rep_map + (1 - fusion_gate) * attn_result

# save attn
if tensor_dict is not None and name is not None:
tensor_dict[name + '_dependent'] = dependent
tensor_dict[name] = attn_score
tensor_dict[name + '_gate'] = fusion_gate
return output

keep_prob=1., is_train=None, wd=0., activation='elu',
tensor_dict=None, name=None):
bs, sl, vec = tf.shape(rep_tensor)[0], tf.shape(rep_tensor)[1], tf.shape(rep_tensor)[2]
ivec = rep_tensor.get_shape()[2]
with tf.variable_scope(scope or 'multi_dimensional_attention'):
map1 = bn_dense_layer(rep_tensor, ivec, True, 0., 'bn_dense_map1', activation,
False, wd, keep_prob, is_train)
map2 = bn_dense_layer(map1, ivec, True, 0., 'bn_dense_map2', 'linear',
False, wd, keep_prob, is_train)

soft = tf.nn.softmax(map2_masked, 1)  # bs,sl,vec
attn_output = tf.reduce_sum(soft * rep_tensor, 1)  # bs, vec

# save attn
if tensor_dict is not None and name is not None:
tensor_dict[name] = soft

return attn_output

def bn_dense_layer(input_tensor, hn, bias, bias_start=0.0, scope=None,
activation='relu', enable_bn=True,
wd=0., keep_prob=1.0, is_train=None):
if is_train is None:
is_train = False

# activation
if activation == 'linear':
activation_func = tf.identity
elif activation == 'relu':
activation_func = tf.nn.relu
elif activation == 'elu':
activation_func = tf.nn.elu
elif activation == 'selu':
activation_func = selu
else:
raise AttributeError('no activation function named as %s' % activation)

with tf.variable_scope(scope or 'bn_dense_layer'):
linear_map = linear(input_tensor, hn, bias, bias_start, 'linear_map',
False, wd, keep_prob, is_train)
if enable_bn:
linear_map = tf.contrib.layers.batch_norm(
linear_map, center=True, scale=True, is_training=is_train, scope='bn')
return activation_func(linear_map)

def dropout(x, keep_prob, is_train, noise_shape=None, seed=None, name=None):
with tf.name_scope(name or "dropout"):
assert is_train is not None
if keep_prob < 1.0:
d = tf.nn.dropout(x, keep_prob, noise_shape=noise_shape, seed=seed)
out = tf.cond(is_train, lambda: d, lambda: x)
return out
return x

def linear(args, output_size, bias, bias_start=0.0, scope=None, squeeze=False, wd=0.0, input_keep_prob=1.0,
is_train=None):
if args is None or (isinstance(args, (tuple, list)) and not args):
raise ValueError("`args` must be specified")
if not isinstance(args, (tuple, list)):
args = [args]

flat_args = [flatten(arg, 1) for arg in args]  # for dense layer [(-1, d)]
if input_keep_prob < 1.0:
assert is_train is not None
flat_args = [tf.cond(is_train, lambda: tf.nn.dropout(arg, input_keep_prob), lambda: arg)
# for dense layer [(-1, d)]
for arg in flat_args]
flat_out = _linear(flat_args, output_size, bias, bias_start=bias_start, scope=scope)  # dense
out = reconstruct(flat_out, args[0], 1)  # ()
if squeeze:
out = tf.squeeze(out, [len(args[0].get_shape().as_list()) - 1])

if wd:

return out

def _linear(xs, output_size, bias, bias_start=0., scope=None):
with tf.variable_scope(scope or 'linear_layer'):
x = tf.concat(xs, -1)
input_size = x.get_shape()[-1]
W = tf.get_variable('W', shape=[input_size, output_size], dtype=tf.float32,
)
if bias:
bias = tf.get_variable('bias', shape=[output_size], dtype=tf.float32,
initializer=tf.constant_initializer(bias_start))
out = tf.matmul(x, W) + bias
else:
out = tf.matmul(x, W)
return out

def flatten(tensor, keep):
fixed_shape = tensor.get_shape().as_list()
start = len(fixed_shape) - keep
left = reduce(mul, [fixed_shape[i] or tf.shape(tensor)[i] for i in range(start)])
out_shape = [left] + [fixed_shape[i] or tf.shape(tensor)[i] for i in range(start, len(fixed_shape))]
flat = tf.reshape(tensor, out_shape)
return flat

def reconstruct(tensor, ref, keep, dim_reduced_keep=None):
dim_reduced_keep = dim_reduced_keep or keep

ref_shape = ref.get_shape().as_list()  # original shape
tensor_shape = tensor.get_shape().as_list()  # current shape
ref_stop = len(ref_shape) - keep  # flatten dims list
tensor_start = len(tensor_shape) - dim_reduced_keep  # start
pre_shape = [ref_shape[i] or tf.shape(ref)[i] for i in range(ref_stop)]  #
keep_shape = [tensor_shape[i] or tf.shape(tensor)[i] for i in range(tensor_start, len(tensor_shape))]  #
# pre_shape = [tf.shape(ref)[i] for i in range(len(ref.get_shape().as_list()[:-keep]))]
# keep_shape = tensor.get_shape().as_list()[-keep:]
target_shape = pre_shape + keep_shape
out = tf.reshape(tensor, target_shape)
return out

def selu(x):
with tf.name_scope('elu') as scope:
alpha = 1.6732632423543772848170429916717
scale = 1.0507009873554804934193349852946
return scale * tf.where(x >= 0.0, x, alpha * tf.nn.elu(x))