PySpark SQL——SQL和pd.DataFrame的結合體

2021-03-02 簡說Python

作者|luanhz

來源|小數志

之前推文PySpark環境搭建和簡介,今天開始介紹PySpark中的第一個重要組件SQL/DataFrame,實際上從名字便可看出這是關係型資料庫SQL和pandas.DataFrame的結合體,功能也幾乎恰是這樣,所以如果具有良好的SQL基本功和熟練的pandas運用技巧,學習PySpark SQL會感到非常熟悉和舒適。

慣例開局一張圖

前文提到,Spark是大數據生態圈中的一個快速分布式計算引擎,支持多種應用場景。例如Spark core中的RDD是最為核心的數據抽象,定位是替代傳統的MapReduce計算框架;SQL是基於RDD的一個新的組件,集成了關係型資料庫和數倉的主要功能,基本數據抽象是DataFrame,與pandas.DataFrame極為相近,適用於體量中等的數據查詢和處理。

那麼,在已經有了RDD的基礎上,Spark為什麼還要推出SQL呢?為此,Spark團隊還專門為此發表論文做以介紹,原文可查找《Spark SQL: Relational Data Processing in Spark》一文。這裡只節選其中的關鍵一段:

核心有兩層意思,一是為了解決用戶從多種數據源(包括結構化、半結構化和非結構化數據)執行數據ETL的需要;二是滿足更為高級的數據分析需求,例如機器學習、圖處理等。而為了實現這一目的,Spark團隊推出SQL組件,一方面滿足了多種數據源的處理問題,另一方面也為機器學習提供了全新的數據結構DataFrame(對應ml子模塊)。

了解了Spark SQL的起源,那麼其功能定位自然也十分清晰:基於DataFrame這一核心數據結構,提供類似資料庫和數倉的核心功能,貫穿大部分數據處理流程:從ETL到數據處理到數據挖掘(機器學習)。

註:由於Spark是基於scala語言實現,所以PySpark在變量和函數命名中也普遍採用駝峰命名法(首單詞小寫,後面單次首字母大寫,例如someFunction),而非Python中的蛇形命名(各單詞均小寫,由下劃線連接,例如some_funciton)為了支撐上述功能需求和定位,PySpark中核心的類主要包括以下幾個:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)

註:這裡的Window為單獨的類,用於建立窗口函數over中的對象;functions子模塊中還有window函數,其主要用於對時間類型數據完成重採樣操作。

DataFrame是PySpark中核心的數據抽象和定義,理解DataFrame的最佳方式是從以下2個方面:

換言之,記憶PySpark中的DataFrame只需對比SQL+pd.DataFrame即可。下面對DataFrame對象的主要功能進行介紹:

數據讀寫及類型轉換。

1)創建DataFrame的方式主要有兩大類:

從其他數據類型轉換,包括RDD、嵌套list、pd.DataFrame等,主要是通過spark.createDataFrame()接口創建

從文件、資料庫中讀取創建,文件包括Json、csv等,資料庫包括主流關係型資料庫MySQL,以及數倉Hive,主要是通過sprak.read屬性+相應數據源類型進行讀寫,例如spark.read.csv()用於讀取csv文件,spark.read.jdbc()則可用於讀取資料庫

2)數據寫入。與spark.read屬性類似,.write則可用於將DataFrame對象寫入相應文件,包括寫入csv文件、寫入資料庫等

3)數據類型轉換。DataFrame既然可以通過其他類型數據結構創建,那麼自然也可轉換為相應類型,常用的轉換其實主要還是DataFrame=>rdd和DataFrame=>pd.DataFrame,前者通過屬性可直接訪問,後者則需相應接口:

df.rdd  # PySpark SQL DataFrame => RDD
df.toPandas()  # PySpark SQL DataFrame => pd.DataFrame

df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])
df.select('name')  # DataFrame[name: string]
df['name']  # Column<b'name'>
df.name  # Column<b'name'>

除了提取單列外,select還支持類似SQL中"*"提取所有列,以及對單列進行簡單的運算和變換,具體應用場景可參考pd.DataFrame中賦值新列的用法,例如下述例子中首先通過"*"關鍵字提取現有的所有列,而後通過df.age+1構造了名字為(age+1)的新列。

