原文:http://www.jianshu.com/p/dbac491317cc?utm_source=tuicool&utm_medium=referral
適宜讀者人群:正在使用Mongodb的開發者
傳統Spark生態系統 和 MongoDB在Spark生態的角色
傳統Spark生態系統
Spark生態系統
那麼Mongodb作為一個database, 可以擔任什麼樣的角色呢? 就是數據存儲這部分, 也就是圖中的黑色圈圈HDFS的部分, 如下圖
用MongoDB替換HDFS後的Spark生態系統
Spark+Mongodb生態系統
為什麼要用MongoDB替換HDFS
存儲方式上, HDFS以文件為單位,每個文件64MB~128MB不等, 而MongoDB作為文檔資料庫則表現得更加細顆粒化
MongoDB支持HDFS所沒有的索引的概念, 所以在讀取上更加快
MongoDB支持的增刪改功能比HDFS更加易於修改寫入後的數據
HDFS的響應級別為分鐘, 而MongoDB通常是毫秒級別
如果現有資料庫已經是MongoDB的話, 那就不用再轉存一份到HDFS上了
可以利用MongoDB強大的Aggregate做數據的篩選或預處理
MongoDB Spark Connector介紹
支持讀取和寫入,即可以將計算後的結果寫入MongoDB
將查詢拆分為n個子任務, 如Connector會將一次match,拆分為多個子任務交給spark來處理, 減少數據的全量讀取
MongoDB Spark 示例代碼
計算用類型Type=1的message字符數並按userid進行分組
開發Maven dependency配置
這裡用的是mongo-spark-connector_2.11 的2.0.0版本和spark的spark-core_2.11的2.0.2版本:
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
示例代碼
import com.mongodb.spark._
import org.apache.spark.{SparkConf, SparkContext}
import org.bson._
val conf = new SparkConf()
.setMaster("local")
.setAppName("Mingdao-Score")
//同時還支持mongo驅動的readPreference配置, 可以只從secondary讀取數據
.set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")
.set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")
val sc = new SparkContext(conf)
// 創建rdd
val originRDD = MongoSpark.load(sc)
// 構造查詢
val dateQuery = new BsonDocument()
.append("$gte", new BsonDateTime(start.getTime))
.append("$lt", new BsonDateTime(end.getTime))
val matchQuery = new Document("$match", BsonDocument.parse("{\"type\":\"1\"}"))
// 構造Projection
val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"userid\":\"$userid\",\"message\":\"$message\"}")
val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))
//比如計算用戶的消息字符數
val rdd1 = aggregatedRDD.keyBy(x=>{
Map(
"userid" -> x.get("userid")
)
})
val rdd2 = rdd1.groupByKey.map(t=>{
(t._1, t._2.map(x => {
x.getString("message").length
}).sum)
})
rdd2.collect().foreach(x=>{
println(x)
})
//保持統計結果至MongoDB outputurl 所指定的資料庫
MongoSpark.save(rdd2)
總結
MongoDB Connector 的文檔只有基礎的示例代碼, 具體詳情需要看GitHub中的example和部分源碼。
參考連結