用Spark計算引擎執行FATE聯邦學習任務

2021-03-03 亨利筆記

題圖攝於北京前門

(本文作者系 VMware 中國研發雲原生實驗室工程師,聯邦學習開源項目 KubeFATE / FATE-Operator 維護者。)

需要加入KubeFATE開源項目討論群的同學,請關注亨利筆記公眾號後回復 「kubefate」 即可。

VMware招聘聯邦學習、隱私計算開發工程師

相關文章

在Juypter Notebook中構建聯邦學習任務

雲原生聯邦學習平臺 KubeFATE 原理詳解

用KubeFATE在K8s上部署聯邦學習FATE v1.5

使用Docker Compose 部署FATE v1.5

在上篇文章 在Juypter Notebook中構建聯邦學習任務 構建任務中提到,FATE 1.5 LTS 版本支持用戶使用 Spark 作為底層的計算引擎,本文將對其實現細節以及使用進行簡單介紹,方便用戶在實際的使用過程中進行調優或者排查錯誤。

使用分布式計算引擎的意義

在 FATE 中一個比較重要的組件是 "FATE Flow",它負責對用戶任務的管理和調度。如官方文檔所示,"FATE Flow" 的工作模式有兩種,分別為單機 (standalone) 和集群(cluster) 模式。單機模式中,數據的存儲以及計算都在 "FATE Flow" 本地執行因此無法有效擴展,所以單機模式主要用於學習以及測試。而在集群模式中,數據的存儲以及計算不再通過本地而是下發到分布式的集群中執行,而集群的大小可以根據實際的業務需求來進行伸縮,可滿足不同數據集規模的需求。

FATE 默認支持使用 "eggroll" 作為其底下計算和存儲的集群,在經過了不斷的迭代和優化之後目前已經能夠滿足大多數聯邦學習應用場景的需求。eggroll 本身是一個相對獨立的集群,它對外提供一個統一的入口以及一組API,外部應用可以通過 RPC 調用的方式把任務發送到 eggroll 集群上執行,而 eggroll 本身是支持橫向擴展的因此用戶可以根據實際場景調整集群的規模。

FATE 在 v1.5 中徹底重構了 Apache Spark 作為計算引擎部分,並提供正式支持。Spark 是一個得到業界廣泛認可的內存型 (in-memory) 計算引擎,由於其簡單、高效和集群管理工具成熟等特性,因此在許多公司的生產環境中被大規模部署和使用。這也是FATE支持使用它的主要原因之一。由於技術原因,目前FATE 對使用 Spark 的支持還不夠完善,暫時還不能滿足大規模樣本(千萬級別)的訓練任務,但優化的工作已在進行中。

KubeFATE 在 1.5 版本支持了 FATE On Spark 的部署,它可以通過容器的方式啟動一個 Spark 集群來為  FATE 提供計算服務,詳情可參考下面的章節:使用 Spark 作為計算引擎。

與具體計算引擎進行對接的是 FATE FLow 服務,因此我們將簡單分析該服務的結構,以弄明白它是如何跟不同的計算引擎交互。

FATE Flow 結構簡介

在 FATE 1.5 中,"FATE Flow" 服務迎來了比較重大的重構,它把存儲、計算、聯邦傳輸(federation)等操作抽象成了不同的接口以供上層的應用使用。而接口在具體的實現中可以通過調用不同的庫來訪問不同的運行時 (runtime),通過這種方式可以非常容易地擴展對其他計算 (如spark) 或存儲 (HDFS、MySQL) 服務的支持。一般來說使用FATE進行聯邦學習任務可分為以下步驟(假設組網已完成):

調用 FATE Flow 提供的接口上傳訓練用的數據集

定義訓練任務的 pipeline 並調用 FATE Flow 接口上傳任務

根據訓練的結果不斷調整訓練參數並得到最終模型

通過 FATE Flow 上傳預測用的數據集

定義預測任務並通過 FATE Flow 執行

在上述的步驟中,除了任務的調度必需要 FATE Flow 參與之外,存儲和計算等其他部分的工作都可以藉助別的服務來完成。

