GPU上的隨機森林:比Apache Spark快2000倍

2021-01-09 人工智慧遇見磐創

隨機森林是一種機器學習算法,以其魯棒性、準確性和可擴展性而受到許多數據科學家的信賴。

該算法通過bootstrap聚合訓練出多棵決策樹,然後通過集成對輸出進行預測。由於其集成特徵的特點,隨機森林是一種可以在分布式計算環境中實現的算法。樹可以在集群中跨進程和機器並行訓練,結果比使用單個進程的訓練時間快得多。

在本文中,我們探索了使用Apache Spark在CPU機器集群上實現分布式隨機森林訓練,並將其與使用NVIDIA RAPIDS和Dask的GPU機器集群上的訓練性能進行了比較。

雖然GPU計算傳統上是為深度學習應用而保留的,但RAPIDS是一個在GPU上執行數據處理和非深度學習ML工作的庫,與在cpu上執行相比,它可以大大提高性能。

我們使用3億個實例訓練了一個隨機森林模型:Spark在20個節點CPU集群上耗時37分鐘,而RAPIDS在20個節點GPU集群上耗時1秒。GPU的速度提高了2000倍以上!

實驗概述

我們使用公共可用的紐約計程車數據集,並訓練一個隨機森林回歸器,該回歸器可以使用與乘客接送相關的屬性來預測計程車的票價金額。以2017年、2018年和2019年的計程車出行量為訓練集,共計300700143個實例。

數據集連結:https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Spark和RAPIDS代碼可以在Jupyter Notebook中找到。

硬體

Spark集群使用Amazon EMR進行管理,而Dask/RAPIDS集群則使用Saturn Cloud進行管理。

兩個集群都有20個工作節點,具有以下AWS實例類型:

Spark:r5.2xlarge

8個CPU,64 GB RAM按需價格:0.504美元/小時RAPIDS:g4dn.xlarge

4個CPU,16 GB RAM1個GPU,16 GB GPU RAM(NVIDIA T4)按需價格:0.526美元/小時Saturn Cloud也可以用NVIDIA特斯拉V100 GPU來啟動Dask集群,但我們在這個練習中選擇了g4dn.xlarge,保持與Spark集群相似的小時成本概況。

Spark

Apache Spark是一個在Scala中構建的開源大數據處理引擎,它有一個Python接口,可以調用Scala/JVM代碼。

它是Hadoop處理生態系統中的一個重要組成部分,圍繞MapReduce範例構建,並且具有用於數據幀和機器學習的接口。

設置Spark集群不在本文的討論範圍之內,但是一旦準備好集群,就可以在Jupyter Notebook中運行以下命令來初始化Spark:

import findsparkfindspark.init()from pyspark.sql import SparkSessionspark = (SparkSession .builder .config('spark.executor.memory', '36g') .getOrCreate())findspark包檢測系統上的Spark安裝位置;如果可以知道Spark包的安裝位置,則可能不需要這樣做。

要獲得有性能的Spark代碼,需要設置幾個配置設置,這取決於集群設置和工作流。在這種情況下,我們設置spark.executor.memory以確保我們不會遇到任何內存溢出或Java堆錯誤。

RAPIDS

NVIDIA RAPIDS是一個開源的Python框架,它在gpu而不是cpu上執行數據科學代碼。類似於在訓練深度學習模型時所看到的,這將為數據科學工作帶來巨大的性能提升。

RAPIDS有數據幀、ML、圖形分析等接口。RAPIDS使用Dask來處理與具有多個gpu的機器的並行化,以及每個具有一個或多個gpu的機器集群。

設置GPU機器可能有點棘手,但是Saturn Cloud已經為啟動GPU集群預構建了映像,所以你只需幾分鐘就可以啟動並運行了!要初始化指向群集的Dask客戶端,可以運行以下命令:

from dask.distributed import Clientfrom dask_saturn import SaturnClustercluster = SaturnCluster()client = Client(cluster)要自己設置Dask集群,請參閱此docs頁面:https://docs.dask.org/en/latest/setup.html

數據加載

數據文件託管在一個公共的S3 bucket上,因此我們可以直接從那裡讀取csv。S3 bucket的所有文件都在同一個目錄中,所以我們使用s3fs來選擇我們想要的文件:

import s3fsfs = s3fs.S3FileSystem(anon=True)files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/') if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']使用Spark,我們需要單獨讀取每個CSV文件,然後將它們組合在一起:

