整理 | 若名
近日,阿里雲計算部門已在 GitHub 上發布了其 Alink 平臺的「核心代碼」,並上傳了一系列算法庫,它們支持批處理和流處理,這對支持機器學習任務至關重要。
Alink 是基於 Flink 的通用算法平臺,由阿里巴巴計算平臺 PAI 團隊研發。除了支持阿里自己的平臺外,還支持 Kafka,HDFS 和 HBase 等一系列開源數據存儲平臺。
阿里雲計算和機器智能部門表示,開發者和數據分析師可以利用開原始碼來構建軟體功能,例如統計分析、機器學習、實時預測、個性化推薦和異常檢測。而 Alink 提供的一系列算法,可以幫助處理機器學習任務,例如 AI 驅動的客戶服務和產品推薦。
開源算法列表
阿里巴巴集團副總裁、阿里雲智能計算平臺事業部總裁、高級研究員賈揚清指出,對於尋求大數據和機器學習工具的開發人員而言,Alink 將是一個新的選擇。
在他看來,作為中國企業是GitHub上十大貢獻者之一,阿里致力於在軟體開發周期中儘早與開源社區建立聯繫。而在 GitHub 上開源 Alink 遵循了這一承諾。
阿里目前已將 Alink 部署到其旗下電子商務平臺天貓上。今年「雙11」期間,單日數據處理量達到 970PB,每秒處理峰值數據高達 25 億條,Alink 幫助天貓產品推薦的點擊率提高了 4%。
迄今為止,阿里的開發人員在過去八年中為整個開源社區貢獻了 180 多個項目,包括雲基礎架構、機器學習、資料庫和網絡。阿里巴巴的開放原始碼計劃包括基於 MySQL 的 AliSQL,容器工具 Pouch 和 JStorm(基於Java的 Apache Storm 版本)。
關於 Alink 的使用問題
Q:能否連接遠程 Flink 集群進行計算?
A:通過方法可以連接一個已經啟動的 Flink 集群:
useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)
。其中,參數
host 和 port 表示集群的地址;parallelism 表示執行作業的並行度;flinkHome 為 flink 的完整路徑,默認使用 PyAlink 自帶的 flink-1.9.0 路徑;localIp 指定實現 Flink DataStream 的列印預覽功能時所需的本機IP位址,需要 Flink 集群能訪問。默認為localhost。shipAlinkAlgoJar 是否將 PyAlink 提供的 Alink 算法包傳輸給遠程集群,如果遠程集群已經放置了 Alink 算法包,那麼這裡可以設為 False,減少數據傳輸。Q:如何停止長時間運行的Flink作業?
A:使用本地執行環境時,使用 Notebook 提供的「停止」按鈕即可。使用遠程集群時,需要使用集群提供的停止作業功能。
Q:能否直接使用 Python 腳本而不是 Notebook 運行?
A:可以。但需要在代碼最後調用 resetEnv,否則腳本不會退出。
使用步驟
使用前準備:
確保使用環境中有Python3,版本>=3.5;需要根據 Python 版本下載對應的 pyalink 包(下載連結參見GitHub);使用 easy_install 進行安裝 easy_install [存放的路徑]/pyalink-0.0.1-py3.*.egg。需要注意的是:如果之前安裝過 pyalink,請先使用 pip uninstall pyalink卸載之前的版本。如果有多個版本的 Python,可能需要使用特定版本的 easy_install,比如 easy_install-3.7。如果使用 Anaconda,則需要在 Anaconda 命令行中進行安裝。
開始使用:
阿里推薦通過 Jupyter Notebook 來使用 PyAlink,能獲得更好的使用體驗。
pyAlink
使用步驟:
在命令行中啟動Jupyter:jupyter notebook,並新建 Python 3 的 Notebook 。導入 pyalink 包:from pyalink.alink import *。使用方法創建本地運行環境:useLocalEnv(parallism, flinkHome=None, config=None)。其中,參數 parallism 表示執行所使用的並行度;flinkHome 為 flink 的完整路徑,默認使用 PyAlink 自帶的 flink-1.9.0 路徑;config為Flink所接受的配置參數。運行後出現如下所示的輸出,表示初始化運行環境成功:JVM listening on ***Python listening on ***4.開始編寫 PyAlink 代碼,例如:
source = CsvSourceBatchOp\.setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\.setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")res = source.select("sepal_length", "sepal_width")df = res.collectToDataframeprint(df)
編寫代碼:
在 PyAlink 中,算法組件提供的接口基本與 Java API 一致,即通過默認構造方法創建一個算法組件,然後通過 setXXX 設置參數,通過link/linkTo/linkFrom與其他組件相連。這裡利用 Jupyter 的自動補全機制可以提供書寫便利。
對於批式作業,可以通過批式組件的
print/collectToDataframe/collectToDataframes
等方法或者 BatchOperator.execute來觸發執行;對於流式作業,則通過StreamOperator.execute 來啟動作業。
如何在集群上運行Alink算法
1.準備Flink集群
wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgztar -xf flink-1.9.0-bin-scala_2.11.tgz && cd flink-1.9.0./bin/start-cluster.sh2.準備Alink算法包
git clone https://github.com/alibaba/Alink.gitcd Alink && mvn -Dmaven.test.skip=true clean package shade:shade3.運行Java示例
./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-0.1-SNAPSHOT.jar# ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-0.1-SNAPSHOT.jar# ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-0.1-SNAPSHOT.jar