如下圖所示,"FATE Flow" 這個方框內列出了部分接口,其中:

"Storage Interface"用於數據集的管理,如上傳本地數據、刪除上傳的數據等。

"Storage Meta Interface"用於數據集的元數據管理。

"Computing Interface"用於執行計算任務。

"Federation Interface"用於在各個訓練參與方之間傳輸數據。

綠色的方格是接口的具體實現,深灰色方格是用於跟遠端服務進行交互的客戶端,而藍色的方格則對應著獨立於 FATE Flow 服務之外的其他運行時。例如,對於計算接口來說,具體實現了該接口的類是 "Table",而 Table 的類型又分為了兩種,其中一種使用 "rollpair" 來跟 "eggroll" 集群交互;而另一種則使用 "pyspark" 中的 "rdd" 來跟 "spark" 集群進行交互。

使用不同計算引擎之間的差異

在上一節中提到,FATE Flow 通過抽象出來的接口可以使用不同的計算、存儲等服務,但由於依賴以及實現機制等原因,這些服務在選擇上有一定的制約,但具體可分為兩類:

使用eggroll作為計算引擎

使用spark作為計算引擎

當使用 eggroll 作為計算引擎時,FATE 的整體架構如下: 

eggroll 集群中有三種不同類型的節點,分別是 Cluster Manager、Node Manager 和 Rollsite,其中 Cluster Manager 負責提供服務入口以及分配資源,Node Manager 是實際執行計算和存儲的節點,而 Rollsite 則提供傳輸服務。

而當使用 Spark 作為計算引擎時,FATE 的整體架構如下: 

由於Spark 本身是一個內存 (in-memory) 計算框架,一般需要藉助其他服務來持久化輸出,因此,要在 FATE 中使用 Spark 作為計算引擎還需要藉助 HDFS 來實現數據的持久化。至於聯邦傳輸則分為了兩部分,分別是指令 (pipeline) 的同步和訓練過程中消息的同步,它們分別藉助 nginx 和 rabbitmq 服務來完成。

使用Spark作為計算引擎

前置條件

前面的小節提到,要使用 Spark 作為計算引擎還需要依賴於 Nginx、RabbitMQ 以及HDFS 等服務,對於完整的服務安裝部署以及配置有三種方式可供參考:

基於裸機的集群安裝部署可以參考FATE ON Spark部署指南。

基於 "docker-compose" 的集群安裝部署可以參考使用Docker Compose 部署 FATE,只需要把配置文件中的computing_backend設置成spark即可,在真正部署的時候會以容器的方式拉起 HDFS、Spark 等服務。

基於 Kubernetes 的集群部署可以參考Kubernetes部署方案,創建集群時使用 cluster-spark.yaml 文件即可創建一個基於 K8s 的 FATE On Spark 集群,至於Spark 節點的數量也可以在這個文件中定義。

注意:目前經過驗證的 Spark、HDFS 以及 RabbitMQ 的版本分別為2.4,2.7和3.8

對於想要利用現有 Spark 集群的用戶來說,除了需要部署其他依賴的服務之外還需要解決 FATE 的 python 包依賴,具體措施是給所有要運行聯邦學習工作負載的Spark節點進行如下操作:

創建新目錄放置文件

$ mkdir -p /data/projects
$ cd /data/projects

下載和安裝"miniconda"

$ wget https://repo.anaconda.com/miniconda/Miniconda3-4.5.4-Linux-x86_64.sh
$ sh Miniconda3-4.5.4-Linux-x86_64.sh -b -p /data/projects/miniconda3
$ miniconda3/bin/pip install virtualenv
$ miniconda3/bin/virtualenv -p /data/projects/miniconda3/bin/python3.6 --no-wheel --no-setuptools --no-download /data/projects/python/venv

下載FATE項目代碼

$ git clone https://github.com/FederatedAI/FATE/tree/v1.5.0

// 添加python依賴路徑
$ echo "export PYTHONPATY=/data/projects/fate/python" >> /data/projects/python/venv/bin/activate

