當MongoDB遇見Spark

2021-03-06 Spark技術日報

原文:http://www.jianshu.com/p/dbac491317cc?utm_source=tuicool&utm_medium=referral

適宜讀者人群:正在使用Mongodb的開發者

傳統Spark生態系統 和 MongoDB在Spark生態的角色

傳統Spark生態系統

Spark生態系統

那麼Mongodb作為一個database, 可以擔任什麼樣的角色呢? 就是數據存儲這部分, 也就是圖中的黑色圈圈HDFS的部分, 如下圖

用MongoDB替換HDFS後的Spark生態系統

Spark+Mongodb生態系統

為什麼要用MongoDB替換HDFS

存儲方式上, HDFS以文件為單位,每個文件64MB~128MB不等, 而MongoDB作為文檔資料庫則表現得更加細顆粒化

MongoDB支持HDFS所沒有的索引的概念, 所以在讀取上更加快

MongoDB支持的增刪改功能比HDFS更加易於修改寫入後的數據

HDFS的響應級別為分鐘, 而MongoDB通常是毫秒級別

如果現有資料庫已經是MongoDB的話, 那就不用再轉存一份到HDFS上了

可以利用MongoDB強大的Aggregate做數據的篩選或預處理

MongoDB Spark Connector介紹

支持讀取和寫入,即可以將計算後的結果寫入MongoDB

將查詢拆分為n個子任務, 如Connector會將一次match,拆分為多個子任務交給spark來處理, 減少數據的全量讀取

MongoDB Spark 示例代碼

計算用類型Type=1的message字符數並按userid進行分組

開發Maven dependency配置

這裡用的是mongo-spark-connector_2.11 的2.0.0版本和spark的spark-core_2.11的2.0.2版本:

    <dependency>

        <groupId>org.mongodb.spark</groupId>

        <artifactId>mongo-spark-connector_2.11</artifactId>

        <version>2.0.0</version>

    </dependency>

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.11</artifactId>

        <version>2.0.2</version>

    </dependency>

示例代碼

    import com.mongodb.spark._

    import org.apache.spark.{SparkConf, SparkContext}

    import org.bson._

    val conf = new SparkConf()

      .setMaster("local")

      .setAppName("Mingdao-Score")

      //同時還支持mongo驅動的readPreference配置, 可以只從secondary讀取數據

      .set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")

      .set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")

    val sc = new SparkContext(conf)

    // 創建rdd

    val originRDD = MongoSpark.load(sc)

    // 構造查詢

    val dateQuery = new BsonDocument()

      .append("$gte", new BsonDateTime(start.getTime))

      .append("$lt", new BsonDateTime(end.getTime))

    val matchQuery = new Document("$match", BsonDocument.parse("{\"type\":\"1\"}"))

    // 構造Projection

    val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"userid\":\"$userid\",\"message\":\"$message\"}")

    val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))

    //比如計算用戶的消息字符數

    val rdd1 = aggregatedRDD.keyBy(x=>{

      Map(

        "userid" -> x.get("userid")

      )

    })

    val rdd2 = rdd1.groupByKey.map(t=>{

      (t._1, t._2.map(x => {

        x.getString("message").length

      }).sum)

    })

    rdd2.collect().foreach(x=>{

        println(x)

    })

    //保持統計結果至MongoDB outputurl 所指定的資料庫

    MongoSpark.save(rdd2)

總結

MongoDB Connector 的文檔只有基礎的示例代碼, 具體詳情需要看GitHub中的example和部分源碼。

參考連結

