基於HBase+ElasticSearch進行用戶畫像查詢(二)

2021-02-14 Hadoop大數據之路

用戶畫像建設是產品進行精細化運營中非常重要的一環,像今日頭條的精準個性化推薦、京東及淘寶的商品推薦、手機SDK的消息推送等,都是基於用戶畫像數據進行精細化運營的。在用戶畫像建設中,如何對用戶畫像的數據進行存儲顯得尤為重要,因為數據存儲結構非常影響數據的查詢性能。上一篇文章《基於hbase進行用戶畫像查詢(一)》已經探討了基於base存儲並基於其filter查詢方案的可行性,從實驗的結論可以看出方案不可行。本文探討一下ElasticSearch+Hbase進行用戶畫像查詢。

一、ElasticSearch全文搜尋引擎

ElasticSearch是一個基於Lucene的開源搜尋引擎,底層使用Java開發並使用Lucene作為其核心來實現所有索引和搜索功能,並通過RESTful API來隱藏Lucene的複雜性,從而讓全文檢索變得簡單。下一篇文章將對ElasticSearch進行詳細的介紹。

ElasticSearch是面向文檔(document)的,其底層以文檔的形式存儲,並索引每個文檔的內容使之可以被搜索。ElasticSearch使用JavaScript對象符號JSON(JavaScript Object Notation)作為文檔序列化格式。

在ElasticSearch體系中,有一個重要的概念:索引,這個詞在屬性上可分為名詞和動詞兩種。索引(名詞):一個索引(index)就像是傳統關係資料庫中的資料庫,它是相關文檔存儲的地方;索引(動詞):索引一個文檔表示一個文檔存儲到索引(名詞)裡,以便它可以被搜索,很像SQL中的insert關鍵字,區別是:如果文檔已存在,新的文檔覆蓋舊的文檔。

ElasticSearch數據架構的主要概念與關係資料庫Mysql對比:

ElasticSearchMysqlIndexDatabaseTypeTableDocumentRow‍FieldColumn
‍‍MappingSchemaDSLSQL

二、HBase與ElasticSearch整合

HBase的特性決定了在根據用戶查詢相關標籤屬性的查詢性能上非常有優勢,但是在根據標籤查詢相關用戶的查詢性能上卻非常糟糕。為了解決這個問題,通常需要在Hbase的上層建立二級索引。

解決方案:HBase存儲全量用戶的基礎信息數據與標籤屬性數據,並採用ElasticSearch索引用戶的標籤數據作為二級索引。一、根據用戶查詢對應的標籤數據只需要查詢HBase即可;二、根據標籤查詢用戶則需要通過term在ElasticSearch中搜索對應的rowkey信息,然後通過rowkey在HBase中查詢對應的用戶信息數據。

整合HBase、ElasticSearch的數據寫入及查詢功能,並對外提供接口能力。由於ElasticSearch並不屬於Hadoop生態圈體系的組件,在整合HBase與ElasticSearch的過程中,遇到一些版本衝突問題,詳細描述如下:

1)ElasticSearch對JDK版本的要求

ElasticSearch2.x及以下ElasticSearch5.x及以上Java 8 update 20 or later, or Java 7 update 55 or laterJava version 1.8.0_73 or later

2)HBase對JDK版本的支持

HBase VersionJDK 7JDK 8JDK 9JDK 102.0不支持支持不支持不支持1.3及以下支持支持不支持不支持

3)Spark對ElasticSearch的支持

ElasticSearch2.x及以下ElasticSearch5.x及以上Spark 1.6.xSpark 2.x

4)第三方依賴guava、netty、protobuf

由於集群HBase版本為1.1.3,Spark版本為1.6.2,因此選擇ElasticSearch2.4.5版本,但是在項目搭建的過程中發現thirdparty dependency出現版本衝突。


nettyguavaprotobufElasticSearch 2.4.53.x默認18.0
HBase 1.1.34.0.x16.0.x及以下2.5.0

為了解決第三方版本不一致的衝突問題,需要對ElasticSearch引入的第三方依賴進行shade操作,

