SparkR:數據科學家的新利器

2020-11-28 CSDN技術社區

摘要:R是數據科學家中最流行的程式語言和環境之一,在Spark中加入對R的支持是社區中較受關注的話題。作為增強Spark對數據科學家群體吸引力的最新舉措,最近發布的Spark 1.4版本在現有的Scala/Java/Python API之外增加了R API(SparkR)。SparkR使得熟悉R的用戶可以在Spark的分布式計算平臺基礎上結合R本身強大的統計分析功能和豐富的第三方擴展包,對大規模數據集進行分析和處理。本文將回顧SparkR項目的背景,對其當前的特性作總體的概覽,闡述其架構和若干技術關鍵點,最後進行展望和總結。

項目背景

R是非常流行的數據統計分析和製圖的語言及環境,有一項調查顯示,R語言在數據科學家中使用的程度僅次於SQL。但目前R語言的核心運行環境是單線程的,能處理的數據量受限於單機的內存容量,大數據時代的海量數據處理對R構成了挑戰。

為了解決R的可伸縮性問題,R社區已經有一些方案,比如parallel和snow包,可以在計算機集群上並行運行R代碼。但它們的缺陷在於沒有解決數據分布式存儲,數據仍然需要在主節點集中表示,分片後再傳輸給工作節點,不適用於大數據處理的場景。另外,數據處理模型過於簡單,即數據分片在工作節點處理後,結果收集回主節點,缺少一個象MapReduce那樣通用的分布式數據編程模型。

Hadoop是流行的大數據處理平臺,它的HDFS分布式文件系統和之上的MapReduce編程模型比較好地解決了大數據分布式存儲和處理的問題。RHadoop項目的出現使得用戶具備了在R中使用Hadoop處理大數據的能力。

Apache頂級開源項目Spark是Hadoop之後備受關注的新一代分布式計算平臺。和Hadoop相比,Spark提供了分布式數據集的抽象,編程模型更靈活和高效,能夠充分利用內存來提升性能。為了方便數據科學家使用Spark進行數據挖掘,社區持續往Spark中加入吸引數據科學家的各種特性,例如0.7.0版本中加入的python API (PySpark);1.3版本中加入的DataFrame等。