df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])
df.select('*', df.age+1).show()
"""
+----+---+----+
|name|age|(age + 1)|
+----+---+----+
|John| 17|       18|
| Tom| 18|       19|
+----+---+----+
"""

df.select('*', (df.age+1).alias('age1')).show()
"""
+----+---+----+
|name|age|age1|
+----+---+----+
|John| 17|  18|
| Tom| 18|  19|
+----+---+----+
"""

df.where(df.age==18).show()
df.filter(df.age==18).show()
df.where('age=18').show()
df.filter('age=18').show()
"""
+----+---+
|name|age|
+----+---+
| Tom| 18|
+----+---+
"""

值得指出的是在pandas.DataFrame中類似的用法是query函數,不同的是query()中表達相等的條件符號是"==",而這裡filter或where的相等條件判斷則是更符合SQL語法中的單等號"="。

# 原始DataFrame
df.show()
"""
+----+---+----+
|name|age|               time|
+----+---+----+
|John| 17|2020-09-06 15:11:00|
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+----+
"""
#  gorupby+pivot實現數據透視表
df.groupby(fn.substring('name', 1, 1).alias('firstName')).pivot('age').count().show()
"""
+----+---+----+
|firstName| 17|  18|
+----+---+----+
|        T|  1|   1|
|        J|  2|null|
+----+---+----+
"""
#  window函數實現時間重採樣
df.groupby(fn.window('time', '5 minutes')).count().show()
"""
+++
|              window|count|
+++
|[2020-09-06 15:10...|    3|
|[2020-09-06 15:15...|    1|
+++
"""

# 多列排序,默認升序
df.sort('name', 'age').show()
"""
+----+---+----+
|name|age|               time|
+----+---+----+
|John| 17|2020-09-06 15:11:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
| Tom| 17|2020-09-06 15:12:00|
+----+---+----+
"""
#  多列排序,並制定不同排序規則
df.sort(['age', 'name'], ascending=[True, False]).show()
"""
+----+---+----+
|name|age|               time|
+----+---+----+
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
|John| 17|2020-09-06 15:11:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+----+
"""

另外,類似於SQL中count和distinct關鍵字,DataFrame中也有相同的用法。

以上主要是類比SQL中的關鍵字用法介紹了DataFrame部分主要操作,而學習DataFrame的另一個主要參照物就是pandas.DataFrame,例如以下操作:

dropna:刪除空值行

實際上也可以接收指定列名或閾值,當接收列名時則僅當相應列為空時才刪除;當接收閾值參數時,則根據各行空值個數是否達到指定閾值進行刪除與否

dropDuplicates/drop_duplicates:刪除重複行

二者為同名函數,與pandas中的drop_duplicates函數功能完全一致

fillna:空值填充

與pandas中fillna功能一致,根據特定規則對空值進行填充,也可接收字典參數對各列指定不同填充

fill:廣義填充

drop:刪除指定列

最後,再介紹DataFrame的幾個通用的常規方法:

# 根據age列創建一個名為ageNew的新列
df.withColumn('ageNew', df.age+100).show()
"""
+----+---+----+-+
|name|age|               time|ageNew|
+----+---+----+-+
|John| 17|2020-09-06 15:11:00|   117|
| Tom| 17|2020-09-06 15:12:00|   117|
| Joy| 17|2020-09-06 15:13:00|   117|
| Tim| 18|2020-09-06 15:16:00|   118|
+----+---+----+-+
"""

注意到,withColumn實現的功能完全可以由select等價實現,二者的區別和聯繫是:withColumn是在現有DataFrame基礎上增加或修改一列,並返回新的DataFrame(包括原有其他列),適用於僅創建或修改單列;而select準確的講是篩選新列,僅僅是在篩選過程中可以通過添加運算或表達式實現創建多個新列,返回一個篩選新列的DataFrame,而且是篩選多少列就返回多少列,適用於同時創建多列的情況(官方文檔建議出於性能考慮和防止內存溢出,在創建多列時首選select)

實際上show是spark中的action算子,即會真正執行計算並返回結果;而前面的很多操作則屬於transform,僅加入到DAG中完成邏輯添加,並不實際執行計算

另外,DataFrame還有一個重要操作:在session中註冊為虛擬表,而後即可真正像執行SQL查詢一樣完成相應SQL操作。