1<dependencies>
2<dependency>
3  <groupId>org.elasticsearch</groupId>
4  <artifactId>elasticsearch</artifactId>
5  <version>${elsearch.version}</version>
6</dependency>
7</dependencies>
8<build>
9<plugins>
10  <plugin>
11    <groupId>org.apache.maven.plugins</groupId>
12    <artifactId>maven-shade-plugin</artifactId>
13    <version>2.4.1</version>
14    <configuration>
15      <createDependencyReducedPom>false</createDependencyReducedPom>
16    </configuration>
17    <executions>
18      <execution>
19        <phase>package</phase>
20        <goals>
21          <goal>shade</goal>
22        </goals>
23        <configuration>
24          <relocations>
25            <relocation>
26              <pattern>com.google.guava</pattern>
27              <shadedPattern>com.aspire.elsearch.guava</shadedPattern>
28            </relocation>
29            <relocation>
30              <pattern>org.joda</pattern>
31              <shadedPattern>com.aspire.elsearch.joda</shadedPattern>
32            </relocation>
33            <relocation>
34              <pattern>com.google.common</pattern>
35              <shadedPattern>com.aspire.elsearch.common</shadedPattern>
36            </relocation>
37            <relocation>
38              <pattern>com.google.thirdparty</pattern>
39              <shadedPattern>com.aspire.elsearch.thirdparty</shadedPattern>
40            </relocation>
41            <relocation>
42              <pattern>io.netty</pattern>
43              <shadedPattern>com.aspire.elsearch.netty</shadedPattern>
44            </relocation>
45          </relocations>
46          <transformers>
47            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
48          </transformers>
49        </configuration>
50      </execution>
51    </executions>
52  </plugin>
53</plugins>
54</build>

執行clean install後,在項目的pom引入依賴。

1<dependency>
2    <groupId>com.aspire.elsearch</groupId>
3    <artifactId>es-shade</artifactId>
4    <version>1.0.0-RELEASE</version>
5    <scope>compile</scope>
6    <exclusions>
7        <exclusion>
8            <groupId>org.elasticsearch</groupId>
9            <artifactId>elasticsearch</artifactId>
10        </exclusion>
11    </exclusions>
12</dependency>


三、批量數據導入

第一種方案:通過Java代碼Scan全表數據,然後通過addIndex或者bulkloader入庫。

1    long start = System.currentTimeMillis();
2    Connection conn = null;
3    HbaseClient hclient = null;
4    Client esClient = null;
5    try {
6        esClient = ElsearchClient.getEsClient();
7        hclient = getHServer(request);
8        conn = hclient.getConnection();
9        Table table = HbaseUtil.getTable(conn, NAMESPACE, TABLE_NAME);
10        Scan scan = new Scan();
11        scan.setStartRow(Bytes.toBytes(STARTROW));
12        scan.setStopRow(Bytes.toBytes(STOPROW));
13        ResultScanner rs = table.getScanner(scan);
14        long count = 0L;
15        for(Result r : rs) {
16            count++;
17            String rk = Bytes.toString(r.getRow());
18            Map<String, Object> docMap = new HashedMap();
19            List<Cell> cells = r.listCells();
20            for (Cell cell : cells) {
21                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
22                String val = Bytes.toString(CellUtil.cloneValue(cell));
23                docMap.put(qualifier, val);
24            }
25            ElsearchUtil.addDoc(esClient, ES_IDX, ES_TPYE, rk, docMap);
26            if (count % 1000 == 0) {
27                long mid = System.currentTimeMillis();
28                String line = String.format("Had insert %s rows, took time %s s.", count, ((mid-start)/1000));
29                StandardLogHelper.writeHbase2EsLog(ResultCode.SUCCESS, start, line);
30            }
31        }
32    } catch (Exception e) {
33        throw e;
34    } finally {
35        if(esClient != null) {
36            ElsearchClient.close();
37        }
38        if(conn != null) {
39            hclient.close(conn);
40        }
41    }

第二種方案:通過Spark程序Scan全表數據並轉成esRDD,然後通過EsSpark入庫。

1val hbaseRdd = sc.newAPIHadoopRDD(hconf,
2  classOf[TableInputFormat],
3  classOf[ImmutableBytesWritable],
4  classOf[Result])
5val esRdd = hbaseRdd.map(tpl => {
6  val ret = tpl._2
7  val rk = Bytes.toString(ret.getRow)
8  val docMap = mutable.HashMap[String, String]()
9  docMap.put("id", rk)
10  val cells = ret.listCells()
11  val it =cells.iterator()
12  while(it.hasNext) {
13    val cell = it.next()
14    val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
15    val value = Bytes.toString(CellUtil.cloneValue(cell))
16    docMap.put(qualifier, value)
17  }
18  docMap
19})
20EsSpark.saveToEs(esRdd, "hbase/es_usertags", Map("es.mapping.id" -> "id"))

