Apache Spark 2.4 是在11月08日正式發布的,其帶來了很多新的特性具體可以參見Apache Spark 2.4 正式發布,重要功能詳細介紹,本文主要介紹這次為複雜數據類型新引入的內置函數和高階函數。本次 Spark 發布共引入了29個新的內置函數來處理複雜類型(例如,數組類型),包括高階函數。
如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop
在 Spark 2.4 之前,為了直接操作複雜類型,有兩種典型的解決方案:
新的內置函數可以直接操作複雜類型,高階函數可以使用匿名 lambda 函數直接操作複雜值,類似於UDF,但具有更好的性能。
在本博客中,通過一些示例,我們將展示一些新的內置函數以及如何使用它們來處理複雜的數據類型。
典型處理方式讓我們首先通過以下示例來回顧一下 Spark 2.4 之前的典型解決方案。
Option 1 – Explode and Collect我們使用 explode 函數將數組拆分成多行,並計算 val + 1,最後再使用 collect_list 重構數組,如下所示:
SELECT id,
collect_list(val + 1) AS vals
FROM (SELECT id,
explode(vals) AS val
FROM iteblog) x
GROUP BY id
這種方式容易出錯並且效率低下,主要體現為三個方面。 首先,我們必須努力確保通過使用唯一鍵(unique key)來進行分組以便將新生成的數組完全組成為原始數組。其次,我們需要進行 group by 操作 ,這意味著需要進行一次 shuffle 操作; 但是 shuffle 操作並不保證重組後的數組和原始數組中數據的順序一致;最後,使用這種方式非常低效。
Option 2 – User Defined Function接下來,我們選擇使用 Scala UDF,它可以接收 Seq[Int] 並對其中每個元素進行加 1 操作:
def addOne(values: Seq[Int]): Seq[Int] = {
values.map(value => value + 1)
}
val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])
或者,我們也可以使用 Python UDF,如下:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
def add_one_to_els(elements):
return [el + 1 for el in elements]
spark.udf.register("plusOneIntPython", add_one_to_els, ArrayType(IntegerType()))
然後我們可以在 SQL 裡面如下使用:
SELECT id, plusOneInt(vals) as vals FROM iteblog
這種方式更加簡單快速,並且可以避免很多陷阱。但這種方式可能仍然效率低下,因為 Scala 或 Python 中的數據序列化可能很昂貴。
新的內置函數下面我們來看看直接操作複雜類型的新內置函數。 這裡 列舉了每個函數的示例。 每個函數的名稱和參數標註了它們處理數據類型,T 或 U 表示數組;K,V 表示 map 類型。
高階函數(Higher-Order Functions)為了進一步處理數組和 map 類型,我們使用了 SQL 中支持的匿名 lambda 函數或高階函數語法,使用 lambda 函數作為參數。lambda 函數的語法如下:
argument -> function body
(argument1, argument2, ...) -> function body
符號 -> 左邊表示參數列表,符號右邊定義函數體,在函數體中可以使用參數和其他變量來計算新的值。
使用匿名 Lambda 函數進行轉換首先我們來看一下使用帶有匿名 lambda 函數的 transform 函數的例子。假設有一個表,包含三列數據:integer 類型的 key,integer 數組的 values,Integer 類型的二維數組 nested_values。如下:
keyvaluesnested_values1[1, 2, 3][[1, 2, 3], [], [4, 5]]當我們執行下面 SQL 的時候:
SELECT TRANSFORM(values, element -> element + 1) FROM iteblog;
transform 函數通過執行lambda 函數遍歷數組種的每個元素並進行加一操作,然後創建一個新數組。
除了參數之外,我們還可以在 lambda 函數中使用其他變量,例如:key,這是表的另外一列:
SELECT TRANSFORM(values, element -> element + key) FROM iteblog;
如果你想要處理更複雜的嵌套類型,比如表中的 nested_values 列,你可以使用嵌套的 lambda 函數:
SELECT TRANSFORM(
nested_values,
arr -> TRANSFORM(arr,
element -> element + key + SIZE(arr)))
FROM iteblog;
在內層的 lambda 函數中你可以使用 key 和 arr 這些在 lambda 函數上下文之外的變量以及表的其他欄位值。
總結Spark 2.4 引入了 24 個新的內置函數,如 array_union,array_max ,array_min等,以及 5 個高階函數,如 transform, filter 等,這些函數都可以用於處理複雜類型。完整的列表可以參見這裡 。
本文原文:https://www.iteblog.com/archives/2457.html(點擊 下面 閱讀原文 即可進入),翻譯自:https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html?spm=a2c4e.11153940.blogcont672027.10.33b9731eaiP4B9