Spark調優 | 一文搞定 Join 優化

2021-12-25 大數據技術團隊

SparkSQL總體流程

在闡述Join實現之前,我們首先簡單介紹SparkSQL的總體流程,一般地,我們有兩種方式使用SparkSQL,一種是直接寫sql語句,這個需要有元資料庫支持,例如Hive等,另一種是通過Dataset/DataFrame編寫Spark應用程式。如下圖所示,sql語句被語法解析(SQL AST)成查詢計劃,或者我們通過Dataset/DataFrame提供的APIs組織成查詢計劃,查詢計劃分為兩大類:邏輯計劃和物理計劃,這個階段通常叫做邏輯計劃,經過語法分析(Analyzer)、一系列查詢優化(Optimizer)後得到優化後的邏輯計劃,最後被映射成物理計劃,轉換成RDD執行。

對於語法解析、語法分析以及查詢優化,本文不做詳細闡述,本文重點介紹Join的物理執行過程。

Join基本要素

如下圖所示,Join大致包括三個要素:Join方式、Join條件以及過濾條件。其中過濾條件也可以通過AND語句放在Join條件中。

Spark支持所有類型的Join,包括:

inner join

left outer join

right outer join

full outer join

left semi join

left anti join

下面分別闡述這幾種Join的實現。

Join基本實現流程

總體上來說,Join的基本實現流程如下圖所示,Spark將參與Join的兩張表抽象為流式遍歷表(streamIter)和查找表(buildIter),通常streamIter為大表,buildIter為小表,我們不用擔心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。

在實際計算時,spark會基於streamIter來遍歷,每次取出streamIter中的一條記錄rowA,根據Join條件計算keyA,然後根據該keyA去buildIter中查找所有滿足Join條件(keyB==keyA)的記錄rowBs,並將rowBs中每條記錄分別與rowAjoin得到join後的記錄,最後根據過濾條件得到最終join的記錄。

從上述計算過程中不難發現,對於每條來自streamIter的記錄,都要去buildIter中查找匹配的記錄,所以buildIter一定要是查找性能較優的數據結構。spark提供了三種join實現:sort merge join、broadcast join以及hash join。

sort merge join實現

要讓兩條記錄能join到一起,首先需要將具有相同key的記錄在同一個分區,所以通常來說,需要做一次shuffle,map階段根據join條件確定每條記錄的key,基於該key做shuffle write,將可能join到一起的記錄分到同一個分區中,這樣在shuffle read階段就可以將兩個表中具有相同key的記錄拉到同一個分區處理。前面我們也提到,對於buildIter一定要是查找性能較優的數據結構,通常我們能想到hash表,但是對於一張較大的表來說,不可能將所有記錄全部放到hash表中,另外也可以對buildIter先排序,查找時按順序查找,查找代價也是可以接受的,我們知道,spark shuffle階段天然就支持排序,這個是非常好實現的,下面是sort merge join示意圖。

在shuffle read階段,分別對streamIter和buildIter進行merge sort,在遍歷streamIter時,對於每條記錄,都採用順序查找的方式從buildIter查找對應的記錄,由於兩個表都是排序的,每次處理完streamIter的一條記錄後,對於streamIter的下一條記錄,只需從buildIter中上一次查找結束的位置開始查找,所以說每次在buildIter中查找不必重頭開始,整體上來說,查找性能還是較優的。

broadcast join實現

為了能具有相同key的記錄分到同一個分區,我們通常是做shuffle,那麼如果buildIter是一個非常小的表,那麼其實就沒有必要大動幹戈做shuffle了,直接將buildIter廣播到每個計算節點,然後將buildIter放到hash表中,如下圖所示。

從上圖可以看到,不用做shuffle,可以直接在一個map中完成,通常這種join也稱之為map join。那麼問題來了,什麼時候會用broadcast join實現呢?這個不用我們擔心,spark sql自動幫我們完成,當buildIter的估計大小不超過參數spark.sql.autoBroadcastJoinThreshold設定的值(默認10M),那麼就會自動採用broadcast join,否則採用sort merge join。

