在數據分析中,處理Key,Value的Pair數據是極為常見的場景。譬如說,對Pair數據按照key分組、聚合,又或者更抽象的,則是根據key對value進行fold運算。
如果我們對編碼的態度有些敷衍,大約會將其分別定義為三個函數:gruopByKey、aggregateByKey、foldByKey。站在調用者的角度,如此設計無可厚非,相反我還得擊節讚嘆。因為從函數名來看,確實體貼地照顧了用戶的知識結構。換個角度,站在實現這一邊,你可能會發現這三個函數乃孿生兄弟,具有相同的血統。
所謂「抽象」,就是要尋找實現上的共同特徵。OO也好,FP也罷,都格外重視抽象的能力,因為抽象能在很大程度上化繁為簡,且具備應對變化的能力。只是各自抽象的層次不同罷了。與OO中抽象接口的設計思路是等同的,個人認為,FP只是將抽象做到了極致,落實到了類型之上(暫且放開業務的羈絆),函數不過就是類型的轉換罷了。
在Spark的語境中,RDD是核心數據結構,從Scala的語法出發,可以認為RDD是一個類型類,或者Monad容器(個人如此認為,若有不妥,還請方家指正)。這個容器到底裝了什麼數據呢?得看你如何為其裝載數據,前面提到了Pair類型,在本文場景下,RDD實則是一個RDD[(K, V)]類型的數據。
「前戲」到此結束,現在來看看groupByKey、aggregateByKey和foldByKey到底要做什麼(what to do)?
groupByKey是將一堆結構形如(K, V)的數據 根據K分組 ,我們暫且將這個分組過程看做是一個黑盒子,想一想, 輸出會是什麼 ?
aggregateByKey是將一堆結構形如(K, V)的數據 根據K 對數據進行 聚合 運算, 它的輸出又會是什麼呢 ?
fold是針對一個集合中的數據進行 摺疊 運算,運算符則取決傳入的函數,例如sum或者produce等。因而foldByKey就是將一堆結構形如(K, V)的數據 根據K 對數據進行摺疊運算,那麼, 它的輸出又會是什麼 ?
現在需要一點推演的能力。首先,不管過程如何,這三個運算接收的參數總是相同的,皆為(K, V)。輸出結果定然不同,但它們卻又具有相同的特徵,即它們都是 根據K 分別對數據進行運算,換言之,計算可能不同,計算的結果可能不同,但結果一定是 根據K 來組織的。這裡的K不就是Pair的key嗎?對應的Value呢?我們還不知道,然而不管是阿貓阿狗,它總可以用一個抽象的類型參數來代表,記為C,結果就變成了(K, C)。這就是我們要找的一個抽象:
RDD[(K, V)] -> RDD[(K, C)]
然而,這個抽象還不夠,我們需要找到將V(可能是多個V)變成C的方法。
現在假設我們要尋找的抽象是一臺超級酷的果汁機。它能同時接受各種各樣的水果,然後聰明地按照水果的種類分別榨出不同的果汁。蘋果歸蘋果汁,橙子歸橙汁,西瓜歸西瓜汁。我們為水果定義類型為Fruit,果汁定義為Juice,根據前面的分析,這個過程就是將RDD[(String, Fruit)]轉換為RDD[(String, Juice)]。
注意,在榨果汁前,水果可能有很多,即使是相同類型的水果,也會作為不同的RDD元素:
("apple", apple1), ("orange", orange1), ("apple", apple2)
轉換的結果是每種水果只有一杯果汁(只是容量不同罷了):
("apple", appleJuice), ("orange", orangeJuice)
那麼,這個果汁機該由什么元件構成呢?現在,我們化身為機械師,想想這個果汁機的組成元件:
注意第二個函數和第三個函數的區別,前者只提供混合功能,即能夠將不同容器的果汁裝到一個容器中,而後者的輸入已有一個前提,那就是已經按照水果類型放到不同的區域,果汁機在混合果汁時,並不會混淆不同區域的果汁。否則你得到的果汁就不是蘋果汁或者橙汁,而是混合味兒的果汁。
再回到函數這邊來。從函數的抽象層面看,這些操作具有共同的特徵,都是將類型為RDD[(K,V)]的數據處理為RDD[(K,C)]。這裡的V和C可以是相同類型,也可以是不同類型。這種操作並非單純地對Pair的value進行map,而是針對不同的key值對原有的value進行聯合(Combine)。因而,不僅類型可能不同,元素個數也可能不同。
於是,我們百折千回地尋找到了這個高度抽象的操作combineByKey。該方法在Spark的定義如下所示:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
//實現略
}
函數式風格與命令式風格不同之處在於它說明了代碼做了什麼(what to do),而不是怎麼做(how to do)。combineByKey函數主要接受了三個函數作為參數,分別為createCombiner、mergeValue、mergeCombiners。這三個函數足以說明它究竟做了什麼。理解了這三個函數,就可以很好地理解combineByKey。
要將RDD[(K,V)]combine為RDD[(K,C)],就需要提供一個函數,能夠完成從V到C的combine,稱之為combiner。如果V和C類型一致,則函數為V => V。倘若C是一個集合,例如Iterable[V],則createCombiner為V => Iterable[V]。
mergeValue則將原RDD中Pair的Value合併為操作後的C類型數據。合併操作的實現決定了結果的運算方式。所以,mergeValue更像是聲明了一種合併方式,它是由整個combine運算的結果來導向的。函數的輸入為原RDD中Pair的V,輸出為結果RDD中Pair的C。
最後的mergeCombiners則會根據每個Key對應的多個C,進行歸併。
再回到果汁機的案例。果汁機的功能類似於groupByKey+foldByKey操作。但我們沒有必要自己去實現這個榨取果汁的功能,可以直接調用combineByKey函數:
case class Juice(volumn: Int) {
def add(j: Juice):Juice = Juice(volumn + j.volumn)
}
case class Fruit(kind: String, weight: Int) {
def makeJuice:Juice = Juice(weight * 100)
}
val apple1 = Fruit("apple", 5)
val apple2 = Fruit("apple", 8)
val orange1 = Fruit("orange", 10)
val fruit = sc.parallelize(List(("apple", apple1) , ("orange", orange1) , ("apple", apple2)))
val juice = fruit.combineByKey(
f => f.makeJuice,
(j:Juice, f) => j.add(f.makeJuice),
(j1:Juice, j2:Juice) => j1.add(j2)
)
執行juice.collect,結果為:
Array[(String, Juice)] = Array((orange, Juice(1000)), (apple, Juice(1300)))
RDD中有許多針對Pair RDD的操作在內部實現都調用了combineByKey函數(Spark的2.0版本則實現為combineByKeyWithClassTag)。例如groupByKey:
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
}
groupByKey函數針對PairRddFunctions的RDD[(K, V)]按照key對value進行分組。它在內部調用了combineByKey函數,傳入的三個函數分別承擔了如下職責:
createCombiner是將原RDD中的K類型轉換為Iterable[V]類型,實現為CompactBuffer。
mergeValue實則就是將原RDD的元素追加到CompactBuffer中,即將追加操作(+=)視為合併操作。
mergeCombiners則負責針對每個key值所對應的Iterable[V],提供合併功能。
再例如,我們要針對科目對成績求平均值:
val scores = sc.parallelize(List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0)))
平均值並不能一次獲得,而是需要求得各個科目的總分以及科目的數量。因此,我們需要針對scores進行combine,從(String, Float)combine為(String, (Float, Int))。在調用combineByKey函數後,再通過map來獲得平均值。代碼如下:
val avg = scores.combineByKey(
(v) => (v, 1),
(acc: (Float, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1:(Float, Int), acc2:(Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
除了可以進行group、average之外,根據傳入的函數實現不同,我們還可以利用combineByKey完成諸如aggregate、fold等操作。這是一個高度的抽象,但從聲明的角度來看,卻又不需要了解過多的實現細節。這正是函數式編程的魅力。