PySpark 原理解析

2021-03-02 匯量技術

眾所周知,Spark 框架主要是由 Scala 語言實現,同時也包含少量 Java 代碼。Spark 面向用戶的編程接口,也是 Scala。然而,在數據科學領域,Python 一直佔據比較重要的地位,仍然有大量的數據工程師在使用各類 Python 數據處理和科學計算的庫,例如 numpy、Pandas、scikit-learn 等。同時,Python 語言的入門門檻也顯著低於 Scala。為此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便廣大數據科學家使用。本文主要從源碼實現層面解析 PySpark 的實現原理,包括以下幾個方面:

PySpark 的多進程架構;

Python 端調用 Java、Scala 接口;

Python Driver 端 RDD、SQL 接口;

Executor 端進程間通信和序列化;

Pandas UDF;

總結;

1、PySpark 的多進程架構

PySpark 採用了 Python、JVM 進程分離的多進程架構,在 Driver、Executor 端均會同時有 Python、JVM 兩個進程。當通過 spark-submit 提交一個 PySpark 的 Python 腳本時,Driver 端會直接運行這個 Python 腳本,並從 Python 中啟動 JVM;而在 Python 中調用的 RDD 或者 DataFrame 的操作,會通過 Py4j 調用到 Java 的接口。在 Executor 端恰好是反過來,首先由 Driver 啟動了 JVM 的 Executor 進程,然後在 JVM 中去啟動 Python 的子進程,用以執行 Python 的 UDF,這其中是使用了 socket 來做進程間通信。總體的架構圖如下所示:

2、Python Driver 如何調用 Java 的接口

上面提到,通過 spark-submit 提交 PySpark 作業後,Driver 端首先是運行用戶提交的 Python 腳本,然而 Spark 提供的大多數 API 都是 Scala 或者 Java 的,那麼就需要能夠在 Python 中去調用 Java 接口。這裡 PySpark 使用了 Py4j 這個開源庫。當創建 Python 端的 SparkContext 對象時,實際會啟動 JVM,並創建一個 Scala 端的 SparkContext 對象。代碼實現在 python/pyspark/context.py:

def _ensure_initialized(cls, instance=None, gateway=None, conf=None):    """    Checks whether a SparkContext is initialized or not.    Throws error if a SparkContext is already running.    """    with SparkContext._lock:        if not SparkContext._gateway:            SparkContext._gateway = gateway or launch_gateway(conf)            SparkContext._jvm = SparkContext._gateway.jvm

在 launch_gateway (python/pyspark/java_gateway.py)中,首先啟動JVM 進程:

SPARK_HOME = _find_spark_home()on_windows = platform.system() == "Windows"script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"command = [os.path.join(SPARK_HOME, script)]

然後創建 JavaGateway 並 import 一些關鍵的 class:

gateway = JavaGateway(        gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret,                                             auto_convert=True))java_import(gateway.jvm, "org.apache.spark.SparkConf")java_import(gateway.jvm, "org.apache.spark.api.java.*")java_import(gateway.jvm, "org.apache.spark.api.python.*")java_import(gateway.jvm, "org.apache.spark.ml.python.*")java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")java_import(gateway.jvm, "org.apache.spark.sql.*")java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")java_import(gateway.jvm, "org.apache.spark.sql.hive.*")java_import(gateway.jvm, "scala.Tuple2")

拿到 JavaGateway 對象,即可以通過它的 jvm 屬性,去調用 Java 的類了,例如:

gateway = JavaGateway()jvm = gateway.jvml = jvm.java.util.ArrayList()

然後會繼續創建 JVM 中的 SparkContext 對象:

def _initialize_context(self, jconf):    """    Initialize SparkContext in function to allow subclass specific initialization    """    return self._jvm.JavaSparkContext(jconf)
self._jsc = jsc or self._initialize_context(self._conf._jconf)

3、Python Driver 端的 RDD、SQL 接口

在 PySpark 中,繼續初始化一些 Python 和 JVM 的環境後,Python 端的 SparkContext 對象就創建好了,它實際是對 JVM 端接口的一層封裝。和 Scala API 類似,SparkContext 對象也提供了各類創建 RDD 的接口,和 Scala API 基本一一對應,我們來看一些例子。