hash join實現

除了上面兩種join實現方式外,spark還提供了hash join實現方式,在shuffle read階段不對記錄排序,反正來自兩格表的具有相同key的記錄會在同一個分區,只是在分區內不排序,將來自buildIter的記錄放到hash表中,以便查找,如下圖所示。

不難發現,要將來自buildIter的記錄放到hash表中,那麼每個分區來自buildIter的記錄不能太大,否則就存不下,默認情況下hash join的實現是關閉狀態,如果要使用hash join,必須滿足以下四個條件:

buildIter總體估計大小超過spark.sql.autoBroadcastJoinThreshold設定的值,即不滿足broadcast join條件

開啟嘗試使用hash join的開關,spark.sql.join.preferSortMergeJoin=false

每個分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值,即shuffle read階段每個分區來自buildIter的記錄要能放到內存中

streamIter的大小是buildIter三倍以上

所以說,使用hash join的條件其實是很苛刻的,在大多數實際場景中,即使能使用hash join,但是使用sort merge join也不會比hash join差很多,所以儘量使用hash

下面我們分別闡述不同Join方式的實現流程。

inner join

inner join是一定要找到左右表中滿足join條件的記錄,我們在寫sql語句或者使用DataFrmae時,可以不用關心哪個是左表,哪個是右表,在spark sql查詢優化階段,spark會自動將大表設為左表,即streamIter,將小表設為右表,即buildIter。這樣對小表的查找相對更優。其基本實現流程如下圖所示,在查找階段,如果右表不存在滿足join條件的記錄,則跳過。

left outer join

left outer join是以左表為準,在右表中查找匹配的記錄,如果查找失敗,則返回一個所有欄位都為null的記錄。我們在寫sql語句或者使用DataFrmae時,一般讓大表在左邊,小表在右邊。其基本實現流程如下圖所示。

right outer join

right outer join是以右表為準,在左表中查找匹配的記錄,如果查找失敗,則返回一個所有欄位都為null的記錄。所以說,右表是streamIter,左表是buildIter,我們在寫sql語句或者使用DataFrmae時,一般讓大表在右邊,小表在左邊。其基本實現流程如下圖所示。

full outer join

full outer join相對來說要複雜一點,總體上來看既要做left outer join,又要做right outer join,但是又不能簡單地先left outer join,再right outer join,最後union得到最終結果,因為這樣最終結果中就存在兩份inner join的結果了。因為既然完成left outer join又要完成right outer join,所以full outer join僅採用sort merge join實現,左邊和右表既要作為streamIter,又要作為buildIter,其基本實現流程如下圖所示。

由於左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較key,如果key相等,則joinrowA和rowB,並將rowA和rowB分別更新到左表和右表的下一條記錄;如果keyA<keyB,則說明右表中沒有與左表rowA對應的記錄,那麼joinrowA與nullRow,緊接著,rowA更新到左表的下一條記錄;如果keyA>keyB,則說明左表中沒有與右表rowB對應的記錄,那麼joinnullRow與rowB,緊接著,rowB更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄全部處理完。

left semi join

left semi join是以左表為準,在右表中查找匹配的記錄,如果查找成功,則僅返回左邊的記錄,否則返回null,其基本實現流程如下圖所示。

left anti join

left anti join與left semi join相反,是以左表為準,在右表中查找匹配的記錄,如果查找成功,則返回null,否則僅返回左邊的記錄,其基本實現流程如下圖所示。

總結

Join是資料庫查詢中一個非常重要的語法特性,在資料庫領域可以說是「得join者的天下」,SparkSQL作為一種分布式數據倉庫系統,給我們提供了全面的join支持,並在內部實現上無聲無息地做了很多優化,了解join的實現將有助於我們更深刻的了解我們的應用程式的運行軌跡。

