圖1: Transformer總體結構圖
而在Transformer內部,其整體結構還是一個Encoder-Decoder,我們可以將圖1中的Transformer部分放大如下圖2:圖2: Transformer內部結構圖
圖3: Encoder及Decoder部分拆分
圖4: Encoder Block內部結構
圖5: block2block
圖6: Self-Attention的可視化表示
圖9: Self-Attention矩陣計算
def scaled_dot_product_attention(q, k, v, mask): matmul_qk = tf.matmul(q, k, transpose_b=True) dk = tf.cast(tf.shape(k)[-1], tf.float32) scaled_attention_logits = matmul_qk/tf.math.sqrt(dk) if mask is not None: scaled_attention_logits += (mask*-1e9) attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) output = tf.matmul(attention_weights, v) return output, attention_weights這其中用到了一個mask,這是因為由於輸入的語料長度不一致,對於較長的語料,會進行截斷,而對於較短的語料,則會用"Padding"字符進行填充,但是在計算Self-Attention的時候,這些填充字符沒有實際意義,所以需要掩蓋掉。這裡用到的方法,是構造一個mask矩陣,對於填充部分,用-1e9表示(代表無窮小),這樣再計算Softmax的時候,這些無窮小的地方就會變為0,所以成功掩蓋掉填充欄位。再來看一下Embedding部分,這裡用Token Embedding和Position Embedding組合成Embedding部分,class Embedding(tf.keras.layers.Layer): def __init__(self, vocab_size, d_model): super(Embedding, self).__init__() self.vocab_size = vocab_size self.d_model = d_model # token embedding self.embedding = tf.keras.layers.Embedding(vocab_size, d_model) self.pos_encoding = positional_encoding(vocab_size, d_model)
def call(self, x): embed_x = self.embedding(x) embed_x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) # 根據語料長度截取 embed_x += self.pos_encoding[:, :tf.shape(x)[1], :] return embed_xdef point_wise_feed_forward_network(d_model, dff): return tf.keras.Sequential([ tf.keras.layers.Dense(dff, activation='relu'), tf.keras.layers.Dense(d_model) ])接下來看一下Encoder Block部分,基本是按照上面的結構進行組裝,不過這裡多了一點就是在Self-Attention層和Feed Forward層後面用到了dropout層去提高模型的泛化能力。class EncoderLayer(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, dff, rate=0.1): super(EncoderLayer, self).__init__() self.mha = MultiHeadAttention(d_model, num_heads) self.ffn = point_wise_feed_forward_network(d_model, dff) self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.dropout1 = tf.keras.layers.Dropout(rate) self.dropout2 = tf.keras.layers.Dropout(rate) def call(self, x, training, mask): attn_output, _ = self.mha(x, x, x, mask) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(x+attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) out2 = self.layernorm2(out1+ffn_output) return out2Decoder Block部分與Encoder Block部分相似,也是按照上面講的結構,對各個模塊進行組裝。可以看到這個地方用到兩個Attention,第一個是Self-Attention,第二個是EncoderDecoder-Attention。此外,還有一點,就是在Decoder Block部分中的Self-Attention中用到的mask不是Encoder Block部分中的Padding Mask,而是Look Ahead Mask,它對應的是一個下三角矩陣,依次將Decoder部分語料中的每個詞逐個顯示。這樣做的目的是為了讓每個詞生成的時候,只會注意到前面的詞,而不會注意到後面的詞。class DecoderLayer(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, dff, rate=0.1): super().__init__() self.mha1 = MultiHeadAttention(d_model, num_heads) self.mha2 = MultiHeadAttention(d_model, num_heads) self.ffn = point_wise_feed_forward_network(d_model, dff) self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.dropout1 = tf.keras.layers.Dropout(rate) self.dropout2 = tf.keras.layers.Dropout(rate) self.dropout3 = tf.keras.layers.Dropout(rate) def call(self, x, enc_output, training, look_ahead_mask, padding_mask): attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) attn1 = self.dropout1(attn1, training=training) out1 = self.layernorm1(attn1 + x) attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask) attn2 = self.dropout2(attn2, training=training) out2 = self.layernorm2(out1 + attn2) ffn_output = self.ffn(out2) ffn_output = self.dropout3(ffn_output, training=training) out3 = self.layernorm3(ffn_output+out2)最後看一下整個模型部分,首先是組織num_layers個Encoder Block,同樣加入Dropout層提高泛化能力。class Encoder(tf.keras.layers.Layer): def __init__(self, num_layers, d_model, num_heads, dff, rate=0.1): super().__init__() self.d_model = d_model self.num_layers = num_layers self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)] self.dropout = tf.keras.layers.Dropout(rate) def call(self, x, training, mask): x = self.dropout(x, training=training) for i in range(self.num_layers): x = self.enc_layers[i](x, training, mask) return x其次是相對比較複雜的Decoder部分,這裡使用Encoder-Decoder-Attention與Encoder部分的output相乘得到context vector,然後再和decoder input和decoder state一起計算出p_gen係數。def call(self, x, enc_output, training, look_ahead_mask, padding_mask): attention_weights = {} out = self.dropout(x, training=training) for i in range(self.num_layers): out, block1, block2 = self.dec_layers[i](out, enc_output, training, look_ahead_mask, padding_mask) attention_weights['decoder_layer{}_block1'.format(i+1)] = block1 attention_weights['decoder_layer{}_block2'.format(i+1)] = block2 enc_out_shape = tf.shape(enc_output) context = tf.reshape(enc_output, (enc_out_shape[0], enc_out_shape[1], self.num_heads, self.depth)) context = tf.transpose(context, [0, 2, 1, 3]) context = tf.expand_dims(context, axis=2) attn = tf.expand_dims(block2, axis=-1) context=context*attn context = tf.reduce_sum(context, axis=3) context = tf.transpose(context, [0, 2, 1, 3]) context = tf.reshape(context, (tf.shape(context)[0], tf.shape(context)[1], self.d_model)) a = self.Wx(x) b = self.Ws(out) c = self.Wh(context) p_gens = tf.sigmoid(self.V(a + b + c)) return out, attention_weights, p_gens最後是PGN_TRANSFORMER模型部分,這裡最終生成的distribution的計算方法calc_final_dist與之前PGN模型的計算方法沒有區別。def call(self, inp, extended_inp, max_oov_len, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask): embed_x = self.embedding(inp) embed_dec = self.embedding(tar) enc_output = self.encoder(embed_x, training, enc_padding_mask) dec_output, attention_weights, p_gens = self.decoder(embed_dec, enc_output, training, look_ahead_mask, dec_padding_mask) final_output = self.final_layer(dec_output) final_output = tf.nn.softmax(final_output) attn_dists = attention_weights['decoder_layer{}_block2'.format(self.params["num_blocks"])] attn_dists = tf.reduce_sum(attn_dists, axis=1)/self.params["num_heads"] final_dists = calc_final_dist(extended_inp, tf.unstack(final_output, axis = 1), tf.unstack(attn_dists, axis=1), tf.unstack(p_gens, axis=1), max_oov_len, self.params["vocab_size"], self.params["batch_size"]) outputs = dict(logits=tf.stack(final_dists, 1), attentions=attn_dists) return outputs