【導讀】使用人工神經網絡來構建聊天機器人越來越流行,然而,教計算機進行對話非常困難,並且通常需要大而複雜的語言模型。通過TensorFlow 2.0,我們可以輕鬆構建複雜的模型。 在這篇文章中,我們將演示如何構建Transformer聊天機器人。 本文聚焦於:使用TensorFlow Dataset並使用tf.data創建輸入管道來使用Cornell Movie-Dialogs Corpus,使用Model子類化實現MultiHeadAttention,使用Functional API實現Transformer。
Transformer 網絡結構
Attention is All You Need 提出的Transformer是一種神經網絡結構。
Transformer模型使用自注意力堆棧而不是RNN或CNN來處理可變大小的輸入。這種通用架構具有許多優點:
它沒有假設數據的時間/空間關係。這是處理一組對象的理想選擇。
可以並行計算層輸出,而不是像RNN那樣的序列處理。
遠距離的元素可以影響彼此的輸出,而不會經過許多重複步驟或卷積層。
它可以學習遠程依賴。
這種架構的缺點:
數據集
我們使用Cornell Movie-Dialogs Corpus作為我們的數據集,其中包含超過10萬對電影角色之間的超過220k個對話。
「+++ $ +++」被用作語料庫數據集中所有文件的欄位分隔符。
movie_conversations.txt具有以下格式:兩個對話者的id,發生此對話的電影的ID以及行ID列表。 角色和電影信息可分別在movie_characters_metadata.txt和movie_titles_metadata.txt中找到。
u0 +++$+++ u2 +++$+++ m0 +++$+++ [『L194』, 『L195』, 『L196』, 『L197』]u0 +++$+++ u2 +++$+++ m0 +++$+++ [『L198』, 『L199』]u0 +++$+++ u2 +++$+++ m0 +++$+++ [『L200』, 『L201』, 『L202』, 『L203』]u0 +++$+++ u2 +++$+++ m0 +++$+++ [『L204』, 『L205』, 『L206』]u0 +++$+++ u2 +++$+++ m0 +++$+++ [『L207』, 『L208』]movie_lines.txt具有以下格式:會話行的ID,該行角色的ID,電影的ID,角色的名稱和行的文本。
L901 +++$+++ u5 +++$+++ m0 +++$+++ KAT +++$+++ He said everyone was doing it. So I did it.L900 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ As in…L899 +++$+++ u5 +++$+++ m0 +++$+++ KAT +++$+++ Now I do. Back then, was a different story.L898 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ But you hate JoeyL897 +++$+++ u5 +++$+++ m0 +++$+++ KAT +++$+++ He was, like, a total babe我們將使用以下步驟構建輸入管道:
從move_conversations.txt和movie_lines.txt中提取對話對的列表
通過刪除每個句子中的特殊字符來預處理每個句子。
使用TensorFlow數據集SubwordTextEncoder構建標記生成器(將文本映射到ID和ID到文本)。
對每個句子進行標記並添加START_TOKEN和END_TOKEN以指示每個句子的開頭和結尾。
過濾掉包含超過MAX_LENGTH 個令牌的句子。
將標記化句子填充到MAX_LENGTH
使用標記化句子構建tf.data.Dataset
請注意,Transformer是一個自回歸模型,它一次預測一個部分,並使用其輸出到目前為止決定下一步做什麼。 在訓練期間,此示例使用teach-Forcing。 無論模型在當前時間步驟預測什麼,teach-forcing都會將真實輸出傳遞到下一個時間步。
完整的預處理代碼可以在文末代碼連結的Prepare Dataset部分找到。
i really , really , really wanna go , but i can t . not unless my sister goes .i m workin on it . but she doesn t seem to be goin for him .
Attention
與許多序列到序列模型一樣,Transformer也包括編碼器和解碼器。 但是,Transformer不使用循環或卷積層,而是使用多頭注意力層,其中包含多個縮放的點積注意力。
縮放點積注意力
縮放的點積注意函數有三個輸入:Q(查詢),K(鍵),V(值)。 用於計算注意力的等式是:
當softmax應用於K時,其值決定了給查詢的重要性。 輸出表示注意力量和值的乘積。 這可以確保我們想要關注的單詞保持原樣,並且不相關的單詞被刷新。
def scaled_dot_product_attention(query, key, value, mask): matmul_qk = tf.matmul(query, key, transpose_b=True)
depth = tf.cast(tf.shape(key)[-1], tf.float32) logits = matmul_qk / tf.math.sqrt(depth)
if mask is not None: logits += (mask * -1e9)
attention_weights = tf.nn.softmax(logits, axis=-1)
return tf.matmul(attention_weights, value)多頭注意層
Sequential模型允許我們通過簡單地將層疊在彼此之上來非常快速地構建模型;但是,對於更複雜和非順序的模型,需要Functional API和Model子類。 tf.keras API允許我們混合和匹配不同的API樣式。我最喜歡的Model子類化功能是調試功能。我可以在call()方法中設置一個斷點,並觀察每個層的輸入和輸出的值,就像一個numpy數組,這使調試變得更加簡單。
在這裡,我們使用Model子類來實現我們的MultiHeadAttention層。
多頭注意力由四部分組成:
線性圖層並分成頭部。
縮放點產品注意力。
頭部的連接。
最後的線性層。
每個多頭注意塊都以字典作為輸入,包括查詢,鍵和值。請注意,當使用帶有Functional API的Model子類時,輸入必須保存為單個參數,因此我們必須將查詢,鍵和值包裝為字典。
然後輸入通過密集層並分成多個頭。上面定義的scaled_dot_product_attention()應用於每個頭。必須在注意步驟中使用適當的面罩。然後將每個頭部的注意力輸出連接起來並穿過最後的緻密層。
查詢,鍵和值不是一個單獨的注意頭,而是分成多個頭,因為它允許模型共同處理來自不同表示空間的不同位置的信息。在拆分之後,每個頭部具有降低的維度,因此總計算成本與具有全維度的單個頭部注意力相同。
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, name="multi_head_attention"): super(MultiHeadAttention, self).__init__(name=name) self.num_heads = num_heads self.d_model = d_model
assert d_model % self.num_heads == 0
self.depth = d_model // self.num_heads
self.query_dense = tf.keras.layers.Dense(units=d_model) self.key_dense = tf.keras.layers.Dense(units=d_model) self.value_dense = tf.keras.layers.Dense(units=d_model)
self.dense = tf.keras.layers.Dense(units=d_model)
def split_heads(self, inputs, batch_size): inputs = tf.reshape( inputs, shape=(batch_size, -1, self.num_heads, self.depth)) return tf.transpose(inputs, perm=[0, 2, 1, 3])
def call(self, inputs):query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask'] batch_size = tf.shape(query)[0]
# linear layersquery = self.query_dense(query)key = self.key_dense(key)value = self.value_dense(value)
# split headsquery = self.split_heads(query, batch_size)key = self.split_heads(key, batch_size)value = self.split_heads(value, batch_size)
scaled_attention = scaled_dot_product_attention(query, key, value, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
outputs = self.dense(concat_attention)
return outputsTransformer
Transformer使用堆疊的多頭注意力和密集層用於編碼器和解碼器。 編碼器將符號表示的輸入序列映射到連續表示序列。 然後,解碼器採用連續表示並一次一個元素地生成符號的輸出序列。
位置編碼
由於Transformer不包含任何重複或卷積,因此添加位置編碼以向模型提供關於句子中單詞的相對位置的一些信息。
將位置編碼矢量添加到嵌入矢量。 嵌入表示在d維空間中的標記,其中具有相似含義的標記將彼此更接近。 但嵌入不會編碼句子中單詞的相對位置。 因此,在添加位置編碼之後,基於在d維空間中它們的含義和它們在句子中的位置的相似性,單詞將彼此更接近。
我們使用Model子類化實現了Positional Encoding,我們將編碼矩陣應用於call()中的輸入。
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model): super(PositionalEncoding, self).__init__() self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, position, i, d_model): angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32)) return position * angles
def positional_encoding(self, position, d_model): angle_rads = self.get_angles( position=tf.range(position, dtype=tf.float32)[:, tf.newaxis], i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :], d_model=d_model) sines = tf.math.sin(angle_rads[:, 0::2]) cosines = tf.math.cos(angle_rads[:, 1::2])
pos_encoding = tf.concat([sines, cosines], axis=-1) pos_encoding = pos_encoding[tf.newaxis, ...] return tf.cast(pos_encoding, tf.float32)
def call(self, inputs): return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]Transformer實現
可以堆疊類似於Sequential模型的層,但沒有它作為順序模型的約束,並且不像模型子類化那樣預先聲明我們需要的所有變量和層。 Functional API的一個優點是它在構建模型時驗證模型,例如檢查每個層的輸入和輸出形狀,並在出現不匹配時引發有意義的錯誤消息。
我們正在使用Functional API實現我們的編碼層,編碼器,解碼層,解碼器和Transformer本身。
嵌入層
每個嵌入層由子層組成:
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"): inputs = tf.keras.Input(shape=(None, d_model), name="inputs") padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
attention = MultiHeadAttention( d_model, num_heads, name="attention")({ 'query': inputs, 'key': inputs, 'value': inputs, 'mask': padding_mask }) attention = tf.keras.layers.Dropout(rate=dropout)(attention) attention = tf.keras.layers.LayerNormalization( epsilon=1e-6)(inputs + attention)
outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention) outputs = tf.keras.layers.Dense(units=d_model)(outputs) outputs = tf.keras.layers.Dropout(rate=dropout)(outputs) outputs = tf.keras.layers.LayerNormalization( epsilon=1e-6)(attention + outputs)
return tf.keras.Model( inputs=[inputs, padding_mask], outputs=outputs, name=name)我們可以使用tf.keras.utils.plot_model()來可視化我們的模型。
編碼器
編碼器包括:
輸入通過嵌入進行,嵌入與位置編碼相加。 該求和的輸出是編碼器層的輸入。 編碼器的輸出是解碼器的輸入。
def encoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name="encoder"): inputs = tf.keras.Input(shape=(None,), name="inputs") padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs) embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32)) embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
for i in range(num_layers): outputs = encoder_layer( units=units, d_model=d_model, num_heads=num_heads, dropout=dropout, name="encoder_layer_{}".format(i), )([outputs, padding_mask])
return tf.keras.Model( inputs=[inputs, padding_mask], outputs=outputs, name=name)
解碼器層
每個解碼器層由子層組成:
當查詢從解碼器的第一個注意塊接收輸出,並且鍵接收編碼器輸出時,注意權重表示基於編碼器輸出給予解碼器輸入的重要性。 換句話說,解碼器通過查看編碼器輸出並自我關注其自己的輸出來預測下一個字。
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"): inputs = tf.keras.Input(shape=(None, d_model), name="inputs") enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs") look_ahead_mask = tf.keras.Input( shape=(1, None, None), name="look_ahead_mask") padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
attention1 = MultiHeadAttention( d_model, num_heads, name="attention_1")(inputs={ 'query': inputs, 'key': inputs, 'value': inputs, 'mask': look_ahead_mask }) attention1 = tf.keras.layers.LayerNormalization( epsilon=1e-6)(attention1 + inputs)
attention2 = MultiHeadAttention( d_model, num_heads, name="attention_2")(inputs={ 'query': attention1, 'key': enc_outputs, 'value': enc_outputs, 'mask': padding_mask }) attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2) attention2 = tf.keras.layers.LayerNormalization( epsilon=1e-6)(attention2 + attention1)
outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2) outputs = tf.keras.layers.Dense(units=d_model)(outputs) outputs = tf.keras.layers.Dropout(rate=dropout)(outputs) outputs = tf.keras.layers.LayerNormalization( epsilon=1e-6)(outputs + attention2)
return tf.keras.Model( inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask], outputs=outputs, name=name)解碼器
解碼器包括:
目標通過嵌入與位置編碼相加。 該求和的輸出是解碼器層的輸入。 解碼器的輸出是最終線性層的輸入。
def decoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name='decoder'): inputs = tf.keras.Input(shape=(None,), name='inputs') enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs') look_ahead_mask = tf.keras.Input( shape=(1, None, None), name='look_ahead_mask') padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask') embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs) embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32)) embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
for i in range(num_layers): outputs = decoder_layer( units=units, d_model=d_model, num_heads=num_heads, dropout=dropout, name='decoder_layer_{}'.format(i), )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])
return tf.keras.Model( inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask], outputs=outputs, name=name)Transformer
Transformer由編碼器,解碼器和最終線性層組成。 解碼器的輸出是線性層的輸入,並返回其輸出。
enc_padding_mask和dec_padding_mask用於屏蔽所有填充token。 look_ahead_mask用於屏蔽序列中的未來標記。 隨著掩碼的長度隨著輸入序列長度的變化而變化,我們將使用Lambda層創建這些掩碼。
def transformer(vocab_size, num_layers, units, d_model, num_heads, dropout, name="transformer"): inputs = tf.keras.Input(shape=(None,), name="inputs") dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")
enc_padding_mask = tf.keras.layers.Lambda( create_padding_mask, output_shape=(1, 1, None), name='enc_padding_mask')(inputs) look_ahead_mask = tf.keras.layers.Lambda( create_look_ahead_mask, output_shape=(1, None, None), name='look_ahead_mask')(dec_inputs) dec_padding_mask = tf.keras.layers.Lambda( create_padding_mask, output_shape=(1, 1, None), name='dec_padding_mask')(inputs)
enc_outputs = encoder( vocab_size=vocab_size, num_layers=num_layers, units=units, d_model=d_model, num_heads=num_heads, dropout=dropout, )(inputs=[inputs, enc_padding_mask])
dec_outputs = decoder( vocab_size=vocab_size, num_layers=num_layers, units=units, d_model=d_model, num_heads=num_heads, dropout=dropout, )(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)訓練模型
我們可以按如下方式初始化Transformer:
NUM_LAYERS = 2D_MODEL = 256NUM_HEADS = 8UNITS = 512DROPOUT = 0.1
model = transformer( vocab_size=VOCAB_SIZE, num_layers=NUM_LAYERS, units=UNITS, d_model=D_MODEL, num_heads=NUM_HEADS, dropout=DROPOUT)在定義了我們的損失函數,優化器和度量之後,我們可以使用model.fit()簡單地訓練我們的模型。 請注意,我們必須屏蔽我們的損失函數,以便忽略填充標記,我們可以自定義學習速率。
def loss_function(y_true, y_pred):y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1)) loss = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction='none')(y_true, y_pred)
mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)loss = tf.multiply(loss, mask)
return tf.reduce_mean(loss)
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):super(CustomSchedule, self).__init__()
self.d_model = d_modelself.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):arg1 = tf.math.rsqrt(step)arg2 = step * (self.warmup_steps**-1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam( learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
def accuracy(y_true, y_pred): # ensure labels have shape (batch_size, MAX_LENGTH - 1)y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))accuracy = tf.metrics.SparseCategoricalAccuracy()(y_true, y_pred)return accuracy
model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
EPOCHS = 20
model.fit(dataset, epochs=EPOCHS)評估
為了評估,我們必須一次一個地推斷一個步驟,並將前一個時間步的輸出作為輸入傳遞。
請注意,我們通常不會在推理期間應用dropout,但是我們沒有為模型指定訓練參數。 這是因為我們已經內置了訓練和掩碼,如果我們想運行模型進行評估,我們可以簡單地調用模型(輸入,訓練= False)來以推理模式運行模型。
def evaluate(sentence):sentence = preprocess_sentence(sentence)
sentence = tf.expand_dims(START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
output = tf.expand_dims(START_TOKEN, 0)
for i in range(MAX_LENGTH):predictions = model(inputs=[sentence, output], training=False)
# select the last word from the seq_len dimensionpredictions = predictions[:, -1:, :]predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
# return the result if the predicted_id is equal to the end tokenif tf.equal(predicted_id, END_TOKEN[0]):break
# concatenated the predicted_id to the output which is given to the decoder as its input.output = tf.concat([output, predicted_id], axis=-1)
return tf.squeeze(output, axis=0)
def predict(sentence):prediction = evaluate(sentence)predicted_sentence = tokenizer.decode([i for i in prediction if i < tokenizer.vocab_size])return predicted_sentence為了測試我們的模型,我們可以調用預測(句子)。
>>> output = predict(『Where have you been?』)>>> print(output)i don t know . i m not sure . i m a paleontologist .
代碼連結:
https://github.com/tensorflow/examples/blob/master/community/en/transformer_chatbot.ipynb
原文連結:
https://medium.com/tensorflow/a-transformer-chatbot-tutorial-with-tensorflow-2-0-88bf59e66fe2
-END-
專知,專業可信的人工智慧知識分發,讓認知協作更快更好!歡迎登錄www.zhuanzhi.ai,註冊登錄專知,獲取更多AI知識資料!
歡迎微信掃一掃加入專知人工智慧知識星球群,獲取最新AI專業乾貨知識教程視頻資料和與專家交流諮詢!
請加專知小助手微信(掃一掃如下二維碼添加),加入專知人工智慧主題群,諮詢技術商務合作~