用戶畫像建設是產品進行精細化運營中非常重要的一環,像今日頭條的精準個性化推薦、京東及淘寶的商品推薦、手機SDK的消息推送等,都是基於用戶畫像數據進行精細化運營的。在用戶畫像建設中,如何對用戶畫像的數據進行存儲顯得尤為重要,因為數據存儲結構非常影響數據的查詢性能。上一篇文章《基於hbase進行用戶畫像查詢(一)》已經探討了基於base存儲並基於其filter查詢方案的可行性,從實驗的結論可以看出方案不可行。本文探討一下ElasticSearch+Hbase進行用戶畫像查詢。
一、ElasticSearch全文搜尋引擎
ElasticSearch是一個基於Lucene的開源搜尋引擎,底層使用Java開發並使用Lucene作為其核心來實現所有索引和搜索功能,並通過RESTful API來隱藏Lucene的複雜性,從而讓全文檢索變得簡單。下一篇文章將對ElasticSearch進行詳細的介紹。
ElasticSearch是面向文檔(document)的,其底層以文檔的形式存儲,並索引每個文檔的內容使之可以被搜索。ElasticSearch使用JavaScript對象符號JSON(JavaScript Object Notation)作為文檔序列化格式。
在ElasticSearch體系中,有一個重要的概念:索引,這個詞在屬性上可分為名詞和動詞兩種。索引(名詞):一個索引(index)就像是傳統關係資料庫中的資料庫,它是相關文檔存儲的地方;索引(動詞):索引一個文檔表示一個文檔存儲到索引(名詞)裡,以便它可以被搜索,很像SQL中的insert關鍵字,區別是:如果文檔已存在,新的文檔覆蓋舊的文檔。
ElasticSearch數據架構的主要概念與關係資料庫Mysql對比:
ElasticSearchMysqlIndexDatabaseTypeTableDocumentRowFieldColumn二、HBase與ElasticSearch整合
HBase的特性決定了在根據用戶查詢相關標籤屬性的查詢性能上非常有優勢,但是在根據標籤查詢相關用戶的查詢性能上卻非常糟糕。為了解決這個問題,通常需要在Hbase的上層建立二級索引。
解決方案:HBase存儲全量用戶的基礎信息數據與標籤屬性數據,並採用ElasticSearch索引用戶的標籤數據作為二級索引。一、根據用戶查詢對應的標籤數據只需要查詢HBase即可;二、根據標籤查詢用戶則需要通過term在ElasticSearch中搜索對應的rowkey信息,然後通過rowkey在HBase中查詢對應的用戶信息數據。
整合HBase、ElasticSearch的數據寫入及查詢功能,並對外提供接口能力。由於ElasticSearch並不屬於Hadoop生態圈體系的組件,在整合HBase與ElasticSearch的過程中,遇到一些版本衝突問題,詳細描述如下:
1)ElasticSearch對JDK版本的要求
ElasticSearch2.x及以下ElasticSearch5.x及以上Java 8 update 20 or later, or Java 7 update 55 or laterJava version 1.8.0_73 or later2)HBase對JDK版本的支持
HBase VersionJDK 7JDK 8JDK 9JDK 102.0不支持支持不支持不支持1.3及以下支持支持不支持不支持3)Spark對ElasticSearch的支持
ElasticSearch2.x及以下ElasticSearch5.x及以上Spark 1.6.xSpark 2.x4)第三方依賴guava、netty、protobuf
由於集群HBase版本為1.1.3,Spark版本為1.6.2,因此選擇ElasticSearch2.4.5版本,但是在項目搭建的過程中發現thirdparty dependency出現版本衝突。
為了解決第三方版本不一致的衝突問題,需要對ElasticSearch引入的第三方依賴進行shade操作,
1<dependencies>
2<dependency>
3 <groupId>org.elasticsearch</groupId>
4 <artifactId>elasticsearch</artifactId>
5 <version>${elsearch.version}</version>
6</dependency>
7</dependencies>
8<build>
9<plugins>
10 <plugin>
11 <groupId>org.apache.maven.plugins</groupId>
12 <artifactId>maven-shade-plugin</artifactId>
13 <version>2.4.1</version>
14 <configuration>
15 <createDependencyReducedPom>false</createDependencyReducedPom>
16 </configuration>
17 <executions>
18 <execution>
19 <phase>package</phase>
20 <goals>
21 <goal>shade</goal>
22 </goals>
23 <configuration>
24 <relocations>
25 <relocation>
26 <pattern>com.google.guava</pattern>
27 <shadedPattern>com.aspire.elsearch.guava</shadedPattern>
28 </relocation>
29 <relocation>
30 <pattern>org.joda</pattern>
31 <shadedPattern>com.aspire.elsearch.joda</shadedPattern>
32 </relocation>
33 <relocation>
34 <pattern>com.google.common</pattern>
35 <shadedPattern>com.aspire.elsearch.common</shadedPattern>
36 </relocation>
37 <relocation>
38 <pattern>com.google.thirdparty</pattern>
39 <shadedPattern>com.aspire.elsearch.thirdparty</shadedPattern>
40 </relocation>
41 <relocation>
42 <pattern>io.netty</pattern>
43 <shadedPattern>com.aspire.elsearch.netty</shadedPattern>
44 </relocation>
45 </relocations>
46 <transformers>
47 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
48 </transformers>
49 </configuration>
50 </execution>
51 </executions>
52 </plugin>
53</plugins>
54</build>
執行clean install後,在項目的pom引入依賴。
1<dependency>
2 <groupId>com.aspire.elsearch</groupId>
3 <artifactId>es-shade</artifactId>
4 <version>1.0.0-RELEASE</version>
5 <scope>compile</scope>
6 <exclusions>
7 <exclusion>
8 <groupId>org.elasticsearch</groupId>
9 <artifactId>elasticsearch</artifactId>
10 </exclusion>
11 </exclusions>
12</dependency>
三、批量數據導入
第一種方案:通過Java代碼Scan全表數據,然後通過addIndex或者bulkloader入庫。
1 long start = System.currentTimeMillis();
2 Connection conn = null;
3 HbaseClient hclient = null;
4 Client esClient = null;
5 try {
6 esClient = ElsearchClient.getEsClient();
7 hclient = getHServer(request);
8 conn = hclient.getConnection();
9 Table table = HbaseUtil.getTable(conn, NAMESPACE, TABLE_NAME);
10 Scan scan = new Scan();
11 scan.setStartRow(Bytes.toBytes(STARTROW));
12 scan.setStopRow(Bytes.toBytes(STOPROW));
13 ResultScanner rs = table.getScanner(scan);
14 long count = 0L;
15 for(Result r : rs) {
16 count++;
17 String rk = Bytes.toString(r.getRow());
18 Map<String, Object> docMap = new HashedMap();
19 List<Cell> cells = r.listCells();
20 for (Cell cell : cells) {
21 String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
22 String val = Bytes.toString(CellUtil.cloneValue(cell));
23 docMap.put(qualifier, val);
24 }
25 ElsearchUtil.addDoc(esClient, ES_IDX, ES_TPYE, rk, docMap);
26 if (count % 1000 == 0) {
27 long mid = System.currentTimeMillis();
28 String line = String.format("Had insert %s rows, took time %s s.", count, ((mid-start)/1000));
29 StandardLogHelper.writeHbase2EsLog(ResultCode.SUCCESS, start, line);
30 }
31 }
32 } catch (Exception e) {
33 throw e;
34 } finally {
35 if(esClient != null) {
36 ElsearchClient.close();
37 }
38 if(conn != null) {
39 hclient.close(conn);
40 }
41 }
第二種方案:通過Spark程序Scan全表數據並轉成esRDD,然後通過EsSpark入庫。
1val hbaseRdd = sc.newAPIHadoopRDD(hconf,
2 classOf[TableInputFormat],
3 classOf[ImmutableBytesWritable],
4 classOf[Result])
5val esRdd = hbaseRdd.map(tpl => {
6 val ret = tpl._2
7 val rk = Bytes.toString(ret.getRow)
8 val docMap = mutable.HashMap[String, String]()
9 docMap.put("id", rk)
10 val cells = ret.listCells()
11 val it =cells.iterator()
12 while(it.hasNext) {
13 val cell = it.next()
14 val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
15 val value = Bytes.toString(CellUtil.cloneValue(cell))
16 docMap.put(qualifier, value)
17 }
18 docMap
19})
20EsSpark.saveToEs(esRdd, "hbase/es_usertags", Map("es.mapping.id" -> "id"))
通過測試,第一種方案效率非常慢,使用單線程addIndex導入數據,即使使用多線程或bulkloader的方式進行數據導入,其效果提升也不明顯。其結果如下:
第二種方案效率比第一種方案的效率有很大幅度的提升,8000萬+的數據導入大約需要3小時(task fail是因為一些特殊字符導致,先忽略)。
結論:在多線程導入數據到ElasticSearch時,由於ElasticSearch內部的分片數及單機多線程上限的影響下,其效率不會特別高。通過Spark程序導入數據的過程中,發現已插入的數據越多後插入的效率就越慢。待解決:8000萬數據導入需要3小時(12個單核executor),對於大數據能力來說性能較差,是否還有更好的方案?修改ElasticSearch的分片數能否提升導入的效率?
四、搜索性能
1 long start = System.currentTimeMillis();
2 Client client = ElsearchClient.getEsClient();
3 String index = "hbase";
4 String type = "es_usertags";
5 String tagValue = "201803";
6 String[] tagCodes = {"01003001", "01002004"};
7 int pageNo = 5;
8 int pageSize = 100;
9 List<Map<String, Object>> list = ElsearchUtil.multiSearch(
10 client,
11 pageNo,
12 pageSize,
13 index, type, tagValue, tagCodes);
14 long end = System.currentTimeMillis();
15 System.out.println("This multiSearch took time " + (end-start)
16 + ", size of list is " + (list==null? 0 : list.size()));
測試結果(含建立連接):This multiSearch took time 6830, size of list is 100
測試結果(不含建立連接):This multiSearch took time 532, size of list is 100
從測試結果來看,獲取tcp連接比較耗時,而實際檢索結果秒級內響應,性能非常高,生產應用只需要使用長連接即可。