相關焦點

  • 實戰課堂 | 手把手教你用MongoDB Spark Connector構建分析應用
    (https://www.tutorialspoint.com/pyspark/pyspark_quick_guide.htm)下載 Sparkcd /home/mongo-sparkwget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4
  • 代碼 | Spark讀取mongoDB數據寫入Hive普通表和分區表
    dependency>      <groupId>org.mongodb</groupId>      <artifactId>mongo-java-driver</artifactId>      <version>3.6.3</version>    </dependency>
  • pyspark操作MongoDB
    """@author: zhangslob@file: spark_count.py @time: 2019/01/03@desc:    不要安裝最新的pyspark版本    `pip3 install pyspark==2.3.2`    更多pyspark操作MongoDB請看https://docs.mongodb.com
  • MongoDB 常見問題:MongoDB診斷
    [¶](https://docs.mongodb.com/manual/faq/diagnostics/#what-tools-are-available-for-monitoring-mongodb)Starting in version 4.0, MongoDB offers [free Cloud monitoring](https://docs.mongodb.com/manual
  • MongoDB 不得不知的 12 個知識點
    最簡單的入手就是存log,因為mongodb本身存的就是json,可以很方便的接入各種存儲日誌的地方。然後可以做成相關監控報表,比如說APM,NPM等,比如說千尋位置2. 其他的話要看題主所在的行業了,不同的行業有不同的用法,比如說信息的展示等等3.
  • MongoDB 如何上手和避坑?
    最簡單的入手就是存log,因為mongodb本身存的就是json,可以很方便的接入各種存儲日誌的地方。然後可以做成相關監控報表,比如說APM,NPM等,比如說千尋位置2. 其他的話要看題主所在的行業了,不同的行業有不同的用法,比如說信息的展示等等3.
  • MongoDB安裝筆記
    _64-rhel70-4.2.2/ mongodb-4.2.24.3.5 建立數據目錄mkdir /tongfu.net/env/mongodb-4.2.2/confmkdir /tongfu.net/env/mongodb-4.2.2/datamkdir /tongfu.net/env/mongodb-4.2.2/logs4.3.6
  • 不容忽視:MongoDB的JavaScript性能
    【IT168 技術】mongodb使用javascript做shell, mongodb的db.eval可以提供給數據驅動與這種javascript shell類似的js接口。
  • mongodb的文檔查詢命令find
    具體表現為:db.col.find().pretty()find和sql的類比1.查詢所有的結果sql:select * from t_usermongodb:db.user.find()2.查詢指定的列sql:select name,age from t_usermongodb:db.user.find({},{'name':1,'age':1})
  • mall整合Mongodb實現文檔操作
    Mongodb的安裝和使用1.下載Mongodb安裝包,下載地址:https://fastdl.mongodb.org/win32/mongodb-win32-x86_64-2008plus-ssl-3.2.21-signed.msi2.選擇安裝路徑進行安裝
  • 一文讀懂MongoDB,從理論到實踐
    安裝和部署流程下載和安裝小強公司的生產環境無法訪問外網,因此小強直接從mongodb的安裝源下載了這5個rpm安裝包:https://repo.mongodb.org/yum/redhat/7/mongodb-org/4.0/x86_64/RPMS/root@:~/mongo# rpm -ivh *.rpmwarning: mongodb-org
  • MongoDB 2.0.6 發布,分布式文檔資料庫
    模式自由(schema-free),意味著對於存儲在mongodb資料庫中的文件,我們不需要知道它的任何結構定義。如果需要的話,你完全可以把不同結構的文件存儲在同一個資料庫裡。 存儲在集合中的文檔,被存儲為鍵-值對的形式。鍵用於唯一標識一個文檔,為字符串類型,而值則可以是各中複雜的文件類型。
  • Spark 2.0系列之SparkSession詳解
    的各項功能,用戶不但可以使用DataFrame和Dataset的各種API,學習Spark的難度也會大大降低。SparkSession的功能首先,我們從一個Spark應用案例入手:SparkSessionZipsExample可以從JSON文件中讀取郵政編碼,通過DataFrame API進行分析,同時還能夠使用Spark SQL語句實施查詢。
  • MongoDB學習筆記整理,請趕緊收藏起來
    mongodb是最近幾年最火的nosql資料庫,在很多大型企業應用廣泛,今天就一起學習它的用法。花了一周時間,整理下面的學習記錄,希望對大家有所幫助。mongodb1. mongodb的安裝,windows 10可以安裝,一般作為服務都是安裝在Linux伺服器上。
  • 「實戰記錄」mongodb與php的坑,我是踩了
    於是直接查看連接埠,全部是mongodb的!問題解決我期間還加大php進程到一百多個,結果還是跑滿!百度之後才知道,原來php7.1後會跟mongodb建立長連接關係!彩蛋mongodb數據是強類型,php是弱類型!一定要注意php的變量做參數時的數據類型要跟mongodb存儲的一致,否則查詢不出數據!
  • 一、Spark概述
    2、易用性(可以通過java/scala/python/R開發spark應用程式)3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)4、兼容性(spark程序可以運行在standalone/yarn/mesos)Spark 框架模塊整個Spark 框架模塊包含
  • 『 Spark 』2. spark 基本概念解析
    過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,並非為了做什麼教程,所以一切以個人理解梳理為主,沒有必要的細節就不會記錄了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本好還是必要的。1.
  • mongoHelper 0.3.9 發布,spring-data-mongodb 增強工具包
    mongoHelper 是基於 spring-data-mongodb 的增強工具包,簡化 CRUD 操作,提供類 jpa 的資料庫操作。
  • MongoDB-CRUD&運維工具介紹以及授權認證
    2.MongoDB 不支持多文檔事務(mongodb4.0 開始支持 ACID)。但是 MongoDB 確實在一個文檔上提供了原子操作。儘管集合中的文檔通常都是相同的,但是 MongoDB 中的集合不需要指定 schema。3.MongoDB 不支持 SQL 但是支持自己的豐富的查詢語言。4.在 MongoDB 中,存儲在集合中的每個文檔都需要一個唯一的 _id 欄位,作為主鍵。
  • Spark【面試】
     spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發一次action操作就會產生一個job。 spark:spark的shuffle是在DAGSchedular劃分Stage的時候產生的,TaskSchedule要分發Stage到各個worker的executor。 減少shuffle可以提高性能。23、RDD機制?