Spark中的combineByKey

2021-03-02 Spark技術日報

在數據分析中,處理Key,Value的Pair數據是極為常見的場景。譬如說,對Pair數據按照key分組、聚合,又或者更抽象的,則是根據key對value進行fold運算。

如果我們對編碼的態度有些敷衍,大約會將其分別定義為三個函數:gruopByKey、aggregateByKey、foldByKey。站在調用者的角度,如此設計無可厚非,相反我還得擊節讚嘆。因為從函數名來看,確實體貼地照顧了用戶的知識結構。換個角度,站在實現這一邊,你可能會發現這三個函數乃孿生兄弟,具有相同的血統。

所謂「抽象」,就是要尋找實現上的共同特徵。OO也好,FP也罷,都格外重視抽象的能力,因為抽象能在很大程度上化繁為簡,且具備應對變化的能力。只是各自抽象的層次不同罷了。與OO中抽象接口的設計思路是等同的,個人認為,FP只是將抽象做到了極致,落實到了類型之上(暫且放開業務的羈絆),函數不過就是類型的轉換罷了。

在Spark的語境中,RDD是核心數據結構,從Scala的語法出發,可以認為RDD是一個類型類,或者Monad容器(個人如此認為,若有不妥,還請方家指正)。這個容器到底裝了什麼數據呢?得看你如何為其裝載數據,前面提到了Pair類型,在本文場景下,RDD實則是一個RDD[(K, V)]類型的數據。

「前戲」到此結束,現在來看看groupByKey、aggregateByKey和foldByKey到底要做什麼(what to do)?

groupByKey是將一堆結構形如(K, V)的數據 根據K分組 ,我們暫且將這個分組過程看做是一個黑盒子,想一想, 輸出會是什麼 ?

aggregateByKey是將一堆結構形如(K, V)的數據 根據K 對數據進行 聚合 運算, 它的輸出又會是什麼呢 ?

fold是針對一個集合中的數據進行 摺疊 運算,運算符則取決傳入的函數,例如sum或者produce等。因而foldByKey就是將一堆結構形如(K, V)的數據 根據K 對數據進行摺疊運算,那麼, 它的輸出又會是什麼 ?

現在需要一點推演的能力。首先,不管過程如何,這三個運算接收的參數總是相同的,皆為(K, V)。輸出結果定然不同,但它們卻又具有相同的特徵,即它們都是 根據K 分別對數據進行運算,換言之,計算可能不同,計算的結果可能不同,但結果一定是 根據K 來組織的。這裡的K不就是Pair的key嗎?對應的Value呢?我們還不知道,然而不管是阿貓阿狗,它總可以用一個抽象的類型參數來代表,記為C,結果就變成了(K, C)。這就是我們要找的一個抽象:

RDD[(K, V)] -> RDD[(K, C)]

然而,這個抽象還不夠,我們需要找到將V(可能是多個V)變成C的方法。

現在假設我們要尋找的抽象是一臺超級酷的果汁機。它能同時接受各種各樣的水果,然後聰明地按照水果的種類分別榨出不同的果汁。蘋果歸蘋果汁,橙子歸橙汁,西瓜歸西瓜汁。我們為水果定義類型為Fruit,果汁定義為Juice,根據前面的分析,這個過程就是將RDD[(String, Fruit)]轉換為RDD[(String, Juice)]。

注意,在榨果汁前,水果可能有很多,即使是相同類型的水果,也會作為不同的RDD元素:

("apple", apple1), ("orange", orange1), ("apple", apple2)

轉換的結果是每種水果只有一杯果汁(只是容量不同罷了):

("apple", appleJuice), ("orange", orangeJuice)

那麼,這個果汁機該由什么元件構成呢?現在,我們化身為機械師,想想這個果汁機的組成元件:

注意第二個函數和第三個函數的區別,前者只提供混合功能,即能夠將不同容器的果汁裝到一個容器中,而後者的輸入已有一個前提,那就是已經按照水果類型放到不同的區域,果汁機在混合果汁時,並不會混淆不同區域的果汁。否則你得到的果汁就不是蘋果汁或者橙汁,而是混合味兒的果汁。

再回到函數這邊來。從函數的抽象層面看,這些操作具有共同的特徵,都是將類型為RDD[(K,V)]的數據處理為RDD[(K,C)]。這裡的V和C可以是相同類型,也可以是不同類型。這種操作並非單純地對Pair的value進行map,而是針對不同的key值對原有的value進行聯合(Combine)。因而,不僅類型可能不同,元素個數也可能不同。

於是,我們百折千回地尋找到了這個高度抽象的操作combineByKey。該方法在Spark的定義如下所示:

def combineByKey[C](

      createCombiner: V => C,

      mergeValue: (C, V) => C,

      mergeCombiners: (C, C) => C,

      partitioner: Partitioner,

      mapSideCombine: Boolean = true,

      serializer: Serializer = null): RDD[(K, C)] = {

    //實現略

  }

