SHC:使用 Spark SQL 高效地讀寫 HBase

2021-03-02 過往記憶大數據

本文原文(點擊下面閱讀原文即可進入) https://www.iteblog.com/archives/2522.html。

Apache Spark 和 Apache HBase 是兩個使用比較廣泛的大數據組件。很多場景需要使用 Spark 分析/查詢 HBase 中的數據,而目前 Spark 內置是支持很多數據源的,其中就包括了 HBase,但是內置的讀取數據源還是使用了 TableInputFormat 來讀取 HBase 中的數據。這個 TableInputFormat 有一些缺點:

一個 Task 裡面只能啟動一個 Scan 去 HBase 中讀取數據;

TableInputFormat 中不支持 BulkGet;

不能享受到 Spark SQL 內置的 catalyst 引擎的優化。

基於這些問題,來自 Hortonworks 的工程師們為我們帶來了全新的 Apache Spark—Apache HBase Connector,下面簡稱 SHC。通過這個類庫,我們可以直接使用 Spark SQL 將 DataFrame 中的數據寫入到 HBase 中;而且我們也可以使用 Spark SQL 去查詢 HBase 中的數據,在查詢 HBase 的時候充分利用了 catalyst 引擎做了許多優化,比如分區修剪(partition pruning),列修剪(column pruning),謂詞下推(predicate pushdown)和數據本地性(data locality)等等。因為有了這些優化,通過 Spark 查詢 HBase 的速度有了很大的提升。

注意:SHC 同時還提供了將 DataFrame 中的數據直接寫入到 HBase 中,但是整個代碼並沒有什麼優化的地方,所以本文對這部分不進行介紹。感興趣的讀者可以直接到這裡查看相關寫數據到 HBase 的代碼。

SHC 是如何實現查詢優化的呢

SHC 主要使用下面的幾種優化,使得 Spark 獲取 HBase 的數據掃描範圍得到減少,提高了數據讀取的效率。

將使用 Rowkey 的查詢轉換成 get 查詢

我們都知道,HBase 中使用 Get 查詢的效率是非常高的,所以如果查詢的過濾條件是針對 RowKey 進行的,那麼我們可以將它轉換成 Get 查詢。為了說明這點,我們使用下面的例子進行說明。假設我們定義好的 HBase catalog 如下:

那麼如果有類似下面的查詢

因為查詢條件直接是針對 RowKey 進行的,所以這種情況直接可以轉換成 Get 或者 BulkGet 請求的。第一個 SQL 查詢過程類似於下面過程

如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

後面兩條 SQL 查詢其實是等效的,在實現上會把 key in (x1, x2, x3..) 轉換成 (key == x1) or (key == x2) or ... 的。整個查詢流程如下:

如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

如果我們的查詢裡面有 Rowkey 還有其他列的過濾,比如下面的例子

sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 and col7 = 'xxx'")

那麼上面的 SQL 翻譯成 HBase 的下面查詢

如果有多個 and 條件,都是使用 SingleColumnValueFilter 進行過濾的,這個都好理解。如果我們有下面的查詢

sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 or col7 = 'xxx'")

那麼在 shc 裡面是怎麼進行的呢?事實上,如果碰到非 RowKey 的過濾,那麼這種查詢是需要掃描 HBase 的全表的。上面的查詢在 shc 裡面就是將 HBase 裡面的所有數據拿到,然後傳輸到 Spark ,再通過 Spark 裡面進行過濾,可見 shc 在這種情況下效率是很低下的。

注意,上面的查詢在 shc 返回的結果是錯誤的。具體原因是在將 id = 1 or col7 = 'xxx' 查詢條件進行合併時,丟棄了所有的查找條件,相當於返回表的所有數據。定位到代碼可以參見下面的

同理,類似於下面的查詢在 shc 裡面其實都是全表掃描,並且將所有的數據返回到 Spark 層面上再進行一次過濾。

很顯然,這種方式查詢效率並不高,一種可行的方案是將算子下推到 HBase 層面,在 HBase 層面通過 SingleColumnValueFilter 過濾一部分數據,然後再返回到 Spark,這樣可以節省很多數據的傳輸。

