基於Pyspark的銷量預測

2021-02-20 Fitzgerald

 本文闡述基於Pyspark的sql數據讀取、特徵處理、尋找最優參數、使用最優參數預測未來銷量的全過程,重在預測流程和Pyspark相關知識點的講解,展示可供企業級開發落地的demo。

01 數據讀取與預處理

1.1 數據讀取

df = spark.sql("""    select store_code,goods_code,ds,qty as label  from xxx.store_sku_sale  where ds>='2020-05-22' and store_code in ('Z001','Z002')    """)


1.2 特徵生成

1).dayofweek等函數是從from pyspark.sql.functions import *中functions的類而來;

2).數據此時是spark.dataframe格式,用類sql的形式進行操作;

3).withColumn函數為新增一列;

4).為說明問題簡化了特徵預處理,只是使用是否月末和星期的OneHotEncoder作為特徵。

df = df.withColumn('dayofweek', dayofweek('ds'))df = df.withColumn("dayofweek", df["dayofweek"].cast(StringType()))# 是否月末編碼df = df.withColumn('day', dayofmonth('ds'))df = df.withColumn('day', df["day"].cast(StringType()))df = df.withColumn('month_end', when(df['day'] <= 25, 0).otherwise(1))# 星期編碼--將星期轉化為了0-1變量,從周一至周天dayofweek_ind = StringIndexer(inputCol='dayofweek', outputCol='dayofweek_index')dayofweek_ind_model = dayofweek_ind.fit(df)dayofweek_ind_ = dayofweek_ind_model.transform(df)onehotencoder = OneHotEncoder(inputCol='dayofweek_index',                                           outputCol='dayofweek_Vec')df = onehotencoder.transform(dayofweek_ind_)

1.3 數據集的劃分

此時產生的dayofweek_Vec是一個向量,在時序領域,統計特徵非常重要,比如mean,std,這些特徵是可以由sql來完成,但one_hot這類特徵使用sql可能乏力,於是可以藉助pyspark.ml中的特徵處理模塊,如果還是無法很好的處理特徵,便需要藉助numpy等使用spark自定義函數udf進行操作。還需留意一點,pyspark.ml中默認的輸入特徵是轉換後名為features的稠密向量(DenseVector),也就是多行row合併在一起,另外,作為有監督學習,和features對應的標籤默認名為label。

inputCols = [    "dayofweek_Vec",    "month_end"]assembler = VectorAssembler(inputCols=inputCols, outputCol="features")#數據集劃分,此時是隨機切分,不考慮時間的順序train_data_1, test_data_1 = df.randomSplit([0.7, 0.3])train_data=assembler.transform(train_data_1)test_data = assembler.transform(test_data_1)

02 模型構建和調優

下面我們以最簡單的回歸模型作為演示。

2.1 設置參數空間

線性回歸模型最重要的三個參數為:regParam--正則係數;fitIntercept--是否帶截距elasticNetParam--彈性網,[0,1]之間,0表示L2;1表示L1;此時我們為每個參數構建一個參數空間。

lr_params = ({'regParam': 0.00}, {'fitIntercept': True}, {'elasticNetParam': 0.5})lr = LinearRegression(maxIter=100, regParam=lr_params[0]['regParam'], \                      fitIntercept=lr_params[1]['fitIntercept'], \                      elasticNetParam=lr_params[2]['elasticNetParam'])
lrParamGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.005, 0.01, 0.1, 0.5]) \ .addGrid(lr.fitIntercept, [False, True]) \ .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5, 1.0]) \ .build()

2.2 交叉驗證

使用五折交叉驗證,在備選空間上尋找最優參數,此時的lr_best_params為向量,可以使用type(lr_best_params)查看該對象的數據類型。

cross_valid = CrossValidator(estimator=lr, estimatorParamMaps=lrParamGrid, evaluator=RegressionEvaluator(),                          numFolds=5)
cvModel = cross_valid.fit(train_data)
best_parameters = [( [{key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())], metric) \ for params, metric in zip( cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)]
lr_best_params = sorted(best_parameters, key=lambda el: el[1], reverse=True)[0]