函數式風格與命令式風格不同之處在於它說明了代碼做了什麼(what to do),而不是怎麼做(how to do)。combineByKey函數主要接受了三個函數作為參數,分別為createCombiner、mergeValue、mergeCombiners。這三個函數足以說明它究竟做了什麼。理解了這三個函數,就可以很好地理解combineByKey。

要將RDD[(K,V)]combine為RDD[(K,C)],就需要提供一個函數,能夠完成從V到C的combine,稱之為combiner。如果V和C類型一致,則函數為V => V。倘若C是一個集合,例如Iterable[V],則createCombiner為V => Iterable[V]。

mergeValue則將原RDD中Pair的Value合併為操作後的C類型數據。合併操作的實現決定了結果的運算方式。所以,mergeValue更像是聲明了一種合併方式,它是由整個combine運算的結果來導向的。函數的輸入為原RDD中Pair的V,輸出為結果RDD中Pair的C。

最後的mergeCombiners則會根據每個Key對應的多個C,進行歸併。

再回到果汁機的案例。果汁機的功能類似於groupByKey+foldByKey操作。但我們沒有必要自己去實現這個榨取果汁的功能,可以直接調用combineByKey函數:

case class Juice(volumn: Int) {

    def add(j: Juice):Juice = Juice(volumn + j.volumn)

}

case class Fruit(kind: String, weight: Int) {

    def makeJuice:Juice = Juice(weight * 100)

}

val apple1 = Fruit("apple", 5)

val apple2 = Fruit("apple", 8)

val orange1 = Fruit("orange", 10)

val fruit = sc.parallelize(List(("apple", apple1) , ("orange", orange1) , ("apple", apple2))) 

val juice = fruit.combineByKey(

    f => f.makeJuice,

    (j:Juice, f) => j.add(f.makeJuice),

    (j1:Juice, j2:Juice) => j1.add(j2) 

)

執行juice.collect,結果為:

Array[(String, Juice)] = Array((orange, Juice(1000)), (apple, Juice(1300)))

RDD中有許多針對Pair RDD的操作在內部實現都調用了combineByKey函數(Spark的2.0版本則實現為combineByKeyWithClassTag)。例如groupByKey:

class PairRDDFunctions[K, V](self: RDD[(K, V)])

    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)

  extends Logging

  with SparkHadoopMapReduceUtil

  with Serializable {

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {

        val createCombiner = (v: V) => CompactBuffer(v)

        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v

        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2

        val bufs = combineByKey[CompactBuffer[V]](

          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)

        bufs.asInstanceOf[RDD[(K, Iterable[V])]]

      }

}

groupByKey函數針對PairRddFunctions的RDD[(K, V)]按照key對value進行分組。它在內部調用了combineByKey函數,傳入的三個函數分別承擔了如下職責:

    createCombiner是將原RDD中的K類型轉換為Iterable[V]類型,實現為CompactBuffer。

    mergeValue實則就是將原RDD的元素追加到CompactBuffer中,即將追加操作(+=)視為合併操作。

    mergeCombiners則負責針對每個key值所對應的Iterable[V],提供合併功能。

再例如,我們要針對科目對成績求平均值:

val scores = sc.parallelize(List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0)))

平均值並不能一次獲得,而是需要求得各個科目的總分以及科目的數量。因此,我們需要針對scores進行combine,從(String, Float)combine為(String, (Float, Int))。在調用combineByKey函數後,再通過map來獲得平均值。代碼如下:

val avg = scores.combineByKey(

    (v) => (v, 1),

    (acc: (Float, Int), v) => (acc._1 + v, acc._2 + 1),

    (acc1:(Float, Int), acc2:(Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

除了可以進行group、average之外,根據傳入的函數實現不同,我們還可以利用combineByKey完成諸如aggregate、fold等操作。這是一個高度的抽象,但從聲明的角度來看,卻又不需要了解過多的實現細節。這正是函數式編程的魅力。

相關焦點

  • 一網打盡Spark高頻面試題
    類比Hadoop的19888注意:由於Spark只負責計算,所有並沒有Hadoop中存儲數據的埠50070Spark任務通過什麼形式進行提交?作業提交參數都有哪些?使用shell腳本提交。reduceByKey、foldByKey、aggregateByKey、combineByKey區別?
  • ​Spark Core基礎面試題總結(上)
    In addition, * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value * pairs, such as `groupByKey` and `join`; * [[org.apache.spark.rdd.DoubleRDDFunctions
  • 大數據學習路線教程圖,如何快速入門Spark
    Spark是大數據中重要的框架之一,下面為大家分享如何快速入門spark。Apache Spark是在大數據工業界裡用的最多的基於內存的技術框架,尤其是RDD的特性以及應用,對幫助理解Spark和任務提交的流程以及緩存機制。通過以上教程可以讓大家掌握Spark的環境搭建,任務調度流程,以及RDD代碼的應用。
  • Spark【面試】
    中進行數據預處理的操作6、簡單說一下hadoop和spark的shuffle過程hadoop:map端保存分片數據,通過網絡收集到reduce端 spark:spark的shuffle是在DAGSchedular劃分Stage的時候產生的,TaskSchedule要分發Stage到各個worker
  • Apache Spark大數據分析入門(一)
    tar -xvzf ~/spark-1.5.0-bin-hadoop2.4.tgz運行Python Shellcd spark-1.5.0-bin-hadoop2.4./bin/pyspark在本節中不會使用Python Shell進行演示。Scala交互式命令行由於運行在JVM上,能夠使用java庫。
  • 『 Spark 』2. spark 基本概念解析
    過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。Worker Node集群中任何一個可以運行spark應用代碼的節點。Worker Node就是物理節點,可以在上面啟動Executor進程。5.
  • Spark面試高頻考點,必知必會!
    能問這樣的問題,已經暗示面試官的水平不低了,那麼我們該如何回答呢:        reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v]。
  • 如何將 MapReduce 轉化為 Spark
    此外,類似於 Impala 這樣的項目也開始逐漸進入到我們的架構中,Impala 提供 SQL 語義,能查詢存儲在 Hadoop 的 HDFS 和 HBase 中的 PB 級大數據。之前也有類似的項目,例如 Hive。Hive 系統雖然也提供了 SQL 語義,但由於 Hive 底層執行使用的是 MapReduce 引擎,仍然是一個批處理過程,難以滿足查詢的交互性。
  • 面試必知的 Spark SQL 幾種 Join 實現
    從上述計算過程中不難發現,對於每條來自streamIter的記錄,都要去buildIter中查找匹配的記錄,所以buildIter一定要是查找性能較優的數據結構。spark提供了三種join實現:sort merge join、broadcast join以及hash join。
  • 大數據分析工程師入門9-Spark SQL
    ("spark.some.config.option", "some-value") .getOrCreate()2.創建DataFrame在一個SparkSession中,應用程式可以從結構化的數據文件、Hive的table、外部資料庫和RDD中創建一個DataFrame。
  • 黑馬程式設計師:技術筆記大數據面試題之spark相關(二)
    Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在於 combine/reduce() 可以處理大規模的數據,因為其輸入數據可以通過外排得到(mapper 對每段數據先做排序,reducer 的 shuffle 對排好序的每段數據做歸併)。
  • 不可不知的spark shuffle
    一個spark的RDD有一組固定的分區組成,每個分區有一系列的記錄組成。
  • 2小時入門Spark之RDD編程
    import findspark#指定spark_home為剛才的解壓路徑,指定python路徑spark_home = "/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"python_path
  • Apache Spark 2.4 中解決複雜數據類型的內置函數和高階函數介紹
    在本博客中,通過一些示例,我們將展示一些新的內置函數以及如何使用它們來處理複雜的數據類型。典型處理方式讓我們首先通過以下示例來回顧一下 Spark 2.4 之前的典型解決方案。首先,我們必須努力確保通過使用唯一鍵(unique key)來進行分組以便將新生成的數組完全組成為原始數組。其次,我們需要進行 group by 操作 ,這意味著需要進行一次 shuffle 操作; 但是 shuffle 操作並不保證重組後的數組和原始數組中數據的順序一致;最後,使用這種方式非常低效。
  • Spark入門介紹
    )中計算引擎的可插拔替換(MR,Spark,Tez,Presto等),至此,Spark可以一展身手了。input文件夾裡的文件):import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDD
  • Spark 數據傾斜及其解決方案
    知道數據傾斜發生在哪一個 stage 之後,接著我們就需要根據 stage 劃分原理,推算出來發生傾斜的那個 stage 對應代碼中的哪一部分,這部分代碼中肯定會有一個 shuffle 類算子。可以通過 countByKey 查看各個 key 的分布。TIPS數據傾斜只會發生在 shuffle 過程中。
  • Spark性能優化指南
    方案實現思路:如果我們判斷那少數幾個數據量特別多的key,對作業的執行和計算結果不是特別重要的話,那麼乾脆就直接過濾掉那少數幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。
  • 2小時入門SparkSQL編程
    美中不足的是,SparkSQL的靈活性會稍差一些,其默認支持的數據類型通常只有 Int,Long,Float,Double,String,Boolean 等這些標準SQL數據類型, 類型擴展相對繁瑣。對於一些較為SQL中不直接支持的功能,通常可以藉助於用戶自定義函數(UDF)來實現,如果功能更加複雜,則可以轉成RDD來進行實現。
  • 【大數據嗶嗶集20210117】Spark面試題靈魂40問
    Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在於 combine/reduce() 可以處理大規模的數據,因為其輸入數據可以通過外排得到(mapper 對每段數據先做排序,reducer 的 shuffle 對排好序的每段數據做歸併)。
  • Spark性能優化指南——基礎篇
    shuffle過程中,各個節點上的相同key都會先寫入本地磁碟文件中,然後其他節點需要通過網絡傳輸拉取各個節點上的磁碟文件中的相同key。而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導致內存不夠存放,進而溢寫到磁碟文件中。