df.createOrReplaceTempView('person')  # 將df註冊為表名叫person的臨時表
spark.sql('select * from person').show()  # 通過sql接口在person臨時表中執行SQL操作
"""
+----+---+----+
|name|age|               time|
+----+---+----+
|John| 17|2020-09-06 15:11:00|
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+----+
"""

基於DataFrame可以實現SQL中大部分功能,同時為了進一步實現SQL中的運算操作,spark.sql還提供了幾乎所有的SQL中的函數,確實可以實現SQL中的全部功能。按照功能,functions子模塊中的功能可以主要分為以下幾類:

聚合統計類,也是最為常用的,除了常規的max、min、avg(mean)、count和sum外,還支持窗口函數中的row_number、rank、dense_rank、ntile,以及前文提到的可用於時間重採樣的窗口函數window等

數值處理類,主要是一些數學函數,包括sqrt、abs、ceil、floor、sin、log等

字符串類,包括子字符串提取substring、字符串拼接concat、concat_ws、split、strim、lpad等

時間處理類,主要是對timestamp類型數據進行處理,包括year、month、hour提取相應數值,timestamp轉換為時間戳、date_format格式化日期、datediff求日期差等

這些函數數量較多,且與SQL中相應函數用法和語法幾乎一致,無需全部記憶,僅在需要時查找使用即可。

本文較為系統全面的介紹了PySpark中的SQL組件以及其核心數據抽象DataFrame,總體而言:該組件是PySpark中的一個重要且常用的子模塊,功能豐富,既繼承了Spark core中RDD的基本特點(算子和延遲執行特性),也是Spark.ml機器學習子模塊的基礎數據結構,其作用自然不言而喻。

與此同時,DataFrame學習成本並不高,大致相當於關係型資料庫SQL+pandas.DataFrame的結合體,很多接口和功能都可以觸類旁通。

如果覺得文章有點用的話,請毫不留情地素質三連吧,分享、點讚、在看、收藏,我不挑,因為這將是我寫作更多優質文章的最強動力。

