大數據分析工程師入門9-Spark SQL

2020-12-05 海牛大數據

本文為《大數據分析師入門課程》系列的第9篇,在本系列的第8篇-Spark基礎中,已經對Spark做了一個入門介紹,在此基礎上本篇拎出Spark SQL,主要站在使用者的角度來進行講解,需要注意的是本文中的例子的代碼均使用Scala語言。

主要包括以下內容:

你該了解的Spark SQL簡單入門操作不得不說的數據源一你該了解的Spark SQL

1.什麼是Spark SQL?

Spark SQL是Spark專門用來處理結構化數據的模塊,是Spark的核心組件,在1.0時發布。

SparkSQL替代的是HIVE的查詢引擎,HIVE的默認引擎查詢效率低是由於其基於MapReduce實現SQL查詢,而MapReduce的shuffle是基於磁碟的。

2.Spark SQL特性

其實最初Spark團隊推出的是Shark-基於Hive對內存管理、物理計劃、執行做了優化,底層使用Spark基於內存的計算引擎,對比Hive性能提升一個數量級。

即便如此高的性能提升,但是由於Shark底層依賴Hive的語法解析器、查詢優化器等組件制約其性能的進一步提升。最終Spark團隊放棄了Shark,推出了Spark SQL項目,其具備以下特性:

標準的數據連接,支持多種數據源多種性能優化技術組件的可擴展性支持多語言開發:Scala、Java、Python、R兼容Hive3.Spark SQL可以做什麼?

大數據處理使用SQL進行大數據處理,使傳統的RDBMS人員也可以進行大數據處理,不需要掌握像mapreduce的編程方法。

使用高級API進行開發SparkSQL支持SQL API,DataFrame和Dataset API多種API,使用這些高級API進行編程和採用Sparkcore的RDD API 進行編程有很大的不同。

使用RDD進行編程時,開發人員在採用不同的程式語言和不同的方式開發應用程式時,其應用程式的性能千差萬別,但如果使用DataFrame和Dataset進行開發時,資深開發人員和初級開發人員開發的程序性能差異很小,這是因為SparkSQL 內部使用Catalyst optimizer 對執行計劃做了很好的優化。

二簡 單 入 門 操 作

1.構建入口

Spark SQL中所有功能的入口點是SparkSession類-Spark 2.0引入的新概念,它為用戶提供統一的切入點。

早期Spark的切入點是SparkContext,通過它來創建和操作數據集,對於不同的API需要不同的context。

比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封裝了SparkContext和SQLContext。

要創建一個 SparkSession使用SparkSession.builder():

import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate()

2.創建DataFrame

在一個SparkSession中,應用程式可以從結構化的數據文件、Hive的table、外部資料庫和RDD中創建一個DataFrame。

舉個例子, 下面就是基於一個JSON文件創建一個DataFrame:

val df =spark.read.json("examples/src/main/resources/people.json") // 顯示出DataFrame的內容df.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

3.執行SQL查詢

// 將DataFrame註冊成一個臨時視圖df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19|Justin|// +----+-------+

SparkSession的SQL函數可以讓應用程式以編程的方式運行SQL查詢,並將結果作為一個 DataFrame返回。

例子中createOrReplaceTempView創建的臨時視圖是session級別的,也就是會隨著session的消失而消失。如果你想讓一個臨時視圖在所有session中相互傳遞並且可用,直到Spark 應用退出,你可以建立一個全局的臨時視圖,全局的臨時視圖存在於系統資料庫global_temp中,我們必須加上庫名去引用它。

// 將一個DataFrame註冊成一個全局臨時視圖df.createGlobalTempView("people") // 注意這裡的global_tempspark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19|Justin|// +----+-------+// 新的session同樣可以訪問spark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19|Justin|// +----+-------+

4.DataFrame操作示例

importspark.implicits._//導入隱式轉換的包//列印schemadf.printSchema() // root// |-- age: long (nullable = true)// |-- name: string (nullable = true)//選擇一列進行列印df.select("name").show() // +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+//年齡加1df.select($"name", $"age" +1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+//選取年齡大於21的df.filter($"age" > 21).show() // +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+//聚合操作df.groupBy("age").count().show() // +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+

5.創建DataSet

Dataset和RDD比較類似,與RDD不同的是實現序列化和反序列化的方式,RDD是使用Java serialization或者Kryo,而Dataset是使用Encoder。

Encoder的動態特性使得Spark可以在執行filtering、sorting和hashing等許多操作時無需把字節反序列化為對象。