def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,                     valueConverter=None, conf=None, batchSize=0):    jconf = self._dictToJavaMap(conf)    jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,                                                valueClass, keyConverter, valueConverter,                                                jconf, batchSize)    return RDD(jrdd, self)

可以看到,這裡 Python 端基本就是直接調用了 Java/Scala 接口。而 PythonRDD (core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala),則是一個 Scala 中封裝的伴生對象,提供了常用的 RDD IO 相關的接口。另外一些接口會通過 self._jsc 對象去創建 RDD。其中 self._jsc 就是 JVM 中的 SparkContext 對象。拿到 RDD 對象之後,可以像 Scala、Java API 一樣,對 RDD 進行各類操作,這些大部分都封裝在 python/pyspark/rdd.py 中。

這裡的代碼中出現了 jrdd 這樣一個對象,這實際上是 Scala 為提供 Java 互操作的 RDD 的一個封裝,用來提供 Java 的 RDD 接口,具體實現在 core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala 中。可以看到每個 Python 的 RDD 對象需要用一個 JavaRDD 對象去創建。

對於 DataFrame 接口,Python 層也同樣提供了 SparkSession、DataFrame 對象,它們也都是對 Java 層接口的封裝,這裡不一一贅述。

4、Executor 端進程間通信和序列化

對於 Spark 內置的算子,在 Python 中調用 RDD、DataFrame 的接口後,從上文可以看出會通過 JVM 去調用到 Scala 的接口,最後執行和直接使用 Scala 並無區別。而對於需要使用 UDF 的情形,在 Executor 端就需要啟動一個 Python worker 子進程,然後執行 UDF 的邏輯。那麼 Spark 是怎樣判斷需要啟動子進程的呢?

在 Spark 編譯用戶的 DAG 的時候,Catalyst Optimizer 會創建 BatchEvalPython 或者 ArrowEvalPython 這樣的 Logical Operator,隨後會被轉換成 PythonEvals 這個 Physical Operator。在 PythonEvals(sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:

object PythonEvals extends Strategy {  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {    case ArrowEvalPython(udfs, output, child, evalType) =>      ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil    case BatchEvalPython(udfs, output, child) =>      BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil    case _ =>      Nil  }}

創建了 ArrowEvalPythonExec 或者 BatchEvalPythonExec,而這二者內部會創建 ArrowPythonRunner、PythonUDFRunner 等類的對象實例,並調用了它們的 compute 方法。由於它們都繼承了 BasePythonRunner,基類的 compute 方法中會去啟動 Python 子進程:

def compute(      inputIterator: Iterator[IN],      partitionIndex: Int,      context: TaskContext): Iterator[OUT] = {  // .
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) // Start a thread to feed the process input from our parent's iterator val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context) writerThread.start() val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = newReaderIterator( stream, writerThread, startTime, env, worker, releasedOrClosed, context) new InterruptibleIterator(context, stdoutIterator)

這裡 env.createPythonWorker 會通過 PythonWorkerFactory (core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去啟動 Python 進程。Executor 端啟動 Python 子進程後,會創建一個 socket 與 Python 建立連接。所有 RDD 的數據都要序列化後,通過 socket 發送,而結果數據需要同樣的方式序列化傳回 JVM。

對於直接使用 RDD 的計算,或者沒有開啟 spark.sql.execution.arrow.enabled 的 DataFrame,是將輸入數據按行發送給 Python,可想而知,這樣效率極低。

在 Spark 2.2 後提供了基於 Arrow 的序列化、反序列化的機制(從 3.0 起是默認開啟),從 JVM 發送數據到 Python 進程的代碼在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。這個類主要是重寫了 newWriterThread 這個方法,使用了 ArrowWriter 向 socket 發送數據:

val arrowWriter = ArrowWriter.create(root)val writer = new ArrowStreamWriter(root, null, dataOut)writer.start()
while (inputIterator.hasNext) {val nextBatch = inputIterator.next()
while (nextBatch.hasNext) { arrowWriter.write(nextBatch.next())}
arrowWriter.finish()writer.writeBatch()arrowWriter.reset()

可以看到, 每次取出一個batch,填充給 ArrowWriter,實際數據會保存在 root 對象中,然後由 ArrowStreamWriter 將 root 對象中的整個 batch 的數據寫入到 socket 的 DataOutputStream 中去。ArrowStreamWriter 會調用 writeBatch 方法去序列化消息並寫數據,代碼參考 ArrowWriter.java#L131。

protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {  ArrowBlock block = MessageSerializer.serialize(out, batch, option);  LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",      block.getOffset(), block.getMetadataLength(), block.getBodyLength());  return block;}

在 MessageSerializer 中,使用了 flatbuffer 來序列化數據。flatbuffer 是一種比較高效的序列化協議,它的主要優點是反序列化的時候,不需要解碼,可以直接通過裸 buffer 來讀取欄位,可以認為反序列化的開銷為零。我們來看看 Python 進程收到消息後是如何反序列化的。

Python 子進程實際上是執行了 worker.py 的 main 函數 (python/pyspark/worker.py):

if __name__ == '__main__':        java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])    auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]    (sock_file, _) = local_connect_and_auth(java_port, auth_secret)    main(sock_file, sock_file)

這裡會去向 JVM 建立連接,並從 socket 中讀取指令和數據。對於如何進行序列化、反序列化,是通過 UDF 的類型來區分:

eval_type = read_int(infile)if eval_type == PythonEvalType.NON_UDF:    func, profiler, deserializer, serializer = read_command(pickleSer, infile)else:    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)