進入python的虛擬環境

$ source /data/projects/python/venv/bin/activate

修改並下載python庫

// 剔除tensorflow和pytorch依賴
$ sed -i -e '23,25d' ./requirements.txt
$ pip install setuptools-42.0.2-py2.py3-none-any.whl
$ pip install -r /data/projects/python/requirements.txt

至此,依賴安裝完畢,之後在 FATE 中提交任務時還需通過配置spark.pyspark.python來指定使用該 python 環境。

示例

當 Spark 集群準備完畢後就可以在 FATE 中使用它了,使用的方式則是在任務的定義中把backend設置為1,這樣 "FATE Flow" 就會在後續的調度中,通過 "spark-submit" 工具把提交到 Spark 集群上執行。

需要注意的是,雖然 Spark 集群的 master 可以在任務的配置中指定,但是 HDFS、RabbitMQ 以及 Nginx 等服務需要在 FATE Flow 啟動之前通過配置文件的方式指定,因此當這些服務地址發生改變時,需要更新配置文件並重啟 FATE Flow 服務。

當 FATE Flow 服務正常啟動後,可以使用"toy_example"來驗證環境。步驟如下:

修改toy_example_config文件如下:

{
"initiator": {
"role": "guest",
"party_id": 9999
},
"job_parameters": {
"work_mode": 0,
"backend": 1,
"spark_run": {
"executor-memory": "4G",
"total-executor-cores": 4
}
},
"role": {
"guest": [
9999
],
"host": [
9999
]
},
"role_parameters": {
"guest": {
"secure_add_example_0": {
"seed": [
123
]
}
},
"host": {
"secure_add_example_0": {
"seed": [
321
]
}
}
},
"algorithm_parameters": {
"secure_add_example_0": {
"partition": 4,
"data_num": 1000
}
}
}

其中spark_run裡面定義了要提供給"spark-submit"的參數,因此還可以通過master來指定master的地址、或者conf來指定spark.pyspark.python的路徑等,一個簡單的例子如下:

...
"job_parameters": {
"work_mode": 0,
"backend": 0,
"spark_run": {
"master": "spark://127.0.0.1:7077"
"conf": "spark.pyspark.python=/data/projects/python/venv/bin/python"
},
},
...

如果沒有設置spark_run欄位則默認讀取${SPARK_HOME}/conf/spark-defaults.conf中的配置。更多的關於spark的參數可參考Spark Configuration。

提交任務以及查看任務狀態 運行以下命令提交任務運行"toy_example"。

$ python run_toy_example.py -b 1 9999 9999 1

查看 fate_board :

查看 Spark 的面板 :

根據上面的輸出可以看到,FATE 通過 Spark 集群成功運行了"toy_example"的測試。

總結

本文主要梳理了分布式系統對FATE的重要性,同時也對比了 FATE 所支持兩種計算引擎 "eggroll" 和 "spark" 之間的差異,最後詳細描述了如何在 FATE 中使用 Saprk 運行任務。由於篇幅有限,關於在如何 FATE 中使用 Spark 的部分只作了簡單介紹,更多的內容如節點資源分配、參數調優等還待用戶探索。

與 EggRoll 相比,目前 FATE 對 Spark 作為計算引擎的支持還在完善中,相信再經過幾個版本的迭代,其使用體驗和穩定性以及效率上會達到更高的水準。


需要加入KubeFATE開源項目討論群的同學,請先關注亨利筆記公眾號,然後回復 「kubefate」 即可。

相關連結

要想了解雲原生、人工智慧和區塊鏈等技術原理,請立即長按以下二維碼,關注本公眾號亨利筆記 ( henglibiji ),以免錯過更新。

歡迎轉發、收藏和點 「在看」,或者點擊閱讀原文。

