機器學習算法與自然語言處理出品
@公眾號原創專欄作者 鎢額音
學校 | 西湖大學文本智能實驗室研究助理
本文首發於 TFSEQ Part II: 序列模型的實現細節,留檔。
介紹 tf.placeholder 和 tf.data 兩種構建數據 pipeline 的方案,並提供了性能上的分析。介紹了 tf.data 的參數設置以及使用時的考量。最後總結了一個簡單的 NLP dataset 模板,以及分享了一些 tf.data 的使用技巧。
介紹了 bucketing 的原理和實現,並分析了其對收斂效果的影響。
介紹了變長序列 attention 的常見實現錯誤,並分享了一個簡單的數值穩定實現
介紹了對長句的處理方案。討論了長句導致的內存溢出問題以及解決方案。介紹了 使用 TBPTT 對序列任務進行加速的方案,並對其與 BPTT 的區別進行了分析。
前言序列模型組件如 RNN 和 Attention 在自然語言處理中有廣泛的應用。但由於序列長度不一且變化範圍較大,為保證效率和穩定性,有許多實現上的細節需要考慮。同樣的理論在不同實現下效果往往會有神秘的差異,甚至會出現不能收斂的情況。本文總結了一些用 Tensorflow 實現序列模型的一些做法,並分析了效率和精度上的權衡。
本文假設讀者已經有深度學習在自然語言處理應用上的基本知識,並用 Tensorflow 實現過一些序列模型。為了避免翻譯帶來的歧義,部分術語會直接使用英文表述(使用中文的話會在括號裡加上英文術語),所以中英混雜的文風難以避免。為了討論方便,以下先做一些術語的規定。
min-batch SGD 是一種迭代式優化(iterative optimization)的算法,每一次迭代都包括以下三個步驟:
讀取 mini-batch,使用模型進行前饋計算(feedforward or forward)
計算 loss,並利用 loss 的值進行反向傳播(backpropagation or backprop),得到各個參數的梯度(gradient)
根據算出的梯度,利用選定的優化算法(Tensorflow 中稱為 Optimizer),如標準 SGD 或者更加流行的 Adam 對參數進行更新。
數據集通常會先進行隨機打亂(shuffle),然後再將整個數據集分成固定大小(batch size)的mini-batch。這是為了保證模型見到的數據是獨立同分布的(i.i.d., independent and identically distributed),從而達到無偏的梯度估計。整個數據集全部輸入並訓練一遍稱為一個 epoch。通常每個 epoch 開始之前都會將數據集 shuffle 一遍。
1. 數據預處理的加速數據預處理的過程即從原始數據到將切分好的 mini-batch 載入模型的整個過程。tensorflow 的文檔將整個流程劃分為 Extract,Transform 和 Load(ETL)
A typical TensorFlow training input pipeline can be framed as an ETL process: 1. Extract: Read data from persistent storage — either local (e.g. HDD or SSD) or remote (e.g. GCS or HDFS). 2. Transform: Use CPU cores to parse and perform preprocessing operations on the data such as image decompression, data augmentation transformations (such as random crop, flips, and color distortions), shuffling, and batching. 3. Load: Load the transformed data onto the accelerator device(s) (for example, GPU(s) or TPU(s)) that execute the machine learning model.1.1 Placeholder 模式
tensorflow 最初推薦使用的是 tf.placeholder 模式。這個模式的流程如下:
Extract:先將整個數據集讀入內存
Transform:在Python中將數據轉化成 numpy 的矩陣
Load:使用 tf.placeholder 將 numpy 數據拷貝到 tensorflow 的內存中,再從 tensorflow 拷貝到 GPU 中。
tf.placeholder 模式的圖示,虛線框值指的是計算操作,實線框指的是存儲。曲線數指系統線程數。所有步驟共享同一線程。這種模式雖然簡單,但缺點也很多:
當數據集很大的時候,可能無法將整個數據集載入內存。
python 代碼的運行速度較慢,多線程由於 python GIL 的存在無法有效並行
多了不必要的拷貝
最明顯的缺點是,在整個處理流程中 GPU 一直是閒置的。
所以 tensorflow 如今主推的是 tf.data api,tf.placeholder的介紹已經很難在文檔中找到了。但在實際線上服務的時候,tf.placeholder 仍然是把數據載入模型的主要方式。
1.2 Input pipeline
input pipeline 在 tensorflow 早期就已經存在,真正做到易用還是從 tensorflow 1.2 發布 tf.contrib.data 接口開始,其前身是雜亂無章的 QueueRunner 和 Queue。
為了解決 tf.placeholder 的效率問題,tensorflow 採用了 cache 和 pipeline 的思想。
Extract:將數據集順序讀入一個固定長度的 queue 中。如果 queue 中的數據達到其容量上線,則阻塞直到下遊任務讀取數據空出容量。
Transform:從 extract 的 queue 中讀取(dequeue)一定量的數據,開啟多條線程並行處理數據,將得到的 mini-batch 放在一個固定長度的 queue 中。
Load:從 transform 後的 queue 中讀取(dequeue) 一個 mini-batch,傳到 GPU。新版本的tensorflow 還提供了 tf.contrib.data.prefetch_to_device 接口,支持將上一步的queue 放在 GPU 內存裡,從而進一步減少等待時間。
通過維護一個固定長度的 queue 作為 cache 來解決內存佔用的問題。三個步驟通過 cache 獨立開來,有獨立的線程去維護,需要調整參數保證各個 queue 不會排空,從而避免阻塞。為了加快處理速度,transform 這一步還加入了多線程支持。
下圖總結了各個步驟的核心函數,圖中寫出的參數是核心參數。
tf.data 模式的圖示,虛線框值指的是計算操作,實線框指的是存儲。曲線數指系統線程數。所有步驟都有各自的線程。對於文本數據而言,一個典型的 dataset 如下
def build_dataset(filenames,
parse_func,
filter_func,
pad_id,
batch_size,
buffer_size,
shuffle=False,
repeat=False):
dataset = tf.data.TextLineDataset(filenames, buffer_size=buffer_size)
if shuffle:
dataset = dataset.shuffle(buffer_size)
if repeat:
# repeat forever
dataset = dataset.repeat()
# filter invalid lines out
dataset = dataset.filter(lambda line: filter_func(line))
dataset = dataset.map(lambda line: parse_func(line),
num_parallel_calls=1)
# suppose element is (word_ids, length)
dataset = dataset.padded_batch(batch_size,
padded_shapes=(
tf.TensorShape([None]),
tf.TensorShape([])),
padding_values=(pad_id, 0))
dataset = dataset.prefetch(8)
return dataset
1.3 Data pipeline 的優化
理解上述過程後,對 buffer_size 和 num_parallel_calls 這兩個參數的調試目標就是在保證 cache 不被排空的基礎上,儘量少地佔用計算資源(如果你還有別的任務要跑的話)。tf.data.Dataset.prefetch 通常只放在最後一步,其 buffer_size 一般設成 8 左右。對於不複雜的 Transform,tf.data.Dataset.map 的 num_parallel_calls 一般設成 1~2。
pipeline 的運行和模型訓練是獨立的,所以只需要保證處理數據的速度比訓練的速度快即可,而這在文本數據處理中很容易做到。由於 tensorflow 對文本處理的支持不好,建議將比較複雜的文本處理先離線做好存成數據文件,再用 tensorflow 讀取。
需要注意的是 tf.data.Dataset.shuffle 的使用。因為 shuffle 是在 Queue 上做的,當 buffer_size 比較小時,對數據集的 shuffle 會不夠充分。實際使用時建議先將硬碟中的數據全局 shuffle 一遍:shuf data.txt > shuf_data.txt,並將 buffer_size 設大一點。
1.4 Data pipeline 的技巧
在你的訓練流程中,如果 GPU 的計算需要依賴沒有 GPU 實現的操作,而這部分操作又不會很重的話,可以把它放在 input pipeline 中,充分利用 tf.data.Dataset.prefetch 的 cache 功能。這裡舉兩個例子:
在實現 GPU 版 word2vec 時,可以把 negative sampling 部分放在 input pipeline 中。
在實現多卡訓練的時候,如果只維護一條 input pipeline,可以將大 batch 切分的處理放在 input pipeline 裡做。個人發現這比這個討論提到的方案都要好一些。
def split_batches(num_splits, batches):
batch_size = tf.shape(batches[0])[0]
# evenly distributed sizes
divisible_sizes = tf.fill([num_splits],
tf.floor_div(batch_size, num_splits))
remainder_sizes = tf.sequence_mask(tf.mod(batch_size, num_splits),
maxlen=num_splits,
dtype=tf.int32)
frag_sizes = divisible_sizes + remainder_sizes
batch_frags_list = []
for batch in batches:
batch_frags = tf.split(batch, frag_sizes, axis=0)
batch_frags_list.append(batch_frags)
frag_batches_list = zip(*batch_frags_list)
# fix corner case
for i, frag_batches in enumerate(frag_batches_list):
if len(frag_batches) == 1:
frag_batches_list[i] = frag_batches[0]
return frag_batches_list
dataset = dataset.map(lambda seqs_batch, lengths_batch:
split_batches(num_splits,
[seqs_batch, lengths_batch]),
num_parallel_calls=4)
更多的建議(如fused transformation)可以參照官方文檔。
2. 變長序列的處理2.1 處理 mini-batch 中的序列長度變化:Padding 和 Bucketing
由於數據集中的序列的長度不一,我們需要對 mini-batch 中的短句進行補長(padding)的操作:可用指定的符號(token)拼到短句末位,將短句補到 mini-batch 裡面最長句子的長度。這是因為模型的大部分操作都是基於矩陣運算。補長用的符號對應的特徵通常會在後續的計算開始前置零。一般補長操作都是在做完詞表映射後做的。
當 mini-batch 中的句長範圍過大的時候(如1~100),GPU 大部分時間都在處理最長的句子,短句有效的部分很快就處理完了。為了更充分利用GPU的並行計算資源,我們可以控制 mini-batch 的句長範圍。
Bucketing 的想法就是在構建 mini-batch 之前,先將序列按長度分組(bucket),然後再從每個 bucket 裡面隨機選取序列構建 mini-batch。按長度分組的操作可以通過簡單的 hash 實現。
Bucketing + padding 可以藉由 tf.data 接口實現:
def bucket_pad_batch(dataset, pad_id):
# dataset element is (seq, length). seq is an integer tuple.
def key_func(seq, length):
bucket_id = length // bucket_width
return tf.to_int64(bucket_id)
def reduce_func(unused_key, windowed_data):
return windowed_data.padded_batch(
batch_size,
padded_shapes=(
tf.TensorShape([None]),
tf.TensorShape([]), # 表示不做 padding
),
padding_values=(pad_id,
0, # 因為不做padding,這只是佔位符
))
dataset = dataset.apply(tf.contrib.data.group_by_window(
key_func=key_func,
reduce_func=reduce_func,
window_size=batch_size))
return dataset
當數據集的句長分布範圍較大的時候,bucketing 可以顯著提升訓練速度。
但 bucketing 分組的操作會妨礙 shuffle 的效果,使訓練數據偏離 i.i.d.(Independent and identically distributed) 的假設,使梯度估計有偏,在訓練過程中使 loss 的抖動加劇,模型的收斂性變差。實際使用中需要調整 bucket 的數量,對訓練速度和收斂性能進行 trade-off。在這篇水文中有這個圖可以參考:
Bucket 數量對訓練速度和收斂速度的影響。圖來自Accelerating recurrent neural network training using sequence bucketing and multi-GPU data parallelization2.2 變長序列的 Soft Attention
soft attention 的計算方法如下:
其中 為第 個詞的權重, 為 attention key 和第 個詞算出來的相似度。我們可以簡單地調用 tf.nn.softmax 進行計算。attention 後的句子特徵為
其中 是第 個詞的特徵向量。
# 實現 1
# feat_seqs: shape=[batch_size, maxlen, feature] # 每個詞的特徵
# sims: shape=[batch_size, maxlen] # 每個詞的 attention 相似度
# lengths: shape=[batch_size] # 句長
atns = tf.nn.softmax(sims) # [batch_size, maxlen]
atns = tf.expand_dims(atns, axis=-1) # [batch_size, maxlen, 1]
feat_seqs = tf.transpose(feat_seqs, [0,2,1]) # [batch_size, feature, maxlen]
feat = tf.matmul(feat_seqs, atns)
在 mini batch 訓練過程中,大多數序列後面都會帶上佔位符 <pad>,如果不做處理的話,上述實現對 attention 權重的計算應為下式:
代表佔位符對應的相似度, 代表詞對應的相似度。 為上述代碼計算的 attention 權重。attention 後的特徵則變成了
如果佔位符的特徵不為 ,算出來的句子表徵就會偏離真實的表徵。但即使佔位符的特徵都為 , 這一項的存在也會將句子的特徵向量的模 變小: 。
常見的錯誤解決方案是直接將 置為零:
# 實現 2
# feat_seqs: shape=[batch_size, maxlen, feature] # 每個詞的特徵
# sims: shape=[batch_size, maxlen] # 每個詞的 attention 相似度
# lengths: shape=[batch_size] # 句長
mask = tf.sequence_mask(seq_lens, maxlen=shape[seq_axis], dtype=tf.float32)
atns = tf.nn.softmax(sims*mask) # [batch_size, maxlen]
atns = tf.expand_dims(atns, axis=-1) # [batch_size, maxlen, 1]
feat_seqs = tf.transpose(feat_seqs, [0,2,1]) # [batch_size, feature, maxlen]
feat = tf.matmul(feat_seqs, atns)
但由於 ,實際上 <pad> 的特徵還是會對句子特徵產生影響。
比較正確的做法是將 置為零:
# 實現 3
# feat_seqs: shape=[batch_size, maxlen, feature] # 每個詞的特徵
# sims: shape=[batch_size, maxlen] # 每個詞的 attention 相似度
# lengths: shape=[batch_size] # 句長
mask = tf.sequence_mask(seq_lens, maxlen=shape[seq_axis], dtype=tf.float32)
exp_sims = tf.exp(sims)*mask
atns = exp_sims/tf.reduce_sum(exp_sims, axis=-1, keepdims=True)
atns = tf.expand_dims(atns, axis=-1) # [batch_size, maxlen, 1]
feat_seqs = tf.transpose(feat_seqs, [0,2,1]) # [batch_size, feature, maxlen]
feat = tf.matmul(feat_seqs, atns)
但由於 tf.float32 的範圍為 , 有 overflow 的風險。我們可以使用下面這個常用的 trick:
其中 為常數,通常取 來防止 overflow。
然而這種做法也是不穩定的,此時存在 underflow 的風險。對參數隨機初始化後,各個詞的 通常非常相近,在 variance 較小的情況下 ,最後導致所有 attention 權重之和 遠遠偏離 1。這裡的 overflow 或者 underflow 是導致模型出現 NAN 或者發散的元兇之一。
我們可以動態調整 ,以及將數值計算轉化到 tf.float64 進行計算。我們也可以直接使用 tf.clip_by_value 將最小值取為一個很小的數,防止全部變成 0。
所幸 Tensorflow 的 tf.nn.softmax 對 softmax 的實現是數值穩定的。同時注意到這個 trick:
所以我們只需將 實現1 的 atns 中對應 <pad> 的值置零,再重新對非零的值做歸一化便可:
# 實現 4
# feat_seqs: shape=[batch_size, maxlen, feature] # 每個詞的特徵
# sims: shape=[batch_size, maxlen] # 每個詞的 attention 相似度
# lengths: shape=[batch_size] # 句長
mask = tf.sequence_mask(seq_lens, maxlen=shape[seq_axis], dtype=tf.float32)
atns = tf.nn.softmax(sims) # [batch_size, maxlen]
atns = mask*atns
atns = atns/tf.reduce_sum(atns, axis=-1, keepdims=True)
atns = tf.expand_dims(atns, axis=-1) # [batch_size, maxlen, 1]
feat_seqs = tf.transpose(feat_seqs, [0,2,1]) # [batch_size, feature, maxlen]
feat = tf.matmul(feat_seqs, atns)
2.3 過長句子的處理
2.3.1 處理 GPU 內存限制
RNN 的訓練用的是 BPTT(Back-propagation through time),每次 forward 的時候需要將每一個 step 的中間狀態保存在內存中。GPU 內存的限制導致我們無法處理過長的序列(~4000)。
當過長的句子在訓練數據中佔少數時,我們可以通過簡單的截斷操作,只保留序列的前 位進行處理。但當長句的佔比較多的時候,截斷會丟失很多訓練信息。我們可以通過設置 tf.nn.dynamic_rnn 函數中的 swap_memory=True 來解決 GPU 內存限制的問題。這個操作會將所有 step 的中間狀態放在 CPU 的內存中。
2.3.2 序列任務的加速:TBPTT(Truncated Back-propagation through time)
如果你的任務從序列的一部分就能夠構建監督信號(如 language modeling,或者其他序列預測任務),用整句長句(~4000)的 loss 去構建一個 gradient 便顯得有點浪費。我們可以將長句 切成短句:,並在處理完每一個短句後便更新模型。在和 BPTT 同樣的計算量下,一個長句子可以更新 次參數(BPTT 只能更新一次)。TBPTT 在像語言模型這種序列預測任務中很流行。
2.3.2.1 TBPTT 第一種實現
一個簡單的實現是把切割出來的短句當成獨立的句子直接訓練: 每個子序列 的初始狀態(initial state)都設成全零向量(或者事先指定的默認向量)。和 BPTT 相比,這種方法會去掉數據集中的長程依賴(long term dependency):
TBPTT 第一種實現圖示假設子句 的順序沒有被打亂,切出的短句句長 , 圖中 指第 位的詞, 為計算第 步梯度使用的 loss,從對應的子句 的詞(紅框內)構建。initial state 是 RNN 計算每個子句 時的初始狀態。可以看到在子句間切換時,子句依賴的信息被 initial state 的置零打斷了。
即便如此,這種方案的好處是可以讓模型冷啟動的性能更好:RNN 形成頭幾個詞的表徵時不能過分依賴初始狀態,需要很快地將輸入的有效信息放到 RNN cell 中。如果訓練完的序列模型是作為短文本任務的特徵提取器(如ELMo),冷啟動的性能將是比較重要的考量。
2.3.2.2 TBPTT 第二種實現
為了保存數據集中的長程依賴(long term dependency),我們可以將更新的 batch 順序限制為 ,並將 的 final state 作為 的 initial state。這裡的本意是使用 state 的傳遞保存子句間的依賴,也是大部分 language modeling 任務的做法。
看起來這種實現像是保存了長程依賴。但在實際訓練時,模型並沒有用長程依賴的信息去訓練。從監督信號的角度來看,因為 loss 最多由句長為 的子句構建,模型最多只能學到長度為 的依賴。見下圖:
TBPTT 第二種實現圖示但 TBPTT 和 BPTT 的差異還不只長程依賴的學習這一點。記 RNN 的函數為
為使用參數 在子句 上進行 forward 計算後的 , 為對應的每一步的輸出。再記 和 分別為計算第 和第 個子句的 loss 所用的參數。
在 TBPTT 中,RNN 計算所用參數為
在 BPTT 中,RNN 計算所用參數為
注意 的上標的不同。兩種使用的初始化狀態是用不同的參數計算出來的。假如在 TBPTT 中計算不同子句的 loss 後不會更新參數,那麼 ,進而, RNN 的計算是一樣的。但實際上參數在處理完每個子句後都會被更新,所以 。由於 RNN 是非線性系統, 的差異會隨著子句長度 的增大而很快被放大。
TBPTT 還可以拓展成一般形式,實現會更加複雜,但實際效果和方案2差不多。感興趣的讀者可以參考這篇文章。
下面是構建計算圖的代碼片段:
# 兩個重要變量:
# reset_flags: 是否需要要使用default_h
# max_batch_size: 如果輸入的batch_size可變,這個值是最大可能的batch_size
def build_state_var(name):
return tf.get_local_variable(name,
[max_batch_size, state_size],
initializer=tf.zeros_initializer(),
trainable=False)
cache_h = build_state_var("cache_h")
default_h = build_state_var("default_h")
initial_state = tf.where(reset_flags,
x=default_h[:batch_size, :],
y=cache_h[:batch_size, :])
outputs, final_state = tf.nn.dynamic_rnn(gru_cell,
embed_seqs,
seq_lens,
initial_state=initial_state)
# 構建train_op
# ...
# 將cache update綁定train_op
with tf.control_dependencies([train_op]):
cache_update = tf.assign(initial_state, final_state)
以及 input pipeline 切分子句的代碼片段:
def truncate_dateset(batched_dataset, chunk_length):
# input batch_dataset element: (ids_batch, length_batch)
# output batch_dataset element: (reset_flag, ids_batch, length_batch)
# do not shuffle the output dataset!!!
def truncate_map(ids_batch, length_batch):
chunk_nums = tf.ceil(tf.truediv(length_batch, chunk_length))
chunk_nums = tf.cast(chunk_nums, dtype=tf.int32)
max_chunk_num = tf.reduce_max(chunk_nums)
batch_size = tf.shape(length_batch)[0]
divisible_lens = tf.sequence_mask(chunk_nums - 1, dtype=tf.int32) * chunk_length
remainder_lens = tf.expand_dims(length_batch - chunk_length * (chunk_nums - 1), 1)
chunck_lengths = tf.reverse_sequence(tf.concat([remainder_lens, divisible_lens],
axis=1),
seq_lengths=chunk_nums,
seq_axis=1)
ids_batch = ids_batch
reset_flags = tf.concat([tf.ones([batch_size, 1], dtype=tf.bool),
tf.zeros([batch_size, max_chunk_num - 1],
dtype=tf.bool)],
axis=1)
def get_batch(index):
chunk_reset_flag = reset_flags[:, index]
chunk_length_batch = chunck_lengths[:, index]
chunk_ids_batch = ids_batch[:, index * chunk_length:(index + 1) * chunk_length]
return chunk_reset_flag, chunk_length_batch, chunk_ids_batch
indices = tf.range(0, max_chunk_num)
return tf.data.Dataset.from_tensor_slices(indices).map(get_batch)
return batched_dataset.flat_map(truncate_map)
推薦閱讀:
【長文詳解】從Transformer到BERT模型
賽爾譯文 | 從頭開始了解Transformer
百聞不如一碼!手把手教你用Python搭一個Transformer