PySpark源碼解析,用Python調用高效Scala接口,搞定大規模數據分析

2021-01-03 機器之心Pro

機器之心專欄

作者:匯量科技-陳緒

相較於Scala語言而言,Python具有其獨有的優勢及廣泛應用性,因此Spark也推出了PySpark,在框架上提供了利用Python語言的接口,為數據科學家使用該框架提供了便利。

眾所周知,Spark 框架主要是由 Scala 語言實現,同時也包含少量 Java 代碼。Spark 面向用戶的編程接口,也是 Scala。然而,在數據科學領域,Python 一直佔據比較重要的地位,仍然有大量的數據工程師在使用各類 Python 數據處理和科學計算的庫,例如 numpy、Pandas、scikit-learn 等。同時,Python 語言的入門門檻也顯著低於 Scala。

為此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便廣大數據科學家使用。本文主要從源碼實現層面解析 PySpark 的實現原理,包括以下幾個方面:

PySpark 的多進程架構;Python 端調用 Java、Scala 接口;Python Driver 端 RDD、SQL 接口;Executor 端進程間通信和序列化;Pandas UDF;總結。PySpark項目地址:https://github.com/apache/spark/tree/master/python

1、PySpark 的多進程架構

PySpark 採用了 Python、JVM 進程分離的多進程架構,在 Driver、Executor 端均會同時有 Python、JVM 兩個進程。當通過 spark-submit 提交一個 PySpark 的 Python 腳本時,Driver 端會直接運行這個 Python 腳本,並從 Python 中啟動 JVM;而在 Python 中調用的 RDD 或者 DataFrame 的操作,會通過 Py4j 調用到 Java 的接口。

在 Executor 端恰好是反過來,首先由 Driver 啟動了 JVM 的 Executor 進程,然後在 JVM 中去啟動 Python 的子進程,用以執行 Python 的 UDF,這其中是使用了 socket 來做進程間通信。總體的架構圖如下所示:

2、Python Driver 如何調用 Java 的接口

上面提到,通過 spark-submit 提交 PySpark 作業後,Driver 端首先是運行用戶提交的 Python 腳本,然而 Spark 提供的大多數 API 都是 Scala 或者 Java 的,那麼就需要能夠在 Python 中去調用 Java 接口。這裡 PySpark 使用了 Py4j 這個開源庫。當創建 Python 端的 SparkContext 對象時,實際會啟動 JVM,並創建一個 Scala 端的 SparkContext 對象。代碼實現在 python/pyspark/context.py:

def_ensure_initialized(cls,instance=None,gateway=None,conf=None):"""CheckswhetheraSparkContextisinitializedornot.ThrowserrorifaSparkContextisalreadyrunning."""withSparkContext._lock:ifnotSparkContext._gateway:SparkContext._gateway=gatewayorlaunch_gateway(conf)SparkContext._jvm=SparkContext._gateway.jvm

在 launch_gateway (python/pyspark/java_gateway.py) 中,首先啟動 JVM 進程:

SPARK_HOME=_find_spark_home()#LaunchthePy4jgatewayusingSpark'sruncommandsothatwepickupthe#properclasspathandsettingsfromspark-env.shon_windows=platform.system()=="Windows"script="./bin/spark-submit.cmd"ifon_windowselse"./bin/spark-submit"command=[os.path.join(SPARK_HOME,script)]

然後創建 JavaGateway 並 import 一些關鍵的 class:

gateway=JavaGateway(gateway_parameters=GatewayParameters(port=gateway_port,auth_token=gateway_secret,auto_convert=True))#ImporttheclassesusedbyPySparkjava_import(gateway.jvm,"org.apache.spark.SparkConf")java_import(gateway.jvm,"org.apache.spark.api.java.*")java_import(gateway.jvm,"org.apache.spark.api.python.*")java_import(gateway.jvm,"org.apache.spark.ml.python.*")java_import(gateway.jvm,"org.apache.spark.mllib.api.python.*")#TODO(davies):moveintosqljava_import(gateway.jvm,"org.apache.spark.sql.*")java_import(gateway.jvm,"org.apache.spark.sql.api.python.*")java_import(gateway.jvm,"org.apache.spark.sql.hive.*")java_import(gateway.jvm,"scala.Tuple2")

