「 本文闡述基於Pyspark的sql數據讀取、特徵處理、尋找最優參數、使用最優參數預測未來銷量的全過程,重在預測流程和Pyspark相關知識點的講解,展示可供企業級開發落地的demo。」
01 數據讀取與預處理
1.1 數據讀取
df = spark.sql(""" select store_code,goods_code,ds,qty as label from xxx.store_sku_sale where ds>='2020-05-22' and store_code in ('Z001','Z002') """)
1.2 特徵生成
1).dayofweek等函數是從from pyspark.sql.functions import *中functions的類而來;
2).數據此時是spark.dataframe格式,用類sql的形式進行操作;
3).withColumn函數為新增一列;
4).為說明問題簡化了特徵預處理,只是使用是否月末和星期的OneHotEncoder作為特徵。
df = df.withColumn('dayofweek', dayofweek('ds'))df = df.withColumn("dayofweek", df["dayofweek"].cast(StringType()))# 是否月末編碼df = df.withColumn('day', dayofmonth('ds'))df = df.withColumn('day', df["day"].cast(StringType()))df = df.withColumn('month_end', when(df['day'] <= 25, 0).otherwise(1))# 星期編碼--將星期轉化為了0-1變量,從周一至周天dayofweek_ind = StringIndexer(inputCol='dayofweek', outputCol='dayofweek_index')dayofweek_ind_model = dayofweek_ind.fit(df)dayofweek_ind_ = dayofweek_ind_model.transform(df)onehotencoder = OneHotEncoder(inputCol='dayofweek_index', outputCol='dayofweek_Vec')df = onehotencoder.transform(dayofweek_ind_)1.3 數據集的劃分
此時產生的dayofweek_Vec是一個向量,在時序領域,統計特徵非常重要,比如mean,std,這些特徵是可以由sql來完成,但one_hot這類特徵使用sql可能乏力,於是可以藉助pyspark.ml中的特徵處理模塊,如果還是無法很好的處理特徵,便需要藉助numpy等使用spark自定義函數udf進行操作。還需留意一點,pyspark.ml中默認的輸入特徵是轉換後名為features的稠密向量(DenseVector),也就是多行row合併在一起,另外,作為有監督學習,和features對應的標籤默認名為label。
inputCols = [ "dayofweek_Vec", "month_end"]assembler = VectorAssembler(inputCols=inputCols, outputCol="features")#數據集劃分,此時是隨機切分,不考慮時間的順序train_data_1, test_data_1 = df.randomSplit([0.7, 0.3])train_data=assembler.transform(train_data_1)test_data = assembler.transform(test_data_1)02 模型構建和調優
下面我們以最簡單的回歸模型作為演示。
2.1 設置參數空間
線性回歸模型最重要的三個參數為:regParam--正則係數;fitIntercept--是否帶截距elasticNetParam--彈性網,[0,1]之間,0表示L2;1表示L1;此時我們為每個參數構建一個參數空間。
lr_params = ({'regParam': 0.00}, {'fitIntercept': True}, {'elasticNetParam': 0.5})lr = LinearRegression(maxIter=100, regParam=lr_params[0]['regParam'], \ fitIntercept=lr_params[1]['fitIntercept'], \ elasticNetParam=lr_params[2]['elasticNetParam'])
lrParamGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.005, 0.01, 0.1, 0.5]) \ .addGrid(lr.fitIntercept, [False, True]) \ .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5, 1.0]) \ .build()2.2 交叉驗證
使用五折交叉驗證,在備選空間上尋找最優參數,此時的lr_best_params為向量,可以使用type(lr_best_params)查看該對象的數據類型。
cross_valid = CrossValidator(estimator=lr, estimatorParamMaps=lrParamGrid, evaluator=RegressionEvaluator(), numFolds=5)
cvModel = cross_valid.fit(train_data)
best_parameters = [( [{key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())], metric) \ for params, metric in zip( cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)]
lr_best_params = sorted(best_parameters, key=lambda el: el[1], reverse=True)[0]2.3 dataframe轉換
下面借用pd.DataFrame把以上關鍵參數轉換為結構化數據,方便後面直接轉換為spark.dataframe。為了檢查使用最優參數前面的評價指標,如mape是否有下降,是可以把類似的這些參數一併寫入資料庫。
pd_best_params = pd.DataFrame({ 'regParam':[lr_best_params[0][0]['regParam']], 'fitIntercept':[lr_best_params[0][1]['fitIntercept']], 'elasticNetParam':[lr_best_params[0][2]['elasticNetParam']]})pd_best_params['update_date'] = todaypd_best_params['update_time'] = update_timepd_best_params['model_type'] = 'linear'以上交叉驗證自然是比較費時的,也可以使用sample函數隨機抽取一定比例的數據放入模型中。
2.4 dataframe最優參數保存至資料庫
pd.DataFrame-->spark.dataframe,以追加的形式寫入hive,得到的最優參數供後續模型預測使用。
spark.createDataFrame(pd_best_params).write.mode("append").format('hive').saveAsTable( 'temp.regression_model_best_param')03 讀取預測數據集和最佳參數3.1 生成並讀取預測數據集
通過union合併真實銷售數據,並使用join on 1=1產生門店/商品/時間維度的笛卡爾集。
df = spark.sql(f""" select store_code,goods_code,ds,qty from xxx.test_store_sale where ds>='{prev28_day}' and ds<'{today}' union select s.store_code,s.goods_code,d.ds,0 as qty from (select stat_date as ds from xxl.dim_date where stat_date<'{after7_day}' and stat_date>='{today}') d join (select distinct store_code,goods_code from xxx.test_store_sale ) s on 1=1""")
3.2 讀取最佳參數
僅以regparam參數為例,把從sql中讀取出來的數據轉化為標量,然後轉換為可供模型函數調用的實際參數值,(因表中數據很小所以使用了collect)。
best_param_set=spark.sql(f"select regparam,fitIntercept, elasticNetParam from scmtemp.regression_model_best_param order by update_date desc,update_time desc limit 1 ").collect()reg_vec=best_param_set.select('regparam')reg_b= [row.regparam for row in reg_vec][0]reg_b=float(reg_b)04 模型預測並寫入sql在上文的交叉驗證階段我們對數據集的劃分為形式為random,在預測階段,需按照指定時間劃分。
train_data=df.where(df['ds'] <today)test_data=df.where(df['ds'] >=today)train_mod01 = assembler.transform(train_data)train_mod02 = train_mod01.selectExpr("features","qty as label")
test_mod01 = assembler.transform(test_data)test_mod02 = test_mod01.select("store_code","goods_code","ds","features")
# build train the modellr = LinearRegression(maxIter=100,regParam=reg_b, fitIntercept=inter_b,elasticNetParam=elastic_b, solver="normal")model = lr.fit(train_mod02)predictions = model.transform(test_mod02)predictions.select("store_code","goods_code","ds","prediction").show(5)test_store_predict=predictions.select("store_code","goods_code","ds","prediction").createOrReplaceTempView('test_store_predict')spark.sql(f"""create table xxx.regression_test_store_predict as select * from test_store_predict""")
在交叉驗證階段使用的是evaluate函數放入測試集進行模型評估,在正式的預測場景使用的是transform來預測,如果預放入模型的特徵已經轉換為名為features的向量,在transform預測階段放入的數據是可以帶入時間和store_code,sku_code等列,預測輸出默認列名為prediction。
結語
以上流程其實是兩個階段,分別為模型交叉驗證尋找最優參數與使用最優參數預測訓練,其中,尋找最優參數階段,雖然我們已經用到了spark這個大數據處理利器,但是參數空間和本身放入的數據往往都不小,所以在實際使用過程中,出於計算性能考慮和實際需要,最優參數更新的頻率可能低於模型預測周期,比如,因每天產生銷售數據,預測未來的模型是每天執行,但是最優參數的更新周期可能是一周執行一次,除了節省資源考慮,還有交叉驗證得到的最優參數往往會較為穩定。
這就是本文分享的Pyspark銷量預測全流程,為書寫編輯便利,合併和簡化了某些步驟,比如特徵處理,只是點到其中關鍵點,如興趣可以沿著帶過的知識點擴增其他,最後,歡迎交流指正。
(本文涉及到的全部代碼請點擊文末的閱讀原文)
參考:
1.http://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf>
2.http://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html