相關焦點

  • 雲原生聯邦學習平臺 KubeFATE 原理詳解
    VMware招聘聯邦學習、隱私計算開發工程師相關文章聯邦學習平臺 KubeFATE 部署 FATE 的配置說明在Juypter Notebook中構建聯邦學習任務用KubeFATE在K8s上上部署聯邦學習FATE v1.5使用Docker Compose 部署FATE v1.5關於FATE FATE 是一個聯邦學習的開源項目,旨在提供一個安全的計算框架來支持聯合AI生態系統。
  • 用KubeFATE在Kubernetes上部署聯邦學習集群
    之前我們在文章《使用KubeFATE快速部署聯邦學習實驗開發環境(一)》、《使用KubeFATE快速部署聯邦學習實驗開發環境(二)》和《使用FATE進行圖片識別的深度神經網絡聯邦學習》中介紹過如何使用 KubeFATE 部署一個基於   Docker Compose 的 FA TE 聯邦學習集群,以便於快速嘗試體驗聯邦學習。但隨著聯邦學習的正式投入使用,訓練集、模型都會逐漸變大。
  • 數據說話:大數據處理引擎Spark與Flink比拼
    常見的問題包括:  非常陡峭的學習曲線。剛接觸這個領域的人經常會被需要學習的技術的數量砸暈。不像經過幾十年發展的資料庫一個系統可以解決大部分數據處理需求,Hadoop 等大數據生態裡的一個系統往往在一些數據處理場景上比較擅長,另一些場景湊合能用,還有一些場景完全無法滿足需求。結果就是需要好幾個系統來處理不同的場景。
  • 一、Spark概述
    /docs/2.4.3/Hadoop YARN集群模式(生產環境使用):運行在 yarn 集群之上,由 yarn 負責資源管理,Spark 負責任務調度和計算,好處:計算資源按需伸縮,集群利用率高,共享底層存儲,避免數據跨集群遷移。
  • Spark入門介紹
    Spark是一種基於內存的快速,通用,可擴展的大數據計算引擎框架。在大家急需一種新的計算引擎出現的情況下,Spark應運而生,不過Spark真正與Hadoop產生聯繫,還需要Yarn幫忙。 在2013年10月發布Hadoop2.X之前,Hadoop1.x是沒有Yarn存在的,存儲和計算緊密耦合,並且負責調度任務和資源的JobTracker苦不堪言,所有工作都堆在了它身上。
  • 如何將 MapReduce 轉化為 Spark
    在目前的環境下,我們需要將數據複製到穩定的存儲系統,比如 HDFS,以便在不同的計算引擎中進行分享。然而,這樣的複製可能比真正的計算所花費的代價要大,所以以流水線的形式將多個系統組合起來效率並不高。適用範圍的局限性:如果一個應用不適合一個專有的計算系統,那麼使用者只能換一個系統,或者重寫一個新的計算系統。
  • 聯邦學習 OR 遷移學習?No,我們需要聯邦遷移學習
    2.1.3 本文小結本文提出了一個安全的聯邦遷移學習(FTL)框架,引入了同態加密和密文共享兩種保護隱私的安全方法。同態加密方法簡單,但計算成本較高。密文共享方法的優點是沒有精度損失,計算速度也較快,但其主要缺點是需要離線生成和存儲多個三元組才能執行在線的聯邦學習。
  • Spark簡介
    存儲在HDFS上的數據要發揮其價值,需要相關的工具來對數據進行處理(包括ETL、分析、計算等),繼而得到各種業務指標、報表或者數據產品。而上述所提到的相關的工具可以是Spark、MapReduce等。Spark大數據計算引擎,可以對大規模數據集進行基於內存的分布式計算。
  • 最簡大數據Spark-2.1.0
    Spark有先進的DAG(有向無環圖)執行引擎還有其基於內存的計算方式有關。易用。支持多語言Java,Scala,Python,R。同時能跟Hadoop生態圈很好的融合,比如Hive,HDFS,YARN。在近些年Hadoop已經慢慢成為大數據事實標準的大環境下,能抱住Hadoop的大腿,使其快速火起來。
  • 『 Spark 』2. spark 基本概念解析
    寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,並非為了做什麼教程,所以一切以個人理解梳理為主,沒有必要的細節就不會記錄了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本好還是必要的。1.
  • Spark學習記錄|RDD分區的那些事
    本周工作中學習了一些簡單的RDD的知識,主要是關於RDD分區相關的內容。下面的內容都是自己親身實踐所得,如果有錯誤的地方,還希望大家批評指正。考慮一下機器學習中網格搜索策略,比如隨機森林中,我們想得到n_estimators和max_depth兩個參數的最優組合,我們會對給出的參數取值範圍計算笛卡爾積,然後對每一種組合訓練得到一個效果,並選取效果最好的一組參數。假設我們想使用spark把這個過程並行化,但是參數組合數量太多,沒有足夠的計算資源,只能一個task上運行幾組參數。
  • Spark【面試】
    的都是並行計算,那麼他們有什麼相同和區別兩者都是用mr模型來進行並行計算,hadoop的一個作業稱為job,job裡面分為map task和reduce task,每個task都是在自己的進程中運行的,當task結束時,進程也會結束spark用戶提交的任務成為application,一個application對應一個
  • 騰訊TDW千臺Spark千億節點對相似度計算
    相似度計算在信息檢索、數據挖掘等領域有著廣泛的應用,是目前推薦引擎中的重要組成部分。隨著網際網路用戶數目和內容的爆炸性增長,對大規模數據進行相似度計算的需求變得日益強烈。在傳統的MapReduce框架下進行相似度計算會引入大量的網絡開銷,導致性能低下。
  • Spark—15分鐘教程
    當我們編寫Spark程序時,首先要知道的是,當我們執行代碼時,我們不一定要對數據執行任何操作。實際上,該工具有兩種類型的API調用:轉換和操作。Spark轉換背後的範例被稱為「延後計算」,這意味著實際的數據計算在我們要求採取行動之前不會開始。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • 分布式機器學習平臺大比拼:Spark、PMLS、TensorFlow、MXNet
    Spark 用戶將計算建模為 DAG,該 DAG 表示了在 RDD 上執行的轉換和動作。DAG 進而被編譯為多個 Stage。每個 Stage 執行為一系列並行運行的任務(Task),每個分區(Partition)對應於一個任務。
  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    但無論是哪種API或者是程式語言,它們都是基於同樣的執行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點,看你喜歡。SQL使用Spark SQL的一種方式就是通過SQL語句來執行SQL查詢。當在程式語言中使用SQL時,其返回結果將被封裝為一個DataFrame。
  • SparkCore——專業術語及流程圖
    隨後這些具體的Task每個都會被分配到集群上的某個節點的某個Executor去執行。·每個節點可以起一個或多個Executor。·每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task。·每個Task執行的結果就是生成了目標RDD的一個partiton。
  • 當傳統聯邦學習面臨異構性挑戰,不妨嘗試這些個性化聯邦學習算法
    的目標是找到一個全局模型ω,可以用它作為初始化全局模型,進一步對損失函數執行梯度更新(步長為 α)來得到它的個性化模型θ_i(ω)。這一階段的具體學習操作取決於採用的個性化聯邦學習機制。例如,遷移學習、多任務學習、元學習、知識蒸餾、混合模型等。進一步,在邊緣設備上進行本地模型聚合,也有助於避免大量設備通過昂貴的主幹網帶寬與雲伺服器直接通信,從而降低通信開銷。通過執行個性化處理,可以在一些資源有限的設備上部署輕量級的個性化模型(例如,通過模型修剪或傳輸學習)。這將有助於減輕設備在通信和計算資源方面的異構性。
  • 揭秘Apache頂級項目大數據分析引擎 Flink:與Spark的對比與分析
    快速Flink利用基於內存的數據流並將迭代處理算法深度集成到了系統的運行時中,這就使得系統能夠以極快的速度來處理數據密集型和迭代任務。2. 可靠性和擴展性當伺服器內存被耗盡時,Flink也能夠很好的運行,這是因為Flink包含自己的內存管理組件、序列化框架和類型推理引擎。3.