組合 RowKey 的查詢優化

shc 還支持組合 RowKey 的方式來建表,具體如下: 

上面的 col00 和 col01 兩列組合成一個 rowkey,並且 col00 排在前面,col01 排在後面。比如 col00 ='row002',col01 = 2,那麼組合的 rowkey 為 row002\x00\x00\x00\x02。那麼在組合 Rowkey 的查詢 shc 都有哪些優化呢?現在我們有如下查詢

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0").show()

根據上面的信息,RowKey 其實是由 col00 和 col01 組合而成的,那麼上面的查詢其實可以將 col00 和 col01 進行拼接,然後組合成一個 RowKey,然後上面的查詢其實可以轉換成一個 Get 查詢。但是在 shc 裡面,上面的查詢是轉換成一個 scan 和一個 get 查詢的。scan 的 startRow 為 row000,endRow 為 row000\xff\xff\xff\xff;get 的 rowkey 為 row000\xff\xff\xff\xff,然後再將所有符合條件的數據返回,最後再在 Spark 層面上做一次過濾,得到最後查詢的結果。因為 shc 裡面組合鍵查詢的代碼還沒完善,所以當前實現應該不是最終的。

在 shc 裡面下面兩條 SQL 查詢下沉到 HBase 的邏輯一致

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000'").show()

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0").show()

唯一區別是在 Spark 層面上的過濾。

scan 查詢優化

如果我們的查詢有 < 或 > 等查詢過濾條件,比如下面的查詢條件:

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 > 'row000' and col00 < 'row005'").show()

這個在 shc 裡面轉換成 HBase 的過濾為一條 get 和 一個 scan,具體為 get 的 Rowkey 為 row0005\xff\xff\xff\xff;scan 的 startRow 為 row000,endRow 為 row005\xff\xff\xff\xff,然後將查詢的結果返回到 spark 層面上進行過濾。