// 一個簡單的Seq轉成DataSet,會有默認的schemaval primitiveDS = Seq(1, 2, 3).toDS().show// +-----+// |value|// +-----+// | 1|// | 2|// | 3|// +-----+ case classPerson(name: String, age: Long) // 通過反射轉換為DataSetval caseClassDS = Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// DataFrame指定一個類則為DataSetval path = "examples/src/main/resources/people.json"val peopleDS = spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

通過上述的代碼可以看出創建DataSet的代碼很簡單,一個toDs就可以自動推斷出schema的類型,讀取json這種結構化的數據得到的是一個DataFrame,再指定它的類則為DataSet。

6.RDD的互操作性

RDD的互操作性指的是RDD和DataFrame的相互轉換,DataFrame轉RDD很簡單,複雜的是RDD轉DataFrame。

目前Spark SQL有兩種方法:

反射推斷Spark SQL 的 Scala 接口支持自動轉換一個包含 Case Class的 RDD 為DataFrame。Case Class 定義了表的Schema。Case class 的參數名使用反射讀取並且成為了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 這樣的複雜類型,這個 RDD 能夠被隱式轉換成一個 DataFrame 然後被註冊為一個表。

// 開啟隱式轉換import spark.implicits._ // 讀入文本文件並最終轉化成DataFrameval peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()// 將DataFrame註冊成表peopleDF.createOrReplaceTempView("people") // 執行一條sql查詢val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // 通過map操作後得到的是RDDteenagersDF.map(teenager => "Name: " +teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+

另一種更加簡單的操作是將RDD中每一行類型變為tuple類型,然後使用toDF依次賦予欄位名,需要注意的是使用tuple最高可以支持22個欄位。

// 開啟隱式轉換import spark.implicits._ // 讀入文本文件並最終轉化成DataFrameval peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => (attributes(0), attributes(1).trim.toInt)) .toDF("name","age")//peopleDF: org.apache.spark.sql.DataFrame = [name:string, age: int]

構造Schema在無法提前定義schema的情況下,RDD轉DataFrame或者DataSet需要構造Schema。

構建一個Schema並將它應用到一個已存在的RDD編程接口需要以下四個步驟:

a.從原始的RDD創建一個tuple或者列表類型的RDD

b.創建一個StructType來匹配RDD中的結構

c.將生成的RDD轉換成Row類型的RDD

d.通過createDataFrame方法將Schema應用到RDD

//需要導入類型相關的包import org.apache.spark.sql.types._ //讀取hdfs上的文本文件,保存到rdd中val peopleRDD =spark.sparkContext.textFile("examples/src/main/resources/people.txt") // 這裡的schema是一個字符串,可以來自於其他未知內容的文件,需要注意的是-這裡明確寫出來只是為了演示,並不代表提前知道schema信息。val schemaString = "name age"// 將有schema信息的字符串轉變為StructField類型val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable =true)) //通過StructType方法讀入schemaval schema = StructType(fields) // 將RDD轉換成Row類型的RDDval rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // 應用schema信息到Row類型的RDDval peopleDF = spark.createDataFrame(rowRDD,schema)

三不得不說的數據源

在工作中使用Spark SQL進行處理數據的第一步就是讀取數據,Spark SQL通過統一的接口去讀取和寫入數據。主要是read和write操作,不同的數據源相應的Option(附加設置)會有所不同,下面通過例子來具體說明。

1.數據讀取

parquet1)讀取Parquet文件

parquet文件自帶schema,讀取後是DataFrame格式。

val usersDF =spark.read.load("examples/src/main/resources/users.parquet")//usersDF: org.apache.spark.sql.DataFrame = [name:string, favorite_color: string ... 1 more field]

2)解析分區信息

parquet文件中如果帶有分區信息,那麼SparkSQL會自動解析分區信息。比如,這樣一份人口數據按照gender和country進行分區存儲,目錄結構如下:

test└── spark-sql └── test ├──gender=male │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └──gender=female │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...

通過spark.read.load讀取該目錄下的文件SparkSQL將自動解析分區信息,返回的DataFrame的Schema如下:

root|--name:string(nullable=true)|--age:long(nullable=true)|--gender:string(nullable=true)|--country:string(nullable=true)

目前自動解析分區支持數值類型和字符串類型。

自動解析分區類型的參數為:spark.sql.sources.partitionColumnTypeInference.enabled,默認值為true。可以關閉該功能,直接將該參數設置為disabled。此時,分區列數據格式將被默認設置為string類型,不會再進行類型解析。