import functoolsfrom pyspark.sql.types import *import pyspark.sql.functions as Ffrom pyspark.sql import DataFrame# 手動指定模式,因為read.csv中的inferSchema非常慢schema = StructType([ StructField('VendorID', DoubleType()), StructField('tpep_pickup_datetime', TimestampType()), ... # 參考notebook獲得完整對象模式]) def read_csv(path): df = spark.read.csv(path, header=True, schema=schema, timestampFormat='yyyy-MM-dd HH:mm:ss', ) df = df.select(cols) return dfdfs = []for tf in files: df = read_csv(tf) dfs.append(df)taxi = functools.reduce(DataFrame.unionAll, dfs)taxi.count()使用Dask+RAPIDS,我們可以一次性讀取所有CSV文件:

import dask_cudftaxi = dask_cudf.read_csv(files, assume_missing=True, parse_dates=[1,2], usecols=cols, storage_options={'anon': True})len(taxi)特徵工程

我們將根據時間生成一些特徵,然後保存數據幀。在這兩個框架中,這將執行所有CSV加載和預處理,並將結果存儲在RAM中(在RAPIDS的情況下是GPU RAM)。我們將用於訓練的特徵包括:

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute', 'pickup_week_hour', 'passenger_count', 'VendorID', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID']對於Spark,我們需要將特徵收集到向量類中:

from pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.pipeline import Pipelinetaxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))taxi = taxi.withColumn('label', taxi.total_amount) taxi = taxi.fillna(-1)assembler = VectorAssembler( inputCols=features, outputCol='features',)pipeline = Pipeline(stages=[assembler])assembler_fitted = pipeline.fit(taxi)X = assembler_fitted.transform(taxi)X.cache()X.count()對於RAPIDS,我們將所有浮點值轉換為float32,以便進行GPU計算:

from dask import persistfrom dask.distributed import waittaxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekdaytaxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hourtaxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minutetaxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hourtaxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)taxi = taxi.fillna(-1)X = taxi[features].astype('float32')y = taxi['total_amount']X, y = persist(X, y)_ = wait([X, y])len(X)訓練隨機森林

我們只需要幾行代碼就可以訓練隨機森林。

Spark:

from pyspark.ml.regression import RandomForestRegressorrf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)fitted = rf.fit(X)RAPIDS:

from cuml.dask.ensemble import RandomForestRegressorrf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)_ = rf.fit(X, y)結果

我們對Spark(CPU)和RAPIDS(GPU)集群上的300700143個紐約計程車數據實例訓練了一個隨機森林模型。兩個集群都有20個工作節點,每小時價格大致相同。以下是工作流每個部分的結果:

Task Spark RAPIDS Load/rowcount 20.6 seconds 25.5 seconds Feature engineering 54.3 seconds 23.1 seconds Random forest 36.9 minutes 1.02 seconds

37分鐘的Spark 與1秒的RAPIDS!

GPU勝利!想一想,一次擬合你不需要等待37分鐘了,這將加快之後迭代和改進模型的速度。而在CPU上,一旦添加了超參數調優或測試不同的模型,迭代都很容易累積到數小時或數天。

你需要看到才能相信嗎?你可以在這裡找到Notebook,然後自己運行測試:https://github.com/saturncloud/saturn-cloud-examples/tree/main/machine_learning/random_forest

你需要更快的隨機森林嗎

對!你可以在幾秒鐘內用Saturn Cloud進入Dask/RAPIDS。Saturn處理所有工具基礎設施、安全性和部署方面的難題,讓你立即啟動並運行RAPIDS。點擊這裡在你的AWS帳戶免費試用Saturn:https://manager.aws.saturnenterprise.io/registe

