題圖攝於北京前門
(本文作者系 VMware 中國研發雲原生實驗室工程師,聯邦學習開源項目 KubeFATE / FATE-Operator 維護者。)需要加入KubeFATE開源項目討論群的同學,請關注亨利筆記公眾號後回復 「kubefate」 即可。
VMware招聘聯邦學習、隱私計算開發工程師
相關文章
在Juypter Notebook中構建聯邦學習任務
雲原生聯邦學習平臺 KubeFATE 原理詳解
用KubeFATE在K8s上部署聯邦學習FATE v1.5
使用Docker Compose 部署FATE v1.5
在上篇文章 在Juypter Notebook中構建聯邦學習任務 構建任務中提到,FATE 1.5 LTS 版本支持用戶使用 Spark 作為底層的計算引擎,本文將對其實現細節以及使用進行簡單介紹,方便用戶在實際的使用過程中進行調優或者排查錯誤。
使用分布式計算引擎的意義在 FATE 中一個比較重要的組件是 "FATE Flow",它負責對用戶任務的管理和調度。如官方文檔所示,"FATE Flow" 的工作模式有兩種,分別為單機 (standalone) 和集群(cluster) 模式。單機模式中,數據的存儲以及計算都在 "FATE Flow" 本地執行因此無法有效擴展,所以單機模式主要用於學習以及測試。而在集群模式中,數據的存儲以及計算不再通過本地而是下發到分布式的集群中執行,而集群的大小可以根據實際的業務需求來進行伸縮,可滿足不同數據集規模的需求。
FATE 默認支持使用 "eggroll" 作為其底下計算和存儲的集群,在經過了不斷的迭代和優化之後目前已經能夠滿足大多數聯邦學習應用場景的需求。eggroll 本身是一個相對獨立的集群,它對外提供一個統一的入口以及一組API,外部應用可以通過 RPC 調用的方式把任務發送到 eggroll 集群上執行,而 eggroll 本身是支持橫向擴展的因此用戶可以根據實際場景調整集群的規模。
FATE 在 v1.5 中徹底重構了 Apache Spark 作為計算引擎部分,並提供正式支持。Spark 是一個得到業界廣泛認可的內存型 (in-memory) 計算引擎,由於其簡單、高效和集群管理工具成熟等特性,因此在許多公司的生產環境中被大規模部署和使用。這也是FATE支持使用它的主要原因之一。由於技術原因,目前FATE 對使用 Spark 的支持還不夠完善,暫時還不能滿足大規模樣本(千萬級別)的訓練任務,但優化的工作已在進行中。
KubeFATE 在 1.5 版本支持了 FATE On Spark 的部署,它可以通過容器的方式啟動一個 Spark 集群來為 FATE 提供計算服務,詳情可參考下面的章節:使用 Spark 作為計算引擎。
與具體計算引擎進行對接的是 FATE FLow 服務,因此我們將簡單分析該服務的結構,以弄明白它是如何跟不同的計算引擎交互。
FATE Flow 結構簡介
在 FATE 1.5 中,"FATE Flow" 服務迎來了比較重大的重構,它把存儲、計算、聯邦傳輸(federation)等操作抽象成了不同的接口以供上層的應用使用。而接口在具體的實現中可以通過調用不同的庫來訪問不同的運行時 (runtime),通過這種方式可以非常容易地擴展對其他計算 (如spark) 或存儲 (HDFS、MySQL) 服務的支持。一般來說使用FATE進行聯邦學習任務可分為以下步驟(假設組網已完成):
調用 FATE Flow 提供的接口上傳訓練用的數據集
定義訓練任務的 pipeline 並調用 FATE Flow 接口上傳任務
根據訓練的結果不斷調整訓練參數並得到最終模型
通過 FATE Flow 上傳預測用的數據集
定義預測任務並通過 FATE Flow 執行
在上述的步驟中,除了任務的調度必需要 FATE Flow 參與之外,存儲和計算等其他部分的工作都可以藉助別的服務來完成。
如下圖所示,"FATE Flow" 這個方框內列出了部分接口,其中:
"Storage Interface"用於數據集的管理,如上傳本地數據、刪除上傳的數據等。
"Storage Meta Interface"用於數據集的元數據管理。
"Computing Interface"用於執行計算任務。
"Federation Interface"用於在各個訓練參與方之間傳輸數據。
綠色的方格是接口的具體實現,深灰色方格是用於跟遠端服務進行交互的客戶端,而藍色的方格則對應著獨立於 FATE Flow 服務之外的其他運行時。例如,對於計算接口來說,具體實現了該接口的類是 "Table",而 Table 的類型又分為了兩種,其中一種使用 "rollpair" 來跟 "eggroll" 集群交互;而另一種則使用 "pyspark" 中的 "rdd" 來跟 "spark" 集群進行交互。
使用不同計算引擎之間的差異
在上一節中提到,FATE Flow 通過抽象出來的接口可以使用不同的計算、存儲等服務,但由於依賴以及實現機制等原因,這些服務在選擇上有一定的制約,但具體可分為兩類:
使用eggroll作為計算引擎
使用spark作為計算引擎
當使用 eggroll 作為計算引擎時,FATE 的整體架構如下:
eggroll 集群中有三種不同類型的節點,分別是 Cluster Manager、Node Manager 和 Rollsite,其中 Cluster Manager 負責提供服務入口以及分配資源,Node Manager 是實際執行計算和存儲的節點,而 Rollsite 則提供傳輸服務。
而當使用 Spark 作為計算引擎時,FATE 的整體架構如下:
由於Spark 本身是一個內存 (in-memory) 計算框架,一般需要藉助其他服務來持久化輸出,因此,要在 FATE 中使用 Spark 作為計算引擎還需要藉助 HDFS 來實現數據的持久化。至於聯邦傳輸則分為了兩部分,分別是指令 (pipeline) 的同步和訓練過程中消息的同步,它們分別藉助 nginx 和 rabbitmq 服務來完成。
使用Spark作為計算引擎
前置條件
前面的小節提到,要使用 Spark 作為計算引擎還需要依賴於 Nginx、RabbitMQ 以及HDFS 等服務,對於完整的服務安裝部署以及配置有三種方式可供參考:
基於裸機的集群安裝部署可以參考FATE ON Spark部署指南。
基於 "docker-compose" 的集群安裝部署可以參考使用Docker Compose 部署 FATE,只需要把配置文件中的computing_backend設置成spark即可,在真正部署的時候會以容器的方式拉起 HDFS、Spark 等服務。
基於 Kubernetes 的集群部署可以參考Kubernetes部署方案,創建集群時使用 cluster-spark.yaml 文件即可創建一個基於 K8s 的 FATE On Spark 集群,至於Spark 節點的數量也可以在這個文件中定義。
注意:目前經過驗證的 Spark、HDFS 以及 RabbitMQ 的版本分別為2.4,2.7和3.8
對於想要利用現有 Spark 集群的用戶來說,除了需要部署其他依賴的服務之外還需要解決 FATE 的 python 包依賴,具體措施是給所有要運行聯邦學習工作負載的Spark節點進行如下操作:
創建新目錄放置文件
$ mkdir -p /data/projects
$ cd /data/projects
下載和安裝"miniconda"
$ wget https://repo.anaconda.com/miniconda/Miniconda3-4.5.4-Linux-x86_64.sh
$ sh Miniconda3-4.5.4-Linux-x86_64.sh -b -p /data/projects/miniconda3
$ miniconda3/bin/pip install virtualenv
$ miniconda3/bin/virtualenv -p /data/projects/miniconda3/bin/python3.6 --no-wheel --no-setuptools --no-download /data/projects/python/venv
下載FATE項目代碼
$ git clone https://github.com/FederatedAI/FATE/tree/v1.5.0
// 添加python依賴路徑
$ echo "export PYTHONPATY=/data/projects/fate/python" >> /data/projects/python/venv/bin/activate
進入python的虛擬環境
$ source /data/projects/python/venv/bin/activate
修改並下載python庫
// 剔除tensorflow和pytorch依賴
$ sed -i -e '23,25d' ./requirements.txt
$ pip install setuptools-42.0.2-py2.py3-none-any.whl
$ pip install -r /data/projects/python/requirements.txt
至此,依賴安裝完畢,之後在 FATE 中提交任務時還需通過配置spark.pyspark.python來指定使用該 python 環境。
示例
當 Spark 集群準備完畢後就可以在 FATE 中使用它了,使用的方式則是在任務的定義中把backend設置為1,這樣 "FATE Flow" 就會在後續的調度中,通過 "spark-submit" 工具把提交到 Spark 集群上執行。
需要注意的是,雖然 Spark 集群的 master 可以在任務的配置中指定,但是 HDFS、RabbitMQ 以及 Nginx 等服務需要在 FATE Flow 啟動之前通過配置文件的方式指定,因此當這些服務地址發生改變時,需要更新配置文件並重啟 FATE Flow 服務。
當 FATE Flow 服務正常啟動後,可以使用"toy_example"來驗證環境。步驟如下:
修改toy_example_config文件如下:
{
"initiator": {
"role": "guest",
"party_id": 9999
},
"job_parameters": {
"work_mode": 0,
"backend": 1,
"spark_run": {
"executor-memory": "4G",
"total-executor-cores": 4
}
},
"role": {
"guest": [
9999
],
"host": [
9999
]
},
"role_parameters": {
"guest": {
"secure_add_example_0": {
"seed": [
123
]
}
},
"host": {
"secure_add_example_0": {
"seed": [
321
]
}
}
},
"algorithm_parameters": {
"secure_add_example_0": {
"partition": 4,
"data_num": 1000
}
}
}
其中spark_run裡面定義了要提供給"spark-submit"的參數,因此還可以通過master來指定master的地址、或者conf來指定spark.pyspark.python的路徑等,一個簡單的例子如下:
...
"job_parameters": {
"work_mode": 0,
"backend": 0,
"spark_run": {
"master": "spark://127.0.0.1:7077"
"conf": "spark.pyspark.python=/data/projects/python/venv/bin/python"
},
},
...
如果沒有設置spark_run欄位則默認讀取${SPARK_HOME}/conf/spark-defaults.conf中的配置。更多的關於spark的參數可參考Spark Configuration。
提交任務以及查看任務狀態 運行以下命令提交任務運行"toy_example"。
$ python run_toy_example.py -b 1 9999 9999 1
查看 fate_board :
查看 Spark 的面板 :
根據上面的輸出可以看到,FATE 通過 Spark 集群成功運行了"toy_example"的測試。
總結
本文主要梳理了分布式系統對FATE的重要性,同時也對比了 FATE 所支持兩種計算引擎 "eggroll" 和 "spark" 之間的差異,最後詳細描述了如何在 FATE 中使用 Saprk 運行任務。由於篇幅有限,關於在如何 FATE 中使用 Spark 的部分只作了簡單介紹,更多的內容如節點資源分配、參數調優等還待用戶探索。
與 EggRoll 相比,目前 FATE 對 Spark 作為計算引擎的支持還在完善中,相信再經過幾個版本的迭代,其使用體驗和穩定性以及效率上會達到更高的水準。
需要加入KubeFATE開源項目討論群的同學,請先關注亨利筆記公眾號,然後回復 「kubefate」 即可。
相關連結
要想了解雲原生、人工智慧和區塊鏈等技術原理,請立即長按以下二維碼,關注本公眾號亨利筆記 ( henglibiji ),以免錯過更新。
歡迎轉發、收藏和點 「在看」,或者點擊閱讀原文。