在 read_udfs 中,如果是 PANDAS 類的 UDF,會創建 ArrowStreamPandasUDFSerializer,其餘的 UDF 類型創建 BatchedSerializer。我們來看看 ArrowStreamPandasUDFSerializer(python/pyspark/serializers.py):

def dump_stream(self, iterator, stream):    import pyarrow as pa    writer = None    try:        for batch in iterator:            if writer is None:                writer = pa.RecordBatchStreamWriter(stream, batch.schema)            writer.write_batch(batch)    finally:        if writer is not None:            writer.close()
def load_stream(self, stream): import pyarrow as pa reader = pa.ipc.open_stream(stream) for batch in reader: yield batch

可以看到,這裡雙向的序列化、反序列化,都是調用了 PyArrow 的 ipc 的方法,和前面看到的 Scala 端是正好對應的,也是按 batch 來讀寫數據。對於 Pandas 的 UDF,讀到一個 batch 後,會將 Arrow 的 batch 轉換成 Pandas Series。

def arrow_to_pandas(self, arrow_column):    from pyspark.sql.types import _check_series_localize_timestamps
s = arrow_column.to_pandas(date_as_object=True)
s = _check_series_localize_timestamps(s, self._timezone) return s
def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) import pyarrow as pa for batch in batches: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]


5、Pandas UDF

前面我們已經看到,PySpark 提供了基於 Arrow 的進程間通信來提高效率,那麼對於用戶在 Python 層的 UDF,是不是也能直接使用到這種高效的內存格式呢?答案是肯定的,這就是 PySpark 推出的 Pandas UDF。區別於以往以行為單位的 UDF,Pandas UDF 是以一個 Pandas Series 為單位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 這個參數來控制。這是一個來自官方文檔的示例:

def multiply_func(a, b):    return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
df.select(multiply(col("x"), col("x"))).show()

上文已經解析過,PySpark 會將 DataFrame 以 Arrow 的方式傳遞給 Python 進程,Python 中會轉換為 Pandas Series,傳遞給用戶的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 來完成計算,在易用性和性能上都得到了很大的提升。

6、總結

PySpark 為用戶提供了 Python 層對 RDD、DataFrame 的操作接口,同時也支持了 UDF,通過 Arrow、Pandas 向量化的執行,對提升大規模數據處理的吞吐是非常重要的,一方面可以讓數據以向量的形式進行計算,提升 cache 命中率,降低函數調用的開銷,另一方面對於一些 IO 的操作,也可以降低網絡延遲對性能的影響。

然而 PySpark 仍然存在著一些不足,主要有:

進程間通信消耗額外的 CPU 資源;

編程接口仍然需要理解 Spark 的分布式計算原理;

Pandas UDF 對返回值有一定的限制,返回多列數據不太方便;

Databricks 提出了新的 Koalas 接口來使得用戶可以以接近單機版 Pandas 的形式來編寫分布式的 Spark 計算作業,對數據科學家會更加友好。而 Vectorized Execution 的推進,有望在 Spark 內部一切數據都是用 Arrow 的格式來存放,對跨語言支持將會更加友好。同時也能看到,在這裡仍然有很大的性能、易用性的優化空間,這也是我們平臺近期的主要發力方向之一。

最後打一個廣告:

Mobvista (匯量科技) 誠招 分布式計算引擎研發,對 Spark/Tensorflow/K8s ,對 C++/Scala/Rust/Go 有經驗或者有興趣深入研究的,歡迎加入我們,一起打造雲原生架構下全棧高性能數據智能計算平臺。請聯繫 xu.chen@mobvista.com 。

相關焦點

  • PySpark工作原理
    其中,Python因為入門簡單、開發效率高(人生苦短,我用Python),廣受大數據工程師喜歡,本文主要探討Pyspark的工作原理。因為我的環境是Mac,所以本文一切以Mac環境為前提,不過其它環境過車過都是差不多的。
  • PySpark源碼解析,用Python調用高效Scala接口,搞定大規模數據分析
    本文主要從源碼實現層面解析 PySpark 的實現原理,包括以下幾個方面:PySpark 的多進程架構;Python 端調用 Java、Scala 接口;Python Driver 端 RDD、SQL 接口;
  • pyspark操作MongoDB
    有幾點需要注意的:不要安裝最新的pyspark版本,請安裝`pip3 install pyspark==2.3.2`
  • 數據分析工具篇——pyspark應用詳解
    不是的~現階段流批一體盛行,Flink也逐漸進入大家的視野,大有發展壯大的趨勢,我們後面會單獨講解這一工具,這篇文章我們重點講解一下基於spark運算的pyspark工具。pyspark不是所有的代碼都在spark環境應用,可以將一些主要的運算單元切到spark環境運算完成,然後輸出運算結果到本地,最後在本地運行一些簡單的數據處理邏輯。
  • 【PySpark源碼解析】教你用Python調用高效Scala接口
    本文主要從源碼實現層面解析 PySpark 的實現原理,包括以下幾個方面:PySpark項目地址:https://github.com/apache/spark/tree/master/pythonPySpark 採用了 Python、JVM 進程分離的多進程架構,在 Driver、Executor 端均會同時有 Python、JVM 兩個進程。
  • Pyspark推薦算法實戰(一)
    前言由於最近轉向商品推薦的工作,因此從本文起,開始介紹一些利用pyspark在推薦算法中的具體應用。目前最新版本的spark已經出到2.4.5,我們主要使用的還是2.3版本,所以後續涉及代碼開發,全部基於2.3.4版本。
  • PySpark源碼解析,教你用Python調用高效Scala接口,搞定大規模數據分析
    本文主要從源碼實現層面解析 PySpark 的實現原理,包括以下幾個方面:PySpark項目地址:https://github.com/apache/spark/tree/master/pythonPySpark 採用了 Python、JVM 進程分離的多進程架構,在 Driver、Executor 端均會同時有 Python、JVM 兩個進程。
  • 手把手教你用PySpark構建機器學習模型
    數據集可以從Kaggle上下載:https://www.kaggle.com/uciml/pima-indians-diabetes-databasefrom pyspark.sql import SparkSessionspark = SparkSession.builder.appName('ml-diabetes').getOrCreate()df
  • 基於Pyspark的銷量預測
    ds,qty as label from xxx.store_sku_sale where ds>='2020-05-22' and store_code in ('Z001','Z002') """)1.2 特徵生成1).dayofweek等函數是從from pyspark.sql.functions
  • PySpark 之Spark DataFrame入門
    header=True)import pandas as pddf_pd=pd.read_csv("iris.csv")# import pysparkclass Row from module sqlfrom pyspark.sql import *# Create Example Data - Departments and Employees# Create the Departmentsdepartment1 = Row(id='123456', name='Computer Science
  • 開窗函數之累積和,PySpark,Pandas和SQL版實現
    PySpark DataFrame版本from pyspark import SparkConfconf=SparkConf()conf.set("spark.sql.execution.arrow.enabled", "true")from pyspark.sql import SparkSessionspark
  • 獨家 | PySpark和SparkSQL基礎:如何利用Python編程執行Spark(附代碼)
    第二步:在Anaconda Prompt終端中輸入「conda install pyspark」並回車來安裝PySpark包。第三步:在Anaconda Prompt終端中輸入「conda install pyarrow」並回車來安裝PyArrow包。當PySpark和PyArrow包安裝完成後,僅需關閉終端,回到Jupyter Notebook,並在你代碼的最頂部導入要求的包。
  • 螺旋鑽機工作原理是什麼?鑽機原理解析
    螺旋鑽機也可以叫做鑽土機,我們在使用鑽土機的時候想要知道鑽土機工作原理是什麼?其實鑽土機原理很簡單哦,下面小編就為大家帶來了鑽土機原理解析,一起來看看吧!鑽土機工作原理是什麼?液壓鑽坑機鑽土機原理解析: ANT螺旋鑽機根據最新國際標準製造,使用進口藥優質EN系列齒輪鋼材和最新的加工技術製作。
  • 信陽市304L角鋼工作原理解析
    信陽市304L角鋼工作原理解析   無錫新同巨不鏽鋼有限公司生產信陽市304L角鋼,廠家直銷,價格上有優勢,歡迎詢價,不報虛價!    信陽市304L角鋼工作原理解析同時採暖季限產結束後不鏽鋼板廠家復產勢必將引發鐵礦石需求的集中。熱交換器。部分地區將補漲。    江蘇省的限產方案短期內改變了供求結構。
  • 81 質譜原理和ESI-MS結構解析
    81 質譜原理和ESI-MS結構解析
  • 關於dhcp relay工作原理的解析
    關於dhcp relay工作原理的解析 下面我們來對dhcp relay工作原理進行一下解析。那麼通過文章的介紹,相信大家都能夠掌握這個原理內容。望對大家有所幫助。
  • 微波爐原理解析
    完全是免費訂閱,請放心關注。註:本文轉載自網絡,不代表本平臺立場,僅供讀者參考,著作權屬歸原創者所有。我們分享此文出於傳播更多資訊之目的。如有侵權,請在後臺留言聯繫我們進行刪除,謝謝!  微波爐是一種常用的電器,不論是在家裡或者是辦公室,甚至是餐廳導出都可以看到它的身影,這主要是因為其效率高、加熱快、佔地小等特點,說起微波爐,大家首先想到就是微波爐能夠很快的加熱食物,而加熱也是目前人們使用微波爐烹調食物時使用的最多的功能之一。那麼,你知道微波爐原理是什麼嗎?
  • 彩色印刷的著色原理解析
    彩色印刷的著色原理解析彩色印刷簡單來說就是用彩色方式複製圖像或文字的複製方式,那麼彩色印刷的著色原理是什麼呢?採用紅、綠、藍三原色和黑色色料(油墨或燃料)按減色混合原理實現全彩色複製的平板印刷方法。CMYK模式在本質上與RGB模式沒有什麼區別,只是產生色彩的原理不同,在RGB模式中由光源發出的色光混合生成顏色,而在CMYK模式中由光線照到有不同比例C、M、Y、K油墨的紙上,部分光譜被吸收後,反射到人眼的光產生顏色。由於C、M、Y、K在混合成色時,隨著C、M、Y、K四種成分的增多,反射到人眼的光會越來越少,光線的亮度會越來越低,所有CMYK模式產生顏色的方法又被稱為色光減色法。
  • 瀏覽器加載解析渲染網頁原理
    一、瀏覽器加載網頁資源的原理1、HTML支持的組要資源類型在瀏覽器內核有一個管理資源的對象CachedResource類,在CachedResource類下有很多子類來分工不同的資源管理,這些資源管理子類分別是:資源
  • 氧量分析儀傳感器原理解析
    氧量分析儀傳感器原理解析在化工企業生產過程中,尤其是存在化學反應的的生產過程中,氣體分析儀表的應用非常的廣泛,對生產過程控制及工藝指導起著關鍵作用。隨著技術發展,氣體分析儀器的種類也越來越多,一般根據氣檢測原理來劃分,如電化學原理、熱導原理、紅外原理、雷射原理;今天諾科儀器主要介紹一下氧量分析儀的傳感器原理。電化學式傳感器是利用被檢測氣體的電化學活性,將其氧化或還原,從而檢測氣體含量。