本文為《大數據分析師入門課程》系列的第9篇,在本系列的第8篇-Spark基礎中,已經對Spark做了一個入門介紹,在此基礎上本篇拎出Spark SQL,主要站在使用者的角度來進行講解,需要注意的是本文中的例子的代碼均使用Scala語言。
主要包括以下內容:
你該了解的Spark SQL簡單入門操作不得不說的數據源一你該了解的Spark SQL
1.什麼是Spark SQL?
Spark SQL是Spark專門用來處理結構化數據的模塊,是Spark的核心組件,在1.0時發布。
SparkSQL替代的是HIVE的查詢引擎,HIVE的默認引擎查詢效率低是由於其基於MapReduce實現SQL查詢,而MapReduce的shuffle是基於磁碟的。
2.Spark SQL特性
其實最初Spark團隊推出的是Shark-基於Hive對內存管理、物理計劃、執行做了優化,底層使用Spark基於內存的計算引擎,對比Hive性能提升一個數量級。
即便如此高的性能提升,但是由於Shark底層依賴Hive的語法解析器、查詢優化器等組件制約其性能的進一步提升。最終Spark團隊放棄了Shark,推出了Spark SQL項目,其具備以下特性:
標準的數據連接,支持多種數據源多種性能優化技術組件的可擴展性支持多語言開發:Scala、Java、Python、R兼容Hive3.Spark SQL可以做什麼?
大數據處理使用SQL進行大數據處理,使傳統的RDBMS人員也可以進行大數據處理,不需要掌握像mapreduce的編程方法。
使用高級API進行開發SparkSQL支持SQL API,DataFrame和Dataset API多種API,使用這些高級API進行編程和採用Sparkcore的RDD API 進行編程有很大的不同。
使用RDD進行編程時,開發人員在採用不同的程式語言和不同的方式開發應用程式時,其應用程式的性能千差萬別,但如果使用DataFrame和Dataset進行開發時,資深開發人員和初級開發人員開發的程序性能差異很小,這是因為SparkSQL 內部使用Catalyst optimizer 對執行計劃做了很好的優化。
二簡 單 入 門 操 作
1.構建入口
Spark SQL中所有功能的入口點是SparkSession類-Spark 2.0引入的新概念,它為用戶提供統一的切入點。
早期Spark的切入點是SparkContext,通過它來創建和操作數據集,對於不同的API需要不同的context。
比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封裝了SparkContext和SQLContext。
要創建一個 SparkSession使用SparkSession.builder():
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate()
2.創建DataFrame
在一個SparkSession中,應用程式可以從結構化的數據文件、Hive的table、外部資料庫和RDD中創建一個DataFrame。
舉個例子, 下面就是基於一個JSON文件創建一個DataFrame:
val df =spark.read.json("examples/src/main/resources/people.json") // 顯示出DataFrame的內容df.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
3.執行SQL查詢
// 將DataFrame註冊成一個臨時視圖df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19|Justin|// +----+-------+
SparkSession的SQL函數可以讓應用程式以編程的方式運行SQL查詢,並將結果作為一個 DataFrame返回。
例子中createOrReplaceTempView創建的臨時視圖是session級別的,也就是會隨著session的消失而消失。如果你想讓一個臨時視圖在所有session中相互傳遞並且可用,直到Spark 應用退出,你可以建立一個全局的臨時視圖,全局的臨時視圖存在於系統資料庫global_temp中,我們必須加上庫名去引用它。
// 將一個DataFrame註冊成一個全局臨時視圖df.createGlobalTempView("people") // 注意這裡的global_tempspark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19|Justin|// +----+-------+// 新的session同樣可以訪問spark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19|Justin|// +----+-------+
4.DataFrame操作示例
importspark.implicits._//導入隱式轉換的包//列印schemadf.printSchema() // root// |-- age: long (nullable = true)// |-- name: string (nullable = true)//選擇一列進行列印df.select("name").show() // +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+//年齡加1df.select($"name", $"age" +1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+//選取年齡大於21的df.filter($"age" > 21).show() // +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+//聚合操作df.groupBy("age").count().show() // +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+
5.創建DataSet
Dataset和RDD比較類似,與RDD不同的是實現序列化和反序列化的方式,RDD是使用Java serialization或者Kryo,而Dataset是使用Encoder。
Encoder的動態特性使得Spark可以在執行filtering、sorting和hashing等許多操作時無需把字節反序列化為對象。
// 一個簡單的Seq轉成DataSet,會有默認的schemaval primitiveDS = Seq(1, 2, 3).toDS().show// +-----+// |value|// +-----+// | 1|// | 2|// | 3|// +-----+ case classPerson(name: String, age: Long) // 通過反射轉換為DataSetval caseClassDS = Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// DataFrame指定一個類則為DataSetval path = "examples/src/main/resources/people.json"val peopleDS = spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
通過上述的代碼可以看出創建DataSet的代碼很簡單,一個toDs就可以自動推斷出schema的類型,讀取json這種結構化的數據得到的是一個DataFrame,再指定它的類則為DataSet。
6.RDD的互操作性
RDD的互操作性指的是RDD和DataFrame的相互轉換,DataFrame轉RDD很簡單,複雜的是RDD轉DataFrame。
目前Spark SQL有兩種方法:
反射推斷Spark SQL 的 Scala 接口支持自動轉換一個包含 Case Class的 RDD 為DataFrame。Case Class 定義了表的Schema。Case class 的參數名使用反射讀取並且成為了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 這樣的複雜類型,這個 RDD 能夠被隱式轉換成一個 DataFrame 然後被註冊為一個表。
// 開啟隱式轉換import spark.implicits._ // 讀入文本文件並最終轉化成DataFrameval peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()// 將DataFrame註冊成表peopleDF.createOrReplaceTempView("people") // 執行一條sql查詢val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // 通過map操作後得到的是RDDteenagersDF.map(teenager => "Name: " +teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+
另一種更加簡單的操作是將RDD中每一行類型變為tuple類型,然後使用toDF依次賦予欄位名,需要注意的是使用tuple最高可以支持22個欄位。
// 開啟隱式轉換import spark.implicits._ // 讀入文本文件並最終轉化成DataFrameval peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => (attributes(0), attributes(1).trim.toInt)) .toDF("name","age")//peopleDF: org.apache.spark.sql.DataFrame = [name:string, age: int]
構造Schema在無法提前定義schema的情況下,RDD轉DataFrame或者DataSet需要構造Schema。
構建一個Schema並將它應用到一個已存在的RDD編程接口需要以下四個步驟:
a.從原始的RDD創建一個tuple或者列表類型的RDD
b.創建一個StructType來匹配RDD中的結構
c.將生成的RDD轉換成Row類型的RDD
d.通過createDataFrame方法將Schema應用到RDD
//需要導入類型相關的包import org.apache.spark.sql.types._ //讀取hdfs上的文本文件,保存到rdd中val peopleRDD =spark.sparkContext.textFile("examples/src/main/resources/people.txt") // 這裡的schema是一個字符串,可以來自於其他未知內容的文件,需要注意的是-這裡明確寫出來只是為了演示,並不代表提前知道schema信息。val schemaString = "name age"// 將有schema信息的字符串轉變為StructField類型val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable =true)) //通過StructType方法讀入schemaval schema = StructType(fields) // 將RDD轉換成Row類型的RDDval rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // 應用schema信息到Row類型的RDDval peopleDF = spark.createDataFrame(rowRDD,schema)
三不得不說的數據源
在工作中使用Spark SQL進行處理數據的第一步就是讀取數據,Spark SQL通過統一的接口去讀取和寫入數據。主要是read和write操作,不同的數據源相應的Option(附加設置)會有所不同,下面通過例子來具體說明。
1.數據讀取
parquet1)讀取Parquet文件
parquet文件自帶schema,讀取後是DataFrame格式。
val usersDF =spark.read.load("examples/src/main/resources/users.parquet")//usersDF: org.apache.spark.sql.DataFrame = [name:string, favorite_color: string ... 1 more field]
2)解析分區信息
parquet文件中如果帶有分區信息,那麼SparkSQL會自動解析分區信息。比如,這樣一份人口數據按照gender和country進行分區存儲,目錄結構如下:
test└── spark-sql └── test ├──gender=male │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └──gender=female │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
通過spark.read.load讀取該目錄下的文件SparkSQL將自動解析分區信息,返回的DataFrame的Schema如下:
root|--name:string(nullable=true)|--age:long(nullable=true)|--gender:string(nullable=true)|--country:string(nullable=true)
目前自動解析分區支持數值類型和字符串類型。
自動解析分區類型的參數為:spark.sql.sources.partitionColumnTypeInference.enabled,默認值為true。可以關閉該功能,直接將該參數設置為disabled。此時,分區列數據格式將被默認設置為string類型,不會再進行類型解析。
3)Schema合併
如果讀取的多個parquet文件中的Schema信息不一致,Spark SQL可以設置參數進行合併,但是Schema合併是一個高消耗的操作,在大多數情況下並不需要,所以Spark SQL從1.5.0開始默認關閉了該功能。
可以通過下面兩種方式開啟該功能:
a.讀取文件的時候,開啟合併功能,只對本次讀取文件進行合併Schema操作
b.設置全局SQL選項spark.sql.parquet.mergeSchema為true,每次讀取文件都會進行合併Schema操作
具體請看下面的例子:
// sqlContext是之前例子中生成的// 導入隱式轉換import sqlContext.implicits._ // 創建一個簡單的DataFrame並保存val df1 = sc.makeRDD(1 to 5).map(i => (i, i *2)).toDF("single", "double")df1.write.parquet("data/test_table/key=1") // 創建另一個DataFrame,注意欄位名val df2 = sc.makeRDD(6 to 10).map(i => (i, i *3)).toDF("single", "triple")df2.write.parquet("data/test_table/key=2") // 讀取這兩個parquet文件,增加開啟合併Schema的設置val df3 =sqlContext.read.option("mergeSchema","true").parquet("data/test_table")df3.printSchema() // 不同名稱的欄位都保留下來了// root// |-- single: int (nullable = true)// |-- double: int (nullable = true)// |-- triple: int (nullable = true)// |-- key : int (nullable = true)
關於schema合併,有一點需要特別關注,那就是當不同parquet文件的schema有衝突時,合併會失敗,如同名的欄位,其類型不一致的情況。這時如果你讀取的是hive數據源,可能會出現讀取失敗或者讀取欄位值全部為NULL的情況。如果大家遇到類型場景,可以考慮是否是這個因素導致。
jsonjson文件和parquet文件一樣也是帶有schema信息,不過需要指明是json文件,才能準確的讀取。
val peopleDF =spark.read.format("json").load("examples/src/main/resources/people.json")//peopleDF: org.apache.spark.sql.DataFrame = [age:bigint, name: string]
MySQL讀取MySQL中的數據是通過jdbc的方式,需要知道要訪問的MySQL資料庫、表等信息,具體請看下面的代碼:
//MySQL數據的訪問ip、埠號和資料庫名val url ="jdbc:mysql://192.168.100.101:3306/testdb"//要訪問的表名val table = "test"//建立一個配置變量val properties = new Properties()//將用戶名存入配置變量properties.setProperty("user","root")//將密碼存入配置變量properties.setProperty("password","root")//需要傳入Mysql的URL、表名、配置變量val df = sqlContext.read.jdbc(url,table,properties)
這裡要注意的一個點是,讀取MySQL需要運行作業時,classpath下有MySQL的驅動jar,或者通過--jars添加驅動jar。
hive讀取hive數據的前提是要進行相關的配置,需要將hive-site.xml、core-site.xml、hdfs-site.xml以及hive的lib依賴放入spark的classpath下,或者在提交作業時通過--files和--jars來指定這些配置文件和jar包。之後,就可以很方便的使用hive的數據表了,示例代碼如下:
import java.io.File import org.apache.spark.sql.Rowimport org.apache.spark.sql.SparkSession case classRecord(key: Int, value: String) // 數倉地址指向默認設置val warehouseLocation = newFile("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() //增加支持hive特性 .getOrCreate() import spark.implicits._import spark.sql //使用sql創建一個表,並將hdfs中的文件導入到表中sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING) USING hive")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // 使用sql直接指向sql查詢sql("SELECT * FROM src").show()// +---+-------+// |key| value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...
2.數據保存
write保存用write方法,先看一個簡單的例子,將一個DataFrame保存到parquet文件中。
//選取DataFrame中的兩列保存到parquet文件中usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")
formatformat可以指定保存文件的格式,支持json、csv、orc等
//選取DataFrame中的兩列保存到json文件中usersDF.select("name", "favorite_color").write.format("json").save("namesAndFavColors.json")
mode在保存數據的時候,要不要考慮數據存不存在?是覆蓋還是追加呢?通過mode可以進行設置。
//選取DataFrame中的兩列保並追加到parquet文件中usersDF.select("name","favorite_color").write.mode(SaveMode.append).save("namesAndFavColors.parquet")
除了append還有下列選項:
總結
本文通過什麼是Spark SQL,有哪些特性,可以做什麼讓讀者對Spark SQL有個整體的了解,然後著重講解了如何進行入門操作和多種數據源操作。
掌握了以上技能,大數據分析工程師面對Spark SQL相關的工作一定可以遊刃有餘。
以上就是今天的內容了,如果對你有幫助,希望你能夠關注、點讚、轉發一鍵三連支持一下。需要完整學習線路和配套課堂筆記,請回復111。