擴展閱讀:【阿里巴巴大數據之路】資料,公眾號「數據愛好者社區」後臺回復「666」,轉發即可下載。

相關焦點

  • Spark調優101
    但是,作為一款開源的項目,Spark並不對其運行效率負責,也就是說完成Spark程序的開發僅僅是萬裡長徵的第一步,更艱巨的挑戰來自於如何對Spark程序進行調優,從而穩定高效地執行日常計算任務。Spark優化策略 針對不同場景的Spark應用而言,調優策略可能迥然不同,歸納而言有以下幾個方面:1.  基於Spark參數的調優:直接調整Spark集群配置和運行時參數可以極大地改善了程序執行效率。2.
  • 發現華為FI性能調優文檔對Spark Broadcast Join的一處誤解
    一、前言今天在查看華為FI性能調優文檔時,發現有下面一處介紹:進行廣播操作,對表有要求:1. 至少有一個表不是空表;2. 表不能是「external table」;3.  2G --conf spark.sql.hive.convertMetastoreParquet=false spark.sql.crossJoin.enabled=true說明:spark.sql.hive.convertMetastoreParquet=false是為了解決spark查詢hive表時,所有欄位內容解析都是
  • Spark性能優化指南
    繼基礎篇講解了每個Spark開發人員都必須熟知的開發調優與資源調優之後,本文作為《Spark性能優化指南》的高級篇,將深入分析數據傾斜調優與shuffle調優,以解決更加棘手的性能問題。調優概述有的時候,我們可能會遇到大數據計算中一個最棘手的問題——數據傾斜,此時Spark作業的性能會比期望差很多。
  • Spark性能優化指南——基礎篇
    我們需要根據不同的業務場景以及數據情況,對Spark作業進行綜合性的分析,然後進行多個方面的調節和優化,才能獲得最佳性能。筆者根據之前的Spark作業開發經驗以及實踐積累,總結出了一套Spark作業的性能優化方案。整套方案主要分為開發調優、資源調優、數據傾斜調優、shuffle調優幾個部分。
  • Spark SQL 之 Join 實現
    如下圖所示,sql語句被語法解析(SQL AST)成查詢計劃,或者我們通過Dataset/DataFrame提供的APIs組織成查詢計劃,查詢計劃分為兩大類:邏輯計劃和物理計劃,這個階段通常叫做邏輯計劃,經過語法分析(Analyzer)、一系列查詢優化(Optimizer)後得到優化後的邏輯計劃,最後被映射成物理計劃,轉換成RDD執行。
  • Spark性能優化指南——高級篇
    源 / 頂級程式設計師   文 / 李雪蕤Spark性能優化指南——基礎篇繼基礎篇講解了每個Spark開發人員都必須熟知的開發調優與資源調優之後,本文作為《Spark性能優化指南》的高級篇,將深入分析數據傾斜調優與shuffle調優,以解決更加棘手的性能問題。
  • 面試必知的 Spark SQL 幾種 Join 實現
    如下圖所示,sql語句被語法解析(SQL AST)成查詢計劃,或者我們通過Dataset/DataFrame提供的APIs組織成查詢計劃,查詢計劃分為兩大類:邏輯計劃和物理計劃,這個階段通常叫做邏輯計劃,經過語法分析(Analyzer)、一系列查詢優化(Optimizer)後得到優化後的邏輯計劃,最後被映射成物理計劃,轉換成RDD執行。
  • Spark Streaming 調優實踐 | 附:贈書活動
    改動之後,我們可以監控 GC 的頻率和時間消耗,看看有沒有達到優化的效果。對於優化 GC,主要還是從降低全局 GC 的頻率出發,executor 中對於 GC 優化的配置可以通過 spark.executor.extraJavaOptions 來配置。4.
  • 大數據入門:SparkCore開發調優原則
    在大數據計算引擎當中,Spark受到的重視是越來越多的,尤其是對數據處理實時性的要求越來越高,Hadoop原生的MapReduce引擎受到詬病,Spark的性能也需要不斷調整優化。今天的大數據入門分享,我們就來講講SparkCore開發調優原則。
  • Apache Spark 中支持的七種 Join 類型簡介
    在介紹下文之前,假設我們有顧客(customer)和訂單(order)相關的兩張表,如下:scala> val order = spark.sparkContext.parallelize(Seq( | (1, 101,2500), (2,102,1110), (3,103,500), (4 ,102,400) | )).toDF
  • 黑馬程式設計師:技術筆記大數據面試題之spark相關(二)
    昨天分享了大數據面試題之spark相關一,看到有很大的反響,今天就分享接下來的二,希望能更好的幫助到大家!spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,groupby等,而且通過DAG圖可以實現良好的容錯。13.RDD機制? 答:rdd分布式彈性數據集,簡單的理解成一種數據結構,是spark框架上的通用貨幣。
  • GC調優在Spark應用中的實踐
    圖 1 分年代的Heap結構 而G1 GC則完全改變了這一傳統思路。我們可以通過spark.storage.memoryFraction參數調節這兩塊內存的比例,Spark會控制緩存RDD總大小不超過heap空間體積乘以這個參數所設置的值,而這塊緩存RDD的空間中沒有使用的部分也可以為JVM運行時所用。因此,分析Spark應用GC問題時應當分別分析兩部分內存的使用情況。
  • Spark 數據傾斜及其解決方案
    業務邏輯: 我們從業務邏輯的層面上來優化數據傾斜,比如要統計不同城市的訂單情況,那麼我們單獨對這一線城市來做 count,最後和其它城市做整合。程序實現: 比如說在 Hive 中,經常遇到 count(distinct)操作,這樣會導致最終只有一個 reduce,我們可以先 group 再在外面包一層 count,就可以了;在 Spark 中使用 reduceByKey 替代 groupByKey 等。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • Spark【面試】
    中提供了大量的RDD操作如join,groupby等,而且通過DAG圖可以實現良好的容錯4、為什麼要用flume導入hdfs,hdfs的構架是怎樣的flume可以實時的導入數據到hdfs中,當hdfs上的文件達到一個指定大小的時候會形成一個文件,或者超過指定時間的話也形成一個文件
  • 2小時入門Spark之MLlib
    一,MLlib基本介紹MLlib是Spark的機器學習庫,包括以下主要功能。實用工具:線性代數,統計,數據處理等工具特徵工程:特徵提取,特徵轉換,特徵選擇常用算法:分類,回歸,聚類,協同過濾,降維模型優化:模型評估,參數優化。MLlib庫包括兩個不同的部分。
  • Spark SQL重點知識總結
    一、Spark SQL的概念理解Spark SQL是spark套件中一個模板,它將數據的計算任務通過SQL的形式轉換成了RDD的計算,類似於Hive
  • 【譯】Oracle調優技巧22:Hash Outer Join
    查找Oracle相關優化過程中,遇到這此系列文章,感覺解釋的簡單易懂,翻譯過來分享給大家。順序以個人查找優化相關的次序為主,可通過查看原文看作者原版文章。原文若無法直接打開,請自行通過梯子嘗試。然而在 Hash outer join 中,這個決策過程變得無關緊要,因為默認情況下父表會被選為hash表,而不管它是小表還是大表。原因是連接條件(joining condition)將決定哪個表將成為此 Hash outer join 中的 hash 表,而不是cost , 後者是用於標識 hash equi-join中hash表的度量。
  • Spark SQL 重點知識總結
    ↓推薦關注↓一、Spark SQL的概念理解Spark
  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    文中使用Scala對Spark SQL進行講解,並且代碼大多都能在spark-shell中運行,關於這點請知曉。概述相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上的運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。