介紹
長期以來,我聽說時間序列問題只能用統計方法(AR[1],AM[2],ARMA[3],ARIMA[4])。這些技術通常被數學家使用,他們試圖不斷改進這些技術來約束平穩和非平穩的時間序列。
幾個月前,我的一個朋友(數學家、統計學教授、非平穩時間序列專家)提出讓我研究如何驗證和改進重建恆星光照曲線的技術。事實上,克卜勒衛星[11]和其他許多衛星一樣,無法連續測量附近恆星的光通量強度。克卜勒衛星在2009年至2016年間致力於尋找太陽系外的行星,稱為太陽系外行星或系外行星。
正如你們所理解的,我們將比我們的行星地球走得更遠一點,並利用機器學習進入銀河之旅。天體物理學一直是我的摯愛。
這個notebook可以在Github上找到:https://github.com/Christophe-pere/Time_series_RNN。
RNN,LSTM,GRU,雙向,CNN-x
那麼我們將在哪個模型上進行這項研究?我們將使用循環神經網絡(RNN[5]),LSTM[6]、GRU[7]、Stacked LSTM、Stacked GRU、雙向LSTM[8]、雙向GRU以及CNN-LSTM[9]。
對於那些熱衷於樹的人,你可以在這裡找到一篇關於XGBoost和時間序列的文章,作者是jasonbrownley。github上提供了一個關於時間序列的很好的存儲庫:https://github.com/Jenniferz28/Time-Series-ARIMA-XGBOOST-RNN
對於那些不熟悉RNN家族的人,把它們看作是具有記憶效應和遺忘能力的學習方法。雙向來自體系結構,它是指兩個RNN,它將在一個方向(從左到右)和另一個方向(從右到左)「讀取」數據,以便能夠最好地表示長期依賴關係。
數據
如前文所述,這些數據對應於幾顆恆星的通量測量值。實際上,在每一個時間增量(小時),衛星都會測量來自附近恆星的通量。這個通量,或者說是光強度,隨時間而變化。這有幾個原因,衛星的正確移動、旋轉、視角等都會有所不同。因此,測量到的光子數會發生變化,恆星是一個熔化的物質球(氫和氦聚變),它有自己的運動,因此光子的發射取決於它的運動。這對應於光強度的波動。
但是,也可能有行星,系外行星,它們幹擾恆星,甚至從恆星之間穿過衛星的視線(凌日方法[12])。這條通道遮住了恆星,衛星接收到的光子較少,因為它們被前面經過的行星擋住了(一個具體的例子是月球引起的日食)。
通量測量的集合被稱為光曲線。光曲線是什麼樣子的?以下是一些示例:
不同恆星之間的通量非常不同。有的噪音很大,有的則很穩定。通量仍然呈現異常。在光照曲線中可以看到孔或缺少測量。我們的目標是看是否有可能在沒有測量的情況下預測光曲線的行為。
數據縮減
為了能夠使用模型中的數據,有必要進行數據簡化。這裡將介紹兩種方法,移動平均法和窗口法。
移動平均線:
移動平均包括取X個連續點並計算它們的平均值。這種方法可以減少變異性,消除噪聲。這也減少了點的數量,這是一種下採樣方法。
下面的函數允許我們從點列表中計算移動平均值,方法計算點的平均值和標準差的數字。
def moving_mean(time, flux, lag=5): ''' 該函數通過設定平均值,使數據去噪,減少數據量。 @param time: (list) 時間值列表 @param flux: (list) 浮點列表->恆星通量 @param lag: (int) 平均值個數,默認值5 @return X: (list) 時間調整 @return y: (list) 通量按平均值重新標定 @return y_std: (list) 標準差列表 ''' # 讓我們做一些簡單的代碼 # 空列表 X = [] y = [] y_std = [] j = 0 # 增量 for i in range(int(len(flux)/lag)): X.append(np.mean(time[(i+j):(i+j+lag)])) y.append(np.mean(flux[(i+j):(i+j+lag)])) y_std.append(np.std(flux[(i+j):(i+j+lag)])) j+= lag return X, y, y_stdn可以看到函數在輸入中接受3個參數。時間和通量是時間序列的x和y。lag 是控制計算時間和通量平均值以及通量標準差時所考慮的數據個數。
現在,我們可以看看如何使用這個函數以及通過轉換得到的結果。
# #導入所需的包matplotlib inlineimport scipyimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport sklearnimport tensorflow as tf# 讓我們看看進度條from tqdm import tqdmtqdm().pandas()現在我們需要導入數據。文件kep_lightcurves.csv包含著數據。每顆恆星有4列,原始磁通量(「…orig」),重新縮放的通量是原始磁通量減去平均磁通量(「…rscl』」)、差值(「…diff」)和殘差(「…_res」)。總共52列。
# 20個數據點x, y, y_err = moving_mean(df.index,df["001724719_rscl"], 20)df.index表示時間序列的時間
df[" 001724719_rscl "] 重新縮放的通量(" 001724719 ")
lag=20是計算平均值和std的數據點的個數
前面3條光照曲線的結果:
窗口方法
第二種方法是窗口方法,它是如何工作的?
你需要取很多點,在前一個例子中是20,然後計算平均值(與前面的方法沒有區別),這個點是新時間序列的開始,它在位置20(偏移19個點)。但是,窗口不是移動到下20個點,而是移動一個點,用之前的20個點計算平均值,然後通過向後移動一步,以此類推。
這不是一種下採樣方法,而是一種清理方法,因為其效果是平滑數據點。
讓我們看看代碼:
def mean_sliding_windows(time, flux, lag=5): ''' 該函數通過設定平均值,使數據去噪。 @param time: (list) 時間值列表 @param flux: (list) 浮點列表->恆星通量 @param lag: (int) 平均值個數,默認值5 @return X: (list) 時間調整 @return y: (list) 通量按平均值重新標定 @return y_std: (list) 標準差列表 ''' # 讓我們做一些簡單的代碼 # 空列表 X = [] y = [] y_std = [] j = 0 # 增量 for i in range(int(len(flux)-lag)): _flux = flux[i:(i+lag)] _time = time[i:(i+lag)] X.append(np.mean(_time)) y.append(np.mean(_flux)) y_std.append(np.std(_flux)) j+= 1 # 我們只移動一步 return X, y, y_std你可以很容易地這樣使用它:
# 使用20個點x, y, y_err = mean_sliding_windows(df.index,df["001724719_rscl"], 40)df.index表示時間序列的時間
df[" 001724719_rscl "] 重新縮放的通量(" 001724719 ")
lag=40是計算平均值和std的數據點的個數
現在,看看結果:
嗯,還不錯。將lag設置為40允許「預測」或在小孔中擴展新的時間序列。但是,如果你仔細看,你會發現在紅線的開始和結束部分有一個分歧。可以改進函數以避免這些偽影。
在接下來的研究中,我們將使用移動平均法獲得的時間序列。
將x軸從值更改為日期:
如果需要日期,可以更改軸。克卜勒任務開始於2009年3月7日,結束於2017年。Pandas有一個叫做pd.data_range()的函數。此函數允許你從不斷遞增的列表中創建日期。
df.index = pd.date_range(『2009–03–07』, periods=len(df.index), freq=』h』)這行代碼將創建一個頻率為小時的新索引。列印結果如下所示。
$ df.indexDatetimeIndex(['2009-03-07 00:00:00', '2009-03-07 01:00:00', '2009-03-07 02:00:00', '2009-03-07 03:00:00', '2009-03-07 04:00:00', '2009-03-07 05:00:00', '2009-03-07 06:00:00', '2009-03-07 07:00:00', '2009-03-07 08:00:00', '2009-03-07 09:00:00', ... '2017-04-29 17:00:00', '2017-04-29 18:00:00', '2017-04-29 19:00:00', '2017-04-29 20:00:00', '2017-04-29 21:00:00', '2017-04-29 22:00:00', '2017-04-29 23:00:00', '2017-04-30 00:00:00', '2017-04-30 01:00:00', '2017-04-30 02:00:00'], dtype='datetime64[ns]', length=71427, freq='H')現在,對於原始時間序列,你有了一個很好的時間刻度。
生成數據集
因此,既然已經創建了數據簡化函數,我們可以將它們組合到另一個函數中(如下所示),該函數將考慮初始數據集和數據集中的恆星名稱(這部分可以在函數中完成)。
def reduced_data(df,stars): ''' Function to automatically reduced a dataset @param df: (pandas dataframe) 包含所有數據的dataframe @param stars: (list) 包含我們想要簡化數據的每個恆星的名稱的列表 @return df_mean: 包含由減少平均值的數據的dataframe @return df_slide: 包含通過滑動窗口方法減少的數據 ''' df_mean = pd.DataFrame() df_slide = pd.DataFrame() for i in tqdm(stars): x , y, y_std = moving_average(df.index, df[i+"_rscl"], lag=25) df_mean[i+"_rscl_x"] = x df_mean[i+"_rscl_y"] = y df_mean[i+"_rscl_y_std"] = y_std x , y, y_std = mean_sliding_windows(df.index, df[i+"_rscl"], lag=40) df_slide[i+"_rscl_x"]= x df_slide[i+"_rscl_y"]= y df_slide[i+"_rscl_y_std"]= y_std return df_mean, df_slide要生成新的數據幀,請執行以下操作:
stars = df.columnsstars = list(set([i.split("_")[0] for i in stars]))print(f"The number of stars available is: {len(stars)}")> The number of stars available is: 13我們有13顆恆星,有4種數據類型,對應52列。
df_mean, df_slide = reduced_data(df,stars)很好,在這一點上,你有兩個新的數據集,其中包含移動平均和窗口方法減少的數據。
方法
準備數據:
為了使用機器學習算法來預測時間序列,必須相應地準備數據。數據不能僅僅設置在(x,y)個數據點。數據必須採用序列[x1,x2,x3,…,xn]和預測值y的形式。
下面的函數演示如何設置數據集:
def create_dataset(values, look_back=1): ''' 函數準備一列(x, y)數據指向用於時間序列學習的數據 @param values: (list) 值列表 @param look_back: (int) x列表的值[x1, x2, x3,…默認值1 @return _x: x時間序列的值 @return _y: y時間序列的值 ''' # 空列表 _x, _y = [], [] for i in range(len(values)-look_back-1): a = values[i:(i+look_back)] _x.append(a) # 集合x _y.append(values[i + look_back]) # 集合y return np.array(_x), np.array(_y)開始之前有兩件重要的事。
1.需要重新縮放數據
當數據在[0,1]範圍內時,深度學習算法對時間序列的預測效果更好。為此,scikit learn提供了MinMaxScaler()函數。你可以配置feature_range參數,但默認值為(0,1)。並清除nan值的數據(如果不刪除nan值,則損失函數將輸出nan)。
# 縮放數據num = 2 # 選擇數據集中的第三顆星values = df_model[stars[num]+"_rscl_y"].values # 提取值scaler = MinMaxScaler(feature_range=(0, 1)) # 創建MinMaxScaler的實例dataset = scaler.fit_transform(values[~np.isnan(values)].reshape(-1, 1)) # 數據將清除nan值,重新縮放並改變形狀2.需要將數據轉換為x list和y
現在,我們將使用create_values()函數為模型生成數據。但是,以前,我更喜歡通過以下方式保存原始數據:
df_model = df_mean.save()# 分成訓練和測試集setstrain_size = int(len(dataset) * 0.8) # 生成80%的訓練數據train = dataset[:train_size] # 設置訓練數據test = dataset[train_size:] # 設置測試數據#重塑為X=t和Y=t+1look_back = 20trainX, trainY = create_dataset(train, look_back)testX, testY = create_dataset(test, look_back)# 將輸入重塑為[示例、時間點、特徵]trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))看看結果吧
trainX[0]> array([[0.7414906], [0.76628096], [0.79901113], [0.62779976], [0.64012722], [0.64934765], [0.68549234], [0.64054092], [0.68075644], [0.73782449], [0.68319294], [0.64330245], [0.61339268], [0.62758265], [0.61779702], [0.69994317], [0.64737128], [0.64122564], [0.62016833], [0.47867125]]) # x數據的第一個有20個值trainY[0]> array([0.46174275]) # 對應的y值度量
我們用什麼指標來預測時間序列?我們可以使用平均絕對誤差和均方誤差。它們由函數給出:
def metrics_time_series(y_true, y_pred): ''' 從sklearn.metrics計算MAE和MSE度量 @param y_true: (list) 真實值列表 @param y_pred: (list) 預測值列表 @return mae, mse: (float), (float) mae和mse的度量值 ''' mae = round(mean_absolute_error(y_true, y_pred), 2) mse = round(mean_squared_error(y_true, y_pred), 2) print(f"The mean absolute error is: {mae}") print(f"The mean squared error is: {mse}") return mae, mse首先需要導入函數:
from sklearn.metrics import mean_absolute_error, mean_squared_errorRNNs:
你可以用幾行代碼輕鬆地用Keras實現RNN家族。在這裡你可以使用這個功能來配置你的RNN。你需要首先從Keras導入不同的模型,如:
# 導入一些包import tensorflow as tffrom keras.layers import SimpleRNN, LSTM, GRU, Bidirectional, Conv1D, MaxPooling1D, Dropout現在,我們有從Keras導入的模型。下面的函數可以生成一個簡單的模型(SimpleRNN,LSTM,GRU)。或者,兩個模型(相同的)可以堆疊,或者用於雙向或兩個雙向模型的堆棧中。你還可以添加帶有MaxPooling1D和dropout的CNN部分(Conv1D)。
def time_series_deep_learning(x_train, y_train, x_test, y_test, model_dl=LSTM , unit=4, look_back=20, cnn=False, bidirection=False, stacked=False): ''' 生成不同組合的RNN模型。可以是簡單的、堆疊的或雙向的。模型也可以與CNN部分一起使用。x是(樣本、時間步長、特徵)的訓練數據 @param x_train: (matrix) 訓練數據,維度為 (samples, time steps, features) @param y_train: (list) 預測 @param x_test: (matrix) 測試數據,維度為 (samples, time steps, features) @param y_test: (list) 預測 @param model_dl: (model) RNN類型 @param unit: (int) RNN單元數量 @param look_back: (int) x列表中值的數量,配置RNN的形狀 @param cnn: (bool) 添加cnn部分模型,默認為false @param bidirection: (bool) 為RNN添加雙向模型,默認為false @param stacked: (bool) 堆疊的兩層RNN模型,默認為假 @return train_predict: (list) x_train的預測值 @return train_y: (list) 真實y值 @return test_predict: (list) x_test的預測值 @return test_y: (list) 真實y值 @return (dataframe) 包含模型和度量的名稱 ''' #配置提前停止的回調,以避免過擬合 es = tf.keras.callbacks.EarlyStopping( monitor='loss', patience=5, verbose=0, mode='auto', ) # 序列模型的實例 model = Sequential() if cnn: # 如果cnn部分是需要的 print("CNN") model.add(Conv1D(128, 5, activation='relu')) model.add(MaxPooling1D(pool_size=4)) model.add(Dropout(0.2)) if not bidirection and not stacked: # 如果需要簡單的模型 print("Simple Model") name = model_dl.__name__ model.add(model_dl(unit, input_shape=(look_back, 1))) elif not bidirection: # 測試是否需要雙向模型 print("Stacked Model") name = "Stacked_"+model_dl.__name__ model.add(model_dl(unit, input_shape=(look_back, 1), return_sequences=True)) model.add(model_dl(unit, input_shape=(look_back, 1))) elif not stacked: # 測試是否需要堆疊模型 print("Bidirectional Model") name = "Bi_"+model_dl.__name__ model.add(Bidirectional(model_dl(unit, input_shape=(look_back, 1)))) else: # 測試是否需要雙向和堆疊模型 print("Stacked Bidirectional Model") name = "Stacked_Bi_"+model_dl.__name__ model.add(Bidirectional(model_dl(unit, input_shape=(look_back, 1), return_sequences=True))) model.add(Bidirectional(model_dl(unit, input_shape=(look_back, 1)))) if cnn: # 更新名稱與cnn部分 name = "CNN_"+name # 添加單層稠密層和激活函數線性來預測連續值 model.add(Dense(1)) model.compile(loss='mean_squared_error', optimizer='adam') # MSE loss可以被'mean_absolute_error'替代 model.fit(trainX, trainY, epochs=1000, batch_size=100, callbacks=[es], verbose=0) # 做出預測 train_predict = model.predict(x_train) test_predict = model.predict(x_test) # 反預測 train_predict = scaler.inverse_transform(train_predict) train_y = scaler.inverse_transform(y_train) test_predict = scaler.inverse_transform(test_predict) test_y = scaler.inverse_transform(y_test) # 計算度量 print("Train") mae_train, mse_train = metrics_time_series( train_y, train_predict) print("Test") mae_test, mse_test = metrics_time_series( test_y, test_predict) return train_predict, train_y, test_predict, test_y, pd.DataFrame([name, mae_train, mse_train, mae_test, mse_test], index=["Name", "mae_train", "mse_train", "mae_test", "mse_test"]).T此函數計算訓練部分和測試部分的度量,並以數據幀的形式返回結果。舉五個例子。
LSTM
# 訓練模型並計算度量> x_train_predict_lstm, y_train_lstm,x_test_predict_lstm, y_test_lstm, res= time_series_deep_learning(train_x, train_y, test_x, test_y, model_dl=LSTM , unit=12, look_back=20)# 畫出預測的結果> plotting_predictions(dataset, look_back, x_train_predict_lstm, x_test_predict_lstm)# 將每個模型的指標保存在數據框df_results中> df_results = df_results.append(res)GRU
# 訓練模型並計算度量> x_train_predict_lstm, y_train_lstm,x_test_predict_lstm, y_test_lstm, res= time_series_deep_learning(train_x, train_y, test_x, test_y, model_dl=GRU, unit=12, look_back=20)堆疊LSTM:
# 訓練模型並計算度量> x_train_predict_lstm, y_train_lstm,x_test_predict_lstm, y_test_lstm, res= time_series_deep_learning(train_x, train_y, test_x, test_y, model_dl=LSTM , unit=12, look_back=20, stacked=True)雙向LSTM:
# 訓練模型並計算度量> x_train_predict_lstm, y_train_lstm,x_test_predict_lstm, y_test_lstm, res= time_series_deep_learning(train_x, train_y, test_x, test_y, model_dl=LSTM , unit=12, look_back=20, bidirection=True)CNN-LSTM:
# 訓練模型並計算度量> x_train_predict_lstm, y_train_lstm,x_test_predict_lstm, y_test_lstm, res= time_series_deep_learning(train_x, train_y, test_x, test_y, model_dl=LSTM , unit=12, look_back=20, cnn=True)結果
考慮到這些數據,結果相當不錯。我們可以看出,深度學習RNN可以很好地再現數據的準確性。下圖顯示了LSTM模型的預測結果。
表1:不同RNN模型的結果,顯示了MAE和MSE指標