拿到 JavaGateway 對象,即可以通過它的 jvm 屬性,去調用 Java 的類了,例如:

gateway = JavaGateway()

gateway=JavaGateway()jvm=gateway.jvml=jvm.java.util.ArrayList()

然後會繼續創建 JVM 中的 SparkContext 對象:

def_initialize_context(self,jconf):"""InitializeSparkContextinfunctiontoallowsubclassspecificinitialization"""returnself._jvm.JavaSparkContext(jconf)#CreatetheJavaSparkContextthroughPy4Jself._jsc=jscorself._initialize_context(self._conf._jconf)

3、Python Driver 端的 RDD、SQL 接口

在 PySpark 中,繼續初始化一些 Python 和 JVM 的環境後,Python 端的 SparkContext 對象就創建好了,它實際是對 JVM 端接口的一層封裝。和 Scala API 類似,SparkContext 對象也提供了各類創建 RDD 的接口,和 Scala API 基本一一對應,我們來看一些例子。

defnewAPIHadoopFile(self,path,inputFormatClass,keyClass,valueClass,keyConverter=None,valueConverter=None,conf=None,batchSize=0):jconf=self._dictToJavaMap(conf)jrdd=self._jvm.PythonRDD.newAPIHadoopFile(self._jsc,path,inputFormatClass,keyClass,valueClass,keyConverter,valueConverter,jconf,batchSize)returnRDD(jrdd,self)

可以看到,這裡 Python 端基本就是直接調用了 Java/Scala 接口。而 PythonRDD (core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala),則是一個 Scala 中封裝的伴生對象,提供了常用的 RDD IO 相關的接口。另外一些接口會通過 self._jsc 對象去創建 RDD。其中 self._jsc 就是 JVM 中的 SparkContext 對象。拿到 RDD 對象之後,可以像 Scala、Java API 一樣,對 RDD 進行各類操作,這些大部分都封裝在 python/pyspark/rdd.py 中。

這裡的代碼中出現了 jrdd 這樣一個對象,這實際上是 Scala 為提供 Java 互操作的 RDD 的一個封裝,用來提供 Java 的 RDD 接口,具體實現在 core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala 中。可以看到每個 Python 的 RDD 對象需要用一個 JavaRDD 對象去創建。

對於 DataFrame 接口,Python 層也同樣提供了 SparkSession、DataFrame 對象,它們也都是對 Java 層接口的封裝,這裡不一一贅述。

4、Executor 端進程間通信和序列化

對於 Spark 內置的算子,在 Python 中調用 RDD、DataFrame 的接口後,從上文可以看出會通過 JVM 去調用到 Scala 的接口,最後執行和直接使用 Scala 並無區別。而對於需要使用 UDF 的情形,在 Executor 端就需要啟動一個 Python worker 子進程,然後執行 UDF 的邏輯。那麼 Spark 是怎樣判斷需要啟動子進程的呢?