3)Schema合併

如果讀取的多個parquet文件中的Schema信息不一致,Spark SQL可以設置參數進行合併,但是Schema合併是一個高消耗的操作,在大多數情況下並不需要,所以Spark SQL從1.5.0開始默認關閉了該功能。

可以通過下面兩種方式開啟該功能:

a.讀取文件的時候,開啟合併功能,只對本次讀取文件進行合併Schema操作

b.設置全局SQL選項spark.sql.parquet.mergeSchema為true,每次讀取文件都會進行合併Schema操作

具體請看下面的例子:

// sqlContext是之前例子中生成的// 導入隱式轉換import sqlContext.implicits._ // 創建一個簡單的DataFrame並保存val df1 = sc.makeRDD(1 to 5).map(i => (i, i *2)).toDF("single", "double")df1.write.parquet("data/test_table/key=1") // 創建另一個DataFrame,注意欄位名val df2 = sc.makeRDD(6 to 10).map(i => (i, i *3)).toDF("single", "triple")df2.write.parquet("data/test_table/key=2") // 讀取這兩個parquet文件,增加開啟合併Schema的設置val df3 =sqlContext.read.option("mergeSchema","true").parquet("data/test_table")df3.printSchema() // 不同名稱的欄位都保留下來了// root// |-- single: int (nullable = true)// |-- double: int (nullable = true)// |-- triple: int (nullable = true)// |-- key : int (nullable = true)

關於schema合併,有一點需要特別關注,那就是當不同parquet文件的schema有衝突時,合併會失敗,如同名的欄位,其類型不一致的情況。這時如果你讀取的是hive數據源,可能會出現讀取失敗或者讀取欄位值全部為NULL的情況。如果大家遇到類型場景,可以考慮是否是這個因素導致。

jsonjson文件和parquet文件一樣也是帶有schema信息,不過需要指明是json文件,才能準確的讀取。

val peopleDF =spark.read.format("json").load("examples/src/main/resources/people.json")//peopleDF: org.apache.spark.sql.DataFrame = [age:bigint, name: string]

MySQL讀取MySQL中的數據是通過jdbc的方式,需要知道要訪問的MySQL資料庫、表等信息,具體請看下面的代碼:

//MySQL數據的訪問ip、埠號和資料庫名val url ="jdbc:mysql://192.168.100.101:3306/testdb"//要訪問的表名val table = "test"//建立一個配置變量val properties = new Properties()//將用戶名存入配置變量properties.setProperty("user","root")//將密碼存入配置變量properties.setProperty("password","root")//需要傳入Mysql的URL、表名、配置變量val df = sqlContext.read.jdbc(url,table,properties)

這裡要注意的一個點是,讀取MySQL需要運行作業時,classpath下有MySQL的驅動jar,或者通過--jars添加驅動jar。

hive讀取hive數據的前提是要進行相關的配置,需要將hive-site.xml、core-site.xml、hdfs-site.xml以及hive的lib依賴放入spark的classpath下,或者在提交作業時通過--files和--jars來指定這些配置文件和jar包。之後,就可以很方便的使用hive的數據表了,示例代碼如下:

import java.io.File import org.apache.spark.sql.Rowimport org.apache.spark.sql.SparkSession case classRecord(key: Int, value: String) // 數倉地址指向默認設置val warehouseLocation = newFile("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() //增加支持hive特性 .getOrCreate() import spark.implicits._import spark.sql //使用sql創建一個表,並將hdfs中的文件導入到表中sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING) USING hive")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // 使用sql直接指向sql查詢sql("SELECT * FROM src").show()// +---+-------+// |key| value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...

2.數據保存

write保存用write方法,先看一個簡單的例子,將一個DataFrame保存到parquet文件中。

//選取DataFrame中的兩列保存到parquet文件中usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")

formatformat可以指定保存文件的格式,支持json、csv、orc等

//選取DataFrame中的兩列保存到json文件中usersDF.select("name", "favorite_color").write.format("json").save("namesAndFavColors.json")

mode在保存數據的時候,要不要考慮數據存不存在?是覆蓋還是追加呢?通過mode可以進行設置。

//選取DataFrame中的兩列保並追加到parquet文件中usersDF.select("name","favorite_color").write.mode(SaveMode.append).save("namesAndFavColors.parquet")

除了append還有下列選項:

總結