2.3 dataframe轉換

下面借用pd.DataFrame把以上關鍵參數轉換為結構化數據,方便後面直接轉換為spark.dataframe。為了檢查使用最優參數前面的評價指標,如mape是否有下降,是可以把類似的這些參數一併寫入資料庫。

pd_best_params = pd.DataFrame({    'regParam':[lr_best_params[0][0]['regParam']],    'fitIntercept':[lr_best_params[0][1]['fitIntercept']],    'elasticNetParam':[lr_best_params[0][2]['elasticNetParam']]})pd_best_params['update_date'] = todaypd_best_params['update_time'] = update_timepd_best_params['model_type'] = 'linear'

以上交叉驗證自然是比較費時的,也可以使用sample函數隨機抽取一定比例的數據放入模型中。

2.4 dataframe最優參數保存至資料庫

pd.DataFrame-->spark.dataframe,以追加的形式寫入hive,得到的最優參數供後續模型預測使用。

spark.createDataFrame(pd_best_params).write.mode("append").format('hive').saveAsTable(    'temp.regression_model_best_param')

03 讀取預測數據集和最佳參數

3.1 生成並讀取預測數據集

通過union合併真實銷售數據,並使用join on 1=1產生門店/商品/時間維度的笛卡爾集。

df = spark.sql(f"""    select store_code,goods_code,ds,qty   from xxx.test_store_sale   where ds>='{prev28_day}' and ds<'{today}'    union    select s.store_code,s.goods_code,d.ds,0 as qty    from    (select stat_date as ds from xxl.dim_date where stat_date<'{after7_day}' and   stat_date>='{today}') d    join    (select    distinct    store_code,goods_code    from xxx.test_store_sale    ) s on 1=1""")


3.2 讀取最佳參數

僅以regparam參數為例,把從sql中讀取出來的數據轉化為標量,然後轉換為可供模型函數調用的實際參數值,(因表中數據很小所以使用了collect)。

best_param_set=spark.sql(f"select regparam,fitIntercept, elasticNetParam from scmtemp.regression_model_best_param order by update_date desc,update_time desc limit 1 ").collect()reg_vec=best_param_set.select('regparam')reg_b= [row.regparam for row in reg_vec][0]reg_b=float(reg_b)

04 模型預測並寫入sql

在上文的交叉驗證階段我們對數據集的劃分為形式為random,在預測階段,需按照指定時間劃分。

train_data=df.where(df['ds'] <today)test_data=df.where(df['ds'] >=today)train_mod01 = assembler.transform(train_data)train_mod02 = train_mod01.selectExpr("features","qty as label")
test_mod01 = assembler.transform(test_data)test_mod02 = test_mod01.select("store_code","goods_code","ds","features")
# build train the modellr = LinearRegression(maxIter=100,regParam=reg_b, fitIntercept=inter_b,elasticNetParam=elastic_b, solver="normal")model = lr.fit(train_mod02)predictions = model.transform(test_mod02)predictions.select("store_code","goods_code","ds","prediction").show(5)test_store_predict=predictions.select("store_code","goods_code","ds","prediction").createOrReplaceTempView('test_store_predict')spark.sql(f"""create table xxx.regression_test_store_predict as select * from test_store_predict""")


在交叉驗證階段使用的是evaluate函數放入測試集進行模型評估,在正式的預測場景使用的是transform來預測,如果預放入模型的特徵已經轉換為名為features的向量,在transform預測階段放入的數據是可以帶入時間和store_code,sku_code等列,預測輸出默認列名為prediction。


 結語

以上流程其實是兩個階段,分別為模型交叉驗證尋找最優參數與使用最優參數預測訓練,其中,尋找最優參數階段,雖然我們已經用到了spark這個大數據處理利器,但是參數空間和本身放入的數據往往都不小,所以在實際使用過程中,出於計算性能考慮和實際需要,最優參數更新的頻率可能低於模型預測周期,比如,因每天產生銷售數據,預測未來的模型是每天執行,但是最優參數的更新周期可能是一周執行一次,除了節省資源考慮,還有交叉驗證得到的最優參數往往會較為穩定。