通過測試,第一種方案效率非常慢,使用單線程addIndex導入數據,即使使用多線程或bulkloader的方式進行數據導入,其效果提升也不明顯。其結果如下:

第二種方案效率比第一種方案的效率有很大幅度的提升,8000萬+的數據導入大約需要3小時(task fail是因為一些特殊字符導致,先忽略)。

結論:在多線程導入數據到ElasticSearch時,由於ElasticSearch內部的分片數及單機多線程上限的影響下,其效率不會特別高。通過Spark程序導入數據的過程中,發現已插入的數據越多後插入的效率就越慢。待解決:8000萬數據導入需要3小時(12個單核executor),對於大數據能力來說性能較差,是否還有更好的方案?修改ElasticSearch的分片數能否提升導入的效率?

四、搜索性能

1    long start = System.currentTimeMillis();
2    Client client = ElsearchClient.getEsClient();
3    String index = "hbase";
4    String type = "es_usertags";
5    String tagValue = "201803";
6    String[] tagCodes = {"01003001", "01002004"};
7    int pageNo = 5;
8    int pageSize = 100;
9    List<Map<String, Object>> list = ElsearchUtil.multiSearch(
10            client,
11            pageNo,
12            pageSize,
13            index, type, tagValue, tagCodes);
14    long end = System.currentTimeMillis();
15    System.out.println("This multiSearch took time " + (end-start)
16            + ", size of list is " + (list==null? 0 : list.size()));

測試結果(含建立連接):This multiSearch took time 6830, size of list is 100

測試結果(不含建立連接):This multiSearch took time 532, size of list is 100

從測試結果來看,獲取tcp連接比較耗時,而實際檢索結果秒級內響應,性能非常高,生產應用只需要使用長連接即可。

