Spark + Iceberg (一):開篇學習

2021-12-16 data duck

目標:從 iceberg 找到 spark 相關類就算成功

取得 plan:ReplaceData、MergeInto、DynamicFileFilterExec、ExtendedBatchScan

版本:spark 3.0.1,iceberg 0.11.0

數據源路徑:file:///Users/bjhl/tmp/icebergData

創建一個 maven 項目,pom.xml 文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>org.example</groupId>  <artifactId>spark-3.x-worker</artifactId>  <version>1.0-SNAPSHOT</version>  <inceptionYear>2008</inceptionYear>  <properties>    <scala.version>2.12.8</scala.version>  </properties>
<repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories>
<pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.1</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3-runtime</artifactId> <version>0.11.0</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.9.2</version> </dependency> </dependencies>
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting></project>

SparkSession 配置
    val spark = SparkSession      .builder()      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")       .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkSessionCatalog")      .config("spark.sql.catalog.hadoop_prod.warehouse", "file:///Users/bjhl/tmp/icebergData")       .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")       .appName(this.getClass.getSimpleName)      .master("local[*]")      .getOrCreate()

創建一張表
        val hdpCatalog = spark.sessionState.catalogManager.catalog("hadoop_prod").asInstanceOf[SparkCatalog]    val namespaces = Array("test")        val identifier = new SimpleLocalIdentifierImpl("/Users/bjhl/tmp/icebergData/test/table_a", namespaces)    val options = new util.HashMap[String, String]()        val schema = new StructType()      .add("c1", IntegerType, true)      .add("c2", StringType, true)      .add("c3", StringType, true)        hdpCatalog.createTable(identifier, schema, null, options)        spark.sql("insert into hadoop_prod.test.table_a VALUES (1, \"wlq\",\"zyc\")")

生成的結構如下

包含元數據信息和數據信息,test 類比 庫名,table_a 是表名

讀取並更新,列印執行計劃
    // 獲取 表結構信息    val df = spark.table("hadoop_prod.test.table_a")    df.printSchema()
df.show()// val dfTableA = spark.read.format("iceberg").load("/Users/bjhl/tmp/icebergData/default/table_a")// dfTableA.show()
spark.sql("merge into hadoop_prod.test.table_a t " + "using (select 1 as c1, \"zyc\" as c2, \"wlq\" as c3) s " + "on t.c1 = s.c1 " + "when matched " + "then update set t.c3 = s.c3").explain()
df.show()
println("讀寫取 iceberg 數據結束")

注意:這裡直接 read.format 方式一直使用的是 HiveCatalog 去獲取信息,老是報錯,目前還沒定位出問題

效果如下:

更新數據後,存儲路徑目錄變化如下

元數據和數據都有新增相應的版本,猜測是以快照的方式實現?

表結構

更新前數據

更新後數據

重點:物理執行計劃,如下

結合 iceberg

clone iceberg 代碼構建下,上面的類來自 iceberg-spark3-extensions

後面就是根據代碼驗證猜想的過程

結束語

注意:

        "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" 要設置,才能支持 merge into 等功能

疑問:

   1.  iceberg 是以什麼方式做的更新?

    2. 對於 iceberg 的存儲方式,spark 任務的運行過程哪個階段性能有所提升或者有所下降?

    3. 對於 iceberg 的實現方式,spark 基於其做了哪些優化?