這就是本文分享的Pyspark銷量預測全流程,為書寫編輯便利,合併和簡化了某些步驟,比如特徵處理,只是點到其中關鍵點,如興趣可以沿著帶過的知識點擴增其他,最後,歡迎交流指正。


(本文涉及到的全部代碼請點擊文末的閱讀原文


參考:

1.http://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf>

2.http://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html



相關焦點

  • Pyspark推薦算法實戰(一)
    前言由於最近轉向商品推薦的工作,因此從本文起,開始介紹一些利用pyspark在推薦算法中的具體應用。目前最新版本的spark已經出到2.4.5,我們主要使用的還是2.3版本,所以後續涉及代碼開發,全部基於2.3.4版本。
  • 手把手教你用PySpark構建機器學習模型
    這個數據集來自美國國家糖尿病與消化與腎病研究所,分類目標是預測病人是否得了糖尿病(是/否)。預測變量包括懷孕次數、BMI(譯者註:Body Mass Index,身體質量指數)、胰島素水平、年齡等等。同樣的,隨機森林算法基於數據樣本創建決策樹,然後從每個決策樹獲得預測,最後通過投票的方式來選擇最佳方案。這是一種比單一決策樹更好的集成方法,因為它通過平均所有結果來減少過度擬合。
  • pyspark操作MongoDB
    有幾點需要注意的:不要安裝最新的pyspark版本,請安裝`pip3 install pyspark==2.3.2`
  • 數據分析工具篇——pyspark應用詳解
    不是的~現階段流批一體盛行,Flink也逐漸進入大家的視野,大有發展壯大的趨勢,我們後面會單獨講解這一工具,這篇文章我們重點講解一下基於spark運算的pyspark工具。pyspark不是所有的代碼都在spark環境應用,可以將一些主要的運算單元切到spark環境運算完成,然後輸出運算結果到本地,最後在本地運行一些簡單的數據處理邏輯。
  • PySpark 原理解析
    代碼實現在 python/pyspark/context.py:def _ensure_initialized(cls, instance=None, gateway=None, conf=None): """ Checks whether a SparkContext is initialized or not.
  • PySpark 之Spark DataFrame入門
    header=True)import pandas as pddf_pd=pd.read_csv("iris.csv")# import pysparkclass Row from module sqlfrom pyspark.sql import *# Create Example Data - Departments and Employees# Create the Departmentsdepartment1 = Row(id='123456', name='Computer Science
  • PySpark工作原理
    create --clone base -n test% source activate test% conda install pyspark=2.4.4% conda install openjdk% conda install jupyterlab% jupyter-lab到此會啟動一個基於瀏覽器的開發環境
  • 開窗函數之累積和,PySpark,Pandas和SQL版實現
    PySpark DataFrame版本from pyspark import SparkConfconf=SparkConf()conf.set("spark.sql.execution.arrow.enabled", "true")from pyspark.sql import SparkSessionspark
  • 年銷量爭霸賽:預測!2019年,銷量前20的品牌銷量排名曝光?
    但具有品牌競爭力和綜合實力的企業突飛猛進,群雄爭霸,究竟誰能完成銷量目標,捍衛自己的江湖地位?旺季市場,還在火熱進行中,而距離年底只剩4個多月,《電動車風雲》現推出「年銷量爭霸賽」系列文章,分析預測2019年行業前20名品牌銷量。
  • PySpark源碼解析,用Python調用高效Scala接口,搞定大規模數據分析
    代碼實現在 python/pyspark/context.py:def_ensure_initialized(cls,instance=None,gateway=None,conf=None):"""CheckswhetheraSparkContextisinitializedornot.ThrowserrorifaSparkContextisalreadyrunning
  • 揭秘天貓商家銷量預測神器!準確度超90%,現實版「水晶球」來了
    揭秘天貓商家銷量預測神器!如果銷量預測相差比較大,可能會導致缺貨或貨物積壓。小熊電器面臨的挑戰,也是絕大多數品牌商家的痛點。天貓為此推出了全球首個銷售精準預測平臺,預測消費正在成為可能。5月8日,2018天貓TES(天貓消費電子生態峰會)上,天貓智慧供應鏈發布行業首個可精準預測銷量的產品「水晶球」。
  • 獨家 | PySpark和SparkSQL基礎:如何利用Python編程執行Spark(附代碼)
    第二步:在Anaconda Prompt終端中輸入「conda install pyspark」並回車來安裝PySpark包。第三步:在Anaconda Prompt終端中輸入「conda install pyarrow」並回車來安裝PyArrow包。當PySpark和PyArrow包安裝完成後,僅需關閉終端,回到Jupyter Notebook,並在你代碼的最頂部導入要求的包。
  • 基於CNN預測股票漲跌幅
    本文我們基於CNN進行建模單變量時間序列,預測股票的漲跌。關於神經網絡學習和實現,大家可以參考我們公眾號上推出的一些文章。
  • 【PySpark源碼解析】教你用Python調用高效Scala接口
    代碼實現在 python/pyspark/context.py:def _ensure_initialized(cls, instance=None, gateway=None, conf=None):    """    Checks whether a SparkContext is initialized or not.
  • 天貓智慧供應鏈「水晶球」來了 能為商家預測未來銷量精準度超90%
    全新亮相的天貓消費電子事業組宣布,將通過打通閒魚、支付寶、優酷等全消費場景,降低獲客成本;提供供應鏈金融,縮短商家帳期;發布全球首個銷量精準預測產品,提升供應鏈流轉效率,持續打造球消費電子類最佳經營環境。
  • 銷量英語怎麼說?
    企業產品銷售量模糊預測研究.3、In this article a new sales volume forecast model is set up.建立了一個新的銷量預測模型。一種產品銷售量預測及管理系統的設計與實現.5、Analysis of sales volume of green textile with combinatorial forecast method.組合預測法分析綠色紡織品的市場銷量.
  • 一文讀懂 PySpark 數據框
    描述指定列如果我們要看一下數據框中某指定列的概要信息,我們會用describe方法。這個方法會提供我們指定列的統計概要信息,如果沒有指定列名,它會提供這個數據框對象的統計信息。執行SQL查詢我們還可以直接將SQL查詢語句傳遞給數據框,為此我們需要通過使用registerTempTable方法從數據框上創建一張表,然後再使用sqlContext.sql()來傳遞SQL查詢語句。
  • PySpark源碼解析,教你用Python調用高效Scala接口,搞定大規模數據分析
    代碼實現在 python/pyspark/context.py:def _ensure_initialized(cls, instance=None, gateway=None, conf=None):    """    Checks whether a SparkContext is initialized or not.
  • 算法預測銷量準確率90%!智能決策公司ToB,想用AI做人想不到的事
    有這樣一家AI技術公司在用深度學習等算法,為各類公司進行企業決策,這戰績還真不少:為大型藥廠預測流感藥銷量,從數百個影響因素中挑出70個構建深度學習模型,準確率達到了80%。連續籤約了四家大型車企,運用算法幫助他們提升銷量。
  • 疫情下的汽車銷量預測:未來半年或同比下滑14%
    時代商學院通過SARIMA模型(季節性自回歸移動平均模型)對2020年的汽車銷量進行預測,預測結果顯示,今年3月份我國汽車銷量或同比下降35%,未來6個月總銷量或同比下降14%。2018年我國汽車銷量出現拐點,自入世後首現負增長,同比下降2.76%。而2019年我國汽車銷量同比下降8.2%,跌幅擴大5.4個百分點。   未來六個月預計總銷量同比下降14%。從2004年至今我國汽車銷量總體呈上升趨勢,且其波動具有周期性,因此建立SARIMA模型對數據進行擬合,並對未來六月汽車銷量做出預測。