作者|luanhz
來源|小數志
之前推文PySpark環境搭建和簡介,今天開始介紹PySpark中的第一個重要組件SQL/DataFrame,實際上從名字便可看出這是關係型資料庫SQL和pandas.DataFrame的結合體,功能也幾乎恰是這樣,所以如果具有良好的SQL基本功和熟練的pandas運用技巧,學習PySpark SQL會感到非常熟悉和舒適。
慣例開局一張圖
前文提到,Spark是大數據生態圈中的一個快速分布式計算引擎,支持多種應用場景。例如Spark core中的RDD是最為核心的數據抽象,定位是替代傳統的MapReduce計算框架;SQL是基於RDD的一個新的組件,集成了關係型資料庫和數倉的主要功能,基本數據抽象是DataFrame,與pandas.DataFrame極為相近,適用於體量中等的數據查詢和處理。
那麼,在已經有了RDD的基礎上,Spark為什麼還要推出SQL呢?為此,Spark團隊還專門為此發表論文做以介紹,原文可查找《Spark SQL: Relational Data Processing in Spark》一文。這裡只節選其中的關鍵一段:核心有兩層意思,一是為了解決用戶從多種數據源(包括結構化、半結構化和非結構化數據)執行數據ETL的需要;二是滿足更為高級的數據分析需求,例如機器學習、圖處理等。而為了實現這一目的,Spark團隊推出SQL組件,一方面滿足了多種數據源的處理問題,另一方面也為機器學習提供了全新的數據結構DataFrame(對應ml子模塊)。
了解了Spark SQL的起源,那麼其功能定位自然也十分清晰:基於DataFrame這一核心數據結構,提供類似資料庫和數倉的核心功能,貫穿大部分數據處理流程:從ETL到數據處理到數據挖掘(機器學習)。
註:由於Spark是基於scala語言實現,所以PySpark在變量和函數命名中也普遍採用駝峰命名法(首單詞小寫,後面單次首字母大寫,例如someFunction),而非Python中的蛇形命名(各單詞均小寫,由下劃線連接,例如some_funciton)為了支撐上述功能需求和定位,PySpark中核心的類主要包括以下幾個:from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)註:這裡的Window為單獨的類,用於建立窗口函數over中的對象;functions子模塊中還有window函數,其主要用於對時間類型數據完成重採樣操作。
DataFrame是PySpark中核心的數據抽象和定義,理解DataFrame的最佳方式是從以下2個方面:
換言之,記憶PySpark中的DataFrame只需對比SQL+pd.DataFrame即可。下面對DataFrame對象的主要功能進行介紹:
數據讀寫及類型轉換。
1)創建DataFrame的方式主要有兩大類:
從其他數據類型轉換,包括RDD、嵌套list、pd.DataFrame等,主要是通過spark.createDataFrame()接口創建
從文件、資料庫中讀取創建,文件包括Json、csv等,資料庫包括主流關係型資料庫MySQL,以及數倉Hive,主要是通過sprak.read屬性+相應數據源類型進行讀寫,例如spark.read.csv()用於讀取csv文件,spark.read.jdbc()則可用於讀取資料庫
2)數據寫入。與spark.read屬性類似,.write則可用於將DataFrame對象寫入相應文件,包括寫入csv文件、寫入資料庫等
3)數據類型轉換。DataFrame既然可以通過其他類型數據結構創建,那麼自然也可轉換為相應類型,常用的轉換其實主要還是DataFrame=>rdd和DataFrame=>pd.DataFrame,前者通過屬性可直接訪問,後者則需相應接口:
df.rdd # PySpark SQL DataFrame => RDD
df.toPandas() # PySpark SQL DataFrame => pd.DataFramedf = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])
df.select('name') # DataFrame[name: string]
df['name'] # Column<b'name'>
df.name # Column<b'name'>除了提取單列外,select還支持類似SQL中"*"提取所有列,以及對單列進行簡單的運算和變換,具體應用場景可參考pd.DataFrame中賦值新列的用法,例如下述例子中首先通過"*"關鍵字提取現有的所有列,而後通過df.age+1構造了名字為(age+1)的新列。
df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])
df.select('*', df.age+1).show()
"""
+----+---+----+
|name|age|(age + 1)|
+----+---+----+
|John| 17| 18|
| Tom| 18| 19|
+----+---+----+
"""df.select('*', (df.age+1).alias('age1')).show()
"""
+----+---+----+
|name|age|age1|
+----+---+----+
|John| 17| 18|
| Tom| 18| 19|
+----+---+----+
"""df.where(df.age==18).show()
df.filter(df.age==18).show()
df.where('age=18').show()
df.filter('age=18').show()
"""
+----+---+
|name|age|
+----+---+
| Tom| 18|
+----+---+
"""值得指出的是在pandas.DataFrame中類似的用法是query函數,不同的是query()中表達相等的條件符號是"==",而這裡filter或where的相等條件判斷則是更符合SQL語法中的單等號"="。
# 原始DataFrame
df.show()
"""
+----+---+----+
|name|age| time|
+----+---+----+
|John| 17|2020-09-06 15:11:00|
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+----+
"""
# gorupby+pivot實現數據透視表
df.groupby(fn.substring('name', 1, 1).alias('firstName')).pivot('age').count().show()
"""
+----+---+----+
|firstName| 17| 18|
+----+---+----+
| T| 1| 1|
| J| 2|null|
+----+---+----+
"""
# window函數實現時間重採樣
df.groupby(fn.window('time', '5 minutes')).count().show()
"""
+++
| window|count|
+++
|[2020-09-06 15:10...| 3|
|[2020-09-06 15:15...| 1|
+++
"""# 多列排序,默認升序
df.sort('name', 'age').show()
"""
+----+---+----+
|name|age| time|
+----+---+----+
|John| 17|2020-09-06 15:11:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
| Tom| 17|2020-09-06 15:12:00|
+----+---+----+
"""
# 多列排序,並制定不同排序規則
df.sort(['age', 'name'], ascending=[True, False]).show()
"""
+----+---+----+
|name|age| time|
+----+---+----+
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
|John| 17|2020-09-06 15:11:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+----+
"""另外,類似於SQL中count和distinct關鍵字,DataFrame中也有相同的用法。
以上主要是類比SQL中的關鍵字用法介紹了DataFrame部分主要操作,而學習DataFrame的另一個主要參照物就是pandas.DataFrame,例如以下操作:
dropna:刪除空值行
實際上也可以接收指定列名或閾值,當接收列名時則僅當相應列為空時才刪除;當接收閾值參數時,則根據各行空值個數是否達到指定閾值進行刪除與否
dropDuplicates/drop_duplicates:刪除重複行
二者為同名函數,與pandas中的drop_duplicates函數功能完全一致
fillna:空值填充
與pandas中fillna功能一致,根據特定規則對空值進行填充,也可接收字典參數對各列指定不同填充
fill:廣義填充
drop:刪除指定列
最後,再介紹DataFrame的幾個通用的常規方法:
# 根據age列創建一個名為ageNew的新列
df.withColumn('ageNew', df.age+100).show()
"""
+----+---+----+-+
|name|age| time|ageNew|
+----+---+----+-+
|John| 17|2020-09-06 15:11:00| 117|
| Tom| 17|2020-09-06 15:12:00| 117|
| Joy| 17|2020-09-06 15:13:00| 117|
| Tim| 18|2020-09-06 15:16:00| 118|
+----+---+----+-+
"""注意到,withColumn實現的功能完全可以由select等價實現,二者的區別和聯繫是:withColumn是在現有DataFrame基礎上增加或修改一列,並返回新的DataFrame(包括原有其他列),適用於僅創建或修改單列;而select準確的講是篩選新列,僅僅是在篩選過程中可以通過添加運算或表達式實現創建多個新列,返回一個篩選新列的DataFrame,而且是篩選多少列就返回多少列,適用於同時創建多列的情況(官方文檔建議出於性能考慮和防止內存溢出,在創建多列時首選select)
實際上show是spark中的action算子,即會真正執行計算並返回結果;而前面的很多操作則屬於transform,僅加入到DAG中完成邏輯添加,並不實際執行計算另外,DataFrame還有一個重要操作:在session中註冊為虛擬表,而後即可真正像執行SQL查詢一樣完成相應SQL操作。
df.createOrReplaceTempView('person') # 將df註冊為表名叫person的臨時表
spark.sql('select * from person').show() # 通過sql接口在person臨時表中執行SQL操作
"""
+----+---+----+
|name|age| time|
+----+---+----+
|John| 17|2020-09-06 15:11:00|
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+----+
"""基於DataFrame可以實現SQL中大部分功能,同時為了進一步實現SQL中的運算操作,spark.sql還提供了幾乎所有的SQL中的函數,確實可以實現SQL中的全部功能。按照功能,functions子模塊中的功能可以主要分為以下幾類:
聚合統計類,也是最為常用的,除了常規的max、min、avg(mean)、count和sum外,還支持窗口函數中的row_number、rank、dense_rank、ntile,以及前文提到的可用於時間重採樣的窗口函數window等
數值處理類,主要是一些數學函數,包括sqrt、abs、ceil、floor、sin、log等
字符串類,包括子字符串提取substring、字符串拼接concat、concat_ws、split、strim、lpad等
時間處理類,主要是對timestamp類型數據進行處理,包括year、month、hour提取相應數值,timestamp轉換為時間戳、date_format格式化日期、datediff求日期差等
這些函數數量較多,且與SQL中相應函數用法和語法幾乎一致,無需全部記憶,僅在需要時查找使用即可。
本文較為系統全面的介紹了PySpark中的SQL組件以及其核心數據抽象DataFrame,總體而言:該組件是PySpark中的一個重要且常用的子模塊,功能豐富,既繼承了Spark core中RDD的基本特點(算子和延遲執行特性),也是Spark.ml機器學習子模塊的基礎數據結構,其作用自然不言而喻。
與此同時,DataFrame學習成本並不高,大致相當於關係型資料庫SQL+pandas.DataFrame的結合體,很多接口和功能都可以觸類旁通。
如果覺得文章有點用的話,請毫不留情地素質三連吧,分享、點讚、在看、收藏,我不挑,因為這將是我寫作更多優質文章的最強動力。