相關焦點

  • 獨家 | PySpark和SparkSQL基礎:如何利用Python編程執行Spark(附代碼)
    import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark.context import SparkContextfrom pyspark.sql.functionsimport *from pyspark.sql.typesimport *from
  • PySpark 之Spark DataFrame入門
    `iris.parquet`")df = spark.sql("SELECT * FROM csv.`iris.csv`")df = spark.sql("SELECT * FROM json.df_pd=pd.read_csv("iris.csv")# import pyspark class Row from module sqlfrom pyspark.sql import *# Create Example Data - Departments
  • 基於Pyspark的銷量預測
    ;2).數據此時是spark.dataframe格式,用類sql的形式進行操作;3).withColumn函數為新增一列;4).為說明問題簡化了特徵預處理,只是使用是否月末和星期的OneHotEncoder作為特徵。
  • 開窗函數之累積和,PySpark,Pandas和SQL版實現
    今天主要是結合Pandas, Koalas和PySpark來介紹下開窗函數的累積和。測試環境Jupyter NotebookiPython-sqlPySpark 3.0KoalasPandas名詞解釋累積和(Cumulative Sum,CUSUM)是一種序貫分析法,由劍橋大學的 E. S. Page 於1954年首先提出。
  • 數據分析工具篇——pyspark應用詳解
    和spark,從離線和實時解決了大數據分析過程中遇到的大部分問題,但是這是否是就代表了大數據計算引擎?個人理解pyspark是本地環境和spark環境的結合用法,spark中的函數是打開本地環境到spark環境的大門,本地的數據和邏輯按照spark運算規則整理好之後,通過spark函數推到spark環境中完成運算。
  • PandaSQL:一個讓你能夠通過SQL語句進行pandas的操作的python包
    它允許切片、分組、連接和執行任意數據轉換。如果你熟練的使用SQL,那麼這篇文章將介紹一種更直接、簡單的使用Pandas處理大多數數據操作案例。假設你對SQL非常的熟悉,或者你想有更可讀的代碼。或者您只是想在dataframe上運行一個特殊的SQL查詢。或者,也許你來自R,想要一個sqldf的替代品。
  • 一場pandas與SQL的巔峰大戰(七)
    基本使用:import pandas as pdfrom pandasql import sqldf#d導入sqldfdata = pd.read_excel('orderamt.xlsx')#讀取文件獲得dataftame,也可以用其他方式取得s
  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    對於SQLContext,目前只有一個簡單的SQL語法解析器sql,而對於HiveContext,則可以使用hiveql和sql兩個解析器,默認是hiveql,我們可以通過如下語句來修改默認解析器:SQLContext.setConf("spark.sql.dialect", "sql")不過就目前來說,HiveQL解析器更加完善,因此推薦使用
  • 如何利用Python實現SQL自動化?
    importSqlsql =Sql('database123') # initialise the Sql objectdirectory =r'C:\\User\medium\data\\' # this is where our generic data is storedfile_list = os.listdir(directory) # get a list
  • DataFrame(4):DataFrame的創建方式
    (data)display(df)結果如下:"王五":{"Java":85, "Python":94}}df = pd.DataFrame(data)display(df)data = { "Java":{"張三":90,"李四":82,"王五":85}, "Python":{"張三":89,"李四":95,"王五":94}, "Hive":{"張三":78,"李四":96}}
  • 2小時入門SparkSQL編程
    公眾號後臺回復關鍵字:pyspark,獲取本項目github地址。本節將介紹SparkSQL編程基本概念和基本用法。但我們使用pyspark進行SparkSQL編程時,在Excutor上跑的全部是java字節碼,pyspark在Driver端就將相應的Python代碼轉換成了java任務然後放到Excutor上執行。
  • Pyspark推薦算法實戰(一)
    首先看下pyspark的算法包內容,pyspark算法包主要有ml和mllib兩個庫,這兩個庫從圖一看到,整體內容基本一致,都能滿足我們日常的算法需求,不過從2.0開始,mllib主要進行維護狀況,不再新增功能,加上ml庫與scikit-learn使用非常類似,對於經常使用後者建模的同學來說,ml能夠非常快速入手,建議更多時候使用ml算法包。
  • DataFrame(3):DataFrame的創建方式
    ]df = pd.DataFrame(data)display(df)結果如下:, "Python":95, "Hive":96},    "王五":{"Java":85, "Python":94}}df = pd.DataFrame(data)display(df)data = {    "Java":{"張三":90,"李四":82,"王五":85},    "Python":{"張三":89,"李四"
  • 適用於初學者和分析師的SQL –使用Python入門SQL
    將Pandas DataFrame加載到SQLite資料庫中熊貓讓我們可以使用to_sql()方法將數據從數據幀快速寫入資料庫。該方法將表名和Connection對象作為其參數。我將在DataHack平臺上使用來自Food Demand Forecasting hackathon的數據幀,該數據幀具有三個數據幀:訂單信息,進餐信息和中心履行信息。
  • 代碼詳解:用SQL GROUP BY語句,找出最強精靈寶可夢
    /Pokemon.csv') #Original datacolumns = ['#','name','type1','type2','total','hp','attack','defense',\'sp_atk','sp_def','speed','generation','legendary']
  • SparkSQL操作insert overwrite table到hive慢
    在印象中spark的速度和hive on mr模式比特別快,但實際上SparkSQL操作insert overwrite table到hive特別慢
  • 用SQL GROUP BY語句,找出最強精靈寶可夢!
    import pandas as pdcnx = sqlite3.connect(':memory:')csvfile = ('/Users/randy/Documents/GitHub/Pokemon-Stat-Predictor/Pokemon.csv') #Original datacolumns = ['#','name','type1','type2','total
  • pyspark操作MongoDB
    有幾點需要注意的:不要安裝最新的pyspark版本,請安裝`pip3 install pyspark==2.3.2`
  • RDD和SparkSQL綜合應用
    公眾號後臺回復關鍵詞:pyspark,獲取本項目github地址。在pyspark大數據項目實踐中,我們往往要綜合應用SparkSQL和RDD來完成任務。我們往往會將DataFrame轉化為RDD,在RDD中應用Python中的列表和字典等數據結構的操作來實現這個邏輯,然後再將RDD轉回成DataFrame。下面以一個DBSCAN聚類算法的分布式實現為例,來說明綜合應用SparkSQL和RDD的方法。