總體來說,shc 能在一定程度上對查詢進行優化,避免了全表掃描。但是經過評測,shc 其實還有很多地方不夠完善,算子下沉並沒有下沉到 HBase 層面上進行。目前這個項目正在和 hbase 自帶的 connectors 進行整合(https://github.com/apache/hbase-connectors),相關 issue 參見 Enhance the current spark-hbase connector。

相關焦點

  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    文中使用Scala對Spark SQL進行講解,並且代碼大多都能在spark-shell中運行,關於這點請知曉。概述相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上的運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。
  • HBase的讀寫和javaAPI的使用
    一、hbase系統管理表hbase:namespace,記錄了hbase中所有namespace的信息 ,當前系統下有哪些namespace信息scan 'hbase:namespace'hbase:meta,記錄了region信息scan 'hbase:meta'二、讀寫思想client(get、scan)rowkey條件(1)由於rowkey是存儲在region
  • 0827-7.1.4-如何在CDP中使用Spark SQL CLI
    = "" ];then        ((i++))        sparksql=${sparksql}"spark.sql(\"$splitchar\").show();"     else        break     fi  doneelse  sparksql = "spark.sql(\"$sql\").show();
  • Spark SQL重點知識總結
    /spark/spark2.4.1/examples/src/main/resources/people.txt")val schemaString="name age"val filed=schemaString.split(" ").map(filename=> org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType
  • 搞懂Hadoop、MapReduce、Hive、HBase、YARN及Spark的區別與聯繫
    HRegionServer:HBase中最核心的模塊,主要負責響應用戶I/O請求,向HDFS文件系統中讀寫。hbase具體操作使用在HBase中,namespace命名空間,是對一組表的邏輯分組,類似RDBMS中的database,方便對表在業務上劃分。
  • 2小時入門SparkSQL編程
    我們使用pyspark進行RDD編程時,在Excutor上跑的很多時候就是Python代碼,當然,少數時候也會跑java字節碼。因此,使用SparkSQL的編程範式進行編程,我們能夠取得幾乎和直接使用scala/java進行編程相當的效率(忽略語法解析時間差異)。此外SparkSQL提供了非常方便的數據讀寫API,我們可以用它和Hive表,HDFS,mysql表,Cassandra,Hbase等各種存儲媒介進行數據交換。
  • 大數據分析工程師入門9-Spark SQL
    比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封裝了SparkContext和SQLContext。
  • SparkSQL基礎及實戰練習
    >    //(13,王英,女 , 80)    // val common: RDD[(String, (Int, Int))] = com1.join(com2)    // common.foreach(println)    // (12,楊春,女 , (70,70))    // (13,王英,女 , (73,80))    // 使用
  • 面試必知的 Spark SQL 幾種 Join 實現
    SparkSQL總體流程介紹在闡述Join實現之前,我們首先簡單介紹SparkSQL的總體流程,一般地,我們有兩種方式使用SparkSQL,一種是直接寫sql語句,這個需要有元資料庫支持,例如Hive等,另一種是通過Dataset/DataFrame編寫Spark應用程式。
  • Spark【面試】
     存的是和hdfs的映射關係,hive是邏輯上的數據倉庫,實際操作的都是hdfs上的文件,HQL就是用sql語法來寫的mr程序。8、Hive與關係型資料庫的關係?沒有關係,hive是數據倉庫,不能和資料庫一樣進行實時的CURD操作。
  • SparkSQL 50道練習題
    import org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql*/    spark.udf.register("sub",(str:String,num1:Int,num2:Int)=>str.substring(num1,num2))    spark.sql("select sno,sname,sbirthday from student where sub(Sbirthday,0,5) = (select sub(
  • HBase二級索引方案
    為了HBase的數據查詢更高效、適應更多的場景, 諸如使用非rowkey欄位檢索也能做到秒級響應,或者支持各個欄位進行模糊查詢和多欄位組合查詢等, 因此需要在HBase上面構建二級索引, 以滿足現實中更複雜多樣的業務需求。
  • 使用Golang實現Spark UDF
    本文記錄了如何實現Golang寫spark udf。注意以下操作也可以對UDAF使用。就是聚合的UDF。為何做這個操作你可能會質疑這個方案能否落地。結論是這個方案很好用也很有效。我們的spark任務當然也需要集成這些特點:在生產環境運行了個一段時間、經過驗證和測試的GO語言倉庫維護了一些自動化pipeline,讓GO倉庫和Schema保持同步權衡下來,與其重新建立自動化pipeline和倉庫、CI任務,我們選擇把go代碼打進jar包可以作為spark udf使用這個操作。
  • SparkSQL操作insert overwrite table到hive慢
    """ | INSERT OVERWRITE TABLE app.table_name PARTITION (dt) | SELECT | id, | name, | class, | dt | FROM tempMonth """.stripMargin sparkSession.sql
  • Spark-TFRecord: Spark將全面支持TFRecord
    現有的項目和之前的嘗試在 Spark-TFRecord 項目之前,社區提供 Spark-TensorFlow-Connector , 在 Spark 中讀寫 TFRecord 。Spark-TensorFlow-Connector 是 TensorFlow 生態圈的一部分,並且是由 DataBricks,spark 的創始公司提供。
  • RDD和SparkSQL綜合應用
    通常,我們會使用SparkSQL的DataFrame來負責項目中數據讀寫相關的任務。對於一些能夠表達為表合併,表拼接,表分組等常規SQL操作的任務,我們也自然傾向於使用DataFrame來表達我們的邏輯。
  • 詭異 | Spark使用get_json_object函數
    一、問題現象:使用spark sql調用get_json_object函數後,報如下錯誤:yarn 容器被
  • 獨家 | PySpark和SparkSQL基礎:如何利用Python編程執行Spark(附代碼)
    import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark.context import SparkContextfrom pyspark.sql.functionsimport *from pyspark.sql.typesimport *from
  • Spark 2.0系列之SparkSession詳解
    SparkSession的功能首先,我們從一個Spark應用案例入手:SparkSessionZipsExample可以從JSON文件中讀取郵政編碼,通過DataFrame API進行分析,同時還能夠使用Spark SQL語句實施查詢。