我們經常說,如果大表join小表,那麼常用的優化手段就是將 小表 廣播出去,而在spark join的過程中,我們都這麼認為 spark會把小表 broadcast,然後join,
我們看一些 Details for Query 圖的時候就可以看到這樣的情況:
但是,實際情況中,會遇到不走 BroadcastHashJoin 的情況,明明是 小表 和 大表 做關聯,但是就是不走 BroadcastHashJoin,難道是 spark無法自動判斷嗎?
知其然知其所以然,就需要看源碼了。
關於join的邏輯,要看 SparkStrategy 這個文件了,內部有 JoinSelection 這個對象。
關鍵代碼就是
def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { // --- BroadcastHashJoin -------------------------------------------------------------------- 我們現在看 BroadcastHashJoin,只要看下面的代碼就好 // broadcast hints were specified case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // broadcast hints were not specified, so need to infer it from size and configuration. case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // --- ShuffledHashJoin --------------------------------------------------------------------- ...... 關鍵之處就是 canBroadcastByHints 和 canBroadcastBySizes 我們看 canBroadcastByHints 的實現, 1:如果 left 等值關聯 right,且 join的類型是 inner join或者是right join ,且有left有broadcast的hint 2:如果 left 等值關聯 right,且 join的類型是 inner、left、left_semi、left_anti join或者是 ExistenceJoin ,且有 right 有broadcast的hint 3:上面只要一種情況符合就可以了 private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast buildLeft || buildRight } 我們看 canBroadcastBySizes 的實現, 1:如果 left 等值關聯 right,且 join的類型是 inner join或者是right join ,且有left 的文件大小 <=autoBroadcastJoinThreshold 2:如果 left 等值關聯 right,且 join的類型是 inner、left、left_semi、left_anti join或者是 ExistenceJoin ,且有 right 的文件大小 <=autoBroadcastJoinThreshold 3:上面只要一種情況符合就可以了 private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && canBroadcast(left) val buildRight = canBuildRight(joinType) && canBroadcast(right) buildLeft || buildRight } private def canBuildRight(joinType: JoinType): Boolean = joinType match { case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true case _ => false } private def canBuildLeft(joinType: JoinType): Boolean = joinType match { case _: InnerLike | RightOuter => true case _ => false } private def canBroadcast(plan: LogicalPlan): Boolean = { plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold }
敲黑板,這是重點,不是 只要 大表 關聯 小表 就一定會broadcast join
還需要看 join的類型,其實想當然也是一樣,
如果 A left join B ,A 是小表,B是大小,那麼spark只會走 sort merge join,而不會是broadcast join,你想:如果把A表廣播出去了,如果存在一些 B表中存在一些A表不存在的數據,而廣播的是A表的數據,怎麼處理,
所以遇到這種情況,我們可以做優化,優化後的結果就可以走broadcast join
將 A left join B 改成 A left join (B join A)
這樣就可以了,因為A表是 主表,只要A存在的數據,而B join A 的結果就是非常小了,而且也符合broadcast join 的規則,最後兩份小表join,shuffle 的io也小下去了。
所以以後面試官問 ,大表 和 小表 等值關聯,spark一定會走broadcast嗎?
應該回答,不一定,理由呢?看源碼,broadcast 也有條件的。