本節使用決策樹二元分類分析StumbleUpon數據集,預測網頁是暫時性的(ephemeral)或是長青的(evergreen),並調校參數找出最佳參數組合,提高預測準確度。
StumbleUpon是一個個性化的搜尋引擎,會按用戶的興趣和網頁評分等記錄推薦給你感興趣的網頁,有些網頁是暫時性的,比如新聞,這些文章可能只是在某一段時間會對讀者有意義,而有些則是長青的,讀者會對這些文章有長久興趣
我們這節的目標就是利用決策樹二元分類機器學習,建立模型,並用這個模型來預測網頁是屬於暫時還是長青的,這屬於簡單的二元分類問題。
到這個網址去查看數據https://www.kaggle.com/c/stumbleupon/data
註:下載數據需要註冊,註冊時需要科學上網,才能加載到驗證API
cp train.tsv ~/pythonwork/datacp test.tsv ~/pythonwork/datahadoop fs -put *.tsv /user/hduser/data
必須將原始數據集提取特徵欄位與標籤欄位,建立訓練所需的數據格式LabeledPoint,以隨機方式按照8:1:1分為三個部分:訓練數據集、驗證數據集、測試數據集
訓練數據集:trainData:以此數據訓練模型
驗證數據集:validationData:作為評估模型使用
測試數據集:testData:作為測試數據使用
HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop pyspark --master yarn --deploy-mode client
rawDataWithHeader = sc.textFile("/user/hduser/stum/data/train.tsv")
註:文件夾需要讀者自己創建
從數據文件上看存在以下幾個問題:
1、第一項數據是欄位名
2、每一項數據以"\t"分隔欄位
3、有些欄位無數據,用?代替了
以下是解決步驟
header = rawDataWithHeader.first()rawData = rawDataWithHeader.filter(lambda x:x !=header)
rData = rawData.map(lambda x: x.replace("\"", ""))
lines = rData.map(lambda x: x.split("\t"))
import numpy as npdef extract_features(field,categoriesMap,featureEnd): # 提取分類特徵欄位 categoryIdx = categoriesMap[field[3]] # 網頁分類轉換為數值 categoryFeatures = np.zeros(len(categoriesMap)) # 初始化categoryFeatures categoryFeatures[categoryIdx] = 1 # 設置List相對應的位置是1 # 提取數值欄位 numericalFeatures=[convert_float(field) for field in field[4:featureEnd]] # 返回「分類特徵欄位」 + 「數值特徵欄位」 return np.concatenate((categoryFeatures,numericalFeatures))def convert_float(x): # 判斷是否為空值數據,如果是返回數值0,不是就轉換為float return (0 if x=="?" else float(x))
categoriesMap = lines.map(lambda fields: fields[3]).distinct().zipWithIndex().collectAsMap()
def extract_label(field): label=field[-1] return float(label)
該函數傳入field參數是單項數據,field[-1]獲取最後一個欄位,也就是label欄位,最後返回float(label)轉換為float之後的label
from pyspark.mllib.regression import LabeledPointlabelpointRDD = lines.map( lambda r: LabeledPoint( extract_label(r), extract_features(r,categoriesMap,len(r) - 1)))
(trainData,validationData,testData) = labelpointRDD.randomSplit([8,1,1])
def PrepareData(sc): global Path if sc.master[0:5] == "local": Path = "file:/root/pythonwork/stum/" else: Path = "hdfs://master:9000/user/hduser/stum/" print("Loading data.....") rawDataWithHeader = sc.textFile(Path+"data/train.tsv") header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) rData = rawData.map(lambda x: x.replace("\"", "")) lines = rData.map(lambda x: x.split("\t")) print("Total:"+str(lines.count())+" item") categoriesMap = lines.map(lambda fields: fields[3]).distinct().zipWithIndex().collectAsMap() labelpointRDD = lines.map( lambda r: LabeledPoint( extract_label(r), extract_features(r,categoriesMap,len(r) - 1))) (trainData,validationData,testData) = labelpointRDD.randomSplit([8,1,1]) return (trainData,validationData,testData,categoriesMap)
(trainData,validationData,testData,categoriesMap) = PrepareData(sc)
trainData.persist()validationData.persist()testData.persist()
from pyspark.mllib.tree import DecisionTreemodel = DecisionTree.trainClassifier(\ trainData, numClasses=2, categoricalFeaturesInfo={},\ impurity="entropy",maxDepth=5,maxBins=5)
建立模型後,可以使用此模型預測test.tsv數據。test.tsv只有feature,使用此特徵欄位預測網頁是暫時的或是長青的
def PredictData(sc,model,categoriesMap): print("Loading data...") global Path if sc.master[0:5] == "local": Path = "file:/root/pythonwork/stum/" else: Path = "hdfs://master:9000/user/alex/stum/" print("Loading data.....") rawDataWithHeader = sc.textFile(Path+"data/test.tsv") header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) rData = rawData.map(lambda x: x.replace("\"", "")) lines = rData.map(lambda x: x.split("\t")) print("Total:"+str(lines.count())+" item") dataRDD = lines.map(lambda r:(r[0],extract_features(r,categoriesMap,len(r)))) DescDict = { 0:"ephemeral", 1:"evergreen"} for data in dataRDD.take(10): predictResult = model.predict(data[1]) print( "Website: "+str(data[0])+"\n"+\ " ==>Predict: "+str(predictResult)+\ "Notes: "+DescDict[predictResult]+"\n")
print("*******Predicting*********")PredictData(sc,model,categoriesMap)
針對二元分類法
有了TPR、FPR就可以繪出ROC曲線圖
AUC就是ROC曲線下的面積
score = model.predict(validationData.map(lambda p: p.features))scoreAndLabels=score.zip(validationData.map(lambda p:p.label))scoreAndLabels.take(5)
from pyspark.mllib.evaluation import BinaryClassificationMetricsmetrics = BinaryClassificationMetrics(scoreAndLabels)print("AUC="+str(metrics.areaUnderROC))
def evaluateModel(model,validationData) score = model.predict(validationData.map(lambda p: p.features)) scoreAndLabels=score.zip(validationData.map(lambda p:p.label)) BinaryClassificationMetrics metrics = BinaryClassificationMetrics(scoreAndLabels) return(metrics.areaUnderROC)
from time import timedef trainEvaluateModel(trainData,validationData,impurityParm,maxDepthParm,maxBinsParm): startTime = time() model = DescisionTree.trainClassifier(trainData,numClasses=2,categoricalFeaturesInfo={},impurity=impurityParm,maxDepth=maxDepthParm,maxBins=maxBinsParm) AUC=evaluateModel(model,validationData) duration=time()-startTime print( "training evaluate:"+\ "impurity="+str(impurityParm)+\ "maxDepth="+str(maxDepthParm)+\ "maxBins="+str(maxBinsParm)+"\n"+\ "==>duration="+str(duration)+\ "Result AUC="+str(AUC)) return (AUC,duration,impurityParm,maxDepthParm,maxBinsParm,model)
解釋
(AUC,duration,impurityParm,maxDepthParm,maxBinsParm,model)=\ trainEvaluateModel(trainData,validationData,"entropy",5,5)
impurity=["gini","entropy"]maxDepthList=[10]maxBinsList=[10]metric=[trainEvaluateModel(trainData,validationData,impurity,maxDepthList,maxBinsList)] for impurity in impurityList for maxDepth in maxDepthList for maxBins in maxBinsList
import pandas as pdIndexList=impurityListdf = pd.DataFrame(metrics,index=IndexList,columns=['AUC','duration','impurity','maxDepth','maxBins','model'])
def evalParameter(trainData,validation,evalparm,impurityList,maxDepthList,maxBinsList): metric=[trainEvaluateModel(trainData,validationData,impurity,maxDepthList,maxBinsList)] for impurity in impurityList for maxDepth in maxDepthList for maxBins in maxBinsList if evalparm=="impurity": IndexList=impurityList[:] elif evalparm=="maxDepth": IndexList=maxDepthList[:] elif evalparm=="maxBins": IndexList=maxBinsList[:] df = pd.DataFrame(metrics,index=IndexList,columns=['AUC','duration','impurity','maxDepth','maxBins','model']) return df
def evalAllParameter(trainData, validationData,impurityList, maxDepthList, maxBinsList): metric=[trainEvaluateModel(trainData,validationData,impurity,maxDepthList,maxBinsList) for impurity in impurityList for maxDepth in maxDepthList for maxBins in maxBinsList] Smetrics = sorted(metrics,key=lambda k:k[0],reverse=True) bestParameter=Smetrics[0] return bestParameter[5]
調用方式:
evaluateModel(model,testData)
如果這個值與訓練階段相差過大,代表過度訓練,如果相差不大,則代表沒問題