本文通過什麼是Spark SQL,有哪些特性,可以做什麼讓讀者對Spark SQL有個整體的了解,然後著重講解了如何進行入門操作和多種數據源操作。

掌握了以上技能,大數據分析工程師面對Spark SQL相關的工作一定可以遊刃有餘。

以上就是今天的內容了,如果對你有幫助,希望你能夠關注、點讚、轉發一鍵三連支持一下。需要完整學習線路和配套課堂筆記,請回復111。

相關焦點

  • 大數據分析工程師面試集錦3-SQL/SparkSql/HiveQL
    大數據分析工程師80%的時間都在與SQL打交道,通過SQL完成業務方的各種臨時性需求分析和常規性報表統計。熟練的SQL技能能夠大大提高工作效率。本文將SQL/SparkSql/HiveQL放在一起來梳理一份常見題型的面試題庫。
  • 數據分析工程師面試集錦5——Spark面試指南
    可以說Spark幾乎是企業搭建大數據平臺必備組件,作為數據分析工程師在工作中執行程序、調試程序、查詢數據都會和Spark打交道,所以對Spark知識的考察也就順理成章了。怎麼去準備Spark的面試?Spark 同時支持Scala、Python、Java 、R四種應用程式API編程接口和編程方式, 考慮到大數據處理的特性,一般會優先使用Scala進行編程。2、Spark有什麼特點,處理大數據有什麼優勢?
  • Spark在360商業數據部的應用實踐
    由於之前大部分數據分析工作都是通過使用hive命令行完成的,為了將遷移至SparkSQL的代價最小,360系統部的同事開發了SparkSQL的命令行版本spark-hive。原有的以hive 命令運行的腳本,簡單的改成spark-hive便可以運行。360系統部的同事也做了大量兼容性的工作。spark-hive目前已經比較穩定,成為數據分析的首選。
  • sparksql 窗口函數原理
    [ROWS | RANGEBETWEEN (CURRENTROW | (UNBOUNDED |[num]) PRECEDING) AND (CURRENTROW | ( UNBOUNDED | [num]) FOLLOWING)])上面是sql的語法,相信大家比較難看懂舉個例子:我們常用的row_number()來說select row_number() over(partitionby
  • 停止使用Pandas並開始使用Spark+Scala
    為什麼數據科學家和工程師應該考慮將Spark與Scala結合使用以替代Pandas,以及如何入門    以數據工程師的經驗,我發現在Pandas中建立數據管道經常需要我們定期增加資源,以跟上不斷增加的內存使用量。 此外,由於意外的數據類型或空值,我們經常會看到許多運行時錯誤。 通過將Spark與Scala結合使用,解決方案感覺更強大,重構和擴展更容易。
  • 深入對比數據科學工具箱: SparkR vs Sparklyr
    Parquet 是一種高性能列式存儲文件格式,比CSV文件強在內建索引,可以快速查詢數據,目前普遍應用在模型訓練過程。它要求先定義數據源表,再通過一系列dplyr操作惰性求值,直到執行 head() 或者 collect() 等觸發函數,才會執行計算過程,並將數據返回。如此設計是因為大數據集如果立即處理是無法優化數據處理流程的,通過惰性求值的方式,系統會在遠程機器上自動優化數據處理流程。
  • PySpark源碼解析,用Python調用高效Scala接口,搞定大規模數據分析
    然而,在數據科學領域,Python 一直佔據比較重要的地位,仍然有大量的數據工程師在使用各類 Python 數據處理和科學計算的庫,例如 numpy、Pandas、scikit-learn 等。同時,Python 語言的入門門檻也顯著低於 Scala。
  • 新手如何快速入門數據分析?
    CDA數據分析研究院原創作品, 轉載需授權隨著網際網路迅猛發展,各大公司沉澱了很多的數據,如何找出藏在這些數據背後的規律,利用這些數據來給公司創造價值,作為一個新手面對這些問題的時候,你是不是考慮怎麼快速學習數據分析呢?
  • 大數據掃盲——什麼是spark
    關於大數據技術之前的文章裡已經提到了HDFS和MapReduce。HDFS解決了大數據的存儲問題,MapReduce解決了大數據的運算問題。既能存儲又能運算,貌似這樣已經很完美了。spark的出現就彌補了MapReduce的不足。 spark是一種基於內存的快速、通用、可擴展的大數據計算引擎。它集批處理、實時流處理、交互式查詢、圖計算與機器學習於一體Spark應用場景批處理可用於ETL(抽取、轉換、加載)。 機器學習可用於自動判斷淘寶的買家評論是好評還是差評。 交互式分析可用於查詢Hive數據倉庫。
  • Spark 3.0發布啦,改進SQL,棄Python 2,增強擴展,性能大幅提升
    Apache Spark 3.0.0Spark是一個開源的大數據處理、數據科學、機器學習和數據分析工作負載的統一引擎,自2010年首次發布以來,已經成長為最活躍的開源項目之一;支持Java、Scala、Python、R等語言
  • 每個數據科學家都得會一點SparkMagic
    其實,數據科學家生產率低下的主要原因在於數據準備工作的雙重性:· 快速訪問、合併和聚合存儲在企業數據湖中的大數據· 探索和可視化數據中具有複雜依賴關係的Python數據包中的數據和統計信息大數據大多是非結構化的,常常存儲在具有企業管理和安全限制的生產環境中。
  • SQL Server2008中的9種數據挖掘算法淺析
    【IT168 技術文檔】  在sql server2008中提供了9種常用的數據挖掘算法,這些算法用在不同數據挖掘的應用場景下,下面我們就各個算法逐個分析討論。  1.決策樹算法  決策樹,又稱判定樹,是一種類似二叉樹或多叉樹的樹結構。
  • 【大數據】最新大數據學習路線(完整詳細版】
    ,redis)Spark(scala,spark,spark core,spark sql,spark streaming,spark mllib,spark graphx)Python(python,spark python)?
  • R+SQL Server的大數據管理
    這是大數據的問題嗎?怎麼那麼不小心就被我碰上了?今天我們就談談「大數據」這個老話題。自2012年以來,大數據(Big Data)已經上過《紐約時報》《華爾街日報》的專欄封面,進入美國白宮官網的新聞,在國內更是被炒得熱火朝天,網際網路、工商業、高校等都紛紛進行探索討論。大數據戰略甚至成為了我們國家「十三五」十四大戰略之一。
  • Databricks連城:Spark SQL結構化數據分析
    數據科學家們早已熟悉的R和Pandas等傳統數據分析框架 雖然提供了直觀易用的API,卻局限於單機,無法覆蓋分布式大數據場景。此外,Spark 1.2.0中引入的外部數據源API也得到了進一步的完善,集成了完整的數據寫入支持,從而補全了Spark SQL多數據源互操作的最後一塊拼圖。借小數據分析之力,撼大數據分析之巨石;四兩撥千斤,不亦樂乎!
  • 大數據基礎知識:Hadoop分布式系統介紹
    隨著智能化、萬物互聯時代的快速發展,數據量開始暴增,一方面我們需要開始思考如何高效可靠地存儲海量的數據,另一方面我們還需要對這些數據進行分析處理,以獲得更多有價值的信息。這時期我們就需要用到Hadoop了。
  • GPU上的隨機森林:比Apache Spark快2000倍
    SparkApache Spark是一個在Scala中構建的開源大數據處理引擎,它有一個Python接口,可以調用Scala/JVM代碼。它是Hadoop處理生態系統中的一個重要組成部分,圍繞MapReduce範例構建,並且具有用於數據幀和機器學習的接口。
  • Java大數據開發是做什麼的?要掌握哪些技能
    Java開發是大數據的經典崗位,行業當中存在普遍的需求,Web開發、Android開發、遊戲開發等崗位,基本上Java語言是主力隊伍。而進入大數據時代,Java又在大數據方向上有了用武之地。今天我們主要來講講Java大數據開發是做什麼的?要掌握哪些技能?
  • 為什麼說,大數據是從流式計算開始切入的?
    大數據說了很多年,我說雷聲大,雨點小,這您同意嗎?為什麼?關鍵在創造的價值,如果僅僅是輔助決策,效果難以顯現,如何才能夠立竿見影?從技術上,對應的就是流式計算,因為它對應的是業務,能夠帶來收入的應用。什麼是流式大數據?有哪些應用?現在就讓我們一起回顧下流計算平臺的發展歷史,以及如何在企業中運用。
  • Python數據分析入門教程(一):獲取數據源
    作者 | CDA數據分析師俗話說,巧婦難為無米之炊。不管你廚藝有多好,如果沒有食材,也做不出香甜可口的飯菜來,所以想要做出飯菜來,首先要做的就是要買米買菜。而數據分析就好比是做飯,首先也應該是準備食材,也就是獲取數據源。一、導入外部數據導入數據主要用到的是Pandas裡的read_x()方法,x表示待導入文件的格式。