一、 Spark的應用現狀
1.1 Spark需求背景
隨著數據規模的持續增長,數據需求越來越多,原有的以MapReduce為代表的Hadoop平臺越來越顯示出其局限性。主要體現在2點:
1) 任務執行時間比較長。特別是某些複雜的SQL任務,或者一些複雜的機器學習迭代。
2) 不能很好的支持像機器學習、實時處理這種新的大數據處理需求。
Spark作為新一代大數據處理的計算平臺,使得我們可以用Spark這一種平臺統一處理數據處理的各種複雜需求,非常好的支持了我們目前現有的業務。與原有MapReduce模型相比,其具有下面3個特點:
1) 充分使用內存作為框架計算過程存儲的介質,與磁碟相比大大提高了數據讀取速度。利用內存緩存,顯著降低算法迭代時頻繁讀取數據的開銷。
2) 更好的DAG框架。原有在MapReduce M-R-M-R的模型,在Spark框架下,更類似與M-R-R,優化掉無用流程節點。
3) 豐富的組件支持。如支持對結構化數據執行SQL操作的組件Spark-SQL,支持實時處理的組件Spark-Streaming,支持機器學習的組件Mllib,支持圖形學習的Graphx。
1.2 以Spark為核心的數據平臺結構
2.1 基於SparkStreaming的實時處理需求
商業數據部內部有大量的實時數據處理需求,如實時廣告收入計算,實時線上ctr預估,實時廣告重定向等,目前主要通過SparkStreaming完成。
實時數據處理的第一步,需要有實時的數據。360的用戶產品,幾乎全國各地都部署有機房,主要有4大主力機房。實時數據的收集過程如下:
1) 使用Apache flume實時將伺服器的日誌上傳至本地機房的Kafka,數據延遲在100ms以內
2) 使用Kafka MirorMaker將各大主力機房的數據匯總至洛陽中心機房,數據延遲在200ms以內。由於公司的網絡環境不是很好,為了保證低延遲,在MirorMaker機房的機器上,申請了帶寬的QOS保證,以降低延遲。
數據處理的實時鏈路如所示:
1) 1種方式是通過Apache Flume實時寫入Hdfs,用於第二天全量數據的離線計算
2) 1種方式是通過SparkSteaming實時處理,處理後數據會回流至Kafka或者Redis,便於後續流程使用。
2.2 基於SparkSQL和DataFrame的數據分析需求
SparkSQL是Spark的核心組件,作為新一代的SQL on Hadoop的解決方案,完美的支持了對現有Hive數據的存取。在與Hive進行集成的同時,Spark SQL也提供了JDBC/ODBC接口,便於第三方工具如Tableau、Qlik等通過該接口接入Spark SQL。
由於之前大部分數據分析工作都是通過使用hive命令行完成的,為了將遷移至SparkSQL的代價最小,360系統部的同事開發了SparkSQL的命令行版本spark-hive。原有的以hive 命令運行的腳本,簡單的改成spark-hive便可以運行。360系統部的同事也做了大量兼容性的工作。spark-hive目前已經比較穩定,成為數據分析的首選。
DataFrame是Spark 1.3引入的新API,與RDD類似,DataFrame也是一個分布式數據容器。
但與RDD不同的是,DataFrame除了數據以外,還掌握更多數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關係操作,比函數式的RDD API要更加友好,門檻更低。
大數據開發過程中,可能會遇到各種類型的數據源,而DataFrame與生俱來就支持各種數據類型,如下圖,包括JSON文件、Parquet文件、Hive表格、本地文件系統、分布式文件系統(HDFS)以及雲存儲(S3)。同時,配合JDBC,它還可以讀取外部關係型資料庫系統如Mysql,Oracle中的數據。對於自帶Schema的數據類型,如Parquet,DataFrame還能夠自動解析列類型。
通過組合使用DataFrame和SparkSQL,與MapReduce比較大大減少了代碼行數,同時執行效率也得到了提升。如下示例是處理廣告主位置信息的scala代碼。
2.3 基於MLLib的機器學習需求
360DMP提供人群擴展功能(Look-alike)。所謂人群擴展,是基於廣告主創建的種子用戶,根據這些種子用戶的特徵,挖掘、篩選、識別、拓展更多具有相似特徵的用戶,以增加廣告的受眾。
業界的Look-alike有2種做法。第一種做法就是顯性的定位。廣告主先選中一部分種子用戶,根據種子用戶的標籤再定位擴展一部分其他用戶。比如如果種子用戶選擇的都是「化妝品-護膚」這個標籤,那麼根據這個標籤可以找到其他的用戶,作為擴展用戶。這種做法的缺點是不夠精確,擴展出來的用戶過大。第二種方法是通過一個機器學習的模型,將問題轉化為機器學習模型,來定位廣告主的潛在用戶。我們採用的是這種方法。
在做Look-alike的過程中,用到了Spark中的Mlilib庫。Mlilib算法庫的核心庫如上,選擇的是Classification中LR算法,主要原因有兩個:
1)模型比較簡單,易於理解和實現
2)模型訓練起來速度比較快,時間可控。
LookAlike的第一步是建立模型。在這裡,廣告主會首先提交一批種子用戶,作為機器學習的正樣本。其他的非種子用戶作為負樣本。於是問題就轉化為一個二分類的模型,正負樣本組成學習的樣本。訓練模型之後,通過模型預測,最後得到廣告主需要的目標人群。
三、部分經驗總結
3.1 使用Direct模式處理kafka數據
SparkStreaming讀取Kafka數據時,有兩種方法:Direct和Receiver。我們選擇的是Direct方法。與基於Receiver的方法相比,Direct具有以下優點:
1)簡化並行性:無需創建多個輸入Kafka流和聯合它們。使用directStream,Spark Streaming將創建與要消費的Kafka分區一樣多的RDD分區,這將從Kafka並行讀取數據。因此,Kafka和RDD分區之間存在一對一映射,這更容易理解和調整。
2)效率:在第一種方法中實現零數據丟失需要將數據存儲在預寫日誌中,該日誌進一步複製數據。這實際上是低效的,因為數據有效地被複製兩次。第二種方法消除了問題,因為沒有接收器,因此不需要預寫日誌。
3)Exactly-once語義:第一種方法使用Kafka的高級API在Zookeeper中存儲消耗的偏移量。這是傳統上消費Kafka數據的方式。雖然這種方法(與預寫日誌結合)可以確保零數據丟失(即至少一次語義),但是一些記錄在一些故障下可能被消費兩次,這是因為Spark Streaming可靠接收的數據與Zookeeper跟蹤的偏移之間存在不一致。因此,在第二種方法中,我們使用不基於Zookeeper的簡單的Kafka API,偏移由Spark Streaming在其檢查點內跟蹤。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,所以每個記錄被Spark Streaming有效地接收一次。
Direct方法需要自己控制消費的kafka offset,參考代碼如下。
3.2 SparkSQL中使用Parquet
相比傳統的行式存儲引擎,列式存儲引擎因其更高的壓縮比,更少的IO操作而越來越受到重視。這是因為在網際網路公司的大數據應用中,大部分情況下,數據量很大並且數據欄位數目比較多,但是大部分查詢只是查詢其中的部分行,部分列。這個時候,使用列式存儲就能極大的發揮其優勢。
Parquet是Spark中優先支持的列存方案。與使用文本相比,Parquet 讓 Spark SQL 的性能平均提高了 10 倍,這要感謝初級的讀取器過濾器、高效的執行計劃,以及 Spark 1.6.0 中經過改進的掃描吞吐量。
SparSQL的Parquet的幾個操作:
1)創建Parquet格式的Hive表
CREATE TABLE parquet_table(age INT, name STRING) STORED AS PARQUET;
2)讀取Parquet格式的文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.read.parquet("/input/parquet")
3)保存為Parquet格式文件
df.write.parquet("/output/parquet")
3.3 Spark參數調優
1)spark.sql.shuffle.partitions:在做Join或者Group的時候,可以通過適當提高該值避免數據傾斜。
2)spark.testing.reserveMemory:Spark executor jvm啟動的時候,會默認保留一部分內存,默認為300m。適當的減少這個值,可以增加 spark執行時Storage Memory的值。設置方式是啟動spark shell的時候加上參數:--conf spark.testing.reservedMemory= 104857600。
3)spark.serializer:Spark內部會涉及到很多對數據進行序列化的地方,默認使用的是Java的序列化機制。Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。設置方法是conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")。
關於作者:
王曉偉,360大數據開發工程師,從事大數據相關平臺開發和數據倉庫開發,曾經為多個開源框架,如Yarn、Pig、Hive、Tez貢獻代碼。
關於360商業數據部:
360商業數據部專注於360自有海量數據的深度挖掘及分析,在保護個人隱私及數據安全前提下,多維分析用戶需求和偏好,運用數據挖掘和人工智慧技術,以及場景化應用全面提升商業價值,已形成包括360商易、360DMP和360分析在內的數據營銷產品體系。360商易基於海量數據洞察人群畫像及品牌現狀,為營銷決策提供支持;360DMP對數據進行整合管理,精準圈定目標人群,提升轉化效果;360分析支持推廣效果評估及流量分析,實時優化投放。該大數據產品體系,結合360點睛實效平臺,共同為廣告主提供大數據精準營銷閉環服務。