spark2.4 join 淺談一

2020-11-07 豆豆杭哲

我們經常說,如果大表join小表,那麼常用的優化手段就是將 小表 廣播出去,而在spark join的過程中,我們都這麼認為 spark會把小表 broadcast,然後join,

我們看一些 Details for Query 圖的時候就可以看到這樣的情況:


但是,實際情況中,會遇到不走 BroadcastHashJoin 的情況,明明是 小表 和 大表 做關聯,但是就是不走 BroadcastHashJoin,難道是 spark無法自動判斷嗎?

知其然知其所以然,就需要看源碼了。

關於join的邏輯,要看 SparkStrategy 這個文件了,內部有 JoinSelection 這個對象。

關鍵代碼就是

def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { // --- BroadcastHashJoin -------------------------------------------------------------------- 我們現在看 BroadcastHashJoin,只要看下面的代碼就好 // broadcast hints were specified case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // broadcast hints were not specified, so need to infer it from size and configuration. case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // --- ShuffledHashJoin --------------------------------------------------------------------- ...... 關鍵之處就是 canBroadcastByHints 和 canBroadcastBySizes 我們看 canBroadcastByHints 的實現, 1:如果 left 等值關聯 right,且 join的類型是 inner join或者是right join ,且有left有broadcast的hint 2:如果 left 等值關聯 right,且 join的類型是 inner、left、left_semi、left_anti join或者是 ExistenceJoin ,且有 right 有broadcast的hint 3:上面只要一種情況符合就可以了 private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast buildLeft || buildRight } 我們看 canBroadcastBySizes 的實現, 1:如果 left 等值關聯 right,且 join的類型是 inner join或者是right join ,且有left 的文件大小 <=autoBroadcastJoinThreshold 2:如果 left 等值關聯 right,且 join的類型是 inner、left、left_semi、left_anti join或者是 ExistenceJoin ,且有 right 的文件大小 <=autoBroadcastJoinThreshold 3:上面只要一種情況符合就可以了 private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && canBroadcast(left) val buildRight = canBuildRight(joinType) && canBroadcast(right) buildLeft || buildRight } private def canBuildRight(joinType: JoinType): Boolean = joinType match { case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true case _ => false } private def canBuildLeft(joinType: JoinType): Boolean = joinType match { case _: InnerLike | RightOuter => true case _ => false } private def canBroadcast(plan: LogicalPlan): Boolean = { plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold }

敲黑板,這是重點,不是 只要 大表 關聯 小表 就一定會broadcast join

還需要看 join的類型,其實想當然也是一樣,

如果 A left join B ,A 是小表,B是大小,那麼spark只會走 sort merge join,而不會是broadcast join,你想:如果把A表廣播出去了,如果存在一些 B表中存在一些A表不存在的數據,而廣播的是A表的數據,怎麼處理,

所以遇到這種情況,我們可以做優化,優化後的結果就可以走broadcast join

將 A left join B 改成 A left join (B join A)

這樣就可以了,因為A表是 主表,只要A存在的數據,而B join A 的結果就是非常小了,而且也符合broadcast join 的規則,最後兩份小表join,shuffle 的io也小下去了。

所以以後面試官問 ,大表 和 小表 等值關聯,spark一定會走broadcast嗎?

應該回答,不一定,理由呢?看源碼,broadcast 也有條件的。

相關焦點

  • spark join優化 布隆過濾器
    我們在通過spark進行 join的時候,有時候會出現這樣的場景。A 表 1000萬, B 表5000 萬,兩份數據 join的key沒有重複結果的形式是 A left join B 所以 一般情況下 ,走 sort merge join ,兩張表join下。
  • 6000字總結Spark的5種join策略(建議收藏)
    參數必須設置為 false,參數是從 Spark 2.0.0 版本引入的,默認值為 true,也就是默認情況下選擇 Sort Merge Join;•小表的大小(plan.stats.sizeInBytes)必須小於 spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions;而且小表大小(stats.sizeInBytes
  • 面試必知的 Spark SQL 幾種 Join 實現
    如下圖所示,sql語句被語法解析(SQL AST)成查詢計劃,或者我們通過Dataset/DataFrame提供的APIs組織成查詢計劃,查詢計劃分為兩大類:邏輯計劃和物理計劃,這個階段通常叫做邏輯計劃,經過語法分析(Analyzer)、一系列查詢優化(Optimizer)後得到優化後的邏輯計劃,最後被映射成物理計劃,轉換成RDD執行。
  • Spark ShuffledHashJoinExec 「1」
    最近在看spark的join,內部的實現邏輯太負責,看的很累,不過每天去看點,慢慢堅持。, &34;), &34; -> SQLMetrics.createSizeMetric(sparkContext, &34;), &34; -> SQLMetrics.createTimingMetric(sparkContext, &34;)) override def requiredChildDistribution: Seq[Distribution] =
  • sparksql序列化異常
    在sparksql中顯示的指定了mapjoin,導致廣播的數據量太大,導致序列化超過指定大小。To avoid this, increase spark.kryoserializer.buffer.max value.
  • left semi join和left join區別
    1.4 left semi joinselect*from wedw_dw.t_user t1left semi join wedw_dw.t_ordert2on t1.user_id = t2.user_id;如圖所示:只能展示a表的欄位,因為left semi join 只傳遞表的 join key 給 map 階段
  • Spark—15分鐘教程
    (ab[cd]{2,4})|(aa[abcde]{1,2/data/sales_parquet")'''SELECT *FROM sales_tableWHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})''''sales_table_execution_plan = sales_table.where( col(&
  • SPARK RDD 介紹
    一、什麼事RDD RDD(Resilient Distributed DataSet)叫做彈性分布式數據集,是spark中最基本的數據抽象,它代表一個不可變、可分區、裡面元素可並行計算的集合,RDD具有數據流模型得特點,自動容錯、位置感知性調度和可伸縮性,RDD允許用戶在執行多個查詢時顯示的將工作集緩存在內存中,後續的查詢能夠重用工作集
  • Spark 性能優化(一)——調優基本原則
    我們是在 YARN 框架下採用這個調整來實現 executor 數量改變的,一種典型辦法是,一個 host 只跑一個 worker,然後配置 spark.executor.cores 為 host 上 CPU core 的 N 分之一,同時設置spark.executor.memory
  • sparksql合併小文件
    查看sparksql支持的參數:spark-sql set -v需要注意這種方式對Spark的版本有要求,建議在Spark2.4.X及以上版本使用,示例: INSERT ... SELECT /*+ COALESCE(numPartitions) */ ... INSERT ...
  • spark job 裝載率統計
    可問題是 spark怎麼做迭代計算,難道是for循環嗎?我本來一直在像 能否通過一次shuffle解決問題,苦苦思索,並沒有這樣的實現。因為一次shuffle,怎麼做到多段之間的依賴關係呢,先給大家看看表結構Car station orderx1 A 1x1 B 2x1 C 3x2 A 1x2 B 2
  • Spark 性能優化(二)——數據傾斜優化
    , attr(3), attr(4).toInt, attr(5).toInt, attr(6))).toDSds: org.apache.spark.sql.Dataset[brower] = [id: int, time: bigint ... 5 more fields]34;sourceTable&執行新的查詢scala> val newSource = spark.sql(&34;)
  • 數據分析工程師面試集錦5——Spark面試指南
    /examples/jars/spark-examples_2.11-2.3.1.jar 1000030、Spark在提交程序的時候如何引入外部jar包?1)對於大小表join的時候,使用map-side join替換join;2)在join之前對表進行篩選,減少join的數據量3)避免出現笛卡爾積,關聯欄位最好不要有重複的值,可以在join之前做去重處理。
  • Hive優化之Spark執行引擎參數調優(二)
    案例1複雜任務執行失敗,大約有400行sql,較為複雜,join聚合函數操作較多。手動重試任務後仍然報錯。查看異常任務SQL發現任務中由10多個SQL語句構成,一個語句大概有200+行,union all、join、sum操作較多。
  • 大數據分析工程師面試集錦5——Spark面試指南
    在2.x之前,對於不同的功能,需要使用不同的Context,比如1.創建和操作RDD時,使用SparkContext2.使用Streaming時,使用StreamingContext3.使用SQL時,使用SQLContext4.使用Hive時,使用HiveContext在2.x中,為了統一上述的Context,引入SparkSession
  • Spark2.4 jdbc中加入hint
    Spark 我們知道是可以連接資料庫的,可以通過spark的API spark.read.jdbc 中可以讀取oracle的數據。但是很多時候我們讀取資料庫的時候不會全表讀,需要加入查詢條件,例如 創建時間。這個時候我們調用spark的jdbc的時候,需要指定查詢的分區。
  • 從0開始學習spark的學習筆記(1)
    3.1 spark下載選擇4. spark的關鍵概念:4.1 Master4.2 Worker4.3 Application4.4 RDD:彈性式分布式數據集(resilient distributed dataset)4.5 job4.6 SparkContext4.7 Driver5. saprk名詞解釋:
  • 解決SparkStreaming推測機制存在的數據傾斜問題
    過程在尋找這個問題答案的過程中,老劉正好在學習spark框架的實時計算模塊SparkStreaming,它裡面就有一個非常經典的問題,關於推測機制的!什麼是推測機制?如果有很多個task都在運行,很多task一下就完成了自己的任務,但是有一個task運行的很慢。在實時計算任務中,如果對實時性要求比較高,就算是兩三秒也要在乎這些。
  • 淺談Java的Fork/Join並發框架
    2.int middle = (this.start + end) / 2;            ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold);            ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold)
  • Spark 數據傾斜及其解決方案
    程序實現: 比如說在 Hive 中,經常遇到 count(distinct)操作,這樣會導致最終只有一個 reduce,我們可以先 group 再在外面包一層 count,就可以了;在 Spark 中使用 reduceByKey 替代 groupByKey 等。