相關焦點

  • Spark運行模式——Local模式
    首先需要下載Spark1.官網地址 http://spark.apache.org/2.文檔查看地址 https://spark.apache.org/docs/2.1.1/3.下載地址 https://archive.apache.org/dist/spark/Local
  • 停止使用Pandas並開始使用Spark+Scala
    · Spark是Apache開源框架  · 它可用作庫並在"本地"集群上運行,或在Spark集群上運行  · 在Spark集群上,可以以分布式方式執行代碼,其中一個主節點和多個工作節點共享負載  · 即使在本地群集上,您仍然可以看到與Pandas相比的性能提升,我們將在下面介紹原因  為什麼要使用Spark?
  • 利用Spark 和 scikit-learn 將你的模型訓練加快 100 倍
    元估計器的例子有決策樹集合(隨機林和額外隨機樹)、超參數調解器(網格搜索和隨機搜索)和多分類技術(一對多和多對一)。import timefrom sklearn import datasets, svmfrom skdist.distribute.search import DistGridSearchCVfrom pyspark.sql import SparkSession # instantiate spark sessionspark
  • 大數據分析工程師入門9-Spark SQL
    本文為《大數據分析師入門課程》系列的第9篇,在本系列的第8篇-Spark基礎中,已經對Spark做了一個入門介紹,在此基礎上本篇拎出Spark SQL,主要站在使用者的角度來進行講解,需要注意的是本文中的例子的代碼均使用Scala語言。
  • Apache DolphinScheduler 1.3.2 發布,性能提升 2~3 倍
    DolphinScheduler-1.3.2 有超過 30 名貢獻者參與開發,性能較 1.2 版本有 2 ~ 3 倍的提升,相對 1.2 版本,1.3.x 增加了諸如 K8s支持、多目錄管理等重要的新特性和新的任務類型。
  • Spark在360商業數據部的應用實踐
    與使用文本相比,Parquet 讓 Spark SQL 的性能平均提高了 10 倍,這要感謝初級的讀取器過濾器、高效的執行計劃,以及 Spark 1.6.0 中經過改進的掃描吞吐量。SparSQL的Parquet的幾個操作:1)創建Parquet格式的Hive表CREATE TABLE parquet_table(age INT, name STRING) STORED AS PARQUET;2)讀取Parquet格式的文件val sqlContext = new org.apache.spark.sql.SQLContext
  • 基於Bert和通用句子編碼的Spark-NLP文本分類
    更不用說經典和流行的機器學習分類器,如隨機森林或Logistic回歸,有150多個深度學習框架提出了各種文本分類問題。文本分類問題中使用了幾個基準數據集,可以在nlpprogress.com上跟蹤最新的基準。以下是關於這些數據集的基本統計數據。
  • 手把手教你在本機安裝spark
    本文轉載自【微信公眾號:五角錢的程式設計師,ID:xianglin965】,經微信公眾號授權轉載,如需轉載與原文作者聯繫今天是spark系列的第一篇文章。最近由於一直work from home節省了很多上下班路上的時間,加上今天的LeetCode的文章篇幅較小,所以抽出了點時間加更了一篇,和大家分享一下最近在學習的spark相關的內容。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • 烏海spark培訓_博雅環球教育放心之選
    烏海spark培訓,博雅環球教育放心之選,是以網際網路企業技術研發、軟體開發、大數據分析、雲計算、人工智慧開發應用等網際網路技術為依託,以校企專業共建,崗前技能實訓,高薪就業安置,網際網路人才外包服務等業務為核心的高端就業培訓。烏海spark培訓, Oracle認證講師、Microsoft認證講師。
  • 隨機森林(Random Forest)算法原理
    二、隨機森林的特點(1)優點l在當前所有的算法中,具有極好的準確率,與其他算法相比有很大優勢。l訓練速度快,容易做成並行化方法,且在訓練過程中能夠檢測到特徵之間的影響。l隨機森林能夠評估那些特徵比較重要。
  • 具有貝葉斯優化的XGBoost和隨機森林
    XGBoost(XGB)和隨機森林(RF)都是集成學習方法,並通過組合各個決策樹的輸出(我們假設基於樹的XGB或RF)來預測(分類或回歸)。隨機森林隨機森林(RF)使用隨機數據樣本獨立訓練每棵樹。這種隨機性有助於使模型比單個決策樹更健壯。由於RF不太可能過度擬合訓練數據。隨機森林應用示例隨機森林差異性已被用於各種應用,例如,基於組織標記數據找到患者群。
  • 深入對比數據科學工具箱: SparkR vs Sparklyr
    SparkR 文檔:http://spark.apachecn.org/docs/cn/2.3.0/structured-streaming-programming-guide.html       Sparklyr 文檔:https://spark.rstudio.com
  • 大數據分析工程師面試集錦3-SQL/SparkSql/HiveQL
    題11:假設除了以上Products表,還有一張存儲銷售產品供應商的表Vendors,表中欄位信息如下,如何得到表Products和Vendors兩表能關聯上的部分中Products的數據。答:(1)構建入口import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder()
  • 隨機森林(Random Forest)
    那說了這麼多,那隨機森林到底是怎樣的一種算法呢?如果讀者接觸過決策樹(Decision Tree)的話,那麼會很容易理解什麼是隨機森林。隨機森林就是通過集成學習的思想將多棵樹集成的一種算法,它的基本單元是決策樹,而它的本質屬於機器學習的一大分支——集成學習(Ensemble Learning)方法。
  • SparkCore——專業術語及流程圖
    輸入可能以多個文件的形式存儲在HDFS上,每個File都包含了很多塊,稱為Block。當Spark讀取這些文件作為輸入時,會根據具體數據格式對應的InputFormat進行解析,一般是將若干個Block合併成一個輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。
  • 亞馬遜加持,英偉達A100 GPU將無人匹敵?
    單個NVIDIA DGX A100系統(帶有8個A100 gpu)在某些AI應用上可以提供與近1000臺雙插槽CPU伺服器相同的性能。 英偉達負責加速計算的副總裁伊恩 巴克(Ian Buck)在公布基準業績後表示:「每個行業都在尋求更好的方式,應用人工智慧來提供新的服務,並擴大業務,我們正處於一個歷史的轉折點。」