相關焦點

  • ElasticSearch安裝
    它提供了一個分布式多用戶能力的全文搜尋引擎[2],基於RESTful web接口。Elasticsearch是用Java語言開發的,並作為Apache許可條款下的開放源碼發布,是一種流行的企業級搜尋引擎。Elasticsearch用於雲計算[3]中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。
  • Elasticsearch高級調優方法論之——根治慢查詢!
    3、考慮冷熱數據分離架構(適用於基於時間的索引)以及Elasticsearch中的翻轉索引(rollover)/壓縮索引(shrink)功能,以有效管理分片計數。推薦閱讀:我在 Elasticsearch 集群內應該設置多少個分片?
  • 在elasticsearch中使用function_score查詢
    前端時間一直在研究kubernets+.net core相關,本打算寫幾篇k8s的文章,但在公司聽到同事遇到個需求一直沒有搞定,於是出於程式設計師的本能了解了一下,就是需要根據左上角,右下角的經緯度坐標查詢一批在地圖上均勻分布!
  • Hbase協處理器實踐總結(hbase數據同步)
    >6.8.5</elasticsearch.version> <hbase.version>1.0.0</hbase.version> <!
  • Spring Data ElasticSearch 使用
    操作,將原始操作elasticSearch的客戶端API 進行封裝 。spring data API 簡化 elasticSearch操作,將原始操作elasticSearch的客戶端API 進行封裝 \n" + " Spring Data為Elasticsearch Elasticsearch項目提供集成搜尋引擎"); articleService.save(article);
  • Elasticsearch(三):實戰
    的核心概念進行比較詳細的介紹,但是在實際生產中我們如何使用elasticsearch呢?本篇文章我們先介紹一些elasticsearch常見的使用方法,然後通過一個實際的例子來加深對elasticsearch使用的理解。這個實際例子是網站上收集的用戶點擊菜單的行為日誌數據存儲在elasticsearch上,並可以通過工具可以通過一些圖表來分析用戶的行為。總體的目標:滿足多個維度圖表的查看,索引可以定期歸檔或者存儲不用人工幹預。
  • ElasticSearch 極簡教程
    ElasticSearch是一個基於Lucene的搜索伺服器。它提供了一個分布式多用戶能力的全文搜尋引擎,基於RESTful web接口。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放源碼發布,是當前流行的企業級搜尋引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。ElasticSearch 架構
  • Elasticsearch專題
    ,基於RESTful風格的全文搜尋引擎ES是分布式文檔資料庫。一條數據就是一個文檔,這個文檔用JSON作為載體ES安裝下載安裝包,解壓,修改elasticsearch.yml啟動啟動報錯ES啟動不能使用root用戶
  • Linxu安裝ElasticSearch
    它提供了一個分布式多用戶能力的全文搜尋引擎,基於RESTful web接口。Elasticsearch是用Java語言開發的,並作為Apache許可條款下的開放源碼發布,是一種流行的企業級搜尋引擎。Elasticsearch用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。
  • ElasticSearch
    Lucene的搜索伺服器,是一個分布式、高擴展、高實時的搜索與數據分析引擎,基於RESTful web接口 2.Elasticsearch是用Java語言開發的,並作為Apache許可條款下的開放源碼發布,是一種流行的企業級搜尋引擎 3.應用場景 3.1 搜索:海量數據的查詢 3.2 日誌數據分析 3.3 實時數據分析1.2 ES和MySQL的區別 • MySQL有事務性
  • ElasticSearch初體驗
    Elastic Stack構建在開源基礎之上, Elastic Stack 讓您能夠安全可靠地獲取任何來源、任何格式的數據,並且能夠實時地對數據進行搜索、分析和可視化Elasticsearch 是基於 JSON 的分布式搜索和分析引擎,專為實現水平擴展、高可用和管理便捷性而設計。
  • ElasticSearch聚合查詢Restful語法和JavaApi詳解(基於ES7.6)
    (一)概述在前面關於ES的一系列文章中,已經介紹了ES的概念、常用操作、JavaAPI以及實際的一個小demo,但是在真實的應用場景中,還有可能會有更高階的一些用法,今天主要介紹兩種相對來說會更難一些的操作,聚合查詢。該文檔基於ElasticSearch7.6,將介紹restful查詢語法以及JavaApi。
  • Elastic App Search初體驗
    開發人員通過AppSearch提供的Api降數據提交給AppSearch,AppSearch富化數據並索引到Elasticsearch,客戶可以在網站或App等各種端進行數據的檢索;了解了AppSearch之後,接下來我們進行實戰演練。
  • ElasticSearch介紹
    /cn/products/elasticsearchGithub:https://github.com/elastic/elasticsearch總結:1、elasticsearch是一個基於Lucene的高擴展的分布式搜索伺服器,支持開箱即用。
  • Elasticsearch Suggester詳解
    "}此時blogs索引裡已經有一些文檔了,可以進行下一步的探索。Term suggester正如其名,只基於analyze過的單個term去提供建議,並不會考慮多個term之間的關係。API調用方只需為每個token挑選options裡的詞,組合在一起返回給用戶前端即可。 那麼有無更直接辦法,API直接給出和用戶輸入文本相似的內容? 答案是有,這就要求助Phrase Suggester了。
  • Elasticsearch SQL用法詳解
    /bin/elasticsearch-plugin install file:/elasticsearch-sql-5.6.3.0.zip3.重啟ES服務 執行完上述三步,你就可以使用SQL探索數據了,以kibana中的使用為例:二、6.4 Elasticsearch SQL用法
  • 網際網路公司Elasticsearch應用案例分享
    一、京東到家訂單中心 Elasticsearch 演進歷程三、去哪兒:訂單中心基於elasticsearch 的解決方案四、Elasticsearch 在58集團信息安全部的應用國內現在有大量的公司都在使用 Elasticsearch,包括攜程、滴滴、今日頭條、餓了麼、360安全、小米、vivo等諸多知名公司。
  • SpringBoot 操作 ElasticSearch 詳解
    >一、ElasticSearch 簡介1、簡介ElasticSearch 是一個基於 Lucene 的搜索伺服器。它提供了一個分布式多員工能力的全文搜尋引擎,基於 RESTful web 接口。Elasticsearch 是用 Java 語言開發的,並作為 Apache 許可條款下的開放源碼發布,是一種流行的企業級搜尋引擎。ElasticSearch 用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。
  • Elasticsearch 的快照插件
    插件的安裝與查詢插件可以通過命令行 elasticsearch-plugin 來安裝(官方文檔[1]):sudo bin/elasticsearch-plugin install $plugin_namesudo
  • Elasticsearch訪問日誌配置
    一、ElasticsearchElasticsearch是一個基於Lucene[1]的搜索伺服器。