相關焦點

  • Tip of the iceberg?
    Tip of the iceberg, you see, literally means the very tip or top of the iceberg, which is a gigantic piece of ice floating in the sea.
  • 一、Spark概述
    一、 Spark 是什麼定義Apache Spark是用於大規模數據(large-scala data)處理的統一(unified)分析引擎。2、易用性(可以通過java/scala/python/R開發spark應用程式)3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)4、兼容性(spark程序可以運行在standalone/yarn/mesos)Spark 框架模塊整個Spark 框架模塊包含
  • 英語口語表達:the tip of the iceberg
    今天要和大家分享的口語表達是:the tip of the iceberg.iceberg是「冰山」的意思,the tip of the iceberg 就是「冰山一角,重要情況或問題顯露出的小部分」。We have to pull the plug on this new service.
  • 「冰山一角」別說成「a corner of the iceberg」
    大家好,歡迎來的餅哥英語的頻道,今天我們分享一個非常有用且地道的表達——冰山一角, 這個短語的英文不是指「corner of a iceberg」,其正確的表達是:tip of the iceberg 冰山一角(重大問題的)冰山一角,端倪,明顯的一小部分
  • Spark學習記錄|RDD分區的那些事
    以前在工作中主要寫Spark SQL相關的代碼,對於RDD的學習有些疏漏。
  • 用Spark計算引擎執行FATE聯邦學習任務
    VMware招聘聯邦學習、隱私計算開發工程師相關文章在Juypter Notebook中構建聯邦學習任務雲原生聯邦學習平臺 KubeFATE 原理詳解用KubeFATE在K8s上部署聯邦學習FATE v1.5
  • Audio news: Big iceberg threatens village
    Audio news: Big iceberg threatens village chinadaily.com.cn 2018-07-18 16:25
  • Spark與深度學習框架——H2O、deeplearning4j、SparkNet
    深度學習是當前正在進行中的Spark項目之一。本文我們將介紹一些Spark能用的深度學習框架。深度學習因其高準確率及通用性,成為機器學習中最受關注的領域。這種算法在2011—2012年期間出現,並超過了很多競爭對手。最開始,深度學習在音頻及圖像識別方面取得了成功。此外,像機器翻譯之類的自然語言處理或者畫圖也能使用深度學習算法來完成。
  • Spark 2.0系列之SparkSession詳解
    的各種API,學習Spark的難度也會大大降低。SparkSession的功能首先,我們從一個Spark應用案例入手:SparkSessionZipsExample可以從JSON文件中讀取郵政編碼,通過DataFrame API進行分析,同時還能夠使用Spark SQL語句實施查詢。
  • 『 Spark 』2. spark 基本概念解析
    作者:李濤濤,通聯數據|優礦|量化工程師,關注大數據,量化投資博客:http://litaotao.github.io郵箱:taotao.engineer@gmail.comGitHub: http://github.com/litaotao寫在前面本系列是綜合了自己在學習spark
  • Spark-TFRecord: Spark將全面支持TFRecord
    從事 Hadoop 內核開發,目前專注於機器學習、深度學習大數據平臺的建設。簡介:在機器學習領域,Apache Spark 由於其支持 SQL 類型的操作以及高效的數據處理,被廣泛的用於數據預處理流程,同時 TensorFlow 作為廣受歡迎的深度學習框架被廣泛的用於模型訓練。
  • 我們在學習Spark的時候,到底在學習什麼?
    很多小夥伴在群裡或者私信留言問我關於Spark的學習路徑問題。Spark發展至今,應該說已經非常成熟了。是大數據計算領域不得不學習的框架。尤其是Spark在穩定性和社區發展的成熟度方面,基本可以吊打其他的大數據處理框架。
  • Jack 60s 美語講堂:tip of the iceberg 冰山一角 (附音頻)
    英文常用名:Jack曾用英文名:Jack、Michael國內實戰派英語教學老師曾任新東方、李陽瘋狂英語講師Hey,guys  I'm Jack 歡迎來到Jack 60s美語講堂今天我要分享的英語表達是:Tip of表示「...的尖端」,iceberg
  • 『 Spark 』13. Spark 2.0 Release Notes 中文版
    量化工程師,關注大數據,量化投資博客:http://litaotao.github.io郵箱:taotao.engineer@gmail.comGitHub: http://github.com/litaotao相關閱讀:寫在前面本系列是綜合了自己在學習
  • 簡化TensorFlow和Spark互操作性:LinkedIn開源Spark-TFRecord
    TensorFlow 是市場上最流行的深度學習框架,而 Apache Spark 仍然是被廣泛採用的數據計算平臺之一,從大型企業到初創公司都能見到它們的身影。很自然會有公司嘗試將這兩者結合起來。雖然有一些框架能夠讓 TensorFlow 適應 Spark,但互操作性挑戰的根源性往往在於數據級別上。
  • 【溫故知新】還記得 the tip of the iceberg 是什麼意思嗎?
  • Spark簡介
    作者:梁偉雄作者簡介:Spark愛好者前置知識在我們一起學習在Spark發展的過程中,Spark和Hive之間的整合就從未間斷,從一開始的Shark到後面的Spark SQL。目前,Hive使用的場景基本都可以使用Spark來替代。Hive更多的用於T+1的場景,而Spark更加靈活,數據處理速度會更快,對時間要求高的業務場景可以考慮Spark的使用。
  • 『 Spark 』9. 搭建 IPython + Notebook + Spark 開發環境
    量化工程師,關注大數據,量化投資博客:http://litaotao.github.io郵箱:taotao.engineer@gmail.comGitHub: http://github.com/litaotao寫在前面本系列是綜合了自己在學習
  • 【Spark重點難點】SparkSQL YYDS(上)!
    而 DataFrame的表達能力卻很弱,它定義了一套DSL算子(Domain Specific Language)。注意:所謂的高階函數指的是,指的是形參為函數的函數,或是返回類型為函數的函數。import org.apache.spark.sql.types.
  • Spark調優 | 一文搞定 Join 優化
    如下圖所示,sql語句被語法解析(SQL AST)成查詢計劃,或者我們通過Dataset/DataFrame提供的APIs組織成查詢計劃,查詢計劃分為兩大類:邏輯計劃和物理計劃,這個階段通常叫做邏輯計劃,經過語法分析(Analyzer)、一系列查詢優化(Optimizer)後得到優化後的邏輯計劃,最後被映射成物理計劃,轉換成RDD執行。