R和Spark的強強結合應運而生。2013年9月SparkR作為一個獨立項目啟動於加州大學伯克利分校的大名鼎鼎的AMPLAB實驗室,與Spark源出同門。2014年1月,SparkR項目在github上開源(https://github.com/amplab-extras/SparkR-pkg)。隨後,來自工業界的Alteryx、Databricks、Intel等公司和來自學術界的普渡大學,以及其它開發者積極參與到開發中來,最終在2015年4月成功地合併進Spark代碼庫的主幹分支,並在Spark 1.4版本中作為重要的新特性之一正式宣布。

當前特性

SparkR往Spark中增加了R語言API和運行時支持。Spark的 API由Spark Core的API以及各個內置的高層組件(Spark Streaming,Spark SQL,ML Pipelines和MLlib,Graphx)的API組成,目前SparkR只提供了Spark的兩組API的R語言封裝,即Spark Core的RDD API和Spark SQL的DataFrame API。

需要指出的是,在Spark 1.4版本中,SparkR的RDD API被隱藏起來沒有開放,主要是出於兩點考慮:

  1. RDD API雖然靈活,但比較底層,R用戶可能更習慣於使用更高層的API;
  2. RDD API的實現上目前不夠健壯,可能會影響用戶體驗,比如每個分區的數據必須能全部裝入到內存中的限制,對包含複雜數據類型的RDD的處理可能會存在問題等。

目前社區正在討論是否開放RDD API的部分子集,以及如何在RDD API的基礎上構建一個更符合R用戶習慣的高層API。

RDD API

用戶使用SparkR RDD API在R中創建RDD,並在RDD上執行各種操作。

目前SparkR RDD實現了Scala RDD API中的大部分方法,可以滿足大多數情況下的使用需求:

SparkR支持的創建RDD的方式有:

  • 從R list或vector創建RDD(parallelize())
  • 從文本文件創建RDD(textFile())
  • 從object文件載入RDD(objectFile())

SparkR支持的RDD的操作有:

  • 數據緩存,持久化控制:cache(),persist(),unpersist()
  • 數據保存:saveAsTextFile(),saveAsObjectFile()
  • 常用的數據轉換操作,如map(),flatMap(),mapPartitions()等
  • 數據分組、聚合操作,如partitionBy(),groupByKey(),reduceByKey()等
  • RDD間join操作,如join(), fullOuterJoin(), leftOuterJoin()等
  • 排序操作,如sortBy(), sortByKey(), top()等
  • Zip操作,如zip(), zipWithIndex(), zipWithUniqueId()
  • 重分區操作,如coalesce(), repartition()
  • 其它雜項方法

和Scala RDD API相比,SparkR RDD API有一些適合R的特點:

  • SparkR RDD中存儲的元素是R的數據類型。
  • SparkR RDD transformation操作應用的是R函數。
  • RDD是一組分布式存儲的元素,而R是用list來表示一組元素的有序集合,因此SparkR將RDD整體上視為一個分布式的list。Scala API 中RDD的每個分區的數據由iterator來表示和訪問,而在SparkR RDD中,每個分區的數據用一個list來表示,應用到分區的轉換操作,如mapPartitions(),接收到的分區數據是一個list而不是iterator。
  • 為了符合R用戶經常使用lapply()對一個list中的每一個元素應用某個指定的函數的習慣,SparkR在RDD類上提供了SparkR專有的transformation方法:lapply()、lapplyPartition()、lapplyPartitionsWithIndex(),分別對應於Scala API的map()、mapPartitions()、mapPartitionsWithIndex()。

DataFrame API

Spark 1.3版本引入了DataFrame API。相較於RDD API,DataFrame API更受社區的推崇,這是因為:

  1. DataFrame的執行過程由Catalyst優化器在內部進行智能的優化,比如過濾器下推,表達式直接生成字節碼。
  2. 基於Spark SQL的外部數據源(external data sources) API訪問(裝載,保存)廣泛的第三方數據源。
  3. 使用R或Python的DataFrame API能獲得和Scala近乎相同的性能。而使用R或Python的RDD API的性能比起Scala RDD API來有較大的性能差距。

Spark的DataFrame API是從R的 Data Frame數據類型和Python的pandas庫借鑑而來,因而對於R用戶而言,SparkR的DataFrame API是很自然的。更重要的是,SparkR DataFrame API性能和Scala DataFrame API幾乎相同,所以推薦儘量用SparkR DataFrame來編程。

目前SparkR的DataFrame API已經比較完善,支持的創建DataFrame的方式有:

  • 從R原生data.frame和list創建
  • 從SparkR RDD創建
  • 從特定的數據源(JSON和Parquet格式的文件)創建
  • 從通用的數據源創建
  • 將指定位置的數據源保存為外部SQL表,並返回相應的DataFrame
  • 從Spark SQL表創建
  • 從一個SQL查詢的結果創建

支持的主要的DataFrame操作有:

  • 數據緩存,持久化控制:cache(),persist(),unpersist()
  • 數據保存:saveAsParquetFile(), saveDF() (將DataFrame的內容保存到一個數據源),saveAsTable() (將DataFrame的內容保存存為數據源的一張表)
  • 集合運算:unionAll(),intersect(), except()
  • Join操作:join(),支持inner、full outer、left/right outer和semi join。
  • 數據過濾:filter(), where()
  • 排序:sortDF(), orderBy()
  • 列操作:增加列- withColumn(),列名更改- withColumnRenamed(),選擇若干列 -select()、selectExpr()。為了更符合R用戶的習慣,SparkR還支持用$、[]、[[]]操作符選擇列,可以用$<列名> <- 的語法來增加、修改和刪除列
  • RDD map類操作:lapply()/map(),flatMap(),lapplyPartition()/mapPartitions(),foreach(),foreachPartition()
  • 數據聚合:groupBy(),agg()
  • 轉換為RDD:toRDD(),toJSON()
  • 轉換為表:registerTempTable(),insertInto()
  • 取部分數據:limit(),take(),first(),head()

編程示例

總體上看,SparkR程序和Spark程序結構很相似。

基於RDD API的示例

要基於RDD API編寫SparkR程序,首先調用sparkR.init()函數來創建SparkContext。然後用SparkContext作為參數,調用parallelize()或者textFile()來創建RDD。有了RDD對象之後,就可以對它們進行各種transformation和action操作。下面的代碼是用SparkR編寫的Word Count示例:

library(SparkR)#初始化SparkContextsc <- sparkR.init("local", "RWordCount") #從HDFS上的一個文本文件創建RDD lines <- textFile(sc, "hdfs://localhost:9000/my_text_file")#調用RDD的transformation和action方法來計算word count#transformation用的函數是R代碼words <- flatMap(lines, function(line) { strsplit(line, " ")[[1]] })wordCount <- lapply(words, function(word) { list(word, 1L) })counts <- reduceByKey(wordCount, "+", 2L)output <- collect(counts)

基於DataFrame API的示例

基於DataFrame API的SparkR程序首先創建SparkContext,然後創建SQLContext,用SQLContext來創建DataFrame,再操作DataFrame裡的數據。下面是用SparkR DataFrame API計算平均年齡的示例:

library(SparkR)#初始化SparkContext和SQLContextsc <- sparkR.init("local", "AverageAge") sqlCtx <- sparkRSQL.init(sc)#從當前目錄的一個JSON文件創建DataFramedf <- jsonFile(sqlCtx, "person.json")#調用DataFrame的操作來計算平均年齡df2 <- agg(df, age="avg")averageAge <- collect(df2)[1, 1]

對於上面兩個示例要注意的一點是SparkR RDD和DataFrame API的調用形式和Java/Scala API有些不同。假設rdd為一個RDD對象,在Java/Scala API中,調用rdd的map()方法的形式為:rdd.map(…),而在SparkR中,調用的形式為:map(rdd, …)。這是因為SparkR使用了R的S4對象系統來實現RDD和DataFrame類。

架構

SparkR主要由兩部分組成:SparkR包和JVM後端。SparkR包是一個R擴展包,安裝到R中之後,在R的運行時環境裡提供了RDD和DataFrame API。


圖1  SparkR軟體棧

SparkR的整體架構如圖2所示。


圖2 SparkR架構

R JVM後端

SparkR API運行在R解釋器中,而Spark Core運行在JVM中,因此必須有一種機制能讓SparkR API調用Spark Core的服務。R JVM後端是Spark Core中的一個組件,提供了R解釋器和JVM虛擬機之間的橋接功能,能夠讓R代碼創建Java類的實例、調用Java對象的實例方法或者Java類的靜態方法。JVM後端基於Netty實現,和R解釋器之間用TCP socket連接,用自定義的簡單高效的二進位協議通信。

R Worker

SparkR RDD API和Scala RDD API相比有兩大不同:SparkR RDD是R對象的分布式數據集,SparkR RDD transformation操作應用的是R函數。SparkR RDD API的執行依賴於Spark Core但運行在JVM上的Spark Core既無法識別R對象的類型和格式,又不能執行R的函數,因此如何在Spark的分布式計算核心的基礎上實現SparkR RDD API是SparkR架構設計的關鍵。

SparkR設計了Scala RRDD類,除了從數據源創建的SparkR RDD外,每個SparkR RDD對象概念上在JVM端有一個對應的RRDD對象。RRDD派生自RDD類,改寫了RDD的compute()方法,在執行時會啟動一個R worker進程,通過socket連接將父RDD的分區數據、序列化後的R函數以及其它信息傳給R worker進程。R worker進程反序列化接收到的分區數據和R函數,將R函數應到到分區數據上,再把結果數據序列化成字節數組傳回JVM端。

從這裡可以看出,與Scala RDD API相比,SparkR RDD API的實現多了幾項開銷:啟動R worker進程,將分區數據傳給R worker和R worker將結果返回,分區數據的序列化和反序列化。這也是SparkR RDD API相比Scala RDD API有較大性能差距的原因。

DataFrame API的實現

由於SparkR DataFrame API不需要傳入R語言的函數(UDF()方法和RDD相關方法除外),而且DataFrame中的數據全部是以JVM的數據類型存儲,所以和SparkR RDD API的實現相比,SparkR DataFrame API的實現簡單很多。R端的DataFrame對象就是對應的JVM端DataFrame對象的wrapper,一個DataFrame方法的實現基本上就是簡單地調用JVM端DataFrame的相應方法。這種情況下,R Worker就不需要了。這是使用SparkR DataFrame API能獲得和ScalaAPI近乎相同的性能的原因。

當然,DataFrame API還包含了一些RDD API,這些RDD API方法的實現是先將DataFrame轉換成RDD,然後調用RDD 的相關方法。

展望

SparkR目前來說還不是非常成熟,一方面RDD API在對複雜的R數據類型的支持、穩定性和性能方面還有較大的提升空間,另一方面DataFrame API在功能完備性上還有一些缺失,比如對用R代碼編寫UDF的支持、序列化/反序列化對嵌套類型的支持,這些問題相信會在後續的開發中得到改善和解決。如何讓DataFrame API對熟悉R原生Data Frame和流行的R package如dplyr的用戶更友好是一個有意思的方向。此外,下一步的開發計劃包含幾個大的特性,比如普渡大學正在做的在SparkR中支持Spark Streaming,還有Databricks正在做的在SparkR中支持ML pipeline等。SparkR已經成為Spark的一部分,相信社區中會有越來越多的人關注並使用SparkR,也會有更多的開發者參與對SparkR的貢獻,其功能和使用性將會越來越強。

總結

Spark將正式支持R API對熟悉R語言的數據科學家是一個福音,他們可以在R中無縫地使用RDD和Data Frame API,藉助Spark內存計算、統一軟體棧上支持多種計算模型的優勢,高效地進行分布式數據計算和分析,解決大規模數據集帶來的挑戰。工欲善其事,必先利其器,SparkR必將成為數據科學家在大數據時代的又一門新利器。

(責編/仲浩)

作者:孫銳,英特爾大數據團隊工程師,HIVE和Shark項目貢獻者,SparkR主力貢獻者之一。

本文為CSDN原創文章,未經允許不得轉載,如需轉載請聯繫market#csdn.net(#換成@)

相關焦點

  • 「sparkr+sparkr mini」 打火機手電筒完美合體
    「sparkr」與「sparkr mini」將他們「古怪的」設計特色與生產日常用品的願望相結合。這套造型別致的多功能全新系列作品能夠讓使用者們隨時隨地生火或照明。經過無數次的討論,power practical的工業設計師mckay nilson 開始著手模擬「sparkr」可能的外形。設計定稿後,整個團隊立即開始打造產品原型進行測試。
  • 辦公必備的大數據分析利器,數據分析工具推薦
    說到數據分析,很多小夥伴可能第一時間聯想到複雜的算法,龐大的數據,甚至是讓人眼花繚亂的代碼。但實際上,運營做數據分析並不需要懂這些,關鍵是你對業務流程的理解,以及用數據解決問題的思維。本文將介紹在一些領域被高頻率使用,且不可缺少的大數據分析利器,使用尚可的數據分析工具。
  • SQL Server 2008 R2數據管理新利器剖析
    【IT168專稿】我們在SQL Server 2008 R2數據管理新紀元一文中介紹了SQL Server 2008 R2的管理利器——SQL Server Utility。這一次我們將深入剖析這一管理工具。
  • 辦公必備的大數據分析利器,值得推薦數據分析工具
    說到數據分析,很多小夥伴可能第一時間聯想到複雜的算法,龐大的數據,甚至是讓人眼花繚亂的代碼。但實際上,運營做數據分析並不需要懂這些,關鍵是你對業務流程的理解,以及用數據解決問題的思維。本文將介紹在一些領域被高頻率使用,且不可缺少的大數據分析利器,使用尚可的數據分析工具。
  • 深入對比數據科學工具箱: SparkR vs Sparklyr
    SparkR:Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")sc <- SparkR::sparkR.session(enableHiveSupport = T,
  • 涪陵頁巖氣田新添採氣「新利器」
    涪陵頁巖氣田新添採氣「新利器」日前,涪陵頁巖氣田重複壓裂技術、「水淹」井氣舉技術接連完成試驗,並順利投用,為頁巖氣田的開採增添了「新利器」。在焦頁4號平臺,通過重複壓裂技術,焦頁4HF井獲得每天18.38萬方的測試產量,讓老井煥發出「新活力」。
  • 研究利器 OncoLnc:TGCA數據挖掘工具
    研究利器| OncoLnc:TGCA數據挖掘工具
  • KDNuggets:數據科學家使用工具調查
    工具/產品/解決方案是數據科學家洞察數據的利器。KDNuggets網站對此觀點進行了年度調查,來分析數據科學家在用哪些類型的工具,並提供了調查的匿名原始數據。
  • 中國科學家自主研發出治癌利器
    據中國醫學科學院腫瘤醫院、國家癌症中心赫捷院士、全國腫瘤登記中心主任陳萬青教授等人基於國家中央癌症登記處的高質量數據所撰寫的研究報告《Cancer Statistics in China, 2015》顯示,2015 年中國患癌趨勢為: 429.2 萬例新發腫瘤病例、 281.4 萬例死亡病例。相當於平均每天 12000人新患癌症、 7500人死於癌症。
  • 從邊緣到數據中心到雲,HCP對象存儲八大利器
    掘金數字新基建,對象存儲迎來全新發展期  數據不僅成為企業的新石油,更是價值的新來源,當數據成為企業數位化進程中的「主角」時,如何尋求更好的配角來展現出數據更多的魅力。  來自多個分析機構的數據綜合說明,非結構化數據隨著雲計算、大數據、物聯網等新興技術的蓬勃發展呈現出井噴式的增長。
  • SparkR編程系列 | 機器學習+數據可視化
    朝樂門@ruc為您詳細講解SparkR的KMeans機器學習和數據可視化的知識,並給出實際程序代碼、代碼介紹及運行結果  —— 一份你的大腦也可以直接執行和調試的SparkR源程序。本案例由朝樂門設計與編寫。
  • 泗洪警方新添防暴利器「劍齒虎」裝甲車
    泗洪警方新添防暴利器「劍齒虎」裝甲車 2020-10-26 18:33 來源:澎湃新聞·澎湃號·政務
  • 法醫科學利器,語言學家克萊爾·博尼爾博士
    1標題:法醫科學利器雷射燒蝕電感耦合等離子體質譜來源:bing摘要:法醫科學利器雷射燒蝕電感耦合等離子體質譜雷射燒蝕電感耦合等離子體質譜是一項強大的分析技術,可直接對固體樣品進行高度敏感的元素和同位素分析。
  • 科學家終於搞清楚T細胞如何產生蛋白質 或成為治療癌症的一大利器
    科學家終於搞清楚T細胞如何產生蛋白質 或成為治療癌症的一大利器  Emma Chou • 2020-05-21 18:20
  • 冷凍電鏡:新時代蛋白質科學和藥物研發的利器
    經歷幾十年的技術積累和硬體發展,2013年,隨著直接電子探測技術的發展,冷凍電鏡技術迎來了「解析度革命」,經美國科學院外籍院士程亦凡等科學家的努力,解析精度獲得了近原子解析度的突破性進步。2017年,杜博歇、弗蘭克和亨德森這3位科學家也憑藉其在冷凍電鏡技術建立過程中發揮的開創性作用獲得了諾貝爾化學獎。
  • 金屬鎵成為抗菌利器
    作為抗菌利器,鎵可以抑制關鍵的鐵依賴性細菌酶,增加細菌對氧化劑的敏感性。此外,鎵耐藥性發展緩慢,其活性與某些抗生素有協同作用,而且鎵不會降低宿主巨噬細胞的抗菌活性。       研究小組通過小鼠模型和臨床試驗對鎵的抗菌活性進行了驗證。小鼠模型研究顯示,靜脈注射鎵可減少感染了綠膿桿菌的小鼠體內的細菌數量,提高小鼠的存活率。
  • 中國專家造出了抗癌「利器」,可治療多種腫瘤
    國家癌症中心數據顯示,我國癌症患病病例總數佔全球將近20%,且有逐漸上升的趨勢,2015年新增392.9萬人,死亡人數新增233.8萬人。面對如此嚴峻的現狀,我國科學家也在一直努力,終於皇天不負有心人,找到了新的突破、新的抗癌「利器」。
  • 美國太空事業慘劇:一件利器被風吹壞,徹底廢了!科學家心痛不已
    22日,美國傳來一大悲劇消息:美國的一個太空探測利器,因在風暴中受損,被宣布無法修復,美國科學家們紛紛叫苦:太慘了!根據作用在第二根斷裂的纜繩上的應力數據,工程師們得出結論,剩餘的鋼索可能比之前認為的要更加的脆弱,造成很大危險。美國國家科學基金會審查了獨立工程公司的多項評估,得出的結論是,望遠鏡的結構「面臨災難性故障的危險」,其鋼索可能不再能夠承載其設計支持的負載。此外,幾項評估指出,任何維修企圖都可能使工人面臨生命危險。
  • 合成生物學:生命科學的「利器」(開卷知新)
    合成生物學,能利用大腸桿菌生產大宗化工材料,擺脫石油原料的束縛;酵母菌生產青蒿酸和稀有人參皂苷,降低成本,促進新藥研發;工程菌不「誤傷」正常細胞,專一攻擊癌細胞;創製載有人工基因組的「人造細胞」,探究生命進化之路;利用DNA儲存數據信息並開發生物計算機……作為科學界的新生力量,合成生物學進展迅速,並已在化工、能源、材料、農業、醫藥、環境和健康等領域展現出廣闊的應用前景。
  • 美國一州科學家因拒絕篡改新冠肺炎數據被解聘
    我們來看看詳細報導:這位科學家說(譯):"謹慎而言,我不希望新聞媒體像他們一樣作假,我所堅信追求的是前兩個月的數據的透明度,畢竟,我的使命就是嚴謹辯證,至於原因我不想多言。「這位偉大的科學家因拒絕造假確診數據而被解僱