隨機森林簡介
隨機森林是機器學習一種常用的方法。它是以決策樹為基礎,用隨機的方式排列建立的,森林裡每個決策樹之間都是沒有關聯的。 在得到森林之後,當有一個新的輸入樣本進入的時候,就讓森林中的每一棵決策樹分別進行一下判斷,看看這個樣本應該屬於哪一類(對於分類算法),然後看看哪一類被選擇最多,就預測這個樣本為那一類。隨機森林可以用來進行無監督學習聚類和異常點檢測。決策樹(decision tree)是一個樹結構(可以是二叉樹或非二叉樹)。其每個非葉節點表示一個特徵屬性上的測試,每個分支代表這個特徵屬性在某個值域上的輸出,而每個葉節點存放一個類別。使用決策樹進行決策的過程就是從根節點開始,測試待分類項中相應的特徵屬性,並按照其值選擇輸出分支,直到到達葉子節點,將葉子節點存放的類別作為決策結果。詳情請前往先前的推送。
算法流程
隨機森林裡每棵樹按照如下規則生成:
1、如果訓練集大小為N,對於每棵樹而言,隨機且有放回地從訓練集中的抽取N個訓練樣本,作為該樹的訓練集;PS:從這裡我們可以知道:每棵樹的訓練集都是不同的,而且裡面包含重複的訓練樣本。
2、如果每個樣本的特徵維度為M,指定一個常數m<<M,隨機地從M個特徵中選取m個特徵子集,每次樹進行分裂時,從這m個屬性中採用某種策略(比如說信息增益)來選擇1個屬性作為該節點的分裂屬性。3、每棵樹都盡最大程度的生長,並且沒有剪枝過程。4、 按照步驟1~3建立大量的決策樹,這樣就構成了隨機森林了。一開始我們提到的隨機森林中的「隨機」就是指的這裡的兩個隨機性。兩個隨機性的引入對隨機森林的分類性能至關重要。由於它們的引入,使得隨機森林不容易陷入過擬合,並且具有很好得抗噪能力(比如:對預設值不敏感)。
隨機森林優點
它能夠處理很高維度(feature很多)的數據,並且不用做特徵選擇(因為特徵子集是隨機選擇的)。在創建隨機森林的時候,對generlization error使用的是無偏估計,模型泛化能力強。訓練速度快,容易做成並行化方法(訓練時樹與樹之間是相互獨立的)。在訓練過程中,能夠檢測到feature間的互相影響。對於不平衡的數據集來說,它可以平衡誤差。隨機森林(Random Forest,簡稱RF)主要應用於
回歸和分類
,且擁有廣泛的應用前景,從市場營銷到醫療保健保險,既可以用來做市場營銷模擬的建模,統計客戶來源,保留和流失,也可用來預測疾病的風險和病患者的易感性。最近幾年的國內外大賽,包括2013年百度校園電影推薦系統大賽、2014年阿里巴巴天池大數據競賽以及Kaggle數據科學競賽,參賽者對隨機森林的使用佔有相當高的比例。而且隨機森林在運算量沒有顯著提高的情況下精度得到了很大的改善。
代碼實現
隨機森林實現分類算法
import pandas as pdimport numpy as npimport randomimport mathimport collectionsfrom sklearn.externals.joblib import Parallel, delayedclass Tree(object):"""定義一棵決策樹"""def __init__(self): self.split_feature = None self.split_value = None self.leaf_value = None self.tree_left = None self.tree_right = None def calc_predict_value(self, dataset):"""通過遞歸決策樹找到樣本所屬葉子節點"""if self.leaf_value is not None:return self.leaf_valueelif dataset[self.split_feature] <= self.split_value:return self.tree_left.calc_predict_value(dataset)else:return self.tree_right.calc_predict_value(dataset) def describe_tree(self):"""以json形式列印決策樹,方便查看樹結構"""if not self.tree_left and not self.tree_right: leaf_info = "{leaf_value:" + str(self.leaf_value) + "}"return leaf_info left_info = self.tree_left.describe_tree() right_info = self.tree_right.describe_tree() tree_structure = "{split_feature:" + str(self.split_feature) + \",split_value:" + str(self.split_value) + \",left_tree:" + left_info + \",right_tree:" + right_info + "}"return tree_structureclass RandomForestClassifier(object): def __init__(self, n_estimators=10, max_depth=-1, min_samples_split=2, min_samples_leaf=1, min_split_gain=0.0, colsample_bytree=None, subsample=0.8, random_state=None):""" 隨機森林參數 n_estimators: 樹數量 max_depth: 樹深度,-1表示不限制深度 min_samples_split: 節點分裂所需的最小樣本數量,小於該值節點終止分裂 min_samples_leaf: 葉子節點最少樣本數量,小於該值葉子被合併 min_split_gain: 分裂所需的最小增益,小於該值節點終止分裂 colsample_bytree: 列採樣設置,可取[sqrt、log2]。sqrt表示隨機選擇sqrt(n_features)個特徵, log2表示隨機選擇log(n_features)個特徵,設置為其他則不進行列採樣 subsample: 行採樣比例 random_state: 隨機種子,設置之後每次生成的n_estimators個樣本集不會變,確保實驗可重複 """ self.n_estimators = n_estimators self.max_depth = max_depth if max_depth != -1 elsefloat('inf') self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.min_split_gain = min_split_gain self.colsample_bytree = colsample_bytree self.subsample = subsample self.random_state = random_state self.trees = None self.feature_importances_ = dict() def fit(self, dataset, targets):"""模型訓練入口""" assert targets.unique().__len__() == 2, "There must be two class for targets!" targets = targets.to_frame(name='label')if self.random_state: random.seed(self.random_state) random_state_stages = random.sample(range(self.n_estimators), self.n_estimators)# 兩種列採樣方式if self.colsample_bytree == "sqrt": self.colsample_bytree = int(len(dataset.columns) ** 0.5)elif self.colsample_bytree == "log2": self.colsample_bytree = int(math.log(len(dataset.columns)))else: self.colsample_bytree = len(dataset.columns)# 並行建立多棵決策樹 self.trees = Parallel(n_jobs=-1, verbose=0, backend="threading")( delayed(self._parallel_build_trees)(dataset, targets, random_state)for random_state in random_state_stages) def _parallel_build_trees(self, dataset, targets, random_state):"""bootstrap有放回抽樣生成訓練樣本集,建立決策樹""" subcol_index = random.sample(dataset.columns.tolist(), self.colsample_bytree) dataset_stage = dataset.sample(n=int(self.subsample * len(dataset)), replace=True, random_state=random_state).reset_index(drop=True) dataset_stage = dataset_stage.loc[:, subcol_index] targets_stage = targets.sample(n=int(self.subsample * len(dataset)), replace=True, random_state=random_state).reset_index(drop=True) tree = self._build_single_tree(dataset_stage, targets_stage, depth=0)print(tree.describe_tree())return tree def _build_single_tree(self, dataset, targets, depth):"""遞歸建立決策樹"""# 如果該節點的類別全都一樣/樣本小於分裂所需最小樣本數量,則選取出現次數最多的類別。終止分裂if len(targets['label'].unique()) <= 1 or dataset.__len__() <= self.min_samples_split: tree = Tree() tree.leaf_value = self.calc_leaf_value(targets['label'])return treeif depth < self.max_depth: best_split_feature, best_split_value, best_split_gain = self.choose_best_feature(dataset, targets) left_dataset, right_dataset, left_targets, right_targets = \ self.split_dataset(dataset, targets, best_split_feature, best_split_value) tree = Tree()# 如果父節點分裂後,左葉子節點/右葉子節點樣本小於設置的葉子節點最小樣本數量,則該父節點終止分裂if left_dataset.__len__() <= self.min_samples_leaf or \ right_dataset.__len__() <= self.min_samples_leaf or \ best_split_gain <= self.min_split_gain: tree.leaf_value = self.calc_leaf_value(targets['label'])return treeelse:# 如果分裂的時候用到該特徵,則該特徵的importance加1 self.feature_importances_[best_split_feature] = \ self.feature_importances_.get(best_split_feature, 0) + 1 tree.split_feature = best_split_feature tree.split_value = best_split_value tree.tree_left = self._build_single_tree(left_dataset, left_targets, depth+1) tree.tree_right = self._build_single_tree(right_dataset, right_targets, depth+1)return tree# 如果樹的深度超過預設值,則終止分裂else: tree = Tree() tree.leaf_value = self.calc_leaf_value(targets['label'])return tree def choose_best_feature(self, dataset, targets):"""尋找最好的數據集劃分方式,找到最優分裂特徵、分裂閾值、分裂增益""" best_split_gain = 1 best_split_feature = None best_split_value = Nonefor feature in dataset.columns:if dataset[feature].unique().__len__() <= 100: unique_values = sorted(dataset[feature].unique().tolist())# 如果該維度特徵取值太多,則選擇100個百分位值作為待選分裂閾值else: unique_values = np.unique([np.percentile(dataset[feature], x)for x in np.linspace(0, 100, 100)])# 對可能的分裂閾值求分裂增益,選取增益最大的閾值for split_value in unique_values: left_targets = targets[dataset[feature] <= split_value] right_targets = targets[dataset[feature] > split_value] split_gain = self.calc_gini(left_targets['label'], right_targets['label'])if split_gain < best_split_gain: best_split_feature = feature best_split_value = split_value best_split_gain = split_gainreturn best_split_feature, best_split_value, best_split_gain @staticmethod def calc_leaf_value(targets):"""選擇樣本中出現次數最多的類別作為葉子節點取值""" label_counts = collections.Counter(targets) major_label = max(zip(label_counts.values(), label_counts.keys()))return major_label[1] @staticmethod def calc_gini(left_targets, right_targets):"""分類樹採用基尼指數作為指標來選擇最優分裂點""" split_gain = 0for targets in [left_targets, right_targets]: gini = 1# 統計每個類別有多少樣本,然後計算gini label_counts = collections.Counter(targets)for key in label_counts: prob = label_counts[key] * 1.0 / len(targets) gini -= prob ** 2 split_gain += len(targets) * 1.0 / (len(left_targets) + len(right_targets)) * ginireturn split_gain @staticmethod def split_dataset(dataset, targets, split_feature, split_value):"""根據特徵和閾值將樣本劃分成左右兩份,左邊小於等於閾值,右邊大於閾值""" left_dataset = dataset[dataset[split_feature] <= split_value] left_targets = targets[dataset[split_feature] <= split_value] right_dataset = dataset[dataset[split_feature] > split_value] right_targets = targets[dataset[split_feature] > split_value]return left_dataset, right_dataset, left_targets, right_targets def predict(self, dataset):"""輸入樣本,預測所屬類別""" res = []for _, row in dataset.iterrows(): pred_list = []# 統計每棵樹的預測結果,選取出現次數最多的結果作為最終類別for tree in self.trees: pred_list.append(tree.calc_predict_value(row)) pred_label_counts = collections.Counter(pred_list) pred_label = max(zip(pred_label_counts.values(), pred_label_counts.keys())) res.append(pred_label[1])return np.array(res)if __name__ == '__main__': df = pd.read_csv("source/wine.txt") df = df[df['label'].isin([1, 2])].sample(frac=1, random_state=66).reset_index(drop=True) clf = RandomForestClassifier(n_estimators=5, max_depth=5, min_samples_split=6, min_samples_leaf=2, min_split_gain=0.0, colsample_bytree="sqrt", subsample=0.8, random_state=66) train_count = int(0.7 * len(df)) feature_list = ["Alcohol", "Malic acid", "Ash", "Alcalinity of ash", "Magnesium", "Total phenols", "Flavanoids", "Nonflavanoid phenols", "Proanthocyanins", "Color intensity", "Hue", "OD280/OD315 of diluted wines", "Proline"] clf.fit(df.loc[:train_count, feature_list], df.loc[:train_count, 'label']) from sklearn import metricsprint(metrics.accuracy_score(df.loc[:train_count, 'label'], clf.predict(df.loc[:train_count, feature_list])))print(metrics.accuracy_score(df.loc[train_count:, 'label'], clf.predict(df.loc[train_count:, feature_list])))
現在實現隨機森林基本上都是可以直接調用
sklearn
裡面的API。
from sklearn.ensemble import RandomForestClassifier
參考資料
[1]
https://blog.csdn.net/yangyin007/article/details/82385967
[2]
https://zhuanlan.zhihu.com/p/22097796
[3]
https://github.com/zhaoxingfeng/RandomForest/blob/master/RandomForestClassification.py