在 Spark 編譯用戶的 DAG 的時候,Catalyst Optimizer 會創建 BatchEvalPython 或者 ArrowEvalPython 這樣的 Logical Operator,隨後會被轉換成 PythonEvals 這個 Physical Operator。在 PythonEvals(sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:

objectPythonEvalsextendsStrategy{overridedefapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{caseArrowEvalPython(udfs,output,child,evalType)=>ArrowEvalPythonExec(udfs,output,planLater(child),evalType)::NilcaseBatchEvalPython(udfs,output,child)=>BatchEvalPythonExec(udfs,output,planLater(child))::Nilcase_=>Nil}}

創建了 ArrowEvalPythonExec 或者 BatchEvalPythonExec,而這二者內部會創建 ArrowPythonRunner、PythonUDFRunner 等類的對象實例,並調用了它們的 compute 方法。由於它們都繼承了 BasePythonRunner,基類的 compute 方法中會去啟動 Python 子進程:

defcompute(inputIterator:Iterator[IN],partitionIndex:Int,context:TaskContext):Iterator[OUT]={//......valworker:Socket=env.createPythonWorker(pythonExec,envVars.asScala.toMap)//Startathreadtofeedtheprocessinputfromourparent'siteratorvalwriterThread=newWriterThread(env,worker,inputIterator,partitionIndex,context)writerThread.start()valstream=newDataInputStream(newBufferedInputStream(worker.getInputStream,bufferSize))valstdoutIterator=newReaderIterator(stream,writerThread,startTime,env,worker,releasedOrClosed,context)newInterruptibleIterator(context,stdoutIterator)

這裡 env.createPythonWorker 會通過PythonWorkerFactory(core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去啟動 Python 進程。Executor 端啟動 Python 子進程後,會創建一個 socket 與 Python 建立連接。所有 RDD 的數據都要序列化後,通過 socket 發送,而結果數據需要同樣的方式序列化傳回 JVM。

對於直接使用 RDD 的計算,或者沒有開啟 spark.sql.execution.arrow.enabled 的 DataFrame,是將輸入數據按行發送給 Python,可想而知,這樣效率極低。

在 Spark 2.2 後提供了基於 Arrow 的序列化、反序列化的機制(從 3.0 起是默認開啟),從 JVM 發送數據到 Python 進程的代碼在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。這個類主要是重寫了 newWriterThread 這個方法,使用了 ArrowWriter 向 socket 發送數據:

valarrowWriter=ArrowWriter.create(root)valwriter=newArrowStreamWriter(root,null,dataOut)writer.start()while(inputIterator.hasNext){valnextBatch=inputIterator.next()while(nextBatch.hasNext){arrowWriter.write(nextBatch.next())}arrowWriter.finish()writer.writeBatch()arrowWriter.reset()

可以看到,每次取出一個 batch,填充給 ArrowWriter,實際數據會保存在 root 對象中,然後由 ArrowStreamWriter 將 root 對象中的整個 batch 的數據寫入到 socket 的 DataOutputStream 中去。ArrowStreamWriter 會調用 writeBatch 方法去序列化消息並寫數據,代碼參考 ArrowWriter.java#L131。

protectedArrowBlockwriteRecordBatch(ArrowRecordBatchbatch)throwsIOException{ArrowBlockblock=MessageSerializer.serialize(out,batch,option);LOGGER.debug("RecordBatchat{},metadata:{},body:{}",block.getOffset(),block.getMetadataLength(),block.getBodyLength());returnblock;}

在 MessageSerializer 中,使用了 flatbuffer 來序列化數據。flatbuffer 是一種比較高效的序列化協議,它的主要優點是反序列化的時候,不需要解碼,可以直接通過裸 buffer 來讀取欄位,可以認為反序列化的開銷為零。我們來看看 Python 進程收到消息後是如何反序列化的。

Python 子進程實際上是執行了 worker.py 的 main 函數 (python/pyspark/worker.py):

if__name__=='__main__':#ReadinformationabouthowtoconnectbacktotheJVMfromtheenvironment.java_port=int(os.environ["PYTHON_WORKER_FACTORY_PORT"])auth_secret=os.environ["PYTHON_WORKER_FACTORY_SECRET"](sock_file,_)=local_connect_and_auth(java_port,auth_secret)main(sock_file,sock_file)

這裡會去向 JVM 建立連接,並從 socket 中讀取指令和數據。對於如何進行序列化、反序列化,是通過 UDF 的類型來區分:

eval_type=read_int(infile)ifeval_type==PythonEvalType.NON_UDF:func,profiler,deserializer,serializer=read_command(pickleSer,infile)else:func,profiler,deserializer,serializer=read_udfs(pickleSer,infile,eval_type)

在 read_udfs 中,如果是 PANDAS 類的 UDF,會創建 ArrowStreamPandasUDFSerializer,其餘的 UDF 類型創建 BatchedSerializer。我們來看看 ArrowStreamPandasUDFSerializer(python/pyspark/serializers.py):

defdump_stream(self,iterator,stream):importpyarrowaspawriter=Nonetry:forbatchiniterator:ifwriterisNone:writer=pa.RecordBatchStreamWriter(stream,batch.schema)writer.write_batch(batch)finally:ifwriterisnotNone:writer.close()defload_stream(self,stream):importpyarrowaspareader=pa.ipc.open_stream(stream)forbatchinreader:yieldbatch

可以看到,這裡雙向的序列化、反序列化,都是調用了 PyArrow 的 ipc 的方法,和前面看到的 Scala 端是正好對應的,也是按 batch 來讀寫數據。對於 Pandas 的 UDF,讀到一個 batch 後,會將 Arrow 的 batch 轉換成 Pandas Series。

defarrow_to_pandas(self,arrow_column):frompyspark.sql.typesimport_check_series_localize_timestamps#Ifthegivencolumnisadatetypecolumn,createsaseriesofdatetime.datedirectly#insteadofcreatingdatetime64[ns]asintermediatedatatoavoidoverflowcausedby#datetime64[ns]typehandling.s=arrow_column.to_pandas(date_as_object=True)s=_check_series_localize_timestamps(s,self._timezone)returnsdefload_stream(self,stream):"""DeserializeArrowRecordBatchestoanArrowtableandreturnasalistofpandas.Series."""batches=super(ArrowStreamPandasSerializer,self).load_stream(stream)importpyarrowaspaforbatchinbatches:yield[self.arrow_to_pandas(c)forcinpa.Table.from_batches([batch]).itercolumns()]

5、Pandas UDF

前面我們已經看到,PySpark 提供了基於 Arrow 的進程間通信來提高效率,那麼對於用戶在 Python 層的 UDF,是不是也能直接使用到這種高效的內存格式呢?答案是肯定的,這就是 PySpark 推出的 Pandas UDF。區別於以往以行為單位的 UDF,Pandas UDF 是以一個 Pandas Series 為單位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 這個參數來控制。這是一個來自官方文檔的示例:

defmultiply_func(a,b):returna*bmultiply=pandas_udf(multiply_func,returnType=LongType())df.select(multiply(col("x"),col("x"))).show()

上文已經解析過,PySpark 會將 DataFrame 以 Arrow 的方式傳遞給 Python 進程,Python 中會轉換為 Pandas Series,傳遞給用戶的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 來完成計算,在易用性和性能上都得到了很大的提升。

6、總結

PySpark 為用戶提供了 Python 層對 RDD、DataFrame 的操作接口,同時也支持了 UDF,通過 Arrow、Pandas 向量化的執行,對提升大規模數據處理的吞吐是非常重要的,一方面可以讓數據以向量的形式進行計算,提升 cache 命中率,降低函數調用的開銷,另一方面對於一些 IO 的操作,也可以降低網絡延遲對性能的影響。

然而 PySpark 仍然存在著一些不足,主要有:

進程間通信消耗額外的 CPU 資源;編程接口仍然需要理解 Spark 的分布式計算原理;Pandas UDF 對返回值有一定的限制,返回多列數據不太方便。Databricks 提出了新的 Koalas 接口來使得用戶可以以接近單機版 Pandas 的形式來編寫分布式的 Spark 計算作業,對數據科學家會更加友好。而 Vectorized Execution 的推進,有望在 Spark 內部一切數據都是用 Arrow 的格式來存放,對跨語言支持將會更加友好。同時也能看到,在這裡仍然有很大的性能、易用性的優化空間,這也是我們平臺近期的主要發力方向之一。

陳緒,匯量科技(Mobvista)高級算法科學家,負責匯量科技大規模數據智能計算引擎和平臺的研發工作。在此之前陳緒是阿里巴巴高級技術專家,負責阿里集團大規模機器學習平臺的研發。

相關焦點

  • 用python分析上海二手房數據,用幾十行代碼爬取大規模數據!
    ,因此在爬取過程中,會有重複的數據,如下圖,3000條數據中有523條重複(為避免重複可以嘗試倒序循環爬取)。1.1 爬取目的一個朋友在學習自考,作業是爬取數據進行數據分析,正好最近我在學習python,所以他委託我幫他完成這一工作1.2使用模塊requests進行網絡請求、bs4進行數據解析、xlwt進行excel表格存儲2、網頁結構分析2.1 首頁分析,獲取數據網頁連結:http://sh.lianjia.com
  • python接口自動化-Json數據處理
    一般常見的接口返回數據也是json格式的,我們在做判斷時候,往往只需要提取其中幾個關鍵的參數就行,這時候就需要json來解析返回的數據了。一、json模塊簡介1.Json簡介:Json,全名 JavaScript Object Notation,是一種輕量級的數據交換格式,常用於http請求中2.可以用help(json),查看對應的源碼注釋內容
  • Scala對於大數據開發重要嗎?Scala基礎學習建議
    對於大數據開發者而言,Scala主要是與Spark和Kafka兩個大數據組件緊密相關,採用Scala編寫的源碼,對於大數據開發者而言,要想真正把技術理論和框架吃透,研讀源碼是非常關鍵的。Scala作為一門面向對象的函數式程式語言,把面向對象編程與函數式編程結合起來,使得代碼更簡潔高效易於理解。
  • Python爬蟲技術路線?
    首先展示一下如何用python爬蟲requests庫進行爬取,requests庫是python爬蟲最基礎也必須掌握的庫。Requests庫教程:1、Requests庫速成教程;2、Requests全面教程。
  • 486頁Android源碼精編解析火爆網際網路,完整版免費下載
    Retrofit源碼Retrofit簡介與其他網絡請求開源庫對比Retrofit 的具體使用源碼分析創建網絡請求接口的實例外觀模式代理模式OkHttp 源碼OkHttp 3.7源碼分析(一)——整體架構簡單使用總體架構OkHttp 3.7源碼分析(二)——攔截器&一個實際網絡請求的實現OkHttp 3.7源碼分析(三)——任務隊列OkHttp 3.7源碼分析(四)——緩存策略OkHttp 3.7源碼分析
  • 如何在Apache Pyspark中運行Scikit-learn模型
    如果您已經準備好了機器學習模型,則可以直接跳到「 pyspark wrapper」部分,也可以通過以下步驟創建一個簡單的scikit learn機器學習模型。scikit learn機器學習模型:我們使用Python創建一個簡單的機器學習模型:將機器學習模型保存到磁碟pyspark wrapper讓我們考慮一下pyspark dataframe (df)中提供的運行預測所需的特徵創建一個python函數,該函數接受這四個特性作為參數,並將預測的分數作為輸出進行返回
  • 通過WordCount解析Spark RDD內部源碼機制
    的運行結果如下:二、解析RDD生成的內部機制下面詳細解析一下wordcount.scala的運行原理。在wordcount.scala的基礎上,我們從數據流動的視角分析數據到底是怎麼處理的。下面有一張WordCount數據處理過程圖,由於圖片較大,為了方便閱讀,將原圖分成兩張圖,如下面兩張圖所示。數據在生產環境中默認在HDFS中進行分布式存儲,如果在分布式集群中,我們的機器會分成不同的節點對數據進行處理,這裡我們在本地測試,重點關注數據是怎麼流動的。
  • python進階之源碼分析:如何將一個類方法變為多個方法?
    作者:豌豆花下貓 來源:Python貓在之前的文章《python 中如何實現參數化測試?》中,我提到了在 python 中實現參數化測試的幾個庫,並留下一個問題:它們是如何做到把一個方法變成多個方法,並且將每個方法與相應的參數綁定起來的呢?
  • 0485-如何在代碼中指定PySpark的Python運行環境
    randomfrom operator import addfrom pyspark.sql import SparkSessionspark = SparkSession \ .builder \ .appName("PythonPi") \ .config("spark.pyspark.python", "python/bin/python3.6") \ .config("spark.pyspark.driver.python
  • Python在pickle序列化後數據如何處理
    曾經接觸過一個項目,是關於Python用pickle序列化後的數據。大概就是可能需要將之前python寫的一套數據傳輸的代碼,改用Java實現,提高開發效率;python代碼結構很清晰,分為兩部分:抽取接口數據,pickle序列化後打包上傳到雲伺服器;從雲伺服器下載,解析入庫;我主要做的是先將解析入庫這部分Java化,其中一個很大的問題就是解析pickle序列化後的數據,處理這樣的問題對當時作為新手的我還是費了一番心思。
  • 大數據分析工程師面試集錦2-Scala
    開發需要最簡單和重要的理由是開發需要,大數據分析工程師是需要掌握大數據相關組件的想要通過Scala的面試,除了平時在學習和工作中的總結以外,刷題是一個很好的辦法,本文會結合數據分析工程師工作中需要掌握的知識點做一個篩選,最終挑選出如下的考題,主要分為問答題和手寫題,仔細看看有沒有你不知道的知識點?
  • python 進階之源碼分析:如何將一個類方法變為多個方法?
    關於第一點,它跟 ddt 是相似的,只是一些命名風格上的差異,以及參數的解析及綁定不同,不值得太關注。,存入待調用的列表裡。跟前面分析的兩個庫不同,它並沒有在此創建新的測試方法,而是復用了已有的方法。,真的是自討苦吃……不過,依稀大致可以看出,它在實現參數化時,使用的是生成器的方案,遍歷一個參數則調用一次測試方法,而前面的 ddt 和 parameterized 則是一次性把所有參數解析完,生成 n 個新的測試方法,再交給測試框架去調度。
  • 最全步驟整理:Python大數據分析-使用PySpark分析多個Excel文件
    Apache Spark是一個用於大規模數據分析處理的引擎。它支持Java、Scala、Python和R語言,在數據分析人工智慧領域 Python的使用已經遠超其它語言。其中Spark還支持一組豐富的高級工具,包括用於SQL和結構化數據處理的Spark SQL、用於機器學習的MLlib、用於圖形處理的GraphX以及用於增量計算和流處理的Spark Steaming。本文使用PySpark的SQL module 來實現對CSV文件數據分析及處理。
  • Python實現線程的高效非阻塞I/O調用
    使用協程可以實現高效的並發任務。Python3.5之後出現的async/await的使用方法,本文將詳細講述async/await的使用以及結合tornado實現非阻塞伺服器的方法。python協程與I/O調用的現狀協程的一般使用方法import asyncioasync def do_some_work(x): print(&39;, x) 此處先掛起,再執行await的協程,最後執行return
  • Python爬蟲「學前班」!別踩坑了!輕鬆爬取大規模數據
    前言爬蟲應用的廣泛,例如搜尋引擎、採集數據、廣告過濾、數據分析等。當我們對少數網站內容進行爬取時寫多個爬蟲還是有可能的,但是對於需要爬取多個網站內容的項目來說是不可能編寫多個爬蟲的,這個時候我們就需要智能爬蟲。
  • TOP 3大開源Python數據分析工具!
    /venvs/python-big-data/bin/activate  $ pip install ipython  $ pip install pandas  $ pip install pyspark  $ pip install scikit-learn  $ pip install scipy  本文選取的示例數據是最近幾天從某網站獲取的實際生產日誌數據,從技術層面來看,這些數據並不能算作是大數據
  • 成都python快速入門培訓:Python如何調用接口講解
    成都python快速入門培訓:Python如何調用接口講解最近成都達內小編有研究接口測試,然後查了查資料,發現有兩種方法,一種是使用urllib庫,一種是使用requests庫。而在這裡,我使用的是requests庫,為什麼要用這個呢?從官方文檔看出,python的標準庫urllib2提供了大部分需要的HTTP功能,但是呢?
  • 我拿到了阿里大牛(清華畢業生)總結的大數據學習路線+視頻教程
    Storm架構分析Storm編程模型、Tuple源碼、並發度分析Storm WordCount案例及常用Api分析Storm集群部署實戰Storm+Kafka+Redis業務指標計算Storm源碼下載編譯Strom集群啟動及源碼分析Storm任務提交及源碼分析
  • 好程式設計師Python培訓分享Python如何調用RPC接口
    Python如何調用RPC接口是很多Python開發工程師比較關心的問題,本篇文章好程式設計師Python培訓小編就給喜歡Python開發的小夥伴們分享一下Python調用RPC接口的詳解,文中有詳細的代碼列出有很好的參考價值,喜歡的小夥伴就隨小編一起來看